tuwunel_api/client/
events.rs1use std::iter::once;
2
3use axum::extract::State;
4use futures::StreamExt;
5use ruma::api::client::peeking::listen_to_new_events::v3::{Request, Response};
6use tokio::time::{Duration, Instant, timeout_at};
7use tuwunel_core::{
8 Err, Event, Result, at,
9 matrix::PduCount,
10 utils::{
11 BoolExt,
12 result::FlatOk,
13 stream::{IterStream, ReadyExt},
14 },
15};
16
17use crate::Ruma;
18
19const EVENT_LIMIT: usize = 50;
20
21pub(crate) async fn events_route(
23 State(services): State<crate::State>,
24 body: Ruma<Request>,
25) -> Result<Response> {
26 let sender_user = body.sender_user();
27
28 let from = body
29 .body
30 .from
31 .as_deref()
32 .map(str::parse)
33 .flat_ok()
34 .unwrap_or_default();
35
36 let timeout = body
37 .body
38 .timeout
39 .as_ref()
40 .map(Duration::as_millis)
41 .map(TryInto::try_into)
42 .flat_ok()
43 .unwrap_or(services.config.client_sync_timeout_default)
44 .max(services.config.client_sync_timeout_min)
45 .min(services.config.client_sync_timeout_max);
46
47 let room_id = body.room_id.as_ref();
48
49 if !services
50 .state_accessor
51 .user_can_see_state_events(sender_user, room_id)
52 .await
53 {
54 return Err!(Request(Forbidden("No room preview available.")));
55 }
56
57 let stop_at = Instant::now()
58 .checked_add(Duration::from_millis(timeout))
59 .expect("configuration must limit maximum timeout");
60
61 loop {
62 let watchers = services
63 .sync
64 .watch(sender_user, body.sender_device.as_deref(), once(room_id).stream())
65 .await;
66
67 let next_batch = services.globals.wait_pending().await?;
68
69 let events = services
70 .timeline
71 .pdus(Some(sender_user), room_id, Some(PduCount::Normal(from)))
72 .ready_filter_map(Result::ok)
73 .ready_take_while(|(count, _)| PduCount::Normal(next_batch).ge(count))
74 .take(EVENT_LIMIT)
75 .collect::<Vec<_>>()
76 .await;
77
78 if !events.is_empty() {
79 return Ok(Response {
80 start: events
81 .first()
82 .map(at!(0))
83 .as_ref()
84 .map(ToString::to_string),
85
86 end: events
87 .last()
88 .map(at!(0))
89 .as_ref()
90 .map(ToString::to_string),
91
92 chunk: events
93 .into_iter()
94 .map(at!(1))
95 .map(Event::into_format)
96 .collect(),
97 });
98 }
99
100 if timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() {
101 return Ok(Response {
102 chunk: Default::default(),
103 start: body.body.from,
104 end: services
105 .server
106 .is_stopping()
107 .is_false()
108 .then_some(next_batch)
109 .as_ref()
110 .map(ToString::to_string),
111 });
112 }
113 }
114}