1mod extensions;
2mod filter;
3mod rooms;
4mod selector;
5
6use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
7
8use axum::extract::{Extension, State};
9use futures::{
10 FutureExt, TryFutureExt,
11 future::{join, try_join},
12};
13use ruma::{
14 DeviceId, OwnedRoomId, UserId,
15 api::client::sync::sync_events::v5::{ListId, Request, Response, response},
16 events::room::member::MembershipState,
17};
18use tokio::{
19 sync::Notify,
20 time::{Instant, timeout_at},
21};
22use tuwunel_core::{
23 Err, Result, debug,
24 debug::INFO_SPAN_LEVEL,
25 debug_warn,
26 error::inspect_log,
27 smallvec::SmallVec,
28 trace,
29 utils::{TryFutureExtExt, result::FlatOk},
30};
31use tuwunel_service::{
32 Services,
33 sync::{Connection, into_connection_key},
34};
35
36use super::share_encrypted_room;
37use crate::{ClientIp, Ruma};
38
39#[derive(Copy, Clone)]
40struct SyncInfo<'a> {
41 services: &'a Services,
42 sender_user: &'a UserId,
43 sender_device: Option<&'a DeviceId>,
44}
45
46#[derive(Clone, Debug)]
47struct WindowRoom {
48 room_id: OwnedRoomId,
49 membership: Option<MembershipState>,
50 lists: ListIds,
51 ranked: usize,
52 last_count: u64,
53}
54
55type Window = BTreeMap<OwnedRoomId, WindowRoom>;
56type ResponseLists = BTreeMap<ListId, response::List>;
57type ListIds = SmallVec<[ListId; 1]>;
58
59#[tracing::instrument(
70 name = "sync",
71 level = INFO_SPAN_LEVEL,
72 skip_all,
73 fields(
74 user_id = %body.sender_user().localpart(),
75 device_id = %body.sender_device.as_deref().map_or("<no device>", |x| x.as_str()),
76 conn_id = ?body.body.conn_id.clone().unwrap_or_default(),
77 since = ?body.body.pos.clone().unwrap_or_default(),
78 )
79)]
80pub(crate) async fn sync_events_v5_route(
81 Extension(interrupted): Extension<Arc<Notify>>,
82 ClientIp(client): ClientIp,
83 State(ref services): State<crate::State>,
84 body: Ruma<Request>,
85) -> Result<Response> {
86 let sender_user = body.sender_user();
87 let sender_device = body.sender_device.as_deref();
88 let request = &body.body;
89 let since = request
90 .pos
91 .as_ref()
92 .and_then(|string| string.parse().ok())
93 .unwrap_or(0);
94
95 let timeout = request
96 .timeout
97 .as_ref()
98 .map(Duration::as_millis)
99 .map(TryInto::try_into)
100 .flat_ok()
101 .map(|timeout: u64| timeout.min(services.config.client_sync_timeout_max))
102 .unwrap_or(0);
103
104 let conn_key = into_connection_key(sender_user, sender_device, request.conn_id.as_deref());
105 let conn_val = services
106 .sync
107 .load_or_init_connection(&conn_key)
108 .await;
109
110 let conn = conn_val.lock();
111 let ping_presence = services
112 .presence
113 .maybe_ping_presence(sender_user, sender_device, Some(client), &request.set_presence)
114 .inspect_err(inspect_log)
115 .ok();
116
117 let (mut conn, _) = join(conn, ping_presence).await;
118
119 if since != 0 && conn.next_batch == 0 {
120 return Err!(Request(UnknownPos(warn!("Connection lost; restarting sync stream."))));
121 }
122
123 if since == 0 {
124 *conn = Connection::default();
125 conn.store(&services.sync, &conn_key);
126 debug_warn!(?conn_key, "Client cleared cache and reloaded.");
127 }
128
129 let advancing = since == conn.next_batch;
130 let retarding = since != 0 && since <= conn.globalsince;
131 if !advancing && !retarding {
132 return Err!(Request(UnknownPos(warn!(
133 "Requesting unknown or invalid stream position."
134 ))));
135 }
136
137 debug_assert!(
138 advancing || retarding,
139 "Request should either be advancing or replaying the since token."
140 );
141
142 conn.next_batch = services.globals.wait_pending().await?;
144 conn.globalsince = since.min(conn.next_batch);
145 conn.update_cache(request);
146 conn.update_rooms_prologue(retarding.then_some(since));
147
148 let mut response = Response {
149 txn_id: request.txn_id.clone(),
150 lists: Default::default(),
151 pos: Default::default(),
152 rooms: Default::default(),
153 extensions: Default::default(),
154 };
155
156 let stop_at = Instant::now()
157 .checked_add(Duration::from_millis(timeout))
158 .expect("configuration must limit maximum timeout");
159
160 let sync_info = SyncInfo { services, sender_user, sender_device };
161 loop {
162 debug_assert!(
163 conn.globalsince <= conn.next_batch,
164 "since should not be greater than next_batch."
165 );
166
167 let window;
168 let watchers = services
169 .sync
170 .watch(sender_user, sender_device, services.state_cache.rooms_joined(sender_user))
171 .await;
172
173 conn.next_batch = services.globals.wait_pending().await?;
174 (window, response.lists) = selector::selector(&mut conn, sync_info)
175 .boxed()
176 .await;
177
178 if conn.globalsince < conn.next_batch {
179 let rooms = rooms::handle(sync_info, &conn, &window)
180 .map_ok(|response_rooms| response.rooms = response_rooms);
181
182 let extensions = extensions::handle(sync_info, &conn, &window)
183 .map_ok(|response_extensions| response.extensions = response_extensions);
184
185 try_join(rooms, extensions).boxed().await?;
186
187 conn.update_rooms_epilogue(window.keys().map(AsRef::as_ref));
188
189 if !is_empty_response(&response) {
190 response.pos = conn.next_batch.to_string().into();
191 trace!(conn.globalsince, conn.next_batch, "response {response:?}");
192 conn.store(&services.sync, &conn_key);
193 return Ok(response);
194 }
195 }
196
197 let waiter = async || {
198 tokio::select! {
199 () = interrupted.notified() => true,
200 watch = timeout_at(stop_at, watchers) => watch.is_err(),
201 }
202 };
203
204 if timeout == 0 || services.server.is_stopping() || waiter().boxed().await {
205 response.pos = conn.next_batch.to_string().into();
206 trace!(conn.globalsince, conn.next_batch, "empty response {response:?}");
207 conn.store(&services.sync, &conn_key);
208 return Ok(response);
209 }
210
211 debug!(
212 ?timeout,
213 last_since = conn.globalsince,
214 last_batch = conn.next_batch,
215 pend_count = ?services.globals.pending_count(),
216 "notified by watcher"
217 );
218
219 conn.globalsince = conn.next_batch;
220 }
221}
222
223fn is_empty_response(response: &Response) -> bool {
224 response.extensions.is_empty() && response.rooms.is_empty()
225}