Skip to main content

tuwunel_api/server/
get_missing_events.rs

1use axum::extract::State;
2use ruma::api::federation::event::get_missing_events;
3use tuwunel_core::{Result, debug, debug_error, utils::to_canonical_object};
4
5use super::AccessCheck;
6use crate::Ruma;
7
8/// arbitrary number but synapse's is 20 and we can handle lots of these anyways
9const LIMIT_MAX: usize = 50;
10/// spec says default is 10
11const LIMIT_DEFAULT: usize = 10;
12
13/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
14///
15/// Retrieves events that the sender is missing.
16pub(crate) async fn get_missing_events_route(
17	State(services): State<crate::State>,
18	body: Ruma<get_missing_events::v1::Request>,
19) -> Result<get_missing_events::v1::Response> {
20	AccessCheck {
21		services: &services,
22		origin: body.origin(),
23		room_id: &body.room_id,
24		event_id: None,
25	}
26	.check()
27	.await?;
28
29	let limit = body
30		.limit
31		.try_into()
32		.unwrap_or(LIMIT_DEFAULT)
33		.min(LIMIT_MAX);
34
35	let room_version = services
36		.state
37		.get_room_version(&body.room_id)
38		.await
39		.ok();
40
41	let mut queued_events = body.latest_events.clone();
42	// the vec will never have more entries the limit
43	let mut events = Vec::with_capacity(limit);
44
45	let mut i: usize = 0;
46	while i < queued_events.len() && events.len() < limit {
47		let Ok(pdu) = services.timeline.get_pdu(&queued_events[i]).await else {
48			debug!(
49				?body.origin,
50				"Event {} does not exist locally, skipping", &queued_events[i]
51			);
52			i = i.saturating_add(1);
53			continue;
54		};
55
56		if body.earliest_events.contains(&queued_events[i]) {
57			i = i.saturating_add(1);
58			continue;
59		}
60
61		if !services
62			.state_accessor
63			.server_can_see_event(body.origin(), &body.room_id, &queued_events[i])
64			.await
65		{
66			debug!(
67				?body.origin,
68				"Server cannot see {:?} in {:?}, skipping", pdu.event_id, pdu.room_id
69			);
70			i = i.saturating_add(1);
71			continue;
72		}
73
74		let Ok(event) = to_canonical_object(&pdu) else {
75			debug_error!(
76				?body.origin,
77				"Failed to convert PDU in database to canonical JSON: {pdu:?}"
78			);
79			i = i.saturating_add(1);
80			continue;
81		};
82
83		let prev_events = pdu.prev_events.iter().map(ToOwned::to_owned);
84
85		let event = services
86			.federation
87			.format_pdu_into(event, room_version.as_ref())
88			.await;
89
90		queued_events.extend(prev_events);
91		events.push(event);
92	}
93
94	Ok(get_missing_events::v1::Response { events })
95}