tuwunel_api/client/sync/v5/extensions/
to_device.rs1use futures::StreamExt;
2use ruma::api::client::sync::sync_events::v5::response;
3use tuwunel_core::{self, Result, at};
4
5use super::{Connection, SyncInfo};
6
7#[tracing::instrument(name = "to_device", level = "trace", skip_all, ret)]
8pub(super) async fn collect(
9 SyncInfo { services, sender_user, sender_device, .. }: SyncInfo<'_>,
10 conn: &Connection,
11) -> Result<Option<response::ToDevice>> {
12 let Some(sender_device) = sender_device else {
13 return Ok(None);
14 };
15
16 let since = conn
21 .extensions
22 .to_device
23 .since
24 .as_deref()
25 .and_then(|s| s.parse::<u64>().ok())
26 .unwrap_or(conn.globalsince)
27 .min(conn.next_batch);
28
29 services
30 .users
31 .remove_to_device_events(sender_user, sender_device, since)
32 .await;
33
34 let events: Vec<_> = services
35 .users
36 .get_to_device_events(sender_user, sender_device, Some(since), Some(conn.next_batch))
37 .map(at!(1))
38 .collect()
39 .await;
40
41 let to_device = events
42 .is_empty()
43 .eq(&false)
44 .then(|| response::ToDevice {
45 next_batch: conn.next_batch.to_string().into(),
46 events,
47 });
48
49 Ok(to_device)
50}