Skip to main content

tuwunel_api/client/sync/v5/
selector.rs

1use std::cmp::Ordering;
2
3use futures::{
4	FutureExt, StreamExt, TryFutureExt,
5	future::{join, join3},
6};
7use ruma::{OwnedRoomId, UInt, events::room::member::MembershipState, uint};
8use tuwunel_core::{
9	apply, is_true,
10	matrix::PduCount,
11	trace,
12	utils::{
13		BoolExt, ReadyExt, TryFutureExtExt,
14		math::usize_from_ruma,
15		option::OptionExt,
16		stream::{BroadbandExt, IterStream},
17	},
18};
19use tuwunel_service::sync::Connection;
20
21use super::{
22	ListIds, ResponseLists, SyncInfo, Window, WindowRoom,
23	filter::{filter_room, filter_room_meta},
24};
25
26#[tracing::instrument(level = "debug", skip_all)]
27pub(super) async fn selector(
28	conn: &mut Connection,
29	sync_info: SyncInfo<'_>,
30) -> (Window, ResponseLists) {
31	use MembershipState::*;
32
33	let SyncInfo { services, sender_user, .. } = sync_info;
34
35	// MSC4380: when m.invite_permission_config blocks invites, omit invited
36	// rooms from the sliding-sync window; an unblock re-exposes them.
37	let invites_blocked = services.users.invites_blocked(sender_user).await;
38
39	let actives = services
40		.state_cache
41		.user_memberships(sender_user, Some(&[Join, Invite, Knock]))
42		.ready_filter(move |(m, _)| !invites_blocked || !matches!(m, Invite))
43		.map(|(membership, room_id)| (room_id.to_owned(), Some(membership)));
44
45	// Source retractions from tracked rooms, not a full left-state scan.
46	let retracted = conn
47		.rooms
48		.keys()
49		.stream()
50		.broad_filter_map(async |room_id| {
51			services
52				.state_cache
53				.is_left(sender_user, room_id)
54				.await
55				.then_some((room_id.clone(), Some(Leave)))
56		});
57
58	let mut rooms = actives
59		.chain(retracted)
60		.broad_filter_map(|(room_id, membership)| matcher(sync_info, conn, room_id, membership))
61		.collect::<Vec<_>>()
62		.await;
63
64	rooms.sort_by(room_sort);
65	rooms
66		.iter_mut()
67		.enumerate()
68		.for_each(|(i, room)| {
69			room.ranked = i;
70
71			conn.rooms
72				.entry(room.room_id.clone())
73				.or_default();
74		});
75
76	trace!(?rooms);
77	let lists = response_lists(rooms.iter());
78
79	trace!(?lists);
80	let window = window(sync_info, conn, rooms.iter(), &lists, invites_blocked).await;
81
82	trace!(?window);
83	(window, lists)
84}
85
86#[tracing::instrument(
87	name = "matcher",
88	level = "trace",
89	skip_all,
90	fields(?room_id, ?membership)
91)]
92async fn matcher(
93	sync_info: SyncInfo<'_>,
94	conn: &Connection,
95	room_id: OwnedRoomId,
96	membership: Option<MembershipState>,
97) -> Option<WindowRoom> {
98	let SyncInfo { services, sender_user, .. } = sync_info;
99
100	let (matched, lists) = conn
101		.lists
102		.iter()
103		.stream()
104		.filter_map(async |(id, list)| {
105			list.filters
106				.clone()
107				.map_async(async |filters| {
108					filter_room(sync_info, &filters, &room_id, membership.as_ref()).await
109				})
110				.await
111				.is_none_or(is_true!())
112				.then(|| id.clone())
113		})
114		.collect::<ListIds>()
115		.map(|lists| (lists.is_empty().is_false(), lists))
116		.await;
117
118	let last_notification = matched.then_async(|| {
119		services
120			.pusher
121			.last_notification_read(sender_user, &room_id)
122			.unwrap_or_default()
123	});
124
125	let last_privateread = matched.then_async(|| {
126		services
127			.read_receipt
128			.last_privateread_update(sender_user, &room_id)
129	});
130
131	let last_receipt = matched.then_async(|| {
132		services
133			.read_receipt
134			.last_receipt_count(&room_id, sender_user.into(), None)
135			.unwrap_or_default()
136	});
137
138	let last_account = matched.then_async(|| {
139		services
140			.account_data
141			.last_count(Some(room_id.as_ref()), sender_user, Some(conn.next_batch))
142			.unwrap_or_default()
143	});
144
145	let last_timeline = matched.then_async(|| {
146		services
147			.timeline
148			.last_timeline_count(None, &room_id, Some(conn.next_batch.into()))
149			.map_ok(PduCount::into_unsigned)
150			.unwrap_or_default()
151	});
152
153	let last_membership = matched.then_async(async || match &membership {
154		| Some(MembershipState::Invite) => services
155			.state_cache
156			.get_invite_count(&room_id, sender_user)
157			.await
158			.unwrap_or_default(),
159		| Some(MembershipState::Leave | MembershipState::Ban) => services
160			.state_cache
161			.get_left_count(&room_id, sender_user)
162			.await
163			.unwrap_or_default(),
164		| _ => 0,
165	});
166
167	let (
168		(last_timeline, last_notification, last_account),
169		(last_receipt, last_privateread, last_membership),
170	) = join(
171		join3(last_timeline, last_notification, last_account),
172		join3(last_receipt, last_privateread, last_membership),
173	)
174	.await;
175
176	// A departed room surfaces only on its own leave count, never on room-global
177	// timeline activity the user no longer receives, so the retraction is one-shot.
178	let last_count = match &membership {
179		| Some(MembershipState::Leave | MembershipState::Ban) => last_membership
180			.filter(|count| conn.next_batch.ge(count))
181			.unwrap_or_default(),
182		| _ => [
183			last_timeline,
184			last_notification,
185			last_account,
186			last_receipt,
187			last_privateread,
188			last_membership,
189		]
190		.into_iter()
191		.map(Option::unwrap_or_default)
192		.filter(|count| conn.next_batch.ge(count))
193		.max()
194		.unwrap_or_default(),
195	};
196
197	Some(WindowRoom {
198		room_id: room_id.clone(),
199		membership,
200		lists,
201		ranked: 0,
202		last_count,
203	})
204}
205
206#[tracing::instrument(
207	level = "debug",
208	skip_all,
209	fields(rooms = rooms.clone().count())
210)]
211async fn window<'a, Rooms>(
212	sync_info: SyncInfo<'_>,
213	conn: &Connection,
214	rooms: Rooms,
215	lists: &ResponseLists,
216	invites_blocked: bool,
217) -> Window
218where
219	Rooms: Iterator<Item = &'a WindowRoom> + Clone + Send + Sync,
220{
221	static FULL_RANGE: (UInt, UInt) = (UInt::MIN, UInt::MAX);
222
223	let SyncInfo { services, sender_user, .. } = sync_info;
224
225	let selections = lists
226		.keys()
227		.cloned()
228		.filter_map(|id| conn.lists.get(&id).map(|list| (id, list)))
229		.flat_map(|(id, list)| {
230			let full_range = list
231				.ranges
232				.is_empty()
233				.then_some(&FULL_RANGE)
234				.into_iter();
235
236			list.ranges
237				.iter()
238				.chain(full_range)
239				.map(apply!(2, usize_from_ruma))
240				.map(move |range| (id.clone(), range))
241		})
242		.flat_map(|(id, (start, end))| {
243			rooms
244				.clone()
245				.filter(move |&room| room.lists.contains(&id))
246				.filter(|&room| {
247					conn.rooms
248						.get(&room.room_id)
249						.is_some_and(|conn_room| {
250							conn_room.roomsince == 0 || room.last_count > conn_room.roomsince
251						})
252				})
253				.enumerate()
254				.skip_while(move |&(i, _)| i < start)
255				.take(end.saturating_add(1).saturating_sub(start))
256				.map(|(_, room)| (room.room_id.clone(), room.clone()))
257		})
258		.stream();
259
260	let subscriptions = conn
261		.subscriptions
262		.iter()
263		.stream()
264		.broad_filter_map(async |(room_id, _)| {
265			let membership = services
266				.state_cache
267				.user_membership(sender_user, room_id);
268
269			let filter = filter_room_meta(sync_info, room_id);
270
271			let (membership, filter) = join(membership, filter).await;
272
273			// MSC4380: suppress invited-room subscriptions when invites are blocked.
274			let suppress = invites_blocked && matches!(membership, Some(MembershipState::Invite));
275
276			(filter && !suppress).then(|| WindowRoom {
277				room_id: room_id.clone(),
278				lists: Default::default(),
279				ranked: usize::MAX,
280				last_count: 0,
281				membership,
282			})
283		})
284		.map(|room| (room.room_id.clone(), room));
285
286	subscriptions.chain(selections).collect().await
287}
288
289fn response_lists<'a, Rooms>(rooms: Rooms) -> ResponseLists
290where
291	Rooms: Iterator<Item = &'a WindowRoom>,
292{
293	rooms
294		.filter(|room| {
295			!matches!(room.membership, Some(MembershipState::Leave | MembershipState::Ban))
296		})
297		.flat_map(|room| room.lists.iter())
298		.fold(ResponseLists::default(), |mut lists, id| {
299			let list = lists.entry(id.clone()).or_default();
300			list.count = list
301				.count
302				.checked_add(uint!(1))
303				.expect("list count must not overflow JsInt");
304
305			lists
306		})
307}
308
309fn room_sort(a: &WindowRoom, b: &WindowRoom) -> Ordering { b.last_count.cmp(&a.last_count) }