1use std::collections::HashMap;
2
3use futures::{FutureExt, TryStreamExt, future::try_join5};
4use ruma::{
5 CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
6 RoomId, RoomVersionId, ServerName, UserId,
7 events::{
8 AnyStrippedStateEvent, StateEventType,
9 room::member::{MembershipState, RoomMemberEventContent},
10 },
11};
12use tuwunel_core::{
13 Err, Result, debug,
14 debug::INFO_SPAN_LEVEL,
15 debug_warn, err, implement,
16 matrix::{Event, PduCount, PduEvent, pdu::MAX_PREV_EVENTS, room_version::from_create_event},
17 smallvec::SmallVec,
18 trace,
19 utils::{
20 BoolExt,
21 stream::{IterStream, TryWidebandExt},
22 },
23 warn,
24};
25
26use super::backoff::{Context, Disposition};
27use crate::rooms::timeline::RawPduId;
28
29type PrevResultsHandled = SmallVec<[PrevHandled; MAX_PREV_EVENTS]>;
30type PrevHandled = (OwnedEventId, Handled);
31type PrevSplit = SmallVec<[OwnedEventId; MAX_PREV_EVENTS]>;
32
33type Handled = Option<(RawPduId, bool)>;
34
35#[implement(super::Service)]
63#[tracing::instrument(
64 name = "pdu",
65 level = INFO_SPAN_LEVEL,
66 skip_all,
67 fields(%room_id, %event_id),
68 ret(level = "debug"),
69)]
70pub async fn handle_incoming_pdu<'a>(
71 &'a self,
72 origin: &'a ServerName,
73 room_id: &'a RoomId,
74 event_id: &'a EventId,
75 pdu: CanonicalJsonObject,
76 is_timeline_event: bool,
77) -> Result<Handled> {
78 if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
80 debug!(?pdu_id, "Exists.");
81 return Ok(Some((pdu_id, false)));
82 }
83
84 let meta_exists = self.services.metadata.exists(room_id).map(Ok);
86
87 let is_disabled = self
89 .services
90 .metadata
91 .is_disabled(room_id)
92 .map(Ok);
93
94 let origin_acl_check = self.acl_check(origin, room_id);
96
97 let sender: &UserId = pdu
99 .get("sender")
100 .try_into()
101 .map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?;
102
103 let sender_acl_check = sender
104 .server_name()
105 .ne(origin)
106 .then_async(|| self.acl_check(sender.server_name(), room_id));
107
108 let create_event = self
110 .services
111 .state_accessor
112 .room_state_get(room_id, &StateEventType::RoomCreate, "")
113 .map(|result| Ok(result.ok()));
114
115 let (meta_exists, is_disabled, (), (), create_event) = try_join5(
116 meta_exists,
117 is_disabled,
118 origin_acl_check,
119 sender_acl_check.map(|o| o.unwrap_or(Ok(()))),
120 create_event,
121 )
122 .await?;
123
124 if !meta_exists {
127 return if self
128 .handle_rescinded_invite(room_id, &pdu)
129 .await?
130 {
131 Ok(None)
132 } else {
133 Err!(Request(NotFound("Room is unknown to this server")))
134 };
135 }
136
137 if is_disabled {
138 return Err!(Request(Forbidden("Federation of this room is disabled by this server.")));
139 }
140
141 let create_event =
142 create_event.ok_or_else(|| err!(Request(NotFound("Room is unknown to this server"))))?;
143
144 let room_version = from_create_event(&create_event)?;
145 let recursion_level = 0;
146
147 let (incoming_pdu, pdu) = self
148 .handle_outlier_pdu(origin, room_id, event_id, pdu, &room_version, recursion_level, false)
149 .await?;
150
151 if !is_timeline_event {
153 debug!(
154 kind = ?incoming_pdu.event_type(),
155 "Not a timeline event.",
156 );
157 return Ok(None);
158 }
159
160 let first_ts_in_room = self
162 .services
163 .timeline
164 .first_pdu_in_room(room_id)
165 .await?
166 .origin_server_ts();
167
168 if incoming_pdu.origin_server_ts() < first_ts_in_room {
169 debug!(
170 origin_server_ts = ?incoming_pdu.origin_server_ts(),
171 ?first_ts_in_room,
172 "Skipping old event."
173 );
174 return Ok(None);
175 }
176
177 let (sorted_prev_events, eventid_info) = self
180 .fetch_prev(
181 origin,
182 room_id,
183 event_id,
184 incoming_pdu.prev_events(),
185 &room_version,
186 recursion_level,
187 first_ts_in_room,
188 )
189 .await?;
190
191 self.handle_prev_events(
192 origin,
193 room_id,
194 event_id,
195 sorted_prev_events,
196 eventid_info,
197 &room_version,
198 recursion_level,
199 first_ts_in_room,
200 create_event.event_id(),
201 )
202 .boxed()
203 .await?;
204
205 self.upgrade_outlier_to_timeline_pdu(
207 origin,
208 room_id,
209 incoming_pdu,
210 pdu,
211 &room_version,
212 recursion_level,
213 create_event.event_id(),
214 )
215 .boxed()
216 .await
217}
218
219#[implement(super::Service)]
229#[tracing::instrument(skip_all, level = "debug", fields(%room_id))]
230async fn handle_rescinded_invite(
231 &self,
232 room_id: &RoomId,
233 pdu: &CanonicalJsonObject,
234) -> Result<bool> {
235 if pdu
236 .get("type")
237 .and_then(CanonicalJsonValue::as_str)
238 != Some("m.room.member")
239 {
240 return Ok(false);
241 }
242
243 let Some(target) = pdu
244 .get("state_key")
245 .and_then(CanonicalJsonValue::as_str)
246 .and_then(|state_key| UserId::parse(state_key).ok())
247 else {
248 return Ok(false);
249 };
250
251 let Some(sender) = pdu
252 .get("sender")
253 .and_then(CanonicalJsonValue::as_str)
254 .and_then(|sender| UserId::parse(sender).ok())
255 else {
256 return Ok(false);
257 };
258
259 if sender == target || !self.services.globals.user_is_local(&target) {
260 return Ok(false);
261 }
262
263 let Some(content) = pdu
264 .get("content")
265 .cloned()
266 .map(Into::into)
267 .and_then(|content| serde_json::from_value::<RoomMemberEventContent>(content).ok())
268 else {
269 return Ok(false);
270 };
271
272 if content.membership != MembershipState::Leave {
273 return Ok(false);
274 }
275
276 if self
277 .services
278 .state_cache
279 .user_membership(&target, room_id)
280 .await != Some(MembershipState::Invite)
281 {
282 return Ok(false);
283 }
284
285 let invite_state = self
287 .services
288 .state_cache
289 .invite_state(&target, room_id)
290 .await?;
291
292 let inviter = invite_state
293 .iter()
294 .find_map(|event| match event.deserialize() {
295 | Ok(AnyStrippedStateEvent::RoomMember(member)) if member.state_key == target =>
296 Some(member.sender),
297 | _ => None,
298 });
299
300 if inviter.as_ref() != Some(&sender) {
302 return Ok(false);
303 }
304
305 let Some(room_version_id) = super::room_version_of(&invite_state) else {
306 return Ok(false);
307 };
308
309 self.services
311 .server_keys
312 .verify_event(pdu, Some(&room_version_id))
313 .await
314 .map_err(|e| {
315 err!(Request(InvalidParam("Invite rescission signature is invalid: {e}")))
316 })?;
317
318 let count = self.services.globals.next_count();
319 self.services
320 .state_cache
321 .update_membership(
322 room_id,
323 &target,
324 RoomMemberEventContent::new(MembershipState::Leave),
325 &sender,
326 None,
327 None,
328 false,
329 PduCount::Normal(*count),
330 )
331 .await?;
332
333 debug!(%room_id, %target, %sender, "Applied a federated invite rescission.");
334
335 Ok(true)
336}
337
338#[implement(super::Service)]
341#[expect(clippy::too_many_arguments)]
342async fn handle_prev_events(
343 &self,
344 origin: &ServerName,
345 room_id: &RoomId,
346 event_id: &EventId,
347 sorted_prev_events: Vec<OwnedEventId>,
348 mut eventid_info: HashMap<OwnedEventId, (PduEvent, CanonicalJsonObject)>,
349 room_version: &RoomVersionId,
350 recursion_level: usize,
351 first_ts_in_room: MilliSecondsSinceUnixEpoch,
352 create_event_id: &EventId,
353) -> Result<()> {
354 trace!(
355 events = sorted_prev_events.len(),
356 event_ids = ?sorted_prev_events,
357 "Handling previous events"
358 );
359
360 let (interior, extremities): (PrevSplit, PrevSplit) = sorted_prev_events
361 .into_iter()
362 .partition(|prev_id| {
363 eventid_info.get(prev_id).is_some_and(|(pdu, _)| {
364 pdu.prev_events()
365 .any(|prev| eventid_info.contains_key(prev))
366 })
367 });
368
369 extremities
370 .into_iter()
371 .try_stream()
372 .map_ok(|prev_id| (eventid_info.remove(&prev_id), prev_id))
373 .widen_and_then(MAX_PREV_EVENTS, async |(info, prev_id)| {
374 self.upgrade_prev_event(
375 origin,
376 room_id,
377 event_id,
378 info,
379 room_version,
380 recursion_level,
381 first_ts_in_room,
382 prev_id,
383 create_event_id,
384 )
385 .await
386 })
387 .try_collect::<PrevResultsHandled>()
388 .boxed()
389 .await?;
390
391 interior
393 .into_iter()
394 .try_stream()
395 .map_ok(|prev_id| (eventid_info.remove(&prev_id), prev_id))
396 .try_for_each(async |(info, prev_id)| {
397 self.upgrade_prev_event(
398 origin,
399 room_id,
400 event_id,
401 info,
402 room_version,
403 recursion_level,
404 first_ts_in_room,
405 prev_id,
406 create_event_id,
407 )
408 .await?;
409
410 Ok(())
411 })
412 .boxed()
413 .await
414}
415
416#[implement(super::Service)]
419#[expect(clippy::too_many_arguments)]
420async fn upgrade_prev_event(
421 &self,
422 origin: &ServerName,
423 room_id: &RoomId,
424 event_id: &EventId,
425 info: Option<(PduEvent, CanonicalJsonObject)>,
426 room_version: &RoomVersionId,
427 recursion_level: usize,
428 first_ts_in_room: MilliSecondsSinceUnixEpoch,
429 prev_id: OwnedEventId,
430 create_event_id: &EventId,
431) -> Result<PrevHandled> {
432 self.services.server.check_running()?;
433 match self
434 .handle_prev_pdu(
435 origin,
436 room_id,
437 event_id,
438 info,
439 room_version,
440 recursion_level,
441 first_ts_in_room,
442 &prev_id,
443 create_event_id,
444 )
445 .await
446 {
447 | Ok(handled) => {
448 if handled.is_some() {
449 self.record_success(Context::Upgrade, &prev_id)
450 .await;
451 debug!(?prev_id, ?handled, "Prev event processed.");
452 } else {
453 debug_warn!(?prev_id, "Prev event not processed.");
454 }
455
456 Ok((prev_id, handled))
457 },
458 | Err(e) => {
459 self.record_outcome(Context::Upgrade, &prev_id, Disposition::Transient);
460 warn!(?prev_id, ?event_id, ?room_id, "Prev event processing failed: {e}");
461
462 Ok((prev_id, None))
463 },
464 }
465}