Skip to main content

tuwunel_api/client/sync/v5/
extensions.rs

1mod account_data;
2mod e2ee;
3mod receipts;
4mod to_device;
5mod typing;
6
7use std::fmt::Debug;
8
9use futures::{FutureExt, future::join5};
10use ruma::{
11	RoomId,
12	api::client::sync::sync_events::v5::{ListId, request::ExtensionRoomConfig, response},
13};
14use tuwunel_core::{Result, apply, at, extract_variant, utils::BoolExt};
15use tuwunel_service::sync::Connection;
16
17use super::{SyncInfo, Window, share_encrypted_room};
18
19#[tracing::instrument(
20	name = "extensions",
21	level = "debug",
22	skip_all,
23	fields(
24		next_batch = conn.next_batch,
25		window = window.len(),
26		rooms = conn.rooms.len(),
27		subs = conn.subscriptions.len(),
28	)
29)]
30pub(super) async fn handle(
31	sync_info: SyncInfo<'_>,
32	conn: &Connection,
33	window: &Window,
34) -> Result<response::Extensions> {
35	let SyncInfo { .. } = sync_info;
36
37	let account_data = conn
38		.extensions
39		.account_data
40		.enabled
41		.unwrap_or(false)
42		.then_async(|| account_data::collect(sync_info, conn, window));
43
44	let receipts = conn
45		.extensions
46		.receipts
47		.enabled
48		.unwrap_or(false)
49		.then_async(|| receipts::collect(sync_info, conn, window));
50
51	let typing = conn
52		.extensions
53		.typing
54		.enabled
55		.unwrap_or(false)
56		.then_async(|| typing::collect(sync_info, conn, window));
57
58	let to_device = conn
59		.extensions
60		.to_device
61		.enabled
62		.unwrap_or(false)
63		.then_async(|| to_device::collect(sync_info, conn));
64
65	let e2ee = conn
66		.extensions
67		.e2ee
68		.enabled
69		.unwrap_or(false)
70		.then_async(|| e2ee::collect(sync_info, conn));
71
72	let (account_data, receipts, typing, to_device, e2ee) =
73		join5(account_data, receipts, typing, to_device, e2ee)
74			.map(apply!(5, |t: Option<_>| t.unwrap_or(Ok(Default::default()))))
75			.await;
76
77	Ok(response::Extensions {
78		account_data: account_data?,
79		receipts: receipts?,
80		typing: typing?,
81		to_device: to_device?,
82		e2ee: e2ee?,
83	})
84}
85
86#[tracing::instrument(
87	name = "selector",
88	level = "trace",
89	skip_all,
90	fields(?implicit, ?explicit),
91)]
92fn selector<'a, ListIter, SubsIter>(
93	SyncInfo { .. }: SyncInfo<'a>,
94	conn: &'a Connection,
95	window: &'a Window,
96	implicit: Option<ListIter>,
97	explicit: Option<SubsIter>,
98) -> impl Iterator<Item = &'a RoomId> + Send + Sync + 'a
99where
100	ListIter: Iterator<Item = &'a ListId> + Clone + Debug + Send + Sync + 'a,
101	SubsIter: Iterator<Item = &'a ExtensionRoomConfig> + Clone + Debug + Send + Sync + 'a,
102{
103	let has_all_subscribed = explicit
104		.clone()
105		.into_iter()
106		.flatten()
107		.any(|erc| matches!(erc, ExtensionRoomConfig::AllSubscribed));
108
109	let all_subscribed = has_all_subscribed
110		.then(|| conn.subscriptions.keys())
111		.into_iter()
112		.flatten()
113		.map(AsRef::as_ref);
114
115	let rooms_explicit = has_all_subscribed
116		.is_false()
117		.then(move || {
118			explicit
119				.into_iter()
120				.flatten()
121				.filter_map(|erc| extract_variant!(erc, ExtensionRoomConfig::Room))
122				.map(AsRef::as_ref)
123		})
124		.into_iter()
125		.flatten();
126
127	let rooms_selected = window
128		.iter()
129		.filter(move |(_, room)| {
130			implicit.as_ref().is_none_or(|lists| {
131				lists
132					.clone()
133					.any(|list| room.lists.contains(list))
134			})
135		})
136		.map(at!(0))
137		.map(AsRef::as_ref);
138
139	all_subscribed
140		.chain(rooms_explicit)
141		.chain(rooms_selected)
142}