Skip to main content

tuwunel_api/client/sync/v5/extensions/
account_data.rs

1use futures::{StreamExt, future::join};
2use ruma::{api::client::sync::sync_events::v5::response, events::AnyRawAccountDataEvent};
3use tuwunel_core::{
4	Result, extract_variant,
5	utils::{IterStream, ReadyExt, stream::BroadbandExt},
6};
7use tuwunel_service::sync::Room;
8
9use super::{Connection, SyncInfo, Window, selector};
10use crate::client::is_empty_account_data_event;
11
12#[tracing::instrument(name = "account_data", level = "trace", skip_all)]
13pub(super) async fn collect(
14	sync_info: SyncInfo<'_>,
15	conn: &Connection,
16	window: &Window,
17) -> Result<response::AccountData> {
18	let SyncInfo { services, sender_user, .. } = sync_info;
19
20	let implicit = conn
21		.extensions
22		.account_data
23		.lists
24		.as_deref()
25		.map(<[_]>::iter);
26
27	let explicit = conn
28		.extensions
29		.account_data
30		.rooms
31		.as_deref()
32		.map(<[_]>::iter);
33
34	let rooms = selector(sync_info, conn, window, implicit, explicit)
35		.stream()
36		.broad_filter_map(async |room_id| {
37			let &Room { roomsince, .. } = conn.rooms.get(room_id)?;
38			let changes: Vec<_> = services
39				.account_data
40				.changes_since(Some(room_id), sender_user, roomsince, Some(conn.next_batch))
41				.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
42				.ready_filter(move |e| roomsince != 0 || !is_empty_account_data_event(e))
43				.collect()
44				.await;
45
46			changes
47				.is_empty()
48				.eq(&false)
49				.then(move || (room_id.to_owned(), changes))
50		})
51		.collect();
52
53	let globalsince = conn.globalsince;
54	let global = services
55		.account_data
56		.changes_since(None, sender_user, globalsince, Some(conn.next_batch))
57		.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
58		.ready_filter(move |e| globalsince != 0 || !is_empty_account_data_event(e))
59		.collect();
60
61	let (global, rooms) = join(global, rooms).await;
62
63	Ok(response::AccountData { global, rooms })
64}