Skip to main content

tuwunel_api/client/sync/
v5.rs

1mod extensions;
2mod filter;
3mod rooms;
4mod selector;
5
6use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
7
8use axum::extract::{Extension, State};
9use futures::{
10	FutureExt, TryFutureExt,
11	future::{join, try_join},
12};
13use ruma::{
14	DeviceId, OwnedRoomId, UserId,
15	api::client::sync::sync_events::v5::{ListId, Request, Response, response},
16	events::room::member::MembershipState,
17};
18use tokio::{
19	sync::Notify,
20	time::{Instant, timeout_at},
21};
22use tuwunel_core::{
23	Err, Result, debug,
24	debug::INFO_SPAN_LEVEL,
25	debug_warn,
26	error::inspect_log,
27	smallvec::SmallVec,
28	trace,
29	utils::{TryFutureExtExt, result::FlatOk},
30};
31use tuwunel_service::{
32	Services,
33	sync::{Connection, into_connection_key},
34};
35
36use super::share_encrypted_room;
37use crate::{ClientIp, Ruma};
38
39#[derive(Copy, Clone)]
40struct SyncInfo<'a> {
41	services: &'a Services,
42	sender_user: &'a UserId,
43	sender_device: Option<&'a DeviceId>,
44}
45
46#[derive(Clone, Debug)]
47struct WindowRoom {
48	room_id: OwnedRoomId,
49	membership: Option<MembershipState>,
50	lists: ListIds,
51	ranked: usize,
52	last_count: u64,
53}
54
55type Window = BTreeMap<OwnedRoomId, WindowRoom>;
56type ResponseLists = BTreeMap<ListId, response::List>;
57type ListIds = SmallVec<[ListId; 1]>;
58
59/// `POST /_matrix/client/unstable/org.matrix.simplified_msc3575/sync`
60/// ([MSC4186])
61///
62/// A simplified version of sliding sync ([MSC3575]).
63///
64/// Get all new events in a sliding window of rooms since the last sync or a
65/// given point in time.
66///
67/// [MSC3575]: https://github.com/matrix-org/matrix-spec-proposals/pull/3575
68/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186
69#[tracing::instrument(
70	name = "sync",
71	level = INFO_SPAN_LEVEL,
72	skip_all,
73	fields(
74		user_id = %body.sender_user().localpart(),
75		device_id = %body.sender_device.as_deref().map_or("<no device>", |x| x.as_str()),
76		conn_id = ?body.body.conn_id.clone().unwrap_or_default(),
77		since = ?body.body.pos.clone().unwrap_or_default(),
78	)
79)]
80pub(crate) async fn sync_events_v5_route(
81	Extension(interrupted): Extension<Arc<Notify>>,
82	ClientIp(client): ClientIp,
83	State(ref services): State<crate::State>,
84	body: Ruma<Request>,
85) -> Result<Response> {
86	let sender_user = body.sender_user();
87	let sender_device = body.sender_device.as_deref();
88	let request = &body.body;
89	let since = request
90		.pos
91		.as_ref()
92		.and_then(|string| string.parse().ok())
93		.unwrap_or(0);
94
95	let timeout = request
96		.timeout
97		.as_ref()
98		.map(Duration::as_millis)
99		.map(TryInto::try_into)
100		.flat_ok()
101		.map(|timeout: u64| timeout.min(services.config.client_sync_timeout_max))
102		.unwrap_or(0);
103
104	let conn_key = into_connection_key(sender_user, sender_device, request.conn_id.as_deref());
105	let conn_val = services
106		.sync
107		.load_or_init_connection(&conn_key)
108		.await;
109
110	let conn = conn_val.lock();
111	let ping_presence = services
112		.presence
113		.maybe_ping_presence(sender_user, sender_device, Some(client), &request.set_presence)
114		.inspect_err(inspect_log)
115		.ok();
116
117	let (mut conn, _) = join(conn, ping_presence).await;
118
119	if since != 0 && conn.next_batch == 0 {
120		return Err!(Request(UnknownPos(warn!("Connection lost; restarting sync stream."))));
121	}
122
123	if since == 0 {
124		*conn = Connection::default();
125		conn.store(&services.sync, &conn_key);
126		debug_warn!(?conn_key, "Client cleared cache and reloaded.");
127	}
128
129	let advancing = since == conn.next_batch;
130	let retarding = since != 0 && since <= conn.globalsince;
131	if !advancing && !retarding {
132		return Err!(Request(UnknownPos(warn!(
133			"Requesting unknown or invalid stream position."
134		))));
135	}
136
137	debug_assert!(
138		advancing || retarding,
139		"Request should either be advancing or replaying the since token."
140	);
141
142	// Update parameters regardless of replay or advance
143	conn.next_batch = services.globals.wait_pending().await?;
144	conn.globalsince = since.min(conn.next_batch);
145	conn.update_cache(request);
146	conn.update_rooms_prologue(retarding.then_some(since));
147
148	let mut response = Response {
149		txn_id: request.txn_id.clone(),
150		lists: Default::default(),
151		pos: Default::default(),
152		rooms: Default::default(),
153		extensions: Default::default(),
154	};
155
156	let stop_at = Instant::now()
157		.checked_add(Duration::from_millis(timeout))
158		.expect("configuration must limit maximum timeout");
159
160	let sync_info = SyncInfo { services, sender_user, sender_device };
161	loop {
162		debug_assert!(
163			conn.globalsince <= conn.next_batch,
164			"since should not be greater than next_batch."
165		);
166
167		let window;
168		let watchers = services
169			.sync
170			.watch(sender_user, sender_device, services.state_cache.rooms_joined(sender_user))
171			.await;
172
173		conn.next_batch = services.globals.wait_pending().await?;
174		(window, response.lists) = selector::selector(&mut conn, sync_info)
175			.boxed()
176			.await;
177
178		if conn.globalsince < conn.next_batch {
179			let rooms = rooms::handle(sync_info, &conn, &window)
180				.map_ok(|response_rooms| response.rooms = response_rooms);
181
182			let extensions = extensions::handle(sync_info, &conn, &window)
183				.map_ok(|response_extensions| response.extensions = response_extensions);
184
185			try_join(rooms, extensions).boxed().await?;
186
187			conn.update_rooms_epilogue(window.keys().map(AsRef::as_ref));
188
189			if !is_empty_response(&response) {
190				response.pos = conn.next_batch.to_string().into();
191				trace!(conn.globalsince, conn.next_batch, "response {response:?}");
192				conn.store(&services.sync, &conn_key);
193				return Ok(response);
194			}
195		}
196
197		let waiter = async || {
198			tokio::select! {
199				() = interrupted.notified() => true,
200				watch = timeout_at(stop_at, watchers) => watch.is_err(),
201			}
202		};
203
204		if timeout == 0 || services.server.is_stopping() || waiter().boxed().await {
205			response.pos = conn.next_batch.to_string().into();
206			trace!(conn.globalsince, conn.next_batch, "empty response {response:?}");
207			conn.store(&services.sync, &conn_key);
208			return Ok(response);
209		}
210
211		debug!(
212			?timeout,
213			last_since = conn.globalsince,
214			last_batch = conn.next_batch,
215			pend_count = ?services.globals.pending_count(),
216			"notified by watcher"
217		);
218
219		conn.globalsince = conn.next_batch;
220	}
221}
222
223fn is_empty_response(response: &Response) -> bool {
224	response.extensions.is_empty() && response.rooms.is_empty()
225}