tuwunel_api/client/sync/v5/
selector.rs1use std::cmp::Ordering;
2
3use futures::{
4 FutureExt, StreamExt, TryFutureExt,
5 future::{join, join3},
6};
7use ruma::{OwnedRoomId, UInt, events::room::member::MembershipState, uint};
8use tuwunel_core::{
9 apply, is_true,
10 matrix::PduCount,
11 trace,
12 utils::{
13 BoolExt, ReadyExt, TryFutureExtExt,
14 math::usize_from_ruma,
15 option::OptionExt,
16 stream::{BroadbandExt, IterStream},
17 },
18};
19use tuwunel_service::sync::Connection;
20
21use super::{
22 ListIds, ResponseLists, SyncInfo, Window, WindowRoom,
23 filter::{filter_room, filter_room_meta},
24};
25
26#[tracing::instrument(level = "debug", skip_all)]
27pub(super) async fn selector(
28 conn: &mut Connection,
29 sync_info: SyncInfo<'_>,
30) -> (Window, ResponseLists) {
31 use MembershipState::*;
32
33 let SyncInfo { services, sender_user, .. } = sync_info;
34
35 let invites_blocked = services.users.invites_blocked(sender_user).await;
38
39 let actives = services
40 .state_cache
41 .user_memberships(sender_user, Some(&[Join, Invite, Knock]))
42 .ready_filter(move |(m, _)| !invites_blocked || !matches!(m, Invite))
43 .map(|(membership, room_id)| (room_id.to_owned(), Some(membership)));
44
45 let retracted = conn
47 .rooms
48 .keys()
49 .stream()
50 .broad_filter_map(async |room_id| {
51 services
52 .state_cache
53 .is_left(sender_user, room_id)
54 .await
55 .then_some((room_id.clone(), Some(Leave)))
56 });
57
58 let mut rooms = actives
59 .chain(retracted)
60 .broad_filter_map(|(room_id, membership)| matcher(sync_info, conn, room_id, membership))
61 .collect::<Vec<_>>()
62 .await;
63
64 rooms.sort_by(room_sort);
65 rooms
66 .iter_mut()
67 .enumerate()
68 .for_each(|(i, room)| {
69 room.ranked = i;
70
71 conn.rooms
72 .entry(room.room_id.clone())
73 .or_default();
74 });
75
76 trace!(?rooms);
77 let lists = response_lists(rooms.iter());
78
79 trace!(?lists);
80 let window = window(sync_info, conn, rooms.iter(), &lists, invites_blocked).await;
81
82 trace!(?window);
83 (window, lists)
84}
85
86#[tracing::instrument(
87 name = "matcher",
88 level = "trace",
89 skip_all,
90 fields(?room_id, ?membership)
91)]
92async fn matcher(
93 sync_info: SyncInfo<'_>,
94 conn: &Connection,
95 room_id: OwnedRoomId,
96 membership: Option<MembershipState>,
97) -> Option<WindowRoom> {
98 let SyncInfo { services, sender_user, .. } = sync_info;
99
100 let (matched, lists) = conn
101 .lists
102 .iter()
103 .stream()
104 .filter_map(async |(id, list)| {
105 list.filters
106 .clone()
107 .map_async(async |filters| {
108 filter_room(sync_info, &filters, &room_id, membership.as_ref()).await
109 })
110 .await
111 .is_none_or(is_true!())
112 .then(|| id.clone())
113 })
114 .collect::<ListIds>()
115 .map(|lists| (lists.is_empty().is_false(), lists))
116 .await;
117
118 let last_notification = matched.then_async(|| {
119 services
120 .pusher
121 .last_notification_read(sender_user, &room_id)
122 .unwrap_or_default()
123 });
124
125 let last_privateread = matched.then_async(|| {
126 services
127 .read_receipt
128 .last_privateread_update(sender_user, &room_id)
129 });
130
131 let last_receipt = matched.then_async(|| {
132 services
133 .read_receipt
134 .last_receipt_count(&room_id, sender_user.into(), None)
135 .unwrap_or_default()
136 });
137
138 let last_account = matched.then_async(|| {
139 services
140 .account_data
141 .last_count(Some(room_id.as_ref()), sender_user, Some(conn.next_batch))
142 .unwrap_or_default()
143 });
144
145 let last_timeline = matched.then_async(|| {
146 services
147 .timeline
148 .last_timeline_count(None, &room_id, Some(conn.next_batch.into()))
149 .map_ok(PduCount::into_unsigned)
150 .unwrap_or_default()
151 });
152
153 let last_membership = matched.then_async(async || match &membership {
154 | Some(MembershipState::Invite) => services
155 .state_cache
156 .get_invite_count(&room_id, sender_user)
157 .await
158 .unwrap_or_default(),
159 | Some(MembershipState::Leave | MembershipState::Ban) => services
160 .state_cache
161 .get_left_count(&room_id, sender_user)
162 .await
163 .unwrap_or_default(),
164 | _ => 0,
165 });
166
167 let (
168 (last_timeline, last_notification, last_account),
169 (last_receipt, last_privateread, last_membership),
170 ) = join(
171 join3(last_timeline, last_notification, last_account),
172 join3(last_receipt, last_privateread, last_membership),
173 )
174 .await;
175
176 let last_count = match &membership {
179 | Some(MembershipState::Leave | MembershipState::Ban) => last_membership
180 .filter(|count| conn.next_batch.ge(count))
181 .unwrap_or_default(),
182 | _ => [
183 last_timeline,
184 last_notification,
185 last_account,
186 last_receipt,
187 last_privateread,
188 last_membership,
189 ]
190 .into_iter()
191 .map(Option::unwrap_or_default)
192 .filter(|count| conn.next_batch.ge(count))
193 .max()
194 .unwrap_or_default(),
195 };
196
197 Some(WindowRoom {
198 room_id: room_id.clone(),
199 membership,
200 lists,
201 ranked: 0,
202 last_count,
203 })
204}
205
206#[tracing::instrument(
207 level = "debug",
208 skip_all,
209 fields(rooms = rooms.clone().count())
210)]
211async fn window<'a, Rooms>(
212 sync_info: SyncInfo<'_>,
213 conn: &Connection,
214 rooms: Rooms,
215 lists: &ResponseLists,
216 invites_blocked: bool,
217) -> Window
218where
219 Rooms: Iterator<Item = &'a WindowRoom> + Clone + Send + Sync,
220{
221 static FULL_RANGE: (UInt, UInt) = (UInt::MIN, UInt::MAX);
222
223 let SyncInfo { services, sender_user, .. } = sync_info;
224
225 let selections = lists
226 .keys()
227 .cloned()
228 .filter_map(|id| conn.lists.get(&id).map(|list| (id, list)))
229 .flat_map(|(id, list)| {
230 let full_range = list
231 .ranges
232 .is_empty()
233 .then_some(&FULL_RANGE)
234 .into_iter();
235
236 list.ranges
237 .iter()
238 .chain(full_range)
239 .map(apply!(2, usize_from_ruma))
240 .map(move |range| (id.clone(), range))
241 })
242 .flat_map(|(id, (start, end))| {
243 rooms
244 .clone()
245 .filter(move |&room| room.lists.contains(&id))
246 .filter(|&room| {
247 conn.rooms
248 .get(&room.room_id)
249 .is_some_and(|conn_room| {
250 conn_room.roomsince == 0 || room.last_count > conn_room.roomsince
251 })
252 })
253 .enumerate()
254 .skip_while(move |&(i, _)| i < start)
255 .take(end.saturating_add(1).saturating_sub(start))
256 .map(|(_, room)| (room.room_id.clone(), room.clone()))
257 })
258 .stream();
259
260 let subscriptions = conn
261 .subscriptions
262 .iter()
263 .stream()
264 .broad_filter_map(async |(room_id, _)| {
265 let membership = services
266 .state_cache
267 .user_membership(sender_user, room_id);
268
269 let filter = filter_room_meta(sync_info, room_id);
270
271 let (membership, filter) = join(membership, filter).await;
272
273 let suppress = invites_blocked && matches!(membership, Some(MembershipState::Invite));
275
276 (filter && !suppress).then(|| WindowRoom {
277 room_id: room_id.clone(),
278 lists: Default::default(),
279 ranked: usize::MAX,
280 last_count: 0,
281 membership,
282 })
283 })
284 .map(|room| (room.room_id.clone(), room));
285
286 subscriptions.chain(selections).collect().await
287}
288
289fn response_lists<'a, Rooms>(rooms: Rooms) -> ResponseLists
290where
291 Rooms: Iterator<Item = &'a WindowRoom>,
292{
293 rooms
294 .filter(|room| {
295 !matches!(room.membership, Some(MembershipState::Leave | MembershipState::Ban))
296 })
297 .flat_map(|room| room.lists.iter())
298 .fold(ResponseLists::default(), |mut lists, id| {
299 let list = lists.entry(id.clone()).or_default();
300 list.count = list
301 .count
302 .checked_add(uint!(1))
303 .expect("list count must not overflow JsInt");
304
305 lists
306 })
307}
308
309fn room_sort(a: &WindowRoom, b: &WindowRoom) -> Ordering { b.last_count.cmp(&a.last_count) }