tuwunel_service/rooms/event_handler/
handle_prev_pdu.rs1use std::{ops::Range, time::Duration};
2
3use futures::FutureExt;
4use ruma::{
5 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName,
6};
7use tuwunel_core::{
8 Err, Result, debug,
9 debug::INFO_SPAN_LEVEL,
10 debug_warn, implement,
11 matrix::{Event, PduEvent, pdu::RawPduId},
12};
13
14#[implement(super::Service)]
15#[expect(clippy::too_many_arguments)]
16#[tracing::instrument(
17 name = "prev",
18 level = INFO_SPAN_LEVEL,
19 skip_all,
20 fields(%prev_id),
21)]
22pub(super) async fn handle_prev_pdu(
23 &self,
24 origin: &ServerName,
25 room_id: &RoomId,
26 event_id: &EventId,
27 eventid_info: Option<(PduEvent, CanonicalJsonObject)>,
28 room_version: &RoomVersionId,
29 recursion_level: usize,
30 first_ts_in_room: MilliSecondsSinceUnixEpoch,
31 prev_id: &EventId,
32 create_event_id: &EventId,
33) -> Result<Option<(RawPduId, bool)>> {
34 if self.services.metadata.is_disabled(room_id).await {
36 return Err!(Request(Forbidden(debug_warn!(
37 "Federaton of room {room_id} is currently disabled on this server. Request by \
38 origin {origin} and event ID {event_id}"
39 ))));
40 }
41
42 let Some((pdu, json)) = eventid_info else {
43 debug!(?prev_id, "Missing eventid_info.");
44 return Ok(None);
45 };
46
47 if pdu.origin_server_ts() < first_ts_in_room {
49 debug_warn!(?prev_id, "origin_server_ts older than room");
50 return Ok(None);
51 }
52
53 if self.is_backed_off(prev_id, Range {
54 start: Duration::from_mins(5),
55 end: Duration::from_hours(24),
56 }) {
57 debug!(?prev_id, "Backing off from prev_event");
58 return Ok(None);
59 }
60
61 self.upgrade_outlier_to_timeline_pdu(
62 origin,
63 room_id,
64 pdu,
65 json,
66 room_version,
67 recursion_level,
68 create_event_id,
69 )
70 .boxed()
71 .await
72}