tuwunel_api/client/sync/v5/extensions/
account_data.rs1use futures::{StreamExt, future::join};
2use ruma::{api::client::sync::sync_events::v5::response, events::AnyRawAccountDataEvent};
3use tuwunel_core::{
4 Result, extract_variant,
5 utils::{IterStream, ReadyExt, stream::BroadbandExt},
6};
7use tuwunel_service::sync::Room;
8
9use super::{Connection, SyncInfo, Window, selector};
10use crate::client::is_empty_account_data_event;
11
12#[tracing::instrument(name = "account_data", level = "trace", skip_all)]
13pub(super) async fn collect(
14 sync_info: SyncInfo<'_>,
15 conn: &Connection,
16 window: &Window,
17) -> Result<response::AccountData> {
18 let SyncInfo { services, sender_user, .. } = sync_info;
19
20 let implicit = conn
21 .extensions
22 .account_data
23 .lists
24 .as_deref()
25 .map(<[_]>::iter);
26
27 let explicit = conn
28 .extensions
29 .account_data
30 .rooms
31 .as_deref()
32 .map(<[_]>::iter);
33
34 let rooms = selector(sync_info, conn, window, implicit, explicit)
35 .stream()
36 .broad_filter_map(async |room_id| {
37 let &Room { roomsince, .. } = conn.rooms.get(room_id)?;
38 let changes: Vec<_> = services
39 .account_data
40 .changes_since(Some(room_id), sender_user, roomsince, Some(conn.next_batch))
41 .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
42 .ready_filter(move |e| roomsince != 0 || !is_empty_account_data_event(e))
43 .collect()
44 .await;
45
46 changes
47 .is_empty()
48 .eq(&false)
49 .then(move || (room_id.to_owned(), changes))
50 })
51 .collect();
52
53 let globalsince = conn.globalsince;
54 let global = services
55 .account_data
56 .changes_since(None, sender_user, globalsince, Some(conn.next_batch))
57 .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
58 .ready_filter(move |e| globalsince != 0 || !is_empty_account_data_event(e))
59 .collect();
60
61 let (global, rooms) = join(global, rooms).await;
62
63 Ok(response::AccountData { global, rooms })
64}