Skip to main content

tuwunel_api/server/
get_missing_events.rs

1use axum::extract::State;
2use ruma::{CanonicalJsonValue, EventId, api::federation::event::get_missing_events};
3use tuwunel_core::{Result, debug};
4
5use super::AccessCheck;
6use crate::Ruma;
7
8/// arbitrary number but synapse's is 20 and we can handle lots of these anyways
9const LIMIT_MAX: usize = 50;
10/// spec says default is 10
11const LIMIT_DEFAULT: usize = 10;
12
13/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
14///
15/// Retrieves events that the sender is missing.
16pub(crate) async fn get_missing_events_route(
17	State(services): State<crate::State>,
18	body: Ruma<get_missing_events::v1::Request>,
19) -> Result<get_missing_events::v1::Response> {
20	AccessCheck {
21		services: &services,
22		origin: body.origin(),
23		room_id: &body.room_id,
24		event_id: None,
25	}
26	.check()
27	.await?;
28
29	let limit = body
30		.limit
31		.try_into()
32		.unwrap_or(LIMIT_DEFAULT)
33		.min(LIMIT_MAX);
34
35	let room_version = services
36		.state
37		.get_room_version(&body.room_id)
38		.await
39		.ok();
40
41	let mut queued_events = body.latest_events.clone();
42	// the vec will never have more entries the limit
43	let mut events = Vec::with_capacity(limit);
44
45	let mut i: usize = 0;
46	while i < queued_events.len() && events.len() < limit {
47		let Ok(event) = services
48			.timeline
49			.get_pdu_json(&queued_events[i])
50			.await
51		else {
52			debug!(
53				?body.origin,
54				event_id = %queued_events[i],
55				"Event does not exist locally, skipping"
56			);
57			i = i.saturating_add(1);
58			continue;
59		};
60
61		if body.earliest_events.contains(&queued_events[i]) {
62			i = i.saturating_add(1);
63			continue;
64		}
65
66		if !services
67			.state_accessor
68			.server_can_see_event(body.origin(), &body.room_id, &queued_events[i])
69			.await
70		{
71			debug!(
72				?body.origin,
73				event_id = %queued_events[i],
74				room_id = %body.room_id,
75				"Server cannot see event, skipping"
76			);
77			i = i.saturating_add(1);
78			continue;
79		}
80
81		let prev_events = event
82			.get("prev_events")
83			.and_then(CanonicalJsonValue::as_array)
84			.into_iter()
85			.flatten()
86			.filter_map(CanonicalJsonValue::as_str)
87			.filter_map(|id| EventId::parse(id).ok());
88
89		queued_events.extend(prev_events);
90
91		let event = services
92			.federation
93			.format_pdu_into(event, room_version.as_ref())
94			.await;
95
96		events.push(event);
97	}
98
99	Ok(get_missing_events::v1::Response { events })
100}