Skip to main content

tuwunel_api/client/sync/v5/
selector.rs

1use std::cmp::Ordering;
2
3use futures::{
4	FutureExt, StreamExt, TryFutureExt,
5	future::{join, join5},
6};
7use ruma::{OwnedRoomId, UInt, events::room::member::MembershipState, uint};
8use tuwunel_core::{
9	apply, is_true,
10	matrix::PduCount,
11	trace,
12	utils::{
13		BoolExt, TryFutureExtExt,
14		math::usize_from_ruma,
15		option::OptionExt,
16		stream::{BroadbandExt, IterStream},
17	},
18};
19use tuwunel_service::sync::Connection;
20
21use super::{
22	ListIds, ResponseLists, SyncInfo, Window, WindowRoom,
23	filter::{filter_room, filter_room_meta},
24};
25
26#[tracing::instrument(level = "debug", skip_all)]
27pub(super) async fn selector(
28	conn: &mut Connection,
29	sync_info: SyncInfo<'_>,
30) -> (Window, ResponseLists) {
31	use MembershipState::*;
32
33	let SyncInfo { services, sender_user, .. } = sync_info;
34
35	let mut rooms = services
36		.state_cache
37		.user_memberships(sender_user, Some(&[Join, Invite, Knock]))
38		.map(|(membership, room_id)| (room_id.to_owned(), Some(membership)))
39		.broad_filter_map(|(room_id, membership)| matcher(sync_info, conn, room_id, membership))
40		.collect::<Vec<_>>()
41		.await;
42
43	rooms.sort_by(room_sort);
44	rooms
45		.iter_mut()
46		.enumerate()
47		.for_each(|(i, room)| {
48			room.ranked = i;
49
50			conn.rooms
51				.entry(room.room_id.clone())
52				.or_default();
53		});
54
55	trace!(?rooms);
56	let lists = response_lists(rooms.iter());
57
58	trace!(?lists);
59	let window = window(sync_info, conn, rooms.iter(), &lists).await;
60
61	trace!(?window);
62	(window, lists)
63}
64
65#[tracing::instrument(
66	name = "matcher",
67	level = "trace",
68	skip_all,
69	fields(?room_id, ?membership)
70)]
71async fn matcher(
72	sync_info: SyncInfo<'_>,
73	conn: &Connection,
74	room_id: OwnedRoomId,
75	membership: Option<MembershipState>,
76) -> Option<WindowRoom> {
77	let SyncInfo { services, sender_user, .. } = sync_info;
78
79	let (matched, lists) = conn
80		.lists
81		.iter()
82		.stream()
83		.filter_map(async |(id, list)| {
84			list.filters
85				.clone()
86				.map_async(async |filters| {
87					filter_room(sync_info, &filters, &room_id, membership.as_ref()).await
88				})
89				.await
90				.is_none_or(is_true!())
91				.then(|| id.clone())
92		})
93		.collect::<ListIds>()
94		.map(|lists| (lists.is_empty().is_false(), lists))
95		.await;
96
97	let last_notification = matched.then_async(|| {
98		services
99			.pusher
100			.last_notification_read(sender_user, &room_id)
101			.unwrap_or_default()
102	});
103
104	let last_privateread = matched.then_async(|| {
105		services
106			.read_receipt
107			.last_privateread_update(sender_user, &room_id)
108	});
109
110	let last_receipt = matched.then_async(|| {
111		services
112			.read_receipt
113			.last_receipt_count(&room_id, sender_user.into(), None)
114			.unwrap_or_default()
115	});
116
117	let last_account = matched.then_async(|| {
118		services
119			.account_data
120			.last_count(Some(room_id.as_ref()), sender_user, Some(conn.next_batch))
121			.unwrap_or_default()
122	});
123
124	let last_timeline = matched.then_async(|| {
125		services
126			.timeline
127			.last_timeline_count(None, &room_id, Some(conn.next_batch.into()))
128			.map_ok(PduCount::into_unsigned)
129			.unwrap_or_default()
130	});
131
132	let (last_timeline, last_notification, last_account, last_receipt, last_privateread) =
133		join5(last_timeline, last_notification, last_account, last_receipt, last_privateread)
134			.await;
135
136	Some(WindowRoom {
137		room_id: room_id.clone(),
138		membership,
139		lists,
140		ranked: 0,
141		last_count: [
142			last_timeline,
143			last_notification,
144			last_account,
145			last_receipt,
146			last_privateread,
147		]
148		.into_iter()
149		.map(Option::unwrap_or_default)
150		.filter(|count| conn.next_batch.ge(count))
151		.max()
152		.unwrap_or_default(),
153	})
154}
155
156#[tracing::instrument(
157	level = "debug",
158	skip_all,
159	fields(rooms = rooms.clone().count())
160)]
161async fn window<'a, Rooms>(
162	sync_info: SyncInfo<'_>,
163	conn: &Connection,
164	rooms: Rooms,
165	lists: &ResponseLists,
166) -> Window
167where
168	Rooms: Iterator<Item = &'a WindowRoom> + Clone + Send + Sync,
169{
170	static FULL_RANGE: (UInt, UInt) = (UInt::MIN, UInt::MAX);
171
172	let SyncInfo { services, sender_user, .. } = sync_info;
173
174	let selections = lists
175		.keys()
176		.cloned()
177		.filter_map(|id| conn.lists.get(&id).map(|list| (id, list)))
178		.flat_map(|(id, list)| {
179			let full_range = list
180				.ranges
181				.is_empty()
182				.then_some(&FULL_RANGE)
183				.into_iter();
184
185			list.ranges
186				.iter()
187				.chain(full_range)
188				.map(apply!(2, usize_from_ruma))
189				.map(move |range| (id.clone(), range))
190		})
191		.flat_map(|(id, (start, end))| {
192			rooms
193				.clone()
194				.filter(move |&room| room.lists.contains(&id))
195				.filter(|&room| {
196					conn.rooms
197						.get(&room.room_id)
198						.is_some_and(|conn_room| {
199							conn_room.roomsince == 0 || room.last_count > conn_room.roomsince
200						})
201				})
202				.enumerate()
203				.skip_while(move |&(i, _)| i < start)
204				.take(end.saturating_add(1).saturating_sub(start))
205				.map(|(_, room)| (room.room_id.clone(), room.clone()))
206		})
207		.stream();
208
209	let subscriptions = conn
210		.subscriptions
211		.iter()
212		.stream()
213		.broad_filter_map(async |(room_id, _)| {
214			let membership = services
215				.state_cache
216				.user_membership(sender_user, room_id);
217
218			let filter = filter_room_meta(sync_info, room_id);
219
220			let (membership, filter) = join(membership, filter).await;
221
222			filter.then(|| WindowRoom {
223				room_id: room_id.clone(),
224				lists: Default::default(),
225				ranked: usize::MAX,
226				last_count: 0,
227				membership,
228			})
229		})
230		.map(|room| (room.room_id.clone(), room));
231
232	subscriptions.chain(selections).collect().await
233}
234
235fn response_lists<'a, Rooms>(rooms: Rooms) -> ResponseLists
236where
237	Rooms: Iterator<Item = &'a WindowRoom>,
238{
239	rooms
240		.flat_map(|room| room.lists.iter())
241		.fold(ResponseLists::default(), |mut lists, id| {
242			let list = lists.entry(id.clone()).or_default();
243			list.count = list
244				.count
245				.checked_add(uint!(1))
246				.expect("list count must not overflow JsInt");
247
248			lists
249		})
250}
251
252fn room_sort(a: &WindowRoom, b: &WindowRoom) -> Ordering { b.last_count.cmp(&a.last_count) }