Skip to main content

tuwunel_api/client/sync/v5/extensions/
to_device.rs

1use futures::StreamExt;
2use ruma::api::client::sync::sync_events::v5::response;
3use tuwunel_core::{self, Result, at};
4
5use super::{Connection, SyncInfo};
6
7#[tracing::instrument(name = "to_device", level = "trace", skip_all, ret)]
8pub(super) async fn collect(
9	SyncInfo { services, sender_user, sender_device, .. }: SyncInfo<'_>,
10	conn: &Connection,
11) -> Result<Option<response::ToDevice>> {
12	let Some(sender_device) = sender_device else {
13		return Ok(None);
14	};
15
16	// Per MSC3885, the to-device extension carries its own opaque `since` token
17	// (independent of the global `pos`). matrix-rust-sdk persists it separately
18	// from `pos`, so trust the request's claim when present and fall back to
19	// `globalsince` only when the client did not provide one.
20	let since = conn
21		.extensions
22		.to_device
23		.since
24		.as_deref()
25		.and_then(|s| s.parse::<u64>().ok())
26		.unwrap_or(conn.globalsince)
27		.min(conn.next_batch);
28
29	services
30		.users
31		.remove_to_device_events(sender_user, sender_device, since)
32		.await;
33
34	let events: Vec<_> = services
35		.users
36		.get_to_device_events(sender_user, sender_device, Some(since), Some(conn.next_batch))
37		.map(at!(1))
38		.collect()
39		.await;
40
41	let to_device = events
42		.is_empty()
43		.eq(&false)
44		.then(|| response::ToDevice {
45			next_batch: conn.next_batch.to_string().into(),
46			events,
47		});
48
49	Ok(to_device)
50}