1mod bump_stamp;
2mod heroes;
3
4use std::collections::{BTreeMap, HashSet};
5
6use futures::{
7 FutureExt, StreamExt, TryFutureExt,
8 future::{join, join3, join4},
9};
10use ruma::{
11 JsOption, MxcUri, OwnedEventId, OwnedMxcUri, OwnedRoomId, RoomId, UInt, UserId,
12 api::client::sync::sync_events::{
13 UnreadNotificationsCount,
14 v5::{DisplayName, response, response::Heroes},
15 },
16 events::{
17 AnySyncStateEvent, StateEventType, TimelineEventType, room::member::MembershipState,
18 },
19 serde::Raw,
20};
21use tuwunel_core::{
22 Result, at, err, error, is_equal_to,
23 itertools::Itertools,
24 matrix::{
25 Event, StateKey,
26 pdu::{PduCount, PduEvent},
27 },
28 ref_at,
29 utils::{
30 BoolExt, IterStream, ReadyExt, TryFutureExtExt,
31 math::usize_from_ruma,
32 result::FlatOk,
33 stream::{BroadbandExt, WidebandExt},
34 },
35};
36use tuwunel_service::{Services, sync::Room};
37
38use self::{bump_stamp::room_bump_stamp, heroes::calculate_heroes};
39use super::{super::load_timeline, Connection, ListIds, SyncInfo, Window, WindowRoom};
40use crate::client::{annotate_membership, ignored_filter, with_membership};
41
42type ThreadCounts = BTreeMap<OwnedEventId, (u64, u64)>;
43
44#[tracing::instrument(
45 name = "rooms",
46 level = "debug",
47 skip_all,
48 fields(
49 next_batch = conn.next_batch,
50 window = window.len(),
51 )
52)]
53pub(super) async fn handle(
54 sync_info: SyncInfo<'_>,
55 conn: &Connection,
56 window: &Window,
57) -> Result<BTreeMap<OwnedRoomId, response::Room>> {
58 window
59 .iter()
60 .stream()
61 .broad_filter_map(async |(room_id, room)| {
62 handle_room(sync_info, conn, room)
63 .map_ok(move |room| (room_id.clone(), room))
64 .inspect_err(|e| error!(?room_id, "sync handler: {e:?}"))
65 .await
66 .ok()
67 })
68 .collect()
69 .map(Ok)
70 .await
71}
72
73#[tracing::instrument(
74 name = "room",
75 level = "debug",
76 skip_all,
77 fields(room_id, roomsince)
78)]
79async fn handle_room(
80 sync_info: SyncInfo<'_>,
81 conn: &Connection,
82 window_room: &WindowRoom,
83) -> Result<response::Room> {
84 let SyncInfo { services, sender_user, .. } = sync_info;
85 let WindowRoom {
86 lists, membership, room_id, last_count, ..
87 } = window_room;
88
89 let &Room { roomsince, .. } = conn
90 .rooms
91 .get(room_id)
92 .ok_or_else(|| err!("Missing connection state for {room_id}"))?;
93
94 debug_assert!(
95 *last_count > roomsince || *last_count == 0 || roomsince == 0,
96 "Stale room shouldn't be in the window"
97 );
98
99 if matches!(*membership, Some(MembershipState::Leave | MembershipState::Ban)) {
100 return leave_or_ban_response(sync_info, conn, window_room, roomsince).await;
101 }
102
103 let is_invite = *membership == Some(MembershipState::Invite);
104
105 let encrypted = services
106 .state_accessor
107 .is_encrypted_room(room_id)
108 .await;
109
110 let (timeline_limit, required_state) = merged_room_details(conn, lists, room_id);
111
112 let timeline = is_invite.is_false().then_async(|| {
113 load_timeline(
114 services,
115 sender_user,
116 room_id,
117 PduCount::Normal(roomsince),
118 Some(PduCount::from(conn.next_batch)),
119 timeline_limit,
120 )
121 });
122
123 let (timeline_pdus, limited, last_timeline_count) = timeline
124 .await
125 .flat_ok()
126 .unwrap_or_else(|| (Vec::new(), true, PduCount::default()));
127
128 let required_state = required_state
129 .into_iter()
130 .filter(|_| !timeline_pdus.is_empty())
131 .collect::<Vec<_>>();
132
133 let prev_batch = timeline_pdus
134 .first()
135 .map(at!(0))
136 .map(PduCount::into_unsigned)
137 .as_ref()
138 .map(ToString::to_string);
139
140 let bump_stamp = room_bump_stamp(
141 services,
142 sender_user,
143 room_id,
144 PduCount::Normal(roomsince),
145 PduCount::from(conn.next_batch),
146 last_timeline_count,
147 )
148 .await;
149
150 let num_live = roomsince
151 .ne(&0)
152 .and_is(limited || timeline_pdus.len() >= timeline_limit)
153 .then_async(|| {
154 services
155 .timeline
156 .pdus(None, room_id, Some(roomsince.into()))
157 .count()
158 .map(TryInto::try_into)
159 .map(Result::ok)
160 });
161
162 let required_state = collect_required_state(
163 services,
164 sender_user,
165 room_id,
166 &required_state,
167 &timeline_pdus,
168 encrypted,
169 );
170
171 let invite_state = is_invite.then_async(|| {
173 services
174 .state_cache
175 .invite_state(sender_user, room_id)
176 .ok()
177 });
178
179 let timeline = timeline_pdus
180 .iter()
181 .stream()
182 .filter_map(|item| ignored_filter(services, item.clone(), sender_user))
183 .map(at!(1))
184 .wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
185 .wide_then(|pdu| {
186 services
187 .pdu_metadata
188 .bundle_aggregations(sender_user, pdu)
189 })
190 .map(Event::into_format)
191 .collect();
192
193 let meta = room_meta_future(services, sender_user, room_id);
194 let events = join4(timeline, num_live, required_state, invite_state);
195 let member_counts = member_counts_future(services, room_id);
196 let notification_counts = notification_counts_future(services, sender_user, room_id);
197 let (
198 (room_name, room_avatar, is_dm),
199 (timeline, num_live, required_state, invite_state),
200 (joined_count, invited_count),
201 (highlight_count, notification_count, _last_notification_read, thread_counts),
202 ) = join4(meta, events, member_counts, notification_counts)
203 .boxed()
204 .await;
205
206 let (heroes, heroes_name, heroes_avatar) = resolve_heroes(
207 services,
208 sender_user,
209 room_id,
210 room_name.as_ref(),
211 room_avatar.as_deref(),
212 )
213 .await;
214
215 Ok(response::Room {
216 initial: roomsince.eq(&0).then_some(true),
217 lists: lists.clone(),
218 membership: membership.clone(),
219 name: room_name.or(heroes_name),
220 avatar: JsOption::from_option(room_avatar.or(heroes_avatar)),
221 is_dm,
222 heroes,
223 required_state,
224 invite_state: invite_state.flatten(),
225 prev_batch: prev_batch.as_deref().map(Into::into),
226 num_live: num_live.flatten(),
227 limited,
228 timeline,
229 bump_stamp,
230 joined_count,
231 invited_count,
232 unread_notifications: merge_unread_notifications(
233 highlight_count,
234 notification_count,
235 &thread_counts,
236 ),
237 })
238}
239
240async fn leave_or_ban_response(
241 SyncInfo { services, sender_user, .. }: SyncInfo<'_>,
242 conn: &Connection,
243 WindowRoom { lists, membership, room_id, .. }: &WindowRoom,
244 roomsince: u64,
245) -> Result<response::Room> {
246 let member_event = services
247 .state_accessor
248 .room_state_get(room_id, &StateEventType::RoomMember, sender_user.as_str())
249 .map_ok(Event::into_format)
250 .await?;
251
252 Ok(response::Room {
253 initial: roomsince.eq(&0).then_some(true),
254 lists: lists.clone(),
255 membership: membership.clone(),
256 prev_batch: Some(conn.next_batch.to_string().into()),
257 limited: true,
258 required_state: vec![member_event],
259 ..Default::default()
260 })
261}
262
263fn merged_room_details(
264 conn: &Connection,
265 lists: &ListIds,
266 room_id: &RoomId,
267) -> (usize, HashSet<(StateEventType, StateKey)>) {
268 lists
269 .iter()
270 .filter_map(|list_id| conn.lists.get(list_id))
271 .map(|list| &list.room_details)
272 .chain(conn.subscriptions.get(room_id))
273 .fold((0_usize, HashSet::new()), |(timeline_limit, mut required_state), config| {
274 required_state.extend(config.required_state.clone());
275 (timeline_limit.max(usize_from_ruma(config.timeline_limit)), required_state)
276 })
277}
278
279async fn resolve_heroes(
280 services: &Services,
281 sender_user: &UserId,
282 room_id: &RoomId,
283 room_name: Option<&DisplayName>,
284 room_avatar: Option<&MxcUri>,
285) -> (Option<Heroes>, Option<DisplayName>, Option<OwnedMxcUri>) {
286 services
287 .config
288 .calculate_heroes
289 .then_async(|| calculate_heroes(services, sender_user, room_id, room_name, room_avatar))
290 .await
291 .unwrap_or_default()
292}
293
294fn room_meta_future<'a>(
295 services: &'a Services,
296 sender_user: &'a UserId,
297 room_id: &'a RoomId,
298) -> impl Future<Output = (Option<DisplayName>, Option<OwnedMxcUri>, Option<bool>)> + Send + 'a {
299 let room_name = services
300 .state_accessor
301 .get_name(room_id)
302 .map_ok(Into::into)
303 .map(Result::ok);
304
305 let room_avatar = services
306 .state_accessor
307 .get_avatar(room_id)
308 .map_ok(|content| content.url)
309 .ok()
310 .map(Option::flatten);
311
312 let is_dm = services
313 .state_accessor
314 .is_direct(room_id, sender_user)
315 .map(|is_dm| is_dm.then_some(is_dm));
316
317 join3(room_name, room_avatar, is_dm)
318}
319
320fn member_counts_future<'a>(
321 services: &'a Services,
322 room_id: &'a RoomId,
323) -> impl Future<Output = (Option<UInt>, Option<UInt>)> + Send + 'a {
324 let joined_count = services
325 .state_cache
326 .room_joined_count(room_id)
327 .map_ok(TryInto::try_into)
328 .map_ok(Result::ok)
329 .map(FlatOk::flat_ok);
330
331 let invited_count = services
332 .state_cache
333 .room_invited_count(room_id)
334 .map_ok(TryInto::try_into)
335 .map_ok(Result::ok)
336 .map(FlatOk::flat_ok);
337
338 join(joined_count, invited_count)
339}
340
341fn notification_counts_future<'a>(
342 services: &'a Services,
343 sender_user: &'a UserId,
344 room_id: &'a RoomId,
345) -> impl Future<Output = (Option<UInt>, Option<UInt>, Result<u64>, ThreadCounts)> + Send + 'a {
346 let highlight_count = services
347 .pusher
348 .highlight_count(sender_user, room_id)
349 .map(TryInto::try_into)
350 .map(Result::ok);
351
352 let notification_count = services
353 .pusher
354 .notification_count(sender_user, room_id)
355 .map(TryInto::try_into)
356 .map(Result::ok);
357
358 let last_read_count = services
359 .pusher
360 .last_notification_read(sender_user, room_id);
361
362 let thread_counts = services
363 .pusher
364 .thread_notification_counts(sender_user, room_id);
365
366 join4(highlight_count, notification_count, last_read_count, thread_counts)
367}
368
369fn merge_unread_notifications(
371 highlight_count: Option<UInt>,
372 notification_count: Option<UInt>,
373 thread_counts: &ThreadCounts,
374) -> UnreadNotificationsCount {
375 let (thread_notifications, thread_highlights) = thread_counts
376 .values()
377 .fold((0_u64, 0_u64), |(n, h), &(notifs, hl)| {
378 (n.saturating_add(notifs), h.saturating_add(hl))
379 });
380
381 let merge = |total: u64| {
382 move |count: UInt| count.saturating_add(UInt::try_from(total).unwrap_or_default())
383 };
384
385 UnreadNotificationsCount {
386 highlight_count: highlight_count.map(merge(thread_highlights)),
387 notification_count: notification_count.map(merge(thread_notifications)),
388 }
389}
390
391async fn collect_required_state(
392 services: &Services,
393 sender_user: &UserId,
394 room_id: &RoomId,
395 required_state: &[(StateEventType, StateKey)],
396 timeline_pdus: &[(PduCount, PduEvent)],
397 encrypted: bool,
398) -> Vec<Raw<AnySyncStateEvent>> {
399 let lazy = required_state
400 .iter()
401 .any(is_equal_to!(&(StateEventType::RoomMember, "$LAZY".into())));
402
403 let timeline_senders = timeline_pdus
404 .iter()
405 .filter(|_| lazy)
406 .map(ref_at!(1))
407 .map(Event::sender)
408 .map(UserId::as_str);
409
410 let timeline_member_targets = timeline_pdus
411 .iter()
412 .filter(|_| lazy)
413 .map(ref_at!(1))
414 .filter(|event| *event.event_type() == TimelineEventType::RoomMember)
415 .filter_map(Event::state_key);
416
417 let timeline_senders = timeline_senders
418 .chain(timeline_member_targets)
419 .sorted_unstable()
420 .dedup()
421 .map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender)))
422 .collect::<Vec<_>>();
423
424 let wildcard_types: Vec<StateEventType> = required_state
425 .iter()
426 .filter(|(_, state_key)| state_key == "*")
427 .map(|(event_type, _)| event_type.clone())
428 .collect();
429
430 let wildcard_state: Vec<(StateEventType, StateKey)> = wildcard_types
431 .into_iter()
432 .stream()
433 .broad_then(|event_type| wildcard_state_keys(services, room_id, event_type))
434 .concat()
435 .await;
436
437 required_state
438 .iter()
439 .cloned()
440 .stream()
441 .chain(wildcard_state.into_iter().stream())
442 .chain(timeline_senders.into_iter().stream())
443 .broad_filter_map(async |state| {
444 let state_key: StateKey = match state.1.as_str() {
445 | "$LAZY" | "*" => return None,
446 | "$ME" => sender_user.as_str().into(),
447 | _ => state.1.clone(),
448 };
449
450 let mut pdu = services
451 .state_accessor
452 .room_state_get(room_id, &state.0, &state_key)
453 .map_ok(Event::into_pdu)
454 .ok()
455 .await?;
456
457 annotate_membership(services, &mut pdu, sender_user, encrypted).await;
458
459 Some(Event::into_format(pdu))
460 })
461 .collect()
462 .await
463}
464
465async fn wildcard_state_keys(
466 services: &Services,
467 room_id: &RoomId,
468 event_type: StateEventType,
469) -> Vec<(StateEventType, StateKey)> {
470 services
471 .state_accessor
472 .room_state_keys(room_id, &event_type)
473 .ready_filter_map(Result::ok)
474 .map(|state_key| (event_type.clone(), state_key))
475 .collect()
476 .await
477}