Skip to main content

tuwunel_api/client/
events.rs

1use std::iter::once;
2
3use axum::extract::State;
4use futures::StreamExt;
5use ruma::api::client::peeking::listen_to_new_events::v3::{Request, Response};
6use tokio::time::{Duration, Instant, timeout_at};
7use tuwunel_core::{
8	Err, Event, Result, at,
9	matrix::PduCount,
10	utils::{
11		BoolExt,
12		result::FlatOk,
13		stream::{IterStream, ReadyExt},
14	},
15};
16
17use crate::Ruma;
18
19const EVENT_LIMIT: usize = 50;
20
21/// GET `/_matrix/client/v3/events`
22pub(crate) async fn events_route(
23	State(services): State<crate::State>,
24	body: Ruma<Request>,
25) -> Result<Response> {
26	let sender_user = body.sender_user();
27
28	let from = body
29		.body
30		.from
31		.as_deref()
32		.map(str::parse)
33		.flat_ok()
34		.unwrap_or_default();
35
36	let timeout = body
37		.body
38		.timeout
39		.as_ref()
40		.map(Duration::as_millis)
41		.map(TryInto::try_into)
42		.flat_ok()
43		.unwrap_or(services.config.client_sync_timeout_default)
44		.max(services.config.client_sync_timeout_min)
45		.min(services.config.client_sync_timeout_max);
46
47	let room_id = body.room_id.as_ref();
48
49	if !services
50		.state_accessor
51		.user_can_see_state_events(sender_user, room_id)
52		.await
53	{
54		return Err!(Request(Forbidden("No room preview available.")));
55	}
56
57	let stop_at = Instant::now()
58		.checked_add(Duration::from_millis(timeout))
59		.expect("configuration must limit maximum timeout");
60
61	loop {
62		let watchers = services
63			.sync
64			.watch(sender_user, body.sender_device.as_deref(), once(room_id).stream())
65			.await;
66
67		let next_batch = services.globals.wait_pending().await?;
68
69		let events = services
70			.timeline
71			.pdus(Some(sender_user), room_id, Some(PduCount::Normal(from)))
72			.ready_filter_map(Result::ok)
73			.ready_take_while(|(count, _)| PduCount::Normal(next_batch).ge(count))
74			.take(EVENT_LIMIT)
75			.collect::<Vec<_>>()
76			.await;
77
78		if !events.is_empty() {
79			return Ok(Response {
80				start: events
81					.first()
82					.map(at!(0))
83					.as_ref()
84					.map(ToString::to_string),
85
86				end: events
87					.last()
88					.map(at!(0))
89					.as_ref()
90					.map(ToString::to_string),
91
92				chunk: events
93					.into_iter()
94					.map(at!(1))
95					.map(Event::into_format)
96					.collect(),
97			});
98		}
99
100		if timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() {
101			return Ok(Response {
102				chunk: Default::default(),
103				start: body.body.from,
104				end: services
105					.server
106					.is_stopping()
107					.is_false()
108					.then_some(next_batch)
109					.as_ref()
110					.map(ToString::to_string),
111			});
112		}
113	}
114}