tuwunel_api/client/sync/v5/
selector.rs1use std::cmp::Ordering;
2
3use futures::{
4 FutureExt, StreamExt, TryFutureExt,
5 future::{join, join5},
6};
7use ruma::{OwnedRoomId, UInt, events::room::member::MembershipState, uint};
8use tuwunel_core::{
9 apply, is_true,
10 matrix::PduCount,
11 trace,
12 utils::{
13 BoolExt, TryFutureExtExt,
14 math::usize_from_ruma,
15 option::OptionExt,
16 stream::{BroadbandExt, IterStream},
17 },
18};
19use tuwunel_service::sync::Connection;
20
21use super::{
22 ListIds, ResponseLists, SyncInfo, Window, WindowRoom,
23 filter::{filter_room, filter_room_meta},
24};
25
26#[tracing::instrument(level = "debug", skip_all)]
27pub(super) async fn selector(
28 conn: &mut Connection,
29 sync_info: SyncInfo<'_>,
30) -> (Window, ResponseLists) {
31 use MembershipState::*;
32
33 let SyncInfo { services, sender_user, .. } = sync_info;
34
35 let mut rooms = services
36 .state_cache
37 .user_memberships(sender_user, Some(&[Join, Invite, Knock]))
38 .map(|(membership, room_id)| (room_id.to_owned(), Some(membership)))
39 .broad_filter_map(|(room_id, membership)| matcher(sync_info, conn, room_id, membership))
40 .collect::<Vec<_>>()
41 .await;
42
43 rooms.sort_by(room_sort);
44 rooms
45 .iter_mut()
46 .enumerate()
47 .for_each(|(i, room)| {
48 room.ranked = i;
49
50 conn.rooms
51 .entry(room.room_id.clone())
52 .or_default();
53 });
54
55 trace!(?rooms);
56 let lists = response_lists(rooms.iter());
57
58 trace!(?lists);
59 let window = window(sync_info, conn, rooms.iter(), &lists).await;
60
61 trace!(?window);
62 (window, lists)
63}
64
65#[tracing::instrument(
66 name = "matcher",
67 level = "trace",
68 skip_all,
69 fields(?room_id, ?membership)
70)]
71async fn matcher(
72 sync_info: SyncInfo<'_>,
73 conn: &Connection,
74 room_id: OwnedRoomId,
75 membership: Option<MembershipState>,
76) -> Option<WindowRoom> {
77 let SyncInfo { services, sender_user, .. } = sync_info;
78
79 let (matched, lists) = conn
80 .lists
81 .iter()
82 .stream()
83 .filter_map(async |(id, list)| {
84 list.filters
85 .clone()
86 .map_async(async |filters| {
87 filter_room(sync_info, &filters, &room_id, membership.as_ref()).await
88 })
89 .await
90 .is_none_or(is_true!())
91 .then(|| id.clone())
92 })
93 .collect::<ListIds>()
94 .map(|lists| (lists.is_empty().is_false(), lists))
95 .await;
96
97 let last_notification = matched.then_async(|| {
98 services
99 .pusher
100 .last_notification_read(sender_user, &room_id)
101 .unwrap_or_default()
102 });
103
104 let last_privateread = matched.then_async(|| {
105 services
106 .read_receipt
107 .last_privateread_update(sender_user, &room_id)
108 });
109
110 let last_receipt = matched.then_async(|| {
111 services
112 .read_receipt
113 .last_receipt_count(&room_id, sender_user.into(), None)
114 .unwrap_or_default()
115 });
116
117 let last_account = matched.then_async(|| {
118 services
119 .account_data
120 .last_count(Some(room_id.as_ref()), sender_user, Some(conn.next_batch))
121 .unwrap_or_default()
122 });
123
124 let last_timeline = matched.then_async(|| {
125 services
126 .timeline
127 .last_timeline_count(None, &room_id, Some(conn.next_batch.into()))
128 .map_ok(PduCount::into_unsigned)
129 .unwrap_or_default()
130 });
131
132 let (last_timeline, last_notification, last_account, last_receipt, last_privateread) =
133 join5(last_timeline, last_notification, last_account, last_receipt, last_privateread)
134 .await;
135
136 Some(WindowRoom {
137 room_id: room_id.clone(),
138 membership,
139 lists,
140 ranked: 0,
141 last_count: [
142 last_timeline,
143 last_notification,
144 last_account,
145 last_receipt,
146 last_privateread,
147 ]
148 .into_iter()
149 .map(Option::unwrap_or_default)
150 .filter(|count| conn.next_batch.ge(count))
151 .max()
152 .unwrap_or_default(),
153 })
154}
155
156#[tracing::instrument(
157 level = "debug",
158 skip_all,
159 fields(rooms = rooms.clone().count())
160)]
161async fn window<'a, Rooms>(
162 sync_info: SyncInfo<'_>,
163 conn: &Connection,
164 rooms: Rooms,
165 lists: &ResponseLists,
166) -> Window
167where
168 Rooms: Iterator<Item = &'a WindowRoom> + Clone + Send + Sync,
169{
170 static FULL_RANGE: (UInt, UInt) = (UInt::MIN, UInt::MAX);
171
172 let SyncInfo { services, sender_user, .. } = sync_info;
173
174 let selections = lists
175 .keys()
176 .cloned()
177 .filter_map(|id| conn.lists.get(&id).map(|list| (id, list)))
178 .flat_map(|(id, list)| {
179 let full_range = list
180 .ranges
181 .is_empty()
182 .then_some(&FULL_RANGE)
183 .into_iter();
184
185 list.ranges
186 .iter()
187 .chain(full_range)
188 .map(apply!(2, usize_from_ruma))
189 .map(move |range| (id.clone(), range))
190 })
191 .flat_map(|(id, (start, end))| {
192 rooms
193 .clone()
194 .filter(move |&room| room.lists.contains(&id))
195 .filter(|&room| {
196 conn.rooms
197 .get(&room.room_id)
198 .is_some_and(|conn_room| {
199 conn_room.roomsince == 0 || room.last_count > conn_room.roomsince
200 })
201 })
202 .enumerate()
203 .skip_while(move |&(i, _)| i < start)
204 .take(end.saturating_add(1).saturating_sub(start))
205 .map(|(_, room)| (room.room_id.clone(), room.clone()))
206 })
207 .stream();
208
209 let subscriptions = conn
210 .subscriptions
211 .iter()
212 .stream()
213 .broad_filter_map(async |(room_id, _)| {
214 let membership = services
215 .state_cache
216 .user_membership(sender_user, room_id);
217
218 let filter = filter_room_meta(sync_info, room_id);
219
220 let (membership, filter) = join(membership, filter).await;
221
222 filter.then(|| WindowRoom {
223 room_id: room_id.clone(),
224 lists: Default::default(),
225 ranked: usize::MAX,
226 last_count: 0,
227 membership,
228 })
229 })
230 .map(|room| (room.room_id.clone(), room));
231
232 subscriptions.chain(selections).collect().await
233}
234
235fn response_lists<'a, Rooms>(rooms: Rooms) -> ResponseLists
236where
237 Rooms: Iterator<Item = &'a WindowRoom>,
238{
239 rooms
240 .flat_map(|room| room.lists.iter())
241 .fold(ResponseLists::default(), |mut lists, id| {
242 let list = lists.entry(id.clone()).or_default();
243 list.count = list
244 .count
245 .checked_add(uint!(1))
246 .expect("list count must not overflow JsInt");
247
248 lists
249 })
250}
251
252fn room_sort(a: &WindowRoom, b: &WindowRoom) -> Ordering { b.last_count.cmp(&a.last_count) }