Skip to main content

tuwunel_api/client/sync/v5/
rooms.rs

1mod bump_stamp;
2mod heroes;
3
4use std::collections::{BTreeMap, HashSet};
5
6use futures::{
7	FutureExt, StreamExt, TryFutureExt, TryStreamExt,
8	future::{join, join3, join4},
9};
10use ruma::{
11	JsOption, OwnedRoomId, UserId,
12	api::client::sync::sync_events::{UnreadNotificationsCount, v5::response},
13	events::{StateEventType, TimelineEventType, room::member::MembershipState},
14};
15use tuwunel_core::{
16	Result, at, err, error, is_equal_to,
17	itertools::Itertools,
18	matrix::{Event, StateKey, pdu::PduCount},
19	ref_at,
20	utils::{
21		BoolExt, IterStream, ReadyExt, TryFutureExtExt, math::usize_from_ruma, result::FlatOk,
22		stream::BroadbandExt,
23	},
24};
25use tuwunel_service::sync::Room;
26
27use self::{bump_stamp::room_bump_stamp, heroes::calculate_heroes};
28use super::{super::load_timeline, Connection, SyncInfo, Window, WindowRoom};
29use crate::client::{annotate_membership, ignored_filter, with_membership};
30
31#[tracing::instrument(
32    name = "rooms",
33    level = "debug",
34    skip_all,
35    fields(
36        next_batch = conn.next_batch,
37        window = window.len(),
38    )
39)]
40pub(super) async fn handle(
41	sync_info: SyncInfo<'_>,
42	conn: &Connection,
43	window: &Window,
44) -> Result<BTreeMap<OwnedRoomId, response::Room>> {
45	window
46		.iter()
47		.stream()
48		.broad_filter_map(async |(room_id, room)| {
49			handle_room(sync_info, conn, room)
50				.map_ok(move |room| (room_id.clone(), room))
51				.inspect_err(|e| error!(?room_id, "sync handler: {e:?}"))
52				.await
53				.ok()
54		})
55		.collect()
56		.map(Ok)
57		.await
58}
59
60#[tracing::instrument(
61	name = "room",
62	level = "debug",
63	skip_all,
64	fields(room_id, roomsince)
65)]
66async fn handle_room(
67	SyncInfo { services, sender_user, .. }: SyncInfo<'_>,
68	conn: &Connection,
69	WindowRoom {
70		lists, membership, room_id, last_count, ..
71	}: &WindowRoom,
72) -> Result<response::Room> {
73	let &Room { roomsince, .. } = conn
74		.rooms
75		.get(room_id)
76		.ok_or_else(|| err!("Missing connection state for {room_id}"))?;
77
78	debug_assert!(
79		*last_count > roomsince || *last_count == 0 || roomsince == 0,
80		"Stale room shouldn't be in the window"
81	);
82
83	if matches!(*membership, Some(MembershipState::Leave | MembershipState::Ban)) {
84		return Ok(response::Room {
85			initial: roomsince.eq(&0).then_some(true),
86			lists: lists.clone(),
87			membership: membership.clone(),
88			prev_batch: Some(conn.next_batch.to_string().into()),
89			limited: true,
90			required_state: vec![
91				services
92					.state_accessor
93					.room_state_get(room_id, &StateEventType::RoomMember, sender_user.as_str())
94					.map_ok(Event::into_format)
95					.await?,
96			],
97
98			..Default::default()
99		});
100	}
101
102	let is_invite = *membership == Some(MembershipState::Invite);
103
104	let encrypted = services
105		.state_accessor
106		.is_encrypted_room(room_id)
107		.await;
108
109	let default_details = (0_usize, HashSet::new());
110	let (timeline_limit, required_state) = lists
111		.iter()
112		.filter_map(|list_id| conn.lists.get(list_id))
113		.map(|list| &list.room_details)
114		.chain(conn.subscriptions.get(room_id).into_iter())
115		.fold(default_details, |(mut timeline_limit, mut required_state), config| {
116			let limit = usize_from_ruma(config.timeline_limit);
117
118			timeline_limit = timeline_limit.max(limit);
119			required_state.extend(config.required_state.clone());
120
121			(timeline_limit, required_state)
122		});
123
124	let timeline = is_invite.is_false().then_async(|| {
125		load_timeline(
126			services,
127			sender_user,
128			room_id,
129			PduCount::Normal(roomsince),
130			Some(PduCount::from(conn.next_batch)),
131			timeline_limit,
132		)
133	});
134
135	let (timeline_pdus, limited, last_timeline_count) = timeline
136		.await
137		.flat_ok()
138		.unwrap_or_else(|| (Vec::new(), true, PduCount::default()));
139
140	let required_state = required_state
141		.into_iter()
142		.filter(|_| !timeline_pdus.is_empty())
143		.collect::<Vec<_>>();
144
145	let prev_batch = timeline_pdus
146		.first()
147		.map(at!(0))
148		.map(PduCount::into_unsigned)
149		.as_ref()
150		.map(ToString::to_string);
151
152	let bump_stamp = room_bump_stamp(
153		services,
154		sender_user,
155		room_id,
156		PduCount::Normal(roomsince),
157		PduCount::from(conn.next_batch),
158		last_timeline_count,
159	)
160	.await;
161
162	let num_live = roomsince
163		.ne(&0)
164		.and_is(limited || timeline_pdus.len() >= timeline_limit)
165		.then_async(|| {
166			services
167				.timeline
168				.pdus(None, room_id, Some(roomsince.into()))
169				.count()
170				.map(TryInto::try_into)
171				.map(Result::ok)
172		});
173
174	let lazy = required_state
175		.iter()
176		.any(is_equal_to!(&(StateEventType::RoomMember, "$LAZY".into())));
177
178	let timeline_senders = timeline_pdus
179		.iter()
180		.filter(|_| lazy)
181		.map(ref_at!(1))
182		.map(Event::sender)
183		.map(UserId::as_str);
184
185	let timeline_member_targets = timeline_pdus
186		.iter()
187		.filter(|_| lazy)
188		.map(ref_at!(1))
189		.filter(|event| *event.event_type() == TimelineEventType::RoomMember)
190		.filter_map(Event::state_key);
191
192	let timeline_senders: Vec<_> = timeline_senders
193		.chain(timeline_member_targets)
194		.sorted_unstable()
195		.dedup()
196		.collect();
197
198	let timeline_senders = timeline_senders
199		.iter()
200		.map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender)))
201		.stream();
202
203	let wildcard_state = required_state
204		.iter()
205		.filter(|(_, state_key)| state_key == "*")
206		.map(|(event_type, _)| {
207			services
208				.state_accessor
209				.room_state_keys(room_id, event_type)
210				.map_ok(|state_key| (event_type.clone(), state_key))
211				.ready_filter_map(Result::ok)
212		})
213		.stream()
214		.flatten();
215
216	let required_state = required_state
217		.iter()
218		.cloned()
219		.stream()
220		.chain(wildcard_state)
221		.chain(timeline_senders)
222		.broad_filter_map(async |state| {
223			let state_key: StateKey = match state.1.as_str() {
224				| "$LAZY" | "*" => return None,
225				| "$ME" => sender_user.as_str().into(),
226				| _ => state.1.clone(),
227			};
228
229			let mut pdu = services
230				.state_accessor
231				.room_state_get(room_id, &state.0, &state_key)
232				.map_ok(Event::into_pdu)
233				.ok()
234				.await?;
235
236			annotate_membership(services, &mut pdu, sender_user, encrypted).await;
237
238			Some(Event::into_format(pdu))
239		})
240		.collect();
241
242	// TODO: figure out a timestamp we can use for remote invites
243	let invite_state = is_invite.then_async(|| {
244		services
245			.state_cache
246			.invite_state(sender_user, room_id)
247			.ok()
248	});
249
250	let room_name = services
251		.state_accessor
252		.get_name(room_id)
253		.map_ok(Into::into)
254		.map(Result::ok);
255
256	let room_avatar = services
257		.state_accessor
258		.get_avatar(room_id)
259		.map_ok(|content| content.url)
260		.ok()
261		.map(Option::flatten);
262
263	let highlight_count = services
264		.pusher
265		.highlight_count(sender_user, room_id)
266		.map(TryInto::try_into)
267		.map(Result::ok);
268
269	let notification_count = services
270		.pusher
271		.notification_count(sender_user, room_id)
272		.map(TryInto::try_into)
273		.map(Result::ok);
274
275	let joined_count = services
276		.state_cache
277		.room_joined_count(room_id)
278		.map_ok(TryInto::try_into)
279		.map_ok(Result::ok)
280		.map(FlatOk::flat_ok);
281
282	let invited_count = services
283		.state_cache
284		.room_invited_count(room_id)
285		.map_ok(TryInto::try_into)
286		.map_ok(Result::ok)
287		.map(FlatOk::flat_ok);
288
289	let is_dm = services
290		.state_accessor
291		.is_direct(room_id, sender_user)
292		.map(|is_dm| is_dm.then_some(is_dm));
293
294	let last_read_count = services
295		.pusher
296		.last_notification_read(sender_user, room_id);
297
298	let timeline = timeline_pdus
299		.iter()
300		.stream()
301		.filter_map(|item| ignored_filter(services, item.clone(), sender_user))
302		.map(at!(1))
303		.broad_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
304		.map(Event::into_format)
305		.collect();
306
307	let meta = join3(room_name, room_avatar, is_dm);
308	let events = join4(timeline, num_live, required_state, invite_state);
309	let member_counts = join(joined_count, invited_count);
310	let notification_counts = join3(highlight_count, notification_count, last_read_count);
311	let (
312		(room_name, room_avatar, is_dm),
313		(timeline, num_live, required_state, invite_state),
314		(joined_count, invited_count),
315		(highlight_count, notification_count, _last_notification_read),
316	) = join4(meta, events, member_counts, notification_counts)
317		.boxed()
318		.await;
319
320	let heroes = services
321		.config
322		.calculate_heroes
323		.then_async(|| {
324			calculate_heroes(
325				services,
326				sender_user,
327				room_id,
328				room_name.as_ref(),
329				room_avatar.as_deref(),
330			)
331		})
332		.await
333		.unwrap_or_default();
334
335	let (heroes, heroes_name, heroes_avatar) = heroes;
336
337	Ok(response::Room {
338		initial: roomsince.eq(&0).then_some(true),
339		lists: lists.clone(),
340		membership: membership.clone(),
341		name: room_name.or(heroes_name),
342		avatar: JsOption::from_option(room_avatar.or(heroes_avatar)),
343		is_dm,
344		heroes,
345		required_state,
346		invite_state: invite_state.flatten(),
347		prev_batch: prev_batch.as_deref().map(Into::into),
348		num_live: num_live.flatten(),
349		limited,
350		timeline,
351		bump_stamp,
352		joined_count,
353		invited_count,
354		unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
355	})
356}