Skip to main content

tuwunel_service/rooms/event_handler/
handle_prev_pdu.rs

1use std::{ops::Range, time::Duration};
2
3use futures::FutureExt;
4use ruma::{
5	CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName,
6};
7use tuwunel_core::{
8	Err, Result, debug,
9	debug::INFO_SPAN_LEVEL,
10	debug_warn, implement,
11	matrix::{Event, PduEvent, pdu::RawPduId},
12};
13
14#[implement(super::Service)]
15#[expect(clippy::too_many_arguments)]
16#[tracing::instrument(
17	name = "prev",
18	level = INFO_SPAN_LEVEL,
19	skip_all,
20	fields(%prev_id),
21)]
22pub(super) async fn handle_prev_pdu(
23	&self,
24	origin: &ServerName,
25	room_id: &RoomId,
26	event_id: &EventId,
27	eventid_info: Option<(PduEvent, CanonicalJsonObject)>,
28	room_version: &RoomVersionId,
29	recursion_level: usize,
30	first_ts_in_room: MilliSecondsSinceUnixEpoch,
31	prev_id: &EventId,
32	create_event_id: &EventId,
33) -> Result<Option<(RawPduId, bool)>> {
34	// Check for disabled again because it might have changed
35	if self.services.metadata.is_disabled(room_id).await {
36		return Err!(Request(Forbidden(debug_warn!(
37			"Federaton of room {room_id} is currently disabled on this server. Request by \
38			 origin {origin} and event ID {event_id}"
39		))));
40	}
41
42	let Some((pdu, json)) = eventid_info else {
43		debug!(?prev_id, "Missing eventid_info.");
44		return Ok(None);
45	};
46
47	// Skip old events
48	if pdu.origin_server_ts() < first_ts_in_room {
49		debug_warn!(?prev_id, "origin_server_ts older than room");
50		return Ok(None);
51	}
52
53	if self.is_backed_off(prev_id, Range {
54		start: Duration::from_mins(5),
55		end: Duration::from_hours(24),
56	}) {
57		debug!(?prev_id, "Backing off from prev_event");
58		return Ok(None);
59	}
60
61	self.upgrade_outlier_to_timeline_pdu(
62		origin,
63		room_id,
64		pdu,
65		json,
66		room_version,
67		recursion_level,
68		create_event_id,
69	)
70	.boxed()
71	.await
72}