Skip to main content

tuwunel_service/rooms/event_handler/
handle_incoming_pdu.rs

1use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join5};
2use ruma::{
3	CanonicalJsonObject, EventId, OwnedEventId, RoomId, ServerName, UserId,
4	events::StateEventType,
5};
6use tuwunel_core::{
7	Err, Result, debug,
8	debug::INFO_SPAN_LEVEL,
9	debug_warn, err, implement,
10	matrix::{Event, pdu::MAX_PREV_EVENTS, room_version},
11	smallvec::SmallVec,
12	trace,
13	utils::{
14		BoolExt,
15		stream::{IterStream, TryWidebandExt},
16	},
17	warn,
18};
19
20use crate::rooms::timeline::RawPduId;
21
22type PrevResultsHandled = SmallVec<[PrevHandled; MAX_PREV_EVENTS]>;
23type PrevHandled = (OwnedEventId, Handled);
24
25type PrevResults = SmallVec<[PrevResult; MAX_PREV_EVENTS]>;
26type PrevResult = (OwnedEventId, Result<Handled>);
27
28type Handled = Option<(RawPduId, bool)>;
29
30/// When receiving an event one needs to:
31/// 0. Check the server is in the room
32/// 1. Skip the PDU if we already know about it
33/// 1.1. Remove unsigned field
34/// 2. Check signatures, otherwise drop
35/// 3. Check content hash, redact if doesn't match
36/// 4. Fetch any missing auth events doing all checks listed here starting at 1.
37///    These are not timeline events
38/// 5. Reject "due to auth events" if can't get all the auth events or some of
39///    the auth events are also rejected "due to auth events"
40/// 6. Reject "due to auth events" if the event doesn't pass auth based on the
41///    auth events
42/// 7. Persist this event as an outlier
43/// 8. If not timeline event: stop
44/// 9. Fetch any missing prev events doing all checks listed here starting at 1.
45///    These are timeline events
46/// 10. Fetch missing state and auth chain events by calling `/state_ids` at
47///     backwards extremities doing all the checks in this list starting at
48///     1. These are not timeline events
49/// 11. Check the auth of the event passes based on the state of the event
50/// 12. Ensure that the state is derived from the previous current state (i.e.
51///     we calculated by doing state res where one of the inputs was a
52///     previously trusted set of state, don't just trust a set of state we got
53///     from a remote)
54/// 13. Use state resolution to find new room state
55/// 14. Check if the event passes auth based on the "current state" of the room,
56///     if not soft fail it
57#[implement(super::Service)]
58#[tracing::instrument(
59	name = "pdu",
60	level = INFO_SPAN_LEVEL,
61	skip_all,
62	fields(%room_id, %event_id),
63	ret(level = "debug"),
64)]
65pub async fn handle_incoming_pdu<'a>(
66	&'a self,
67	origin: &'a ServerName,
68	room_id: &'a RoomId,
69	event_id: &'a EventId,
70	pdu: CanonicalJsonObject,
71	is_timeline_event: bool,
72) -> Result<Handled> {
73	// 1. Skip the PDU if we already have it as a timeline event
74	if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
75		debug!(?pdu_id, "Exists.");
76		return Ok(Some((pdu_id, false)));
77	}
78
79	// 1.1 Check the server is in the room
80	let meta_exists = self.services.metadata.exists(room_id).map(Ok);
81
82	// 1.2 Check if the room is disabled
83	let is_disabled = self
84		.services
85		.metadata
86		.is_disabled(room_id)
87		.map(Ok);
88
89	// 1.3.1 Check room ACL on origin field/server
90	let origin_acl_check = self.acl_check(origin, room_id);
91
92	// 1.3.2 Check room ACL on sender's server name
93	let sender: &UserId = pdu
94		.get("sender")
95		.try_into()
96		.map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?;
97
98	let sender_acl_check = sender
99		.server_name()
100		.ne(origin)
101		.then_async(|| self.acl_check(sender.server_name(), room_id));
102
103	// Fetch create event
104	let create_event =
105		self.services
106			.state_accessor
107			.room_state_get(room_id, &StateEventType::RoomCreate, "");
108
109	let (meta_exists, is_disabled, (), (), ref create_event) = try_join5(
110		meta_exists,
111		is_disabled,
112		origin_acl_check,
113		sender_acl_check.map(|o| o.unwrap_or(Ok(()))),
114		create_event,
115	)
116	.await?;
117
118	if !meta_exists {
119		return Err!(Request(NotFound("Room is unknown to this server")));
120	}
121
122	if is_disabled {
123		return Err!(Request(Forbidden("Federation of this room is disabled by this server.")));
124	}
125
126	let room_version = room_version::from_create_event(create_event)?;
127	let recursion_level = 0;
128
129	let (incoming_pdu, pdu) = self
130		.handle_outlier_pdu(origin, room_id, event_id, pdu, &room_version, recursion_level, false)
131		.await?;
132
133	// 8. if not timeline event: stop
134	if !is_timeline_event {
135		debug!(
136			kind = ?incoming_pdu.event_type(),
137			"Not a timeline event.",
138		);
139		return Ok(None);
140	}
141
142	// Skip old events
143	let first_ts_in_room = self
144		.services
145		.timeline
146		.first_pdu_in_room(room_id)
147		.await?
148		.origin_server_ts();
149
150	if incoming_pdu.origin_server_ts() < first_ts_in_room {
151		debug!(
152			origin_server_ts = ?incoming_pdu.origin_server_ts(),
153			?first_ts_in_room,
154			"Skipping old event."
155		);
156		return Ok(None);
157	}
158
159	// 9. Fetch any missing prev events doing all checks listed here starting at 1.
160	//    These are timeline events
161	let (sorted_prev_events, mut eventid_info) = self
162		.fetch_prev(
163			origin,
164			room_id,
165			incoming_pdu.prev_events(),
166			&room_version,
167			recursion_level,
168			first_ts_in_room,
169		)
170		.await?;
171
172	trace!(
173		events = sorted_prev_events.len(),
174		event_ids = ?sorted_prev_events,
175		"Handling previous events"
176	);
177	let _prev_handles: PrevResultsHandled = sorted_prev_events
178		.into_iter()
179		.enumerate()
180		.try_stream()
181		.map_ok(|(i, prev_id)| (i, eventid_info.remove(&prev_id), prev_id))
182		.widen_and_then(MAX_PREV_EVENTS, async |(i, eventid_info, prev_id)| {
183			self.services.server.check_running()?;
184			match self
185				.handle_prev_pdu(
186					origin,
187					room_id,
188					event_id,
189					eventid_info,
190					&room_version,
191					recursion_level,
192					first_ts_in_room,
193					&prev_id,
194					create_event.event_id(),
195				)
196				.await
197			{
198				| Ok(Some(handled)) => {
199					self.cancel_back_off(&prev_id);
200					debug!(?i, ?prev_id, ?handled, "Prev event processed.");
201
202					Ok((prev_id, Ok(Some(handled))))
203				},
204				| Ok(None) => {
205					debug_warn!(?i, ?prev_id, "Prev event not processed.");
206
207					Ok((prev_id, Ok(None)))
208				},
209				| Err(e) => {
210					self.back_off(&prev_id);
211					warn!(?i, ?prev_id, ?event_id, ?room_id, "Prev event processing failed: {e}");
212
213					Ok((prev_id, Err(e)))
214				},
215			}
216		})
217		.try_collect::<PrevResults>()
218		.map_ok(PrevResults::into_iter)
219		.map_ok(IterStream::stream)
220		.map_ok(|s| s.map(|(id, res)| res.map(|res| (id, res))))
221		.try_flatten_stream()
222		.try_collect()
223		.boxed()
224		.await?;
225
226	// Done with prev events, now handling the incoming event
227	self.upgrade_outlier_to_timeline_pdu(
228		origin,
229		room_id,
230		incoming_pdu,
231		pdu,
232		&room_version,
233		recursion_level,
234		create_event.event_id(),
235	)
236	.boxed()
237	.await
238}