Skip to main content

tuwunel_api/client/sync/v5/
rooms.rs

1mod bump_stamp;
2mod heroes;
3
4use std::collections::{BTreeMap, HashSet};
5
6use futures::{
7	FutureExt, StreamExt, TryFutureExt,
8	future::{join, join3, join4},
9};
10use ruma::{
11	JsOption, MxcUri, OwnedEventId, OwnedMxcUri, OwnedRoomId, RoomId, UInt, UserId,
12	api::client::sync::sync_events::{
13		UnreadNotificationsCount,
14		v5::{DisplayName, response, response::Heroes},
15	},
16	events::{
17		AnySyncStateEvent, StateEventType, TimelineEventType, room::member::MembershipState,
18	},
19	serde::Raw,
20};
21use tuwunel_core::{
22	Result, at, err, error, is_equal_to,
23	itertools::Itertools,
24	matrix::{
25		Event, StateKey,
26		pdu::{PduCount, PduEvent},
27	},
28	ref_at,
29	utils::{
30		BoolExt, IterStream, ReadyExt, TryFutureExtExt,
31		math::usize_from_ruma,
32		result::FlatOk,
33		stream::{BroadbandExt, WidebandExt},
34	},
35};
36use tuwunel_service::{Services, sync::Room};
37
38use self::{bump_stamp::room_bump_stamp, heroes::calculate_heroes};
39use super::{super::load_timeline, Connection, ListIds, SyncInfo, Window, WindowRoom};
40use crate::client::{annotate_membership, ignored_filter, with_membership};
41
42type ThreadCounts = BTreeMap<OwnedEventId, (u64, u64)>;
43
44#[tracing::instrument(
45    name = "rooms",
46    level = "debug",
47    skip_all,
48    fields(
49        next_batch = conn.next_batch,
50        window = window.len(),
51    )
52)]
53pub(super) async fn handle(
54	sync_info: SyncInfo<'_>,
55	conn: &Connection,
56	window: &Window,
57) -> Result<BTreeMap<OwnedRoomId, response::Room>> {
58	window
59		.iter()
60		.stream()
61		.broad_filter_map(async |(room_id, room)| {
62			handle_room(sync_info, conn, room)
63				.map_ok(move |room| (room_id.clone(), room))
64				.inspect_err(|e| error!(?room_id, "sync handler: {e:?}"))
65				.await
66				.ok()
67		})
68		.collect()
69		.map(Ok)
70		.await
71}
72
73#[tracing::instrument(
74	name = "room",
75	level = "debug",
76	skip_all,
77	fields(room_id, roomsince)
78)]
79async fn handle_room(
80	sync_info: SyncInfo<'_>,
81	conn: &Connection,
82	window_room: &WindowRoom,
83) -> Result<response::Room> {
84	let SyncInfo { services, sender_user, .. } = sync_info;
85	let WindowRoom {
86		lists, membership, room_id, last_count, ..
87	} = window_room;
88
89	let &Room { roomsince, .. } = conn
90		.rooms
91		.get(room_id)
92		.ok_or_else(|| err!("Missing connection state for {room_id}"))?;
93
94	debug_assert!(
95		*last_count > roomsince || *last_count == 0 || roomsince == 0,
96		"Stale room shouldn't be in the window"
97	);
98
99	if matches!(*membership, Some(MembershipState::Leave | MembershipState::Ban)) {
100		return leave_or_ban_response(sync_info, conn, window_room, roomsince).await;
101	}
102
103	let is_invite = *membership == Some(MembershipState::Invite);
104
105	let encrypted = services
106		.state_accessor
107		.is_encrypted_room(room_id)
108		.await;
109
110	let (timeline_limit, required_state) = merged_room_details(conn, lists, room_id);
111
112	let timeline = is_invite.is_false().then_async(|| {
113		load_timeline(
114			services,
115			sender_user,
116			room_id,
117			PduCount::Normal(roomsince),
118			Some(PduCount::from(conn.next_batch)),
119			timeline_limit,
120		)
121	});
122
123	let (timeline_pdus, limited, last_timeline_count) = timeline
124		.await
125		.flat_ok()
126		.unwrap_or_else(|| (Vec::new(), true, PduCount::default()));
127
128	let required_state = required_state
129		.into_iter()
130		.filter(|_| !timeline_pdus.is_empty())
131		.collect::<Vec<_>>();
132
133	let prev_batch = timeline_pdus
134		.first()
135		.map(at!(0))
136		.map(PduCount::into_unsigned)
137		.as_ref()
138		.map(ToString::to_string);
139
140	let bump_stamp = room_bump_stamp(
141		services,
142		sender_user,
143		room_id,
144		PduCount::Normal(roomsince),
145		PduCount::from(conn.next_batch),
146		last_timeline_count,
147	)
148	.await;
149
150	let num_live = roomsince
151		.ne(&0)
152		.and_is(limited || timeline_pdus.len() >= timeline_limit)
153		.then_async(|| {
154			services
155				.timeline
156				.pdus(None, room_id, Some(roomsince.into()))
157				.count()
158				.map(TryInto::try_into)
159				.map(Result::ok)
160		});
161
162	let required_state = collect_required_state(
163		services,
164		sender_user,
165		room_id,
166		&required_state,
167		&timeline_pdus,
168		encrypted,
169	);
170
171	// TODO: figure out a timestamp we can use for remote invites
172	let invite_state = is_invite.then_async(|| {
173		services
174			.state_cache
175			.invite_state(sender_user, room_id)
176			.ok()
177	});
178
179	let timeline = timeline_pdus
180		.iter()
181		.stream()
182		.filter_map(|item| ignored_filter(services, item.clone(), sender_user))
183		.map(at!(1))
184		.wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
185		.wide_then(|pdu| {
186			services
187				.pdu_metadata
188				.bundle_aggregations(sender_user, pdu)
189		})
190		.map(Event::into_format)
191		.collect();
192
193	let meta = room_meta_future(services, sender_user, room_id);
194	let events = join4(timeline, num_live, required_state, invite_state);
195	let member_counts = member_counts_future(services, room_id);
196	let notification_counts = notification_counts_future(services, sender_user, room_id);
197	let (
198		(room_name, room_avatar, is_dm),
199		(timeline, num_live, required_state, invite_state),
200		(joined_count, invited_count),
201		(highlight_count, notification_count, _last_notification_read, thread_counts),
202	) = join4(meta, events, member_counts, notification_counts)
203		.boxed()
204		.await;
205
206	let (heroes, heroes_name, heroes_avatar) = resolve_heroes(
207		services,
208		sender_user,
209		room_id,
210		room_name.as_ref(),
211		room_avatar.as_deref(),
212	)
213	.await;
214
215	Ok(response::Room {
216		initial: roomsince.eq(&0).then_some(true),
217		lists: lists.clone(),
218		membership: membership.clone(),
219		name: room_name.or(heroes_name),
220		avatar: JsOption::from_option(room_avatar.or(heroes_avatar)),
221		is_dm,
222		heroes,
223		required_state,
224		invite_state: invite_state.flatten(),
225		prev_batch: prev_batch.as_deref().map(Into::into),
226		num_live: num_live.flatten(),
227		limited,
228		timeline,
229		bump_stamp,
230		joined_count,
231		invited_count,
232		unread_notifications: merge_unread_notifications(
233			highlight_count,
234			notification_count,
235			&thread_counts,
236		),
237	})
238}
239
240async fn leave_or_ban_response(
241	SyncInfo { services, sender_user, .. }: SyncInfo<'_>,
242	conn: &Connection,
243	WindowRoom { lists, membership, room_id, .. }: &WindowRoom,
244	roomsince: u64,
245) -> Result<response::Room> {
246	let member_event = services
247		.state_accessor
248		.room_state_get(room_id, &StateEventType::RoomMember, sender_user.as_str())
249		.map_ok(Event::into_format)
250		.await?;
251
252	Ok(response::Room {
253		initial: roomsince.eq(&0).then_some(true),
254		lists: lists.clone(),
255		membership: membership.clone(),
256		prev_batch: Some(conn.next_batch.to_string().into()),
257		limited: true,
258		required_state: vec![member_event],
259		..Default::default()
260	})
261}
262
263fn merged_room_details(
264	conn: &Connection,
265	lists: &ListIds,
266	room_id: &RoomId,
267) -> (usize, HashSet<(StateEventType, StateKey)>) {
268	lists
269		.iter()
270		.filter_map(|list_id| conn.lists.get(list_id))
271		.map(|list| &list.room_details)
272		.chain(conn.subscriptions.get(room_id))
273		.fold((0_usize, HashSet::new()), |(timeline_limit, mut required_state), config| {
274			required_state.extend(config.required_state.clone());
275			(timeline_limit.max(usize_from_ruma(config.timeline_limit)), required_state)
276		})
277}
278
279async fn resolve_heroes(
280	services: &Services,
281	sender_user: &UserId,
282	room_id: &RoomId,
283	room_name: Option<&DisplayName>,
284	room_avatar: Option<&MxcUri>,
285) -> (Option<Heroes>, Option<DisplayName>, Option<OwnedMxcUri>) {
286	services
287		.config
288		.calculate_heroes
289		.then_async(|| calculate_heroes(services, sender_user, room_id, room_name, room_avatar))
290		.await
291		.unwrap_or_default()
292}
293
294fn room_meta_future<'a>(
295	services: &'a Services,
296	sender_user: &'a UserId,
297	room_id: &'a RoomId,
298) -> impl Future<Output = (Option<DisplayName>, Option<OwnedMxcUri>, Option<bool>)> + Send + 'a {
299	let room_name = services
300		.state_accessor
301		.get_name(room_id)
302		.map_ok(Into::into)
303		.map(Result::ok);
304
305	let room_avatar = services
306		.state_accessor
307		.get_avatar(room_id)
308		.map_ok(|content| content.url)
309		.ok()
310		.map(Option::flatten);
311
312	let is_dm = services
313		.state_accessor
314		.is_direct(room_id, sender_user)
315		.map(|is_dm| is_dm.then_some(is_dm));
316
317	join3(room_name, room_avatar, is_dm)
318}
319
320fn member_counts_future<'a>(
321	services: &'a Services,
322	room_id: &'a RoomId,
323) -> impl Future<Output = (Option<UInt>, Option<UInt>)> + Send + 'a {
324	let joined_count = services
325		.state_cache
326		.room_joined_count(room_id)
327		.map_ok(TryInto::try_into)
328		.map_ok(Result::ok)
329		.map(FlatOk::flat_ok);
330
331	let invited_count = services
332		.state_cache
333		.room_invited_count(room_id)
334		.map_ok(TryInto::try_into)
335		.map_ok(Result::ok)
336		.map(FlatOk::flat_ok);
337
338	join(joined_count, invited_count)
339}
340
341fn notification_counts_future<'a>(
342	services: &'a Services,
343	sender_user: &'a UserId,
344	room_id: &'a RoomId,
345) -> impl Future<Output = (Option<UInt>, Option<UInt>, Result<u64>, ThreadCounts)> + Send + 'a {
346	let highlight_count = services
347		.pusher
348		.highlight_count(sender_user, room_id)
349		.map(TryInto::try_into)
350		.map(Result::ok);
351
352	let notification_count = services
353		.pusher
354		.notification_count(sender_user, room_id)
355		.map(TryInto::try_into)
356		.map(Result::ok);
357
358	let last_read_count = services
359		.pusher
360		.last_notification_read(sender_user, room_id);
361
362	let thread_counts = services
363		.pusher
364		.thread_notification_counts(sender_user, room_id);
365
366	join4(highlight_count, notification_count, last_read_count, thread_counts)
367}
368
369// MSC3771/MSC3773: SSS v5 has no per-thread bucket; fold into the room total.
370fn merge_unread_notifications(
371	highlight_count: Option<UInt>,
372	notification_count: Option<UInt>,
373	thread_counts: &ThreadCounts,
374) -> UnreadNotificationsCount {
375	let (thread_notifications, thread_highlights) = thread_counts
376		.values()
377		.fold((0_u64, 0_u64), |(n, h), &(notifs, hl)| {
378			(n.saturating_add(notifs), h.saturating_add(hl))
379		});
380
381	let merge = |total: u64| {
382		move |count: UInt| count.saturating_add(UInt::try_from(total).unwrap_or_default())
383	};
384
385	UnreadNotificationsCount {
386		highlight_count: highlight_count.map(merge(thread_highlights)),
387		notification_count: notification_count.map(merge(thread_notifications)),
388	}
389}
390
391async fn collect_required_state(
392	services: &Services,
393	sender_user: &UserId,
394	room_id: &RoomId,
395	required_state: &[(StateEventType, StateKey)],
396	timeline_pdus: &[(PduCount, PduEvent)],
397	encrypted: bool,
398) -> Vec<Raw<AnySyncStateEvent>> {
399	let lazy = required_state
400		.iter()
401		.any(is_equal_to!(&(StateEventType::RoomMember, "$LAZY".into())));
402
403	let timeline_senders = timeline_pdus
404		.iter()
405		.filter(|_| lazy)
406		.map(ref_at!(1))
407		.map(Event::sender)
408		.map(UserId::as_str);
409
410	let timeline_member_targets = timeline_pdus
411		.iter()
412		.filter(|_| lazy)
413		.map(ref_at!(1))
414		.filter(|event| *event.event_type() == TimelineEventType::RoomMember)
415		.filter_map(Event::state_key);
416
417	let timeline_senders = timeline_senders
418		.chain(timeline_member_targets)
419		.sorted_unstable()
420		.dedup()
421		.map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender)))
422		.collect::<Vec<_>>();
423
424	let wildcard_types: Vec<StateEventType> = required_state
425		.iter()
426		.filter(|(_, state_key)| state_key == "*")
427		.map(|(event_type, _)| event_type.clone())
428		.collect();
429
430	let wildcard_state: Vec<(StateEventType, StateKey)> = wildcard_types
431		.into_iter()
432		.stream()
433		.broad_then(|event_type| wildcard_state_keys(services, room_id, event_type))
434		.concat()
435		.await;
436
437	required_state
438		.iter()
439		.cloned()
440		.stream()
441		.chain(wildcard_state.into_iter().stream())
442		.chain(timeline_senders.into_iter().stream())
443		.broad_filter_map(async |state| {
444			let state_key: StateKey = match state.1.as_str() {
445				| "$LAZY" | "*" => return None,
446				| "$ME" => sender_user.as_str().into(),
447				| _ => state.1.clone(),
448			};
449
450			let mut pdu = services
451				.state_accessor
452				.room_state_get(room_id, &state.0, &state_key)
453				.map_ok(Event::into_pdu)
454				.ok()
455				.await?;
456
457			annotate_membership(services, &mut pdu, sender_user, encrypted).await;
458
459			Some(Event::into_format(pdu))
460		})
461		.collect()
462		.await
463}
464
465async fn wildcard_state_keys(
466	services: &Services,
467	room_id: &RoomId,
468	event_type: StateEventType,
469) -> Vec<(StateEventType, StateKey)> {
470	services
471		.state_accessor
472		.room_state_keys(room_id, &event_type)
473		.ready_filter_map(Result::ok)
474		.map(|state_key| (event_type.clone(), state_key))
475		.collect()
476		.await
477}