tuwunel_api/client/sync/v5/
extensions.rs1mod account_data;
2mod e2ee;
3mod receipts;
4mod to_device;
5mod typing;
6
7use std::fmt::Debug;
8
9use futures::{FutureExt, future::join5};
10use ruma::{
11 RoomId,
12 api::client::sync::sync_events::v5::{ListId, request::ExtensionRoomConfig, response},
13};
14use tuwunel_core::{Result, apply, at, extract_variant, utils::BoolExt};
15use tuwunel_service::sync::Connection;
16
17use super::{SyncInfo, Window, share_encrypted_room};
18
19#[tracing::instrument(
20 name = "extensions",
21 level = "debug",
22 skip_all,
23 fields(
24 next_batch = conn.next_batch,
25 window = window.len(),
26 rooms = conn.rooms.len(),
27 subs = conn.subscriptions.len(),
28 )
29)]
30pub(super) async fn handle(
31 sync_info: SyncInfo<'_>,
32 conn: &Connection,
33 window: &Window,
34) -> Result<response::Extensions> {
35 let SyncInfo { .. } = sync_info;
36
37 let account_data = conn
38 .extensions
39 .account_data
40 .enabled
41 .unwrap_or(false)
42 .then_async(|| account_data::collect(sync_info, conn, window));
43
44 let receipts = conn
45 .extensions
46 .receipts
47 .enabled
48 .unwrap_or(false)
49 .then_async(|| receipts::collect(sync_info, conn, window));
50
51 let typing = conn
52 .extensions
53 .typing
54 .enabled
55 .unwrap_or(false)
56 .then_async(|| typing::collect(sync_info, conn, window));
57
58 let to_device = conn
59 .extensions
60 .to_device
61 .enabled
62 .unwrap_or(false)
63 .then_async(|| to_device::collect(sync_info, conn));
64
65 let e2ee = conn
66 .extensions
67 .e2ee
68 .enabled
69 .unwrap_or(false)
70 .then_async(|| e2ee::collect(sync_info, conn));
71
72 let (account_data, receipts, typing, to_device, e2ee) =
73 join5(account_data, receipts, typing, to_device, e2ee)
74 .map(apply!(5, |t: Option<_>| t.unwrap_or(Ok(Default::default()))))
75 .await;
76
77 Ok(response::Extensions {
78 account_data: account_data?,
79 receipts: receipts?,
80 typing: typing?,
81 to_device: to_device?,
82 e2ee: e2ee?,
83 })
84}
85
86#[tracing::instrument(
87 name = "selector",
88 level = "trace",
89 skip_all,
90 fields(?implicit, ?explicit),
91)]
92fn selector<'a, ListIter, SubsIter>(
93 SyncInfo { .. }: SyncInfo<'a>,
94 conn: &'a Connection,
95 window: &'a Window,
96 implicit: Option<ListIter>,
97 explicit: Option<SubsIter>,
98) -> impl Iterator<Item = &'a RoomId> + Send + Sync + 'a
99where
100 ListIter: Iterator<Item = &'a ListId> + Clone + Debug + Send + Sync + 'a,
101 SubsIter: Iterator<Item = &'a ExtensionRoomConfig> + Clone + Debug + Send + Sync + 'a,
102{
103 let has_all_subscribed = explicit
104 .clone()
105 .into_iter()
106 .flatten()
107 .any(|erc| matches!(erc, ExtensionRoomConfig::AllSubscribed));
108
109 let all_subscribed = has_all_subscribed
110 .then(|| conn.subscriptions.keys())
111 .into_iter()
112 .flatten()
113 .map(AsRef::as_ref);
114
115 let rooms_explicit = has_all_subscribed
116 .is_false()
117 .then(move || {
118 explicit
119 .into_iter()
120 .flatten()
121 .filter_map(|erc| extract_variant!(erc, ExtensionRoomConfig::Room))
122 .map(AsRef::as_ref)
123 })
124 .into_iter()
125 .flatten();
126
127 let rooms_selected = window
128 .iter()
129 .filter(move |(_, room)| {
130 implicit.as_ref().is_none_or(|lists| {
131 lists
132 .clone()
133 .any(|list| room.lists.contains(list))
134 })
135 })
136 .map(at!(0))
137 .map(AsRef::as_ref);
138
139 all_subscribed
140 .chain(rooms_explicit)
141 .chain(rooms_selected)
142}