Skip to main content

tuwunel_service/rooms/event_handler/
handle_incoming_pdu.rs

1use std::collections::HashMap;
2
3use futures::{FutureExt, TryStreamExt, future::try_join5};
4use ruma::{
5	CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
6	RoomId, RoomVersionId, ServerName, UserId,
7	events::{
8		AnyStrippedStateEvent, StateEventType,
9		room::member::{MembershipState, RoomMemberEventContent},
10	},
11};
12use tuwunel_core::{
13	Err, Result, debug,
14	debug::INFO_SPAN_LEVEL,
15	debug_warn, err, implement,
16	matrix::{Event, PduCount, PduEvent, pdu::MAX_PREV_EVENTS, room_version::from_create_event},
17	smallvec::SmallVec,
18	trace,
19	utils::{
20		BoolExt,
21		stream::{IterStream, TryWidebandExt},
22	},
23	warn,
24};
25
26use super::backoff::{Context, Disposition};
27use crate::rooms::timeline::RawPduId;
28
29type PrevResultsHandled = SmallVec<[PrevHandled; MAX_PREV_EVENTS]>;
30type PrevHandled = (OwnedEventId, Handled);
31type PrevSplit = SmallVec<[OwnedEventId; MAX_PREV_EVENTS]>;
32
33type Handled = Option<(RawPduId, bool)>;
34
35/// When receiving an event one needs to:
36/// 0. Check the server is in the room
37/// 1. Skip the PDU if we already know about it
38/// 1.1. Remove unsigned field
39/// 2. Check signatures, otherwise drop
40/// 3. Check content hash, redact if doesn't match
41/// 4. Fetch any missing auth events doing all checks listed here starting at 1.
42///    These are not timeline events
43/// 5. Reject "due to auth events" if can't get all the auth events or some of
44///    the auth events are also rejected "due to auth events"
45/// 6. Reject "due to auth events" if the event doesn't pass auth based on the
46///    auth events
47/// 7. Persist this event as an outlier
48/// 8. If not timeline event: stop
49/// 9. Fetch any missing prev events doing all checks listed here starting at 1.
50///    These are timeline events
51/// 10. Fetch missing state and auth chain events by calling `/state_ids` at
52///     backwards extremities doing all the checks in this list starting at
53///     1. These are not timeline events
54/// 11. Check the auth of the event passes based on the state of the event
55/// 12. Ensure that the state is derived from the previous current state (i.e.
56///     we calculated by doing state res where one of the inputs was a
57///     previously trusted set of state, don't just trust a set of state we got
58///     from a remote)
59/// 13. Use state resolution to find new room state
60/// 14. Check if the event passes auth based on the "current state" of the room,
61///     if not soft fail it
62#[implement(super::Service)]
63#[tracing::instrument(
64	name = "pdu",
65	level = INFO_SPAN_LEVEL,
66	skip_all,
67	fields(%room_id, %event_id),
68	ret(level = "debug"),
69)]
70pub async fn handle_incoming_pdu<'a>(
71	&'a self,
72	origin: &'a ServerName,
73	room_id: &'a RoomId,
74	event_id: &'a EventId,
75	pdu: CanonicalJsonObject,
76	is_timeline_event: bool,
77) -> Result<Handled> {
78	// 1. Skip the PDU if we already have it as a timeline event
79	if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
80		debug!(?pdu_id, "Exists.");
81		return Ok(Some((pdu_id, false)));
82	}
83
84	// 1.1 Check the server is in the room
85	let meta_exists = self.services.metadata.exists(room_id).map(Ok);
86
87	// 1.2 Check if the room is disabled
88	let is_disabled = self
89		.services
90		.metadata
91		.is_disabled(room_id)
92		.map(Ok);
93
94	// 1.3.1 Check room ACL on origin field/server
95	let origin_acl_check = self.acl_check(origin, room_id);
96
97	// 1.3.2 Check room ACL on sender's server name
98	let sender: &UserId = pdu
99		.get("sender")
100		.try_into()
101		.map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?;
102
103	let sender_acl_check = sender
104		.server_name()
105		.ne(origin)
106		.then_async(|| self.acl_check(sender.server_name(), room_id));
107
108	// Fetch create event; absent when we are not resident in the room.
109	let create_event = self
110		.services
111		.state_accessor
112		.room_state_get(room_id, &StateEventType::RoomCreate, "")
113		.map(|result| Ok(result.ok()));
114
115	let (meta_exists, is_disabled, (), (), create_event) = try_join5(
116		meta_exists,
117		is_disabled,
118		origin_acl_check,
119		sender_acl_check.map(|o| o.unwrap_or(Ok(()))),
120		create_event,
121	)
122	.await?;
123
124	// When not resident, the only event we can act on is a leave rescinding an
125	// out-of-band invite we hold for a local user.
126	if !meta_exists {
127		return if self
128			.handle_rescinded_invite(room_id, &pdu)
129			.await?
130		{
131			Ok(None)
132		} else {
133			Err!(Request(NotFound("Room is unknown to this server")))
134		};
135	}
136
137	if is_disabled {
138		return Err!(Request(Forbidden("Federation of this room is disabled by this server.")));
139	}
140
141	let create_event =
142		create_event.ok_or_else(|| err!(Request(NotFound("Room is unknown to this server"))))?;
143
144	let room_version = from_create_event(&create_event)?;
145	let recursion_level = 0;
146
147	let (incoming_pdu, pdu) = self
148		.handle_outlier_pdu(origin, room_id, event_id, pdu, &room_version, recursion_level, false)
149		.await?;
150
151	// 8. if not timeline event: stop
152	if !is_timeline_event {
153		debug!(
154			kind = ?incoming_pdu.event_type(),
155			"Not a timeline event.",
156		);
157		return Ok(None);
158	}
159
160	// Skip old events
161	let first_ts_in_room = self
162		.services
163		.timeline
164		.first_pdu_in_room(room_id)
165		.await?
166		.origin_server_ts();
167
168	if incoming_pdu.origin_server_ts() < first_ts_in_room {
169		debug!(
170			origin_server_ts = ?incoming_pdu.origin_server_ts(),
171			?first_ts_in_room,
172			"Skipping old event."
173		);
174		return Ok(None);
175	}
176
177	// 9. Fetch any missing prev events doing all checks listed here starting at 1.
178	//    These are timeline events
179	let (sorted_prev_events, eventid_info) = self
180		.fetch_prev(
181			origin,
182			room_id,
183			event_id,
184			incoming_pdu.prev_events(),
185			&room_version,
186			recursion_level,
187			first_ts_in_room,
188		)
189		.await?;
190
191	self.handle_prev_events(
192		origin,
193		room_id,
194		event_id,
195		sorted_prev_events,
196		eventid_info,
197		&room_version,
198		recursion_level,
199		first_ts_in_room,
200		create_event.event_id(),
201	)
202	.boxed()
203	.await?;
204
205	// Done with prev events, now handling the incoming event
206	self.upgrade_outlier_to_timeline_pdu(
207		origin,
208		room_id,
209		incoming_pdu,
210		pdu,
211		&room_version,
212		recursion_level,
213		create_event.event_id(),
214	)
215	.boxed()
216	.await
217}
218
219/// Apply a federated leave that rescinds an out-of-band invite for a local
220/// user.
221///
222/// We are not resident in the room, so the kick cannot be processed as a normal
223/// timeline event for lack of room state; but it must still clear the invite so
224/// the invited user's `/sync` reflects the rescission. Mirrors Synapse's
225/// out-of-band membership handling: only a kick from the original inviter is
226/// honored, since without the room state we cannot judge any other sender's
227/// authority. Returns `true` when a rescission was applied.
228#[implement(super::Service)]
229#[tracing::instrument(skip_all, level = "debug", fields(%room_id))]
230async fn handle_rescinded_invite(
231	&self,
232	room_id: &RoomId,
233	pdu: &CanonicalJsonObject,
234) -> Result<bool> {
235	if pdu
236		.get("type")
237		.and_then(CanonicalJsonValue::as_str)
238		!= Some("m.room.member")
239	{
240		return Ok(false);
241	}
242
243	let Some(target) = pdu
244		.get("state_key")
245		.and_then(CanonicalJsonValue::as_str)
246		.and_then(|state_key| UserId::parse(state_key).ok())
247	else {
248		return Ok(false);
249	};
250
251	let Some(sender) = pdu
252		.get("sender")
253		.and_then(CanonicalJsonValue::as_str)
254		.and_then(|sender| UserId::parse(sender).ok())
255	else {
256		return Ok(false);
257	};
258
259	if sender == target || !self.services.globals.user_is_local(&target) {
260		return Ok(false);
261	}
262
263	let Some(content) = pdu
264		.get("content")
265		.cloned()
266		.map(Into::into)
267		.and_then(|content| serde_json::from_value::<RoomMemberEventContent>(content).ok())
268	else {
269		return Ok(false);
270	};
271
272	if content.membership != MembershipState::Leave {
273		return Ok(false);
274	}
275
276	if self
277		.services
278		.state_cache
279		.user_membership(&target, room_id)
280		.await != Some(MembershipState::Invite)
281	{
282		return Ok(false);
283	}
284
285	// Recover the inviter and the room version from the stored stripped state.
286	let invite_state = self
287		.services
288		.state_cache
289		.invite_state(&target, room_id)
290		.await?;
291
292	let inviter = invite_state
293		.iter()
294		.find_map(|event| match event.deserialize() {
295			| Ok(AnyStrippedStateEvent::RoomMember(member)) if member.state_key == target =>
296				Some(member.sender),
297			| _ => None,
298		});
299
300	// Honor the rescission only from the original inviter.
301	if inviter.as_ref() != Some(&sender) {
302		return Ok(false);
303	}
304
305	let Some(room_version_id) = super::room_version_of(&invite_state) else {
306		return Ok(false);
307	};
308
309	// Verify the kick is signed by the sender's server before acting on it.
310	self.services
311		.server_keys
312		.verify_event(pdu, Some(&room_version_id))
313		.await
314		.map_err(|e| {
315			err!(Request(InvalidParam("Invite rescission signature is invalid: {e}")))
316		})?;
317
318	let count = self.services.globals.next_count();
319	self.services
320		.state_cache
321		.update_membership(
322			room_id,
323			&target,
324			RoomMemberEventContent::new(MembershipState::Leave),
325			&sender,
326			None,
327			None,
328			false,
329			PduCount::Normal(*count),
330		)
331		.await?;
332
333	debug!(%room_id, %target, %sender, "Applied a federated invite rescission.");
334
335	Ok(true)
336}
337
338/// Upgrade an incoming PDU's previous events, walking interior events after
339/// their parents so each derives state locally instead of refetching it.
340#[implement(super::Service)]
341#[expect(clippy::too_many_arguments)]
342async fn handle_prev_events(
343	&self,
344	origin: &ServerName,
345	room_id: &RoomId,
346	event_id: &EventId,
347	sorted_prev_events: Vec<OwnedEventId>,
348	mut eventid_info: HashMap<OwnedEventId, (PduEvent, CanonicalJsonObject)>,
349	room_version: &RoomVersionId,
350	recursion_level: usize,
351	first_ts_in_room: MilliSecondsSinceUnixEpoch,
352	create_event_id: &EventId,
353) -> Result<()> {
354	trace!(
355		events = sorted_prev_events.len(),
356		event_ids = ?sorted_prev_events,
357		"Handling previous events"
358	);
359
360	let (interior, extremities): (PrevSplit, PrevSplit) = sorted_prev_events
361		.into_iter()
362		.partition(|prev_id| {
363			eventid_info.get(prev_id).is_some_and(|(pdu, _)| {
364				pdu.prev_events()
365					.any(|prev| eventid_info.contains_key(prev))
366			})
367		});
368
369	extremities
370		.into_iter()
371		.try_stream()
372		.map_ok(|prev_id| (eventid_info.remove(&prev_id), prev_id))
373		.widen_and_then(MAX_PREV_EVENTS, async |(info, prev_id)| {
374			self.upgrade_prev_event(
375				origin,
376				room_id,
377				event_id,
378				info,
379				room_version,
380				recursion_level,
381				first_ts_in_room,
382				prev_id,
383				create_event_id,
384			)
385			.await
386		})
387		.try_collect::<PrevResultsHandled>()
388		.boxed()
389		.await?;
390
391	// Walk interior events forward so each parent commits before its children.
392	interior
393		.into_iter()
394		.try_stream()
395		.map_ok(|prev_id| (eventid_info.remove(&prev_id), prev_id))
396		.try_for_each(async |(info, prev_id)| {
397			self.upgrade_prev_event(
398				origin,
399				room_id,
400				event_id,
401				info,
402				room_version,
403				recursion_level,
404				first_ts_in_room,
405				prev_id,
406				create_event_id,
407			)
408			.await?;
409
410			Ok(())
411		})
412		.boxed()
413		.await
414}
415
416/// Upgrade one previous event, folding a transient failure into a no-op so a
417/// single bad prev does not abort the batch; a shutdown still propagates.
418#[implement(super::Service)]
419#[expect(clippy::too_many_arguments)]
420async fn upgrade_prev_event(
421	&self,
422	origin: &ServerName,
423	room_id: &RoomId,
424	event_id: &EventId,
425	info: Option<(PduEvent, CanonicalJsonObject)>,
426	room_version: &RoomVersionId,
427	recursion_level: usize,
428	first_ts_in_room: MilliSecondsSinceUnixEpoch,
429	prev_id: OwnedEventId,
430	create_event_id: &EventId,
431) -> Result<PrevHandled> {
432	self.services.server.check_running()?;
433	match self
434		.handle_prev_pdu(
435			origin,
436			room_id,
437			event_id,
438			info,
439			room_version,
440			recursion_level,
441			first_ts_in_room,
442			&prev_id,
443			create_event_id,
444		)
445		.await
446	{
447		| Ok(handled) => {
448			if handled.is_some() {
449				self.record_success(Context::Upgrade, &prev_id)
450					.await;
451				debug!(?prev_id, ?handled, "Prev event processed.");
452			} else {
453				debug_warn!(?prev_id, "Prev event not processed.");
454			}
455
456			Ok((prev_id, handled))
457		},
458		| Err(e) => {
459			self.record_outcome(Context::Upgrade, &prev_id, Disposition::Transient);
460			warn!(?prev_id, ?event_id, ?room_id, "Prev event processing failed: {e}");
461
462			Ok((prev_id, None))
463		},
464	}
465}