tuwunel_service/rooms/event_handler/
handle_prev_pdu.rs1use std::time::Duration;
2
3use futures::FutureExt;
4use ruma::{
5 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName,
6};
7use tuwunel_core::{
8 Err, Result, debug,
9 debug::INFO_SPAN_LEVEL,
10 debug_warn, implement,
11 matrix::{Event, PduEvent, pdu::RawPduId},
12};
13
14use super::backoff::Context;
15
16#[implement(super::Service)]
17#[expect(clippy::too_many_arguments)]
18#[tracing::instrument(
19 name = "prev",
20 level = INFO_SPAN_LEVEL,
21 skip_all,
22 fields(%prev_id),
23)]
24pub(super) async fn handle_prev_pdu(
25 &self,
26 origin: &ServerName,
27 room_id: &RoomId,
28 event_id: &EventId,
29 eventid_info: Option<(PduEvent, CanonicalJsonObject)>,
30 room_version: &RoomVersionId,
31 recursion_level: usize,
32 first_ts_in_room: MilliSecondsSinceUnixEpoch,
33 prev_id: &EventId,
34 create_event_id: &EventId,
35) -> Result<Option<(RawPduId, bool)>> {
36 if self.services.metadata.is_disabled(room_id).await {
38 return Err!(Request(Forbidden(debug_warn!(
39 "Federaton of room {room_id} is currently disabled on this server. Request by \
40 origin {origin} and event ID {event_id}"
41 ))));
42 }
43
44 let Some((pdu, json)) = eventid_info else {
45 debug!(?prev_id, "Missing eventid_info.");
46 return Ok(None);
47 };
48
49 if pdu.origin_server_ts() < first_ts_in_room {
51 debug_warn!(?prev_id, "origin_server_ts older than room");
52 return Ok(None);
53 }
54
55 if self
56 .is_suppressed(
57 Context::Upgrade,
58 prev_id,
59 Duration::from_mins(5)..Duration::from_hours(24),
60 )
61 .await
62 .is_deny()
63 {
64 debug!(?prev_id, "Backing off from prev_event");
65 return Ok(None);
66 }
67
68 self.record_attempt(Context::Upgrade, prev_id);
69
70 self.upgrade_outlier_to_timeline_pdu(
71 origin,
72 room_id,
73 pdu,
74 json,
75 room_version,
76 recursion_level,
77 create_event_id,
78 )
79 .boxed()
80 .await
81}