1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 time::Duration,
4};
5
6use axum::extract::State;
7use futures::{
8 FutureExt, StreamExt, TryFutureExt,
9 future::{join, join3, join4, join5},
10 pin_mut,
11};
12use ruma::{
13 DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
14 api::client::{
15 filter::FilterDefinition,
16 sync::sync_events::{
17 self, DeviceLists, UnreadNotificationsCount,
18 v3::{
19 Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
20 KnockState, KnockedRoom, LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms,
21 State as RoomState, StateEvents, Timeline, ToDevice,
22 },
23 },
24 },
25 events::{
26 AnyGlobalAccountDataEvent, AnyRawAccountDataEvent, AnyRoomAccountDataEvent,
27 AnySyncEphemeralRoomEvent, AnySyncStateEvent, StateEventType, SyncEphemeralRoomEvent,
28 TimelineEventType::*,
29 presence::{PresenceEvent, PresenceEventContent},
30 room::member::{MembershipState, RoomMemberEventContent},
31 typing::TypingEventContent,
32 },
33 serde::Raw,
34 uint,
35};
36use tokio::time;
37use tuwunel_core::{
38 Result, at,
39 debug::INFO_SPAN_LEVEL,
40 debug_error, err,
41 error::{inspect_debug_log, inspect_log},
42 extract_variant, is_equal_to, is_false, is_true,
43 matrix::{
44 Event,
45 event::{Matches, trim_event_fields},
46 pdu::{EventHash, PduCount, PduEvent},
47 },
48 pair_of, ref_at,
49 result::FlatOk,
50 trace,
51 utils::{
52 self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
53 future::{OptionStream, ReadyBoolExt},
54 math::ruma_from_u64,
55 option::OptionExt,
56 result::MapExpect,
57 stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
58 },
59};
60use tuwunel_service::{
61 Services,
62 rooms::{
63 lazy_loading,
64 lazy_loading::{Options, Witness},
65 read_receipt::PrivateReadEvents,
66 short::{ShortEventId, ShortStateHash, ShortStateKey},
67 },
68};
69
70use super::{load_timeline, share_encrypted_room};
71use crate::{
72 ClientIp, Ruma,
73 client::{ignored_filter, is_empty_account_data_event, with_membership},
74};
75
76#[derive(Clone, Copy, Debug)]
78enum StateAfter {
79 Off,
80 Stable,
81 Unstable,
82}
83
84impl StateAfter {
85 fn requested(self) -> bool { !matches!(self, Self::Off) }
86
87 fn wrap(self, events: StateEvents) -> RoomState {
88 match self {
89 | Self::Off => RoomState::Before(events),
90 | Self::Stable => RoomState::After(events),
91 | Self::Unstable => RoomState::AfterUnstable(events),
92 }
93 }
94}
95
96impl From<(bool, bool)> for StateAfter {
97 fn from((stable, unstable): (bool, bool)) -> Self {
98 match (stable, unstable) {
100 | (_, true) => Self::Unstable,
101 | (true, _) => Self::Stable,
102 | _ => Self::Off,
103 }
104 }
105}
106
107#[derive(Default)]
108struct StateChanges {
109 heroes: Option<Vec<OwnedUserId>>,
110 joined_member_count: Option<u64>,
111 invited_member_count: Option<u64>,
112 state_events: Vec<PduEvent>,
113}
114
115type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
116
117struct RoomMetadata {
118 since_shortstatehash: Option<ShortStateHash>,
119 horizon_shortstatehash: Option<ShortStateHash>,
120 after_shortstatehash: Option<ShortStateHash>,
121 current_shortstatehash: Option<ShortStateHash>,
122 receipt_events: Vec<(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)>,
123 encrypted_room: Option<bool>,
124}
125
126struct UserMetadata {
127 witness: Option<Witness>,
128 #[expect(clippy::option_option)]
129 last_notification_read: Option<Option<u64>>,
130 thread_last_reads: Option<BTreeMap<OwnedEventId, u64>>,
131 last_privateread_update: u64,
132 joined_since_last_sync: bool,
133}
134
135struct NotificationGates<F> {
136 send_notification_counts: bool,
137 send_notification_count_filter: F,
138}
139
140struct BuildJoinedRoom {
141 receipt_events: Vec<(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)>,
142 typing_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
143 private_read_events: Option<PrivateReadEvents>,
144 state_events: Vec<Raw<AnySyncStateEvent>>,
145 account_data_events: Vec<Raw<AnyRoomAccountDataEvent>>,
146 room_events: Vec<PduEvent>,
147 heroes: Option<Vec<OwnedUserId>>,
148 joined_member_count: Option<u64>,
149 invited_member_count: Option<u64>,
150 unread_notifications: UnreadNotificationsCount,
151 unread_thread_notifications: BTreeMap<OwnedEventId, UnreadNotificationsCount>,
152 state_after: StateAfter,
153 limited: bool,
154 joined_since_last_sync: bool,
155 prev_batch: Option<PduCount>,
156}
157
158#[tracing::instrument(
194 name = "sync",
195 level = "debug",
196 skip_all,
197 fields(
198 user_id = %body.sender_user(),
199 device_id = %body.sender_device.as_deref().map_or("<no device>", |x| x.as_str()),
200 )
201)]
202pub(crate) async fn sync_events_route(
203 State(services): State<crate::State>,
204 ClientIp(client): ClientIp,
205 body: Ruma<sync_events::v3::Request>,
206) -> Result<sync_events::v3::Response> {
207 let sender_user = body.sender_user();
208 let sender_device = body.sender_device.as_deref();
209
210 let filter = body
211 .body
212 .filter
213 .as_ref()
214 .map_async(async |filter| match filter {
215 | Filter::FilterDefinition(filter) => filter.clone(),
216 | Filter::FilterId(filter_id) => services
217 .users
218 .get_filter(sender_user, filter_id)
219 .await
220 .unwrap_or_default(),
221 });
222
223 let filter = filter.map(Option::unwrap_or_default);
224 let full_state = body.body.full_state;
225 let set_presence = &body.body.set_presence;
226 let state_after =
227 StateAfter::from((body.body.use_state_after, body.body.use_state_after_unstable));
228
229 let ping_presence = services
230 .presence
231 .maybe_ping_presence(
232 sender_user,
233 body.sender_device.as_deref(),
234 Some(client),
235 set_presence,
236 )
237 .inspect_err(inspect_log)
238 .ok();
239
240 let note_sync = services.presence.note_sync(sender_user);
242
243 let (filter, ..) = join3(filter, ping_presence, note_sync).await;
244
245 let mut since = body
246 .body
247 .since
248 .as_deref()
249 .map(str::parse)
250 .flat_ok()
251 .unwrap_or(0);
252
253 let timeout = body
254 .body
255 .timeout
256 .as_ref()
257 .map(Duration::as_millis)
258 .map(TryInto::try_into)
259 .flat_ok()
260 .unwrap_or(services.config.client_sync_timeout_default)
261 .max(services.config.client_sync_timeout_min)
262 .min(services.config.client_sync_timeout_max);
263
264 let stop_at = time::Instant::now()
265 .checked_add(Duration::from_millis(timeout))
266 .expect("configuration must limit maximum timeout");
267
268 loop {
269 let watch_rooms = services
270 .state_cache
271 .rooms_joined(sender_user)
272 .chain(services.state_cache.rooms_invited(sender_user));
273
274 let watchers = services
275 .sync
276 .watch(sender_user, sender_device, watch_rooms)
277 .await;
278
279 let next_batch = services.globals.wait_pending().await?;
280 if since > next_batch {
281 debug_error!(since, next_batch, "received since > next_batch, clamping");
282 since = next_batch;
283 }
284
285 if since < next_batch || full_state {
286 let response = build_sync_events(
287 &services,
288 sender_user,
289 sender_device,
290 since,
291 next_batch,
292 full_state,
293 state_after,
294 &filter,
295 )
296 .await?;
297
298 let empty = response.rooms.is_empty()
299 && response.presence.is_empty()
300 && response.account_data.is_empty()
301 && response.device_lists.is_empty()
302 && response.to_device.is_empty();
303
304 if !empty || full_state {
305 return Ok(response);
306 }
307 }
308
309 if time::timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() {
311 let response =
312 build_empty_response(&services, sender_user, sender_device, next_batch).await;
313
314 trace!(since, next_batch, "empty response");
315 return Ok(response);
316 }
317
318 trace!(
319 since,
320 last_batch = ?next_batch,
321 count = ?services.globals.pending_count(),
322 stop_at = ?stop_at,
323 "notified by watcher"
324 );
325
326 since = next_batch;
327 }
328}
329
330async fn build_empty_response(
331 services: &Services,
332 sender_user: &UserId,
333 sender_device: Option<&DeviceId>,
334 next_batch: u64,
335) -> sync_events::v3::Response {
336 let device_one_time_keys_count = sender_device.map_async(|sender_device| {
337 services
338 .users
339 .count_one_time_keys(sender_user, sender_device)
340 });
341
342 let device_unused_fallback_key_types = sender_device.map_async(|sender_device| {
343 services
344 .users
345 .unused_fallback_key_algorithms(sender_user, sender_device)
346 .collect::<Vec<_>>()
347 });
348
349 let (device_one_time_keys_count, device_unused_fallback_key_types) =
350 join(device_one_time_keys_count, device_unused_fallback_key_types).await;
351
352 sync_events::v3::Response {
353 device_one_time_keys_count: device_one_time_keys_count.unwrap_or_default(),
354 device_unused_fallback_key_types,
355 ..sync_events::v3::Response::new(next_batch.to_string())
356 }
357}
358
359#[tracing::instrument(
360 name = "build",
361 level = INFO_SPAN_LEVEL,
362 skip_all,
363 fields(
364 %since,
365 %next_batch,
366 count = ?services.globals.pending_count(),
367 )
368)]
369#[expect(clippy::too_many_arguments)]
370async fn build_sync_events(
371 services: &Services,
372 sender_user: &UserId,
373 sender_device: Option<&DeviceId>,
374 since: u64,
375 next_batch: u64,
376 full_state: bool,
377 state_after: StateAfter,
378 filter: &FilterDefinition,
379) -> Result<sync_events::v3::Response> {
380 let invites_blocked = services.users.invites_blocked(sender_user).await;
383
384 let joined_rooms = collect_joined_rooms(
385 services,
386 sender_user,
387 sender_device,
388 since,
389 next_batch,
390 full_state,
391 state_after,
392 filter,
393 );
394
395 let left_rooms = collect_left_rooms(
396 services,
397 sender_user,
398 since,
399 next_batch,
400 full_state,
401 state_after,
402 filter,
403 );
404
405 let invited_rooms =
406 collect_invited_rooms(services, sender_user, since, next_batch, filter, invites_blocked);
407
408 let knocked_rooms = collect_knocked_rooms(services, sender_user, since, next_batch, filter);
409
410 let presence_updates = services
411 .config
412 .allow_local_presence
413 .then_async(|| {
414 process_presence_updates(services, since, next_batch, sender_user, filter)
415 });
416
417 let account_data = collect_global_account_data(services, sender_user, since, next_batch);
418
419 let keys_changed = services
420 .users
421 .keys_changed(sender_user, since, Some(next_batch))
422 .map(ToOwned::to_owned)
423 .collect::<HashSet<_>>();
424
425 let to_device_events = sender_device.map_async(|sender_device| {
426 services
427 .users
428 .get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch))
429 .map(at!(1))
430 .collect::<Vec<_>>()
431 });
432
433 let device_one_time_keys_count = sender_device.map_async(|sender_device| {
434 services
435 .users
436 .count_one_time_keys(sender_user, sender_device)
437 });
438
439 let device_unused_fallback_key_types = sender_device.map_async(|sender_device| {
440 services
441 .users
442 .unused_fallback_key_algorithms(sender_user, sender_device)
443 .collect::<Vec<_>>()
444 });
445
446 let remove_to_device_events = sender_device.map_async(|sender_device| {
448 services
449 .users
450 .remove_to_device_events(sender_user, sender_device, since)
451 });
452
453 let (
454 account_data,
455 keys_changed,
456 presence_updates,
457 (_, to_device_events, device_one_time_keys_count, device_unused_fallback_key_types),
458 (
459 (joined_rooms, mut device_list_updates, left_encrypted_users),
460 left_rooms,
461 invited_rooms,
462 knocked_rooms,
463 ),
464 ) = join5(
465 account_data,
466 keys_changed,
467 presence_updates,
468 join4(
469 remove_to_device_events,
470 to_device_events,
471 device_one_time_keys_count,
472 device_unused_fallback_key_types,
473 ),
474 join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms),
475 )
476 .boxed()
477 .await;
478
479 device_list_updates.extend(keys_changed);
480
481 let device_list_left =
482 collect_device_list_left(services, sender_user, left_encrypted_users).await;
483
484 let presence_events = build_presence_events(presence_updates);
485
486 Ok(sync_events::v3::Response {
487 account_data: GlobalAccountData { events: account_data },
488 device_lists: DeviceLists {
489 left: device_list_left,
490 changed: device_list_updates.into_iter().collect(),
491 },
492 device_one_time_keys_count: device_one_time_keys_count.unwrap_or_default(),
493 device_unused_fallback_key_types,
494 next_batch: next_batch.to_string(),
495 presence: Presence { events: presence_events },
496 rooms: Rooms {
497 leave: left_rooms,
498 join: joined_rooms,
499 invite: invited_rooms,
500 knock: knocked_rooms,
501 },
502 to_device: ToDevice {
503 events: to_device_events.unwrap_or_default(),
504 },
505 })
506}
507
508#[expect(clippy::too_many_arguments)]
509fn collect_joined_rooms<'a>(
510 services: &'a Services,
511 sender_user: &'a UserId,
512 sender_device: Option<&'a DeviceId>,
513 since: u64,
514 next_batch: u64,
515 full_state: bool,
516 state_after: StateAfter,
517 filter: &'a FilterDefinition,
518) -> impl Future<
519 Output = (BTreeMap<OwnedRoomId, JoinedRoom>, HashSet<OwnedUserId>, HashSet<OwnedUserId>),
520> + Send
521+ 'a {
522 services
523 .state_cache
524 .rooms_joined(sender_user)
525 .ready_filter(|&room_id| filter.room.matches(room_id))
526 .map(ToOwned::to_owned)
527 .broad_filter_map(move |room_id| {
528 load_joined_room(
529 services,
530 sender_user,
531 sender_device,
532 room_id.clone(),
533 since,
534 next_batch,
535 full_state,
536 state_after,
537 filter,
538 )
539 .map_ok(move |(joined_room, dlu, jeu)| (room_id, joined_room, dlu, jeu))
540 .ok()
541 })
542 .ready_fold(
543 (BTreeMap::new(), HashSet::new(), HashSet::new()),
544 |(mut joined_rooms, mut device_list_updates, mut left_encrypted_users),
545 (room_id, joined_room, dlu, leu)| {
546 device_list_updates.extend(dlu);
547 left_encrypted_users.extend(leu);
548 if !joined_room.is_empty() {
549 joined_rooms.insert(room_id, joined_room);
550 }
551
552 (joined_rooms, device_list_updates, left_encrypted_users)
553 },
554 )
555}
556
557fn collect_left_rooms<'a>(
558 services: &'a Services,
559 sender_user: &'a UserId,
560 since: u64,
561 next_batch: u64,
562 full_state: bool,
563 state_after: StateAfter,
564 filter: &'a FilterDefinition,
565) -> impl Future<Output = BTreeMap<OwnedRoomId, LeftRoom>> + Send + 'a {
566 services
567 .state_cache
568 .rooms_left_state(sender_user)
569 .ready_filter(|(room_id, _)| filter.room.matches(room_id))
570 .broad_filter_map(move |(room_id, _)| {
571 handle_left_room(
572 services,
573 since,
574 room_id.clone(),
575 sender_user,
576 next_batch,
577 full_state,
578 state_after,
579 filter,
580 )
581 .map_ok(move |left_room| (room_id, left_room))
582 .ok()
583 })
584 .ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room)))
585 .collect()
586}
587
588async fn collect_invited_rooms<'a>(
589 services: &'a Services,
590 sender_user: &'a UserId,
591 since: u64,
592 next_batch: u64,
593 filter: &'a FilterDefinition,
594 invites_blocked: bool,
595) -> BTreeMap<OwnedRoomId, InvitedRoom> {
596 services
597 .state_cache
598 .rooms_invited_state(sender_user)
599 .ready_filter(move |_| !invites_blocked)
600 .ready_filter(|(room_id, _)| filter.room.matches(room_id))
601 .fold_default(async |mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| {
602 let invite_count = services
603 .state_cache
604 .get_invite_count(&room_id, sender_user)
605 .await
606 .ok();
607
608 if Some(since) >= invite_count || Some(next_batch) < invite_count {
610 return invited_rooms;
611 }
612
613 let invited_room = InvitedRoom {
614 invite_state: InviteState { events: invite_state },
615 };
616
617 invited_rooms.insert(room_id, invited_room);
618 invited_rooms
619 })
620 .await
621}
622
623async fn collect_knocked_rooms<'a>(
624 services: &'a Services,
625 sender_user: &'a UserId,
626 since: u64,
627 next_batch: u64,
628 filter: &'a FilterDefinition,
629) -> BTreeMap<OwnedRoomId, KnockedRoom> {
630 services
631 .state_cache
632 .rooms_knocked_state(sender_user)
633 .ready_filter(|(room_id, _)| filter.room.matches(room_id))
634 .fold_default(async |mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| {
635 let knock_count = services
636 .state_cache
637 .get_knock_count(&room_id, sender_user)
638 .await
639 .ok();
640
641 if Some(since) >= knock_count || Some(next_batch) < knock_count {
643 return knocked_rooms;
644 }
645
646 let knocked_room = KnockedRoom {
647 knock_state: KnockState { events: knock_state },
648 };
649
650 knocked_rooms.insert(room_id, knocked_room);
651 knocked_rooms
652 })
653 .await
654}
655
656fn collect_global_account_data<'a>(
657 services: &'a Services,
658 sender_user: &'a UserId,
659 since: u64,
660 next_batch: u64,
661) -> impl Future<Output = Vec<Raw<AnyGlobalAccountDataEvent>>> + Send + 'a {
662 services
663 .account_data
664 .changes_since(None, sender_user, since, Some(next_batch))
665 .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
666 .ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
667 .collect()
668}
669
670fn collect_device_list_left<'a>(
671 services: &'a Services,
672 sender_user: &'a UserId,
673 left_encrypted_users: HashSet<OwnedUserId>,
674) -> impl Future<Output = Vec<OwnedUserId>> + Send + 'a {
675 left_encrypted_users
676 .into_iter()
677 .stream()
678 .broad_filter_map(async |user_id: OwnedUserId| {
679 share_encrypted_room(services, sender_user, &user_id, None)
680 .await
681 .eq(&false)
682 .then_some(user_id)
683 })
684 .collect()
685}
686
687fn build_presence_events(presence_updates: Option<PresenceUpdates>) -> Vec<Raw<PresenceEvent>> {
688 presence_updates
689 .into_iter()
690 .flat_map(IntoIterator::into_iter)
691 .map(|(sender, content)| PresenceEvent { content, sender })
692 .map(|ref event| Raw::new(event))
693 .filter_map(Result::ok)
694 .collect()
695}
696
697#[tracing::instrument(name = "presence", level = "debug", skip_all)]
698async fn process_presence_updates(
699 services: &Services,
700 since: u64,
701 next_batch: u64,
702 syncing_user: &UserId,
703 filter: &FilterDefinition,
704) -> PresenceUpdates {
705 services
706 .presence
707 .presence_since(since, Some(next_batch))
708 .ready_filter(|(user_id, ..)| filter.presence.matches(user_id))
709 .filter(|(user_id, ..)| {
710 services
711 .state_cache
712 .user_sees_user(syncing_user, user_id)
713 })
714 .filter_map(|(user_id, _, presence_bytes)| {
715 services
716 .presence
717 .from_json_bytes_to_event(presence_bytes, user_id)
718 .map_ok(move |event| (user_id, event))
719 .ok()
720 })
721 .map(|(user_id, event)| (user_id.to_owned(), event.content))
722 .collect()
723 .boxed()
724 .await
725}
726
727#[tracing::instrument(
728 name = "left",
729 level = "debug",
730 skip_all,
731 fields(
732 room_id = %room_id,
733 full = %full_state,
734 ),
735)]
736#[expect(clippy::too_many_arguments)]
737async fn handle_left_room(
738 services: &Services,
739 since: u64,
740 ref room_id: OwnedRoomId,
741 sender_user: &UserId,
742 next_batch: u64,
743 full_state: bool,
744 state_after: StateAfter,
745 filter: &FilterDefinition,
746) -> Result<Option<LeftRoom>> {
747 let left_count = services
748 .state_cache
749 .get_left_count(room_id, sender_user)
750 .await
751 .unwrap_or(0);
752
753 if left_count == 0 || left_count > next_batch {
754 return Ok(None);
755 }
756
757 let include_leave = filter.room.include_leave;
758 if since == 0 && !include_leave {
759 return Ok(None);
760 }
761
762 if since != 0 && left_count <= since {
765 return Ok(None);
766 }
767
768 let is_not_found = services.metadata.exists(room_id).is_false();
769
770 let is_disabled = services.metadata.is_disabled(room_id);
771
772 let is_banned = services.metadata.is_banned(room_id);
773
774 pin_mut!(is_not_found, is_disabled, is_banned);
775 if is_not_found.or(is_disabled).or(is_banned).await {
776 let event = PduEvent {
779 event_id: EventId::new_v1(services.globals.server_name()),
780 origin_server_ts: utils::millis_since_unix_epoch().try_into()?,
781 kind: RoomMember,
782 state_key: Some(sender_user.as_str().into()),
783 sender: sender_user.to_owned(),
784 content: serde_json::from_str(r#"{"membership":"leave"}"#)?,
785 room_id: room_id.clone(),
787 depth: uint!(1),
788 origin: None,
789 unsigned: None,
790 redacts: None,
791 hashes: EventHash::default(),
792 auth_events: Default::default(),
793 prev_events: Default::default(),
794 };
795
796 let state = state_after.wrap(StateEvents {
797 events: vec![trim_event_fields(event.into_format(), filter.event_fields.as_deref())],
798 });
799
800 return Ok(Some(LeftRoom {
801 account_data: RoomAccountData::default(),
802 state,
803 timeline: Timeline {
804 limited: false,
805 events: Default::default(),
806 prev_batch: Some(left_count.to_string()),
807 },
808 }));
809 }
810
811 load_left_room(
812 services,
813 sender_user,
814 room_id,
815 since,
816 left_count,
817 full_state,
818 state_after,
819 filter,
820 )
821 .await
822}
823
824#[tracing::instrument(name = "load", level = "debug", skip_all)]
825#[expect(clippy::too_many_arguments)]
826async fn load_left_room(
827 services: &Services,
828 sender_user: &UserId,
829 room_id: &RoomId,
830 since: u64,
831 left_count: u64,
832 full_state: bool,
833 state_after: StateAfter,
834 filter: &FilterDefinition,
835) -> Result<Option<LeftRoom>> {
836 let initial = since == 0;
837 let timeline_limit: usize = filter
838 .room
839 .timeline
840 .limit
841 .map(TryInto::try_into)
842 .map_expect("UInt to usize")
843 .unwrap_or(10)
844 .min(100);
845
846 let (timeline_pdus, limited, _) = load_timeline(
847 services,
848 sender_user,
849 room_id,
850 PduCount::Normal(since),
851 Some(PduCount::Normal(left_count)),
852 timeline_limit.max(1),
853 )
854 .await
855 .unwrap_or_default();
856
857 let since_shortstatehash = services
858 .timeline
859 .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
860 .ok();
861
862 let horizon_shortstatehash = timeline_pdus
863 .first()
864 .map(at!(0))
865 .map_async(|count| {
866 services
867 .timeline
868 .get_shortstatehash(room_id, count)
869 .inspect_err(inspect_debug_log)
870 .ok()
871 });
872
873 let after_shortstatehash = state_after.requested().then_async(|| {
878 services
879 .timeline
880 .next_shortstatehash(room_id, PduCount::Normal(left_count))
881 .or_else(|_| services.state.get_room_shortstatehash(room_id))
882 .inspect_err(inspect_debug_log)
883 });
884
885 let left_shortstatehash = services
886 .timeline
887 .get_shortstatehash(room_id, PduCount::Normal(left_count))
888 .inspect_err(inspect_debug_log)
889 .or_else(|_| services.state.get_room_shortstatehash(room_id))
890 .map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
891
892 let (since_shortstatehash, horizon_shortstatehash, after_shortstatehash, left_shortstatehash) =
893 join4(
894 since_shortstatehash,
895 horizon_shortstatehash,
896 after_shortstatehash,
897 left_shortstatehash,
898 )
899 .boxed()
900 .await;
901
902 let StateChanges { state_events, .. } = calculate_state_changes(
903 services,
904 sender_user,
905 room_id,
906 full_state || initial,
907 state_after,
908 since_shortstatehash,
909 horizon_shortstatehash.flatten(),
910 after_shortstatehash.flat_ok(),
911 left_shortstatehash?,
912 false,
913 None,
914 )
915 .boxed()
916 .await?;
917
918 let is_sender_membership = |event: &PduEvent| {
919 *event.kind() == RoomMember && event.state_key() == Some(sender_user.as_str())
920 };
921
922 let timeline_sender_member = timeline_limit
923 .eq(&0)
924 .then(|| timeline_pdus.last().map(ref_at!(1)).cloned())
925 .into_iter()
926 .flat_map(Option::into_iter);
927
928 let encrypted = services
929 .state_accessor
930 .is_encrypted_room(room_id)
931 .await;
932
933 let event_fields = filter.event_fields.as_deref();
934
935 let state_events = state_events
936 .into_iter()
937 .filter(|pdu| filter.room.state.matches(pdu))
938 .filter(|pdu| timeline_limit > 0 || !is_sender_membership(pdu))
939 .chain(timeline_sender_member)
940 .stream()
941 .wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
942 .map(|pdu| trim_event_fields(pdu.into_format(), event_fields))
943 .collect();
944
945 let left_prev_batch = timeline_limit
946 .eq(&0)
947 .then_some(left_count)
948 .map(PduCount::Normal);
949
950 let prev_batch = timeline_pdus
951 .first()
952 .filter(|_| timeline_limit > 0)
953 .map(at!(0))
954 .or(left_prev_batch)
955 .as_ref()
956 .map(ToString::to_string);
957
958 let timeline_events = timeline_pdus
959 .into_iter()
960 .stream()
961 .wide_filter_map(|item| ignored_filter(services, item, sender_user))
962 .map(at!(1))
963 .ready_filter(|pdu| filter.room.timeline.matches(pdu))
964 .take(timeline_limit)
965 .wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
966 .wide_then(|pdu| {
967 services
968 .pdu_metadata
969 .bundle_aggregations(sender_user, pdu)
970 })
971 .collect::<Vec<_>>();
972
973 let account_data_events = services
974 .account_data
975 .changes_since(Some(room_id), sender_user, since, None)
976 .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
977 .ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
978 .collect();
979
980 let (state_events, account_data_events, timeline_events) =
981 join3(state_events, account_data_events, timeline_events)
982 .boxed()
983 .await;
984
985 let state = state_after.wrap(StateEvents { events: state_events });
986
987 Ok(Some(LeftRoom {
988 account_data: RoomAccountData { events: account_data_events },
989 state,
990 timeline: Timeline {
991 prev_batch,
992 limited: limited || timeline_limit == 0,
993 events: timeline_events
994 .into_iter()
995 .map(|pdu| trim_event_fields(pdu.into_format(), event_fields))
996 .collect(),
997 },
998 }))
999}
1000
1001#[tracing::instrument(
1002 name = "joined",
1003 level = "debug",
1004 skip_all,
1005 fields(
1006 room_id = ?room_id,
1007 ),
1008)]
1009#[expect(clippy::too_many_arguments)]
1010async fn load_joined_room(
1011 services: &Services,
1012 sender_user: &UserId,
1013 sender_device: Option<&DeviceId>,
1014 ref room_id: OwnedRoomId,
1015 since: u64,
1016 next_batch: u64,
1017 full_state: bool,
1018 state_after: StateAfter,
1019 filter: &FilterDefinition,
1020) -> Result<(JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>)> {
1021 let initial = since == 0;
1022 let (timeline_pdus, limited, last_timeline_count) =
1023 load_join_timeline(services, sender_user, room_id, since, next_batch, filter).await?;
1024
1025 let timeline_changed = last_timeline_count.into_unsigned() > since;
1026 debug_assert!(
1027 timeline_pdus.is_empty() || timeline_changed,
1028 "if timeline events, last_timeline_count must be in the since window."
1029 );
1030
1031 let RoomMetadata {
1032 since_shortstatehash,
1033 horizon_shortstatehash,
1034 after_shortstatehash,
1035 current_shortstatehash,
1036 receipt_events,
1037 encrypted_room,
1038 } = gather_room_metadata(
1039 services,
1040 sender_user,
1041 room_id,
1042 since,
1043 next_batch,
1044 &timeline_pdus,
1045 last_timeline_count,
1046 timeline_changed,
1047 state_after,
1048 )
1049 .boxed()
1050 .await?;
1051
1052 let UserMetadata {
1053 witness,
1054 last_notification_read,
1055 thread_last_reads,
1056 last_privateread_update,
1057 joined_since_last_sync,
1058 } = gather_user_metadata(
1059 services,
1060 sender_user,
1061 sender_device,
1062 room_id,
1063 filter,
1064 &timeline_pdus,
1065 &receipt_events,
1066 since,
1067 initial,
1068 timeline_changed,
1069 encrypted_room,
1070 since_shortstatehash,
1071 )
1072 .boxed()
1073 .await;
1074
1075 let StateChanges {
1076 heroes,
1077 joined_member_count,
1078 invited_member_count,
1079 mut state_events,
1080 } = compute_join_state_changes(
1081 services,
1082 sender_user,
1083 room_id,
1084 full_state || initial,
1085 state_after,
1086 since_shortstatehash,
1087 horizon_shortstatehash,
1088 after_shortstatehash,
1089 current_shortstatehash,
1090 joined_since_last_sync,
1091 witness.as_ref(),
1092 )
1093 .await?;
1094
1095 let joined_sender_member = take_sender_membership_for_join(
1096 &mut state_events,
1097 sender_user,
1098 joined_since_last_sync,
1099 timeline_pdus.is_empty(),
1100 initial,
1101 );
1102
1103 let prev_batch =
1104 compute_join_prev_batch(&timeline_pdus, joined_sender_member.as_ref(), since);
1105 let in_window = |count: u64| count > since && count <= next_batch;
1106
1107 let NotificationGates {
1108 send_notification_counts,
1109 send_notification_count_filter,
1110 } = compute_notification_gates(
1111 last_notification_read,
1112 thread_last_reads.as_ref(),
1113 since,
1114 in_window,
1115 );
1116
1117 let encrypted = encrypted_room.unwrap_or(false);
1119
1120 let aggregates = await_join_aggregates(
1121 services,
1122 sender_user,
1123 room_id,
1124 &state_events,
1125 timeline_pdus,
1126 joined_sender_member,
1127 encrypted,
1128 initial,
1129 since,
1130 next_batch,
1131 last_privateread_update,
1132 send_notification_counts,
1133 filter,
1134 )
1135 .await;
1136
1137 let (joined_room, device_list_updates, left_encrypted_users) = finalize_joined_room(
1138 services,
1139 sender_user,
1140 filter,
1141 state_events,
1142 aggregates,
1143 receipt_events,
1144 heroes,
1145 joined_member_count,
1146 invited_member_count,
1147 thread_last_reads.as_ref(),
1148 send_notification_count_filter,
1149 FinalizeJoinFlags {
1150 encrypted,
1151 full_state,
1152 state_after,
1153 limited,
1154 joined_since_last_sync,
1155 initial,
1156 },
1157 in_window,
1158 prev_batch,
1159 )
1160 .await;
1161
1162 Ok((joined_room, device_list_updates, left_encrypted_users))
1163}
1164
1165#[expect(clippy::too_many_arguments)]
1166async fn compute_join_state_changes(
1167 services: &Services,
1168 sender_user: &UserId,
1169 room_id: &RoomId,
1170 full_state: bool,
1171 state_after: StateAfter,
1172 since_shortstatehash: Option<ShortStateHash>,
1173 horizon_shortstatehash: Option<ShortStateHash>,
1174 after_shortstatehash: Option<ShortStateHash>,
1175 current_shortstatehash: Option<ShortStateHash>,
1176 joined_since_last_sync: bool,
1177 witness: Option<&Witness>,
1178) -> Result<StateChanges> {
1179 current_shortstatehash
1180 .map_async(|current_shortstatehash| {
1181 calculate_state_changes(
1182 services,
1183 sender_user,
1184 room_id,
1185 full_state,
1186 state_after,
1187 since_shortstatehash,
1188 horizon_shortstatehash,
1189 after_shortstatehash,
1190 current_shortstatehash,
1191 joined_since_last_sync,
1192 witness,
1193 )
1194 })
1195 .await
1196 .transpose()
1197 .map(Option::unwrap_or_default)
1198}
1199
1200fn compute_join_prev_batch(
1201 timeline_pdus: &[(PduCount, PduEvent)],
1202 joined_sender_member: Option<&PduEvent>,
1203 since: u64,
1204) -> Option<PduCount> {
1205 timeline_pdus.first().map(at!(0)).or_else(|| {
1206 joined_sender_member
1207 .is_some()
1208 .then_some(since)
1209 .map(Into::into)
1210 })
1211}
1212
1213#[expect(clippy::too_many_arguments)]
1214async fn assemble_join_state_events(
1215 services: &Services,
1216 state_events: Vec<PduEvent>,
1217 sender_user: &UserId,
1218 encrypted: bool,
1219 room_events: &[PduEvent],
1220 filter: &FilterDefinition,
1221 full_state: bool,
1222 state_after: StateAfter,
1223) -> Vec<Raw<AnySyncStateEvent>> {
1224 let is_in_timeline = |event: &PduEvent| {
1225 room_events
1226 .iter()
1227 .map(Event::event_id)
1228 .any(is_equal_to!(event.event_id()))
1229 };
1230
1231 let include_in_state = |event: &PduEvent| {
1235 let filter = &filter.room.state;
1236 filter.matches(event) && (full_state || state_after.requested() || !is_in_timeline(event))
1237 };
1238
1239 assemble_state_events(
1240 services,
1241 state_events,
1242 sender_user,
1243 encrypted,
1244 include_in_state,
1245 filter.event_fields.as_deref(),
1246 )
1247 .await
1248}
1249
1250async fn load_join_timeline(
1251 services: &Services,
1252 sender_user: &UserId,
1253 room_id: &RoomId,
1254 since: u64,
1255 next_batch: u64,
1256 filter: &FilterDefinition,
1257) -> Result<(Vec<(PduCount, PduEvent)>, bool, PduCount)> {
1258 let timeline_limit: usize = filter
1259 .room
1260 .timeline
1261 .limit
1262 .map(TryInto::try_into)
1263 .map_expect("UInt to usize")
1264 .unwrap_or(10)
1265 .min(100);
1266
1267 load_timeline(
1268 services,
1269 sender_user,
1270 room_id,
1271 PduCount::Normal(since),
1272 Some(PduCount::Normal(next_batch)),
1273 timeline_limit,
1274 )
1275 .await
1276}
1277
1278struct JoinAggregates {
1279 room_events: Vec<PduEvent>,
1280 account_data_events: Vec<Raw<AnyRoomAccountDataEvent>>,
1281 typing_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1282 private_read_events: Option<PrivateReadEvents>,
1283 notification_count: Option<UInt>,
1284 highlight_count: Option<UInt>,
1285 thread_counts: Option<BTreeMap<OwnedEventId, (u64, u64)>>,
1286 device_list_updates: HashSet<OwnedUserId>,
1287 left_encrypted_users: HashSet<OwnedUserId>,
1288}
1289
1290#[expect(clippy::too_many_arguments)]
1291async fn await_join_aggregates(
1292 services: &Services,
1293 sender_user: &UserId,
1294 room_id: &RoomId,
1295 state_events: &[PduEvent],
1296 timeline_pdus: Vec<(PduCount, PduEvent)>,
1297 joined_sender_member: Option<PduEvent>,
1298 encrypted: bool,
1299 initial: bool,
1300 since: u64,
1301 next_batch: u64,
1302 last_privateread_update: u64,
1303 send_notification_counts: bool,
1304 filter: &FilterDefinition,
1305) -> JoinAggregates {
1306 let (notification_count, highlight_count, thread_counts) =
1307 notification_count_futures(services, sender_user, room_id, send_notification_counts);
1308
1309 let private_read_events = last_privateread_update.gt(&since).then_async(|| {
1310 services
1311 .read_receipt
1312 .private_read_get(room_id, sender_user)
1313 .unwrap_or_default()
1314 });
1315
1316 let typing_events = gather_typing_events(services, room_id, sender_user, since);
1317
1318 let device_list_updates = gather_device_list_updates(
1319 services,
1320 sender_user,
1321 room_id,
1322 timeline_membership_changes(&timeline_pdus, initial),
1323 state_events,
1324 initial,
1325 since,
1326 next_batch,
1327 );
1328
1329 let room_events = collect_room_events(
1330 services,
1331 sender_user,
1332 timeline_pdus,
1333 joined_sender_member,
1334 encrypted,
1335 filter,
1336 );
1337
1338 let account_data_events = collect_room_account_data(services, sender_user, room_id, since);
1339
1340 let (
1341 (room_events, account_data_events),
1342 (typing_events, private_read_events),
1343 (notification_count, highlight_count, thread_counts),
1344 (device_list_updates, left_encrypted_users),
1345 ) = join4(
1346 join(room_events, account_data_events),
1347 join(typing_events, private_read_events),
1348 join3(notification_count, highlight_count, thread_counts),
1349 device_list_updates,
1350 )
1351 .boxed()
1352 .await;
1353
1354 JoinAggregates {
1355 room_events,
1356 account_data_events,
1357 typing_events,
1358 private_read_events,
1359 notification_count,
1360 highlight_count,
1361 thread_counts,
1362 device_list_updates,
1363 left_encrypted_users,
1364 }
1365}
1366
1367struct FinalizeJoinFlags {
1368 encrypted: bool,
1369 full_state: bool,
1370 state_after: StateAfter,
1371 limited: bool,
1372 joined_since_last_sync: bool,
1373 initial: bool,
1374}
1375
1376#[expect(clippy::too_many_arguments)]
1377async fn finalize_joined_room(
1378 services: &Services,
1379 sender_user: &UserId,
1380 filter: &FilterDefinition,
1381 state_events: Vec<PduEvent>,
1382 aggregates: JoinAggregates,
1383 receipt_events: Vec<(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)>,
1384 heroes: Option<Vec<OwnedUserId>>,
1385 joined_member_count: Option<u64>,
1386 invited_member_count: Option<u64>,
1387 thread_last_reads: Option<&BTreeMap<OwnedEventId, u64>>,
1388 send_notification_count_filter: impl Fn(&UInt) -> bool,
1389 flags: FinalizeJoinFlags,
1390 in_window: impl Fn(u64) -> bool,
1391 prev_batch: Option<PduCount>,
1392) -> (JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>) {
1393 let JoinAggregates {
1394 room_events,
1395 account_data_events,
1396 typing_events,
1397 private_read_events,
1398 notification_count,
1399 highlight_count,
1400 thread_counts,
1401 device_list_updates,
1402 left_encrypted_users,
1403 } = aggregates;
1404
1405 let FinalizeJoinFlags {
1406 encrypted,
1407 full_state,
1408 state_after,
1409 limited,
1410 joined_since_last_sync,
1411 initial,
1412 } = flags;
1413
1414 let state_events = assemble_join_state_events(
1415 services,
1416 state_events,
1417 sender_user,
1418 encrypted,
1419 &room_events,
1420 filter,
1421 full_state,
1422 state_after,
1423 )
1424 .await;
1425
1426 let (unread_notifications, unread_thread_notifications) = assemble_unread_notifications(
1427 notification_count,
1428 highlight_count,
1429 thread_counts,
1430 thread_last_reads,
1431 send_notification_count_filter,
1432 filter.room.timeline.unread_thread_notifications,
1433 initial,
1434 in_window,
1435 );
1436
1437 let joined_room = build_joined_room(
1438 BuildJoinedRoom {
1439 receipt_events,
1440 typing_events,
1441 private_read_events,
1442 state_events,
1443 account_data_events,
1444 room_events,
1445 heroes,
1446 joined_member_count,
1447 invited_member_count,
1448 unread_notifications,
1449 unread_thread_notifications,
1450 state_after,
1451 limited,
1452 joined_since_last_sync,
1453 prev_batch,
1454 },
1455 filter.event_fields.as_deref(),
1456 );
1457
1458 (joined_room, device_list_updates, left_encrypted_users)
1459}
1460
1461fn build_joined_room(args: BuildJoinedRoom, event_fields: Option<&[String]>) -> JoinedRoom {
1462 let BuildJoinedRoom {
1463 receipt_events,
1464 typing_events,
1465 private_read_events,
1466 state_events,
1467 account_data_events,
1468 room_events,
1469 heroes,
1470 joined_member_count,
1471 invited_member_count,
1472 unread_notifications,
1473 unread_thread_notifications,
1474 state_after,
1475 limited,
1476 joined_since_last_sync,
1477 prev_batch,
1478 } = args;
1479
1480 let edus: Vec<Raw<AnySyncEphemeralRoomEvent>> = receipt_events
1481 .into_iter()
1482 .map(at!(1))
1483 .chain(typing_events)
1484 .chain(private_read_events.into_iter().flatten())
1485 .collect();
1486
1487 let state = state_after.wrap(StateEvents { events: state_events });
1488
1489 let heroes = heroes
1490 .into_iter()
1491 .flatten()
1492 .map(TryInto::try_into)
1493 .filter_map(Result::ok)
1494 .collect();
1495
1496 JoinedRoom {
1497 account_data: RoomAccountData { events: account_data_events },
1498 ephemeral: Ephemeral { events: edus },
1499 state,
1500 summary: RoomSummary {
1501 joined_member_count: joined_member_count.map(ruma_from_u64),
1502 invited_member_count: invited_member_count.map(ruma_from_u64),
1503 heroes,
1504 },
1505 timeline: Timeline {
1506 limited: limited || joined_since_last_sync,
1507 prev_batch: prev_batch.as_ref().map(ToString::to_string),
1508 events: room_events
1509 .into_iter()
1510 .map(|pdu| trim_event_fields(pdu.into_format(), event_fields))
1511 .collect(),
1512 },
1513 unread_notifications,
1514 unread_thread_notifications,
1515 }
1516}
1517
1518#[expect(clippy::too_many_arguments)]
1519async fn gather_room_metadata(
1520 services: &Services,
1521 sender_user: &UserId,
1522 room_id: &RoomId,
1523 since: u64,
1524 next_batch: u64,
1525 timeline_pdus: &[(PduCount, PduEvent)],
1526 last_timeline_count: PduCount,
1527 timeline_changed: bool,
1528 state_after: StateAfter,
1529) -> Result<RoomMetadata> {
1530 let since_shortstatehash = timeline_changed.then_async(|| {
1531 services
1532 .timeline
1533 .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
1534 .ok()
1535 });
1536
1537 let horizon_shortstatehash = timeline_pdus
1538 .first()
1539 .map(at!(0))
1540 .map_async(|count| {
1541 services
1542 .timeline
1543 .get_shortstatehash(room_id, count)
1544 .inspect_err(inspect_debug_log)
1545 });
1546
1547 let after_shortstatehash = state_after.requested().then_async(|| {
1552 services
1553 .timeline
1554 .next_shortstatehash(room_id, last_timeline_count)
1555 .or_else(|_| services.state.get_room_shortstatehash(room_id))
1556 .inspect_err(inspect_debug_log)
1557 });
1558
1559 let current_shortstatehash = timeline_changed.then_async(|| {
1560 services
1561 .timeline
1562 .get_shortstatehash(room_id, last_timeline_count)
1563 .inspect_err(inspect_debug_log)
1564 .or_else(|_| services.state.get_room_shortstatehash(room_id))
1565 .map_err(|_| err!(Database(error!("Room {room_id} has no state"))))
1566 });
1567
1568 let encrypted_room =
1569 timeline_changed.then_async(|| services.state_accessor.is_encrypted_room(room_id));
1570
1571 let receipt_events = services
1572 .read_receipt
1573 .readreceipts_since(room_id, since, Some(next_batch))
1574 .filter_map(async |(read_user, _, edu)| {
1575 services
1576 .users
1577 .user_is_ignored(read_user, sender_user)
1578 .await
1579 .or_some((read_user.to_owned(), edu))
1580 })
1581 .collect::<Vec<(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)>>();
1582
1583 let (
1584 (
1585 since_shortstatehash,
1586 horizon_shortstatehash,
1587 after_shortstatehash,
1588 current_shortstatehash,
1589 ),
1590 receipt_events,
1591 encrypted_room,
1592 ) = join3(
1593 join4(
1594 since_shortstatehash,
1595 horizon_shortstatehash,
1596 after_shortstatehash,
1597 current_shortstatehash,
1598 ),
1599 receipt_events,
1600 encrypted_room,
1601 )
1602 .boxed()
1603 .await;
1604
1605 Ok(RoomMetadata {
1606 since_shortstatehash: since_shortstatehash.flatten(),
1607 horizon_shortstatehash: horizon_shortstatehash.flat_ok(),
1608 after_shortstatehash: after_shortstatehash.flat_ok(),
1609 current_shortstatehash: current_shortstatehash.transpose()?,
1610 receipt_events,
1611 encrypted_room,
1612 })
1613}
1614
1615fn collect_room_events<'a>(
1616 services: &'a Services,
1617 sender_user: &'a UserId,
1618 timeline_pdus: Vec<(PduCount, PduEvent)>,
1619 joined_sender_member: Option<PduEvent>,
1620 encrypted: bool,
1621 filter: &'a FilterDefinition,
1622) -> impl Future<Output = Vec<PduEvent>> + Send + 'a {
1623 let include_in_timeline = |event: &PduEvent| filter.room.timeline.matches(event);
1624 timeline_pdus
1625 .into_iter()
1626 .stream()
1627 .wide_filter_map(|item| ignored_filter(services, item, sender_user))
1628 .map(at!(1))
1629 .chain(joined_sender_member.into_iter().stream())
1630 .ready_filter(include_in_timeline)
1631 .wide_then(move |pdu| with_membership(services, pdu, sender_user, encrypted))
1632 .wide_then(move |pdu| {
1633 services
1634 .pdu_metadata
1635 .bundle_aggregations(sender_user, pdu)
1636 })
1637 .collect::<Vec<_>>()
1638}
1639
1640fn collect_room_account_data<'a>(
1641 services: &'a Services,
1642 sender_user: &'a UserId,
1643 room_id: &'a RoomId,
1644 since: u64,
1645) -> impl Future<Output = Vec<Raw<AnyRoomAccountDataEvent>>> + Send + 'a {
1646 services
1647 .account_data
1648 .changes_since(Some(room_id), sender_user, since, None)
1649 .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
1650 .ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
1651 .collect()
1652}
1653
1654#[expect(clippy::type_complexity)]
1655fn notification_count_futures<'a>(
1656 services: &'a Services,
1657 sender_user: &'a UserId,
1658 room_id: &'a RoomId,
1659 send: bool,
1660) -> (
1661 impl Future<Output = Option<UInt>> + Send + 'a,
1662 impl Future<Output = Option<UInt>> + Send + 'a,
1663 impl Future<Output = Option<BTreeMap<OwnedEventId, (u64, u64)>>> + Send + 'a,
1664) {
1665 let notification_count = send.then_async(move || {
1666 services
1667 .pusher
1668 .notification_count(sender_user, room_id)
1669 .map(TryInto::try_into)
1670 .unwrap_or(uint!(0))
1671 });
1672
1673 let highlight_count = send.then_async(move || {
1674 services
1675 .pusher
1676 .highlight_count(sender_user, room_id)
1677 .map(TryInto::try_into)
1678 .unwrap_or(uint!(0))
1679 });
1680
1681 let thread_counts = send.then_async(move || {
1684 services
1685 .pusher
1686 .thread_notification_counts(sender_user, room_id)
1687 });
1688
1689 (notification_count, highlight_count, thread_counts)
1690}
1691
1692fn take_sender_membership_for_join(
1693 state_events: &mut Vec<PduEvent>,
1694 sender_user: &UserId,
1695 joined_since_last_sync: bool,
1696 timeline_empty: bool,
1697 initial: bool,
1698) -> Option<PduEvent> {
1699 if !(joined_since_last_sync && timeline_empty && !initial) {
1700 return None;
1701 }
1702
1703 let is_sender_membership = |event: &PduEvent| {
1704 *event.event_type() == StateEventType::RoomMember.into()
1705 && event
1706 .state_key()
1707 .is_some_and(is_equal_to!(sender_user.as_str()))
1708 };
1709
1710 state_events
1711 .iter()
1712 .position(is_sender_membership)
1713 .map(|pos| state_events.swap_remove(pos))
1714}
1715
1716#[expect(clippy::too_many_arguments)]
1717async fn gather_user_metadata(
1718 services: &Services,
1719 sender_user: &UserId,
1720 sender_device: Option<&DeviceId>,
1721 room_id: &RoomId,
1722 filter: &FilterDefinition,
1723 timeline_pdus: &[(PduCount, PduEvent)],
1724 receipt_events: &[(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)],
1725 since: u64,
1726 initial: bool,
1727 timeline_changed: bool,
1728 encrypted_room: Option<bool>,
1729 since_shortstatehash: Option<ShortStateHash>,
1730) -> UserMetadata {
1731 let lazy_load_options =
1732 [&filter.room.state.lazy_load_options, &filter.room.timeline.lazy_load_options];
1733
1734 let lazy_loading_enabled = encrypted_room.is_some_and(is_false!())
1735 && lazy_load_options
1736 .iter()
1737 .any(|opts| opts.is_enabled());
1738
1739 let lazy_loading_context = &lazy_loading::Context {
1740 user_id: sender_user,
1741 device_id: sender_device,
1742 room_id,
1743 token: Some(since),
1744 options: Some(&filter.room.state.lazy_load_options),
1745 mode: lazy_loading::Mode::Update,
1746 };
1747
1748 let lazy_load_reset =
1750 initial.then_async(|| services.lazy_loading.reset(lazy_loading_context));
1751
1752 lazy_load_reset.await;
1753 let witness = lazy_loading_enabled.then_async(|| {
1754 let witness: Witness = timeline_pdus
1755 .iter()
1756 .map(ref_at!(1))
1757 .map(Event::sender)
1758 .map(Into::into)
1759 .chain(receipt_events.iter().map(ref_at!(0)).cloned())
1760 .collect();
1761
1762 services
1763 .lazy_loading
1764 .witness_retain(witness, lazy_loading_context)
1765 });
1766
1767 let sender_joined_count = timeline_changed.then_async(|| {
1768 services
1769 .state_cache
1770 .get_joined_count(room_id, sender_user)
1771 .unwrap_or(0)
1772 });
1773
1774 let since_encryption = since_shortstatehash.map_async(|shortstatehash| {
1775 services
1776 .state_accessor
1777 .state_get(shortstatehash, &StateEventType::RoomEncryption, "")
1778 });
1779
1780 let last_notification_read = timeline_pdus.is_empty().then_async(|| {
1781 services
1782 .pusher
1783 .last_notification_read(sender_user, room_id)
1784 .ok()
1785 });
1786
1787 let thread_last_reads = timeline_pdus.is_empty().then_async(|| {
1788 services
1789 .pusher
1790 .thread_last_notification_reads(sender_user, room_id)
1791 });
1792
1793 let last_privateread_update = services
1794 .read_receipt
1795 .last_privateread_update(sender_user, room_id);
1796
1797 let (
1798 (last_privateread_update, last_notification_read, thread_last_reads),
1799 (sender_joined_count, since_encryption),
1800 witness,
1801 ) = join3(
1802 join3(last_privateread_update, last_notification_read, thread_last_reads),
1803 join(sender_joined_count, since_encryption),
1804 witness,
1805 )
1806 .await;
1807
1808 let _encrypted_since_last_sync =
1809 !initial && encrypted_room.is_some_and(is_true!()) && since_encryption.is_none();
1810
1811 let joined_since_last_sync = sender_joined_count.unwrap_or(0) > since;
1812
1813 UserMetadata {
1814 witness,
1815 last_notification_read,
1816 thread_last_reads,
1817 last_privateread_update,
1818 joined_since_last_sync,
1819 }
1820}
1821
1822#[expect(clippy::option_option)]
1823fn compute_notification_gates(
1824 last_notification_read: Option<Option<u64>>,
1825 thread_last_reads: Option<&BTreeMap<OwnedEventId, u64>>,
1826 since: u64,
1827 in_window: impl Fn(u64) -> bool,
1828) -> NotificationGates<impl Fn(&UInt) -> bool> {
1829 let send_main_counts = last_notification_read
1830 .flatten()
1831 .is_none_or(&in_window);
1832
1833 let send_thread_counts =
1834 thread_last_reads.is_none_or(|reads| reads.values().copied().any(&in_window));
1835
1836 let send_notification_counts = send_main_counts || send_thread_counts;
1841
1842 let send_notification_resets = last_notification_read
1843 .flatten()
1844 .is_some_and(|last_count| last_count > since);
1845
1846 let send_notification_count_filter =
1847 move |count: &UInt| *count != uint!(0) || send_notification_resets;
1848
1849 NotificationGates {
1850 send_notification_counts,
1851 send_notification_count_filter,
1852 }
1853}
1854
1855async fn gather_typing_events(
1856 services: &Services,
1857 room_id: &RoomId,
1858 sender_user: &UserId,
1859 since: u64,
1860) -> Vec<Raw<AnySyncEphemeralRoomEvent>> {
1861 services
1862 .typing
1863 .last_typing_update(room_id)
1864 .and_then(async |count| {
1865 if count <= since {
1866 return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
1867 }
1868
1869 let typings = typings_event_for_user(services, room_id, sender_user).await?;
1870
1871 Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
1872 })
1873 .unwrap_or(Vec::new())
1874 .await
1875}
1876
1877fn timeline_membership_changes(
1878 timeline_pdus: &[(PduCount, PduEvent)],
1879 initial: bool,
1880) -> Vec<(MembershipState, OwnedUserId)> {
1881 timeline_pdus
1882 .iter()
1883 .filter(|_| !initial)
1884 .map(ref_at!(1))
1885 .filter_map(extract_membership)
1886 .collect::<Vec<_>>()
1887}
1888
1889fn extract_membership(event: &PduEvent) -> Option<(MembershipState, OwnedUserId)> {
1890 let content: RoomMemberEventContent = event.get_content().ok()?;
1891 let user_id: OwnedUserId = event.state_key()?.parse().ok()?;
1892
1893 Some((content.membership, user_id))
1894}
1895
1896#[expect(clippy::too_many_arguments)]
1897async fn gather_device_list_updates(
1898 services: &Services,
1899 sender_user: &UserId,
1900 room_id: &RoomId,
1901 timeline_membership_changes: Vec<(MembershipState, OwnedUserId)>,
1902 state_events: &[PduEvent],
1903 initial: bool,
1904 since: u64,
1905 next_batch: u64,
1906) -> (HashSet<OwnedUserId>, HashSet<OwnedUserId>) {
1907 let keys_changed = services
1908 .users
1909 .room_keys_changed(room_id, since, Some(next_batch))
1910 .map(|(user_id, _)| user_id)
1911 .map(ToOwned::to_owned)
1912 .collect::<Vec<_>>();
1913
1914 let (mut dlu, leu) = state_events
1915 .iter()
1916 .stream()
1917 .ready_filter(|_| !initial)
1918 .ready_filter(|state_event| *state_event.event_type() == RoomMember)
1919 .ready_filter_map(extract_membership)
1920 .chain(timeline_membership_changes.into_iter().stream())
1921 .fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (membership, user_id)| {
1922 use MembershipState::*;
1923
1924 let requires_update = async |user_id| {
1925 !share_encrypted_room(services, sender_user, user_id, Some(room_id)).await
1926 };
1927
1928 match membership {
1929 | Join if requires_update(&user_id).await => dlu.insert(user_id),
1930 | Leave => leu.insert(user_id),
1931 | _ => false,
1932 };
1933
1934 (dlu, leu)
1935 })
1936 .await;
1937
1938 dlu.extend(keys_changed.await);
1939 (dlu, leu)
1940}
1941
1942async fn assemble_state_events(
1943 services: &Services,
1944 state_events: Vec<PduEvent>,
1945 sender_user: &UserId,
1946 encrypted: bool,
1947 include_in_state: impl Fn(&PduEvent) -> bool + Send + Sync,
1948 event_fields: Option<&[String]>,
1949) -> Vec<Raw<AnySyncStateEvent>> {
1950 state_events
1951 .into_iter()
1952 .filter(include_in_state)
1953 .stream()
1954 .wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
1955 .map(|pdu| trim_event_fields(pdu.into_format(), event_fields))
1956 .collect()
1957 .await
1958}
1959
1960#[expect(clippy::too_many_arguments)]
1961fn assemble_unread_notifications(
1962 notification_count: Option<UInt>,
1963 highlight_count: Option<UInt>,
1964 thread_counts: Option<BTreeMap<OwnedEventId, (u64, u64)>>,
1965 thread_last_reads: Option<&BTreeMap<OwnedEventId, u64>>,
1966 send_notification_count_filter: impl Fn(&UInt) -> bool,
1967 want_thread_unread: bool,
1968 initial: bool,
1969 in_window: impl Fn(u64) -> bool,
1970) -> (UnreadNotificationsCount, BTreeMap<OwnedEventId, UnreadNotificationsCount>) {
1971 let thread_counts = thread_counts.unwrap_or_default();
1972
1973 let (thread_total_notifications, thread_total_highlights) = thread_counts
1974 .values()
1975 .fold((0_u64, 0_u64), |(n, h), &(notifs, hl)| {
1976 (n.saturating_add(notifs), h.saturating_add(hl))
1977 });
1978
1979 let merge_total = |total: u64| {
1982 move |count: UInt| {
1983 want_thread_unread
1984 .is_false()
1985 .then(|| count.saturating_add(UInt::try_from(total).unwrap_or_default()))
1986 .unwrap_or(count)
1987 }
1988 };
1989
1990 let unread_notifications = UnreadNotificationsCount {
1991 highlight_count: highlight_count
1992 .map(merge_total(thread_total_highlights))
1993 .filter(&send_notification_count_filter),
1994 notification_count: notification_count
1995 .map(merge_total(thread_total_notifications))
1996 .filter(&send_notification_count_filter),
1997 };
1998
1999 let advanced_in_window = |root: &EventId| {
2005 initial
2006 || thread_last_reads
2007 .is_none_or(|reads| reads.get(root).copied().is_some_and(&in_window))
2008 };
2009
2010 let unread_thread_notifications = thread_counts
2011 .into_iter()
2012 .filter(|_| want_thread_unread)
2013 .filter(|(root, _)| advanced_in_window(root))
2014 .map(|(root, (notifications, highlights))| {
2015 let counts = UnreadNotificationsCount {
2016 notification_count: UInt::try_from(notifications).ok(),
2017 highlight_count: UInt::try_from(highlights).ok(),
2018 };
2019
2020 (root, counts)
2021 })
2022 .collect();
2023
2024 (unread_notifications, unread_thread_notifications)
2025}
2026
2027#[tracing::instrument(
2028 name = "state",
2029 level = "trace",
2030 skip_all,
2031 fields(
2032 full = %full_state,
2033 after = ?state_after,
2034 ss = ?since_shortstatehash,
2035 hs = ?horizon_shortstatehash,
2036 as = ?after_shortstatehash,
2037 cs = %current_shortstatehash,
2038 )
2039)]
2040#[expect(clippy::too_many_arguments)]
2041async fn calculate_state_changes<'a>(
2042 services: &Services,
2043 sender_user: &UserId,
2044 room_id: &RoomId,
2045 full_state: bool,
2046 state_after: StateAfter,
2047 since_shortstatehash: Option<ShortStateHash>,
2048 horizon_shortstatehash: Option<ShortStateHash>,
2049 after_shortstatehash: Option<ShortStateHash>,
2050 current_shortstatehash: ShortStateHash,
2051 joined_since_last_sync: bool,
2052 witness: Option<&'a Witness>,
2053) -> Result<StateChanges> {
2054 let incremental = !full_state && !joined_since_last_sync && since_shortstatehash.is_some();
2055
2056 let horizon_shortstatehash = state_after
2061 .requested()
2062 .then_some(after_shortstatehash)
2063 .unwrap_or(horizon_shortstatehash)
2064 .unwrap_or(current_shortstatehash);
2065
2066 let since_shortstatehash = since_shortstatehash.unwrap_or(horizon_shortstatehash);
2067
2068 let state_get_shorteventid = |user_id: &'a UserId| {
2069 services
2070 .state_accessor
2071 .state_get_shortid(
2072 horizon_shortstatehash,
2073 &StateEventType::RoomMember,
2074 user_id.as_str(),
2075 )
2076 .ok()
2077 };
2078
2079 let lazy_state_ids = witness.map_async(|witness| {
2080 witness
2081 .iter()
2082 .stream()
2083 .ready_filter(|&user_id| user_id != sender_user)
2084 .broad_filter_map(|user_id| state_get_shorteventid(user_id))
2085 .into_future()
2086 });
2087
2088 let state_diff_ids = incremental.then_async(|| {
2089 services
2090 .state_accessor
2091 .state_added((since_shortstatehash, horizon_shortstatehash))
2092 .boxed()
2093 .into_future()
2094 });
2095
2096 let current_state_ids = (!incremental).then_async(|| {
2097 services
2098 .state_accessor
2099 .state_full_shortids(horizon_shortstatehash)
2100 .expect_ok()
2101 .boxed()
2102 .into_future()
2103 });
2104
2105 let after = state_after.requested();
2107 let state_events = current_state_ids
2108 .stream()
2109 .map(|ids| (false, ids))
2110 .chain(
2111 state_diff_ids
2112 .stream()
2113 .map(move |ids| (after, ids)),
2114 )
2115 .broad_filter_map(async |(after, (shortstatekey, shorteventid))| {
2116 lazy_filter(services, sender_user, witness, shortstatekey, shorteventid, after).await
2117 })
2118 .chain(lazy_state_ids.stream())
2119 .broad_filter_map(|shorteventid| {
2120 services
2121 .timeline
2122 .get_pdu_from_shorteventid(shorteventid)
2123 .ok()
2124 })
2125 .collect::<Vec<_>>()
2126 .await;
2127
2128 let send_member_counts = state_events
2129 .iter()
2130 .any(|event| *event.kind() == RoomMember);
2131
2132 let member_counts =
2133 send_member_counts.then_async(|| calculate_counts(services, room_id, sender_user));
2134
2135 let (joined_member_count, invited_member_count, heroes) =
2136 member_counts.await.unwrap_or((None, None, None));
2137
2138 Ok(StateChanges {
2139 heroes,
2140 joined_member_count,
2141 invited_member_count,
2142 state_events,
2143 })
2144}
2145
2146async fn lazy_filter(
2147 services: &Services,
2148 sender_user: &UserId,
2149 witness: Option<&Witness>,
2150 shortstatekey: ShortStateKey,
2151 shorteventid: ShortEventId,
2152 after: bool,
2153) -> Option<ShortEventId> {
2154 let Some(witness) = witness else {
2155 return Some(shorteventid);
2156 };
2157
2158 let (event_type, state_key) = services
2159 .short
2160 .get_statekey_from_short(shortstatekey)
2161 .await
2162 .ok()?;
2163
2164 let keep = event_type != StateEventType::RoomMember
2167 || state_key == sender_user.as_str()
2168 || (after && <&UserId>::try_from(state_key.as_str()).is_ok_and(|u| !witness.contains(u)));
2169
2170 keep.then_some(shorteventid)
2171}
2172
2173async fn calculate_counts(
2174 services: &Services,
2175 room_id: &RoomId,
2176 sender_user: &UserId,
2177) -> (Option<u64>, Option<u64>, Option<Vec<OwnedUserId>>) {
2178 let joined_member_count = services
2179 .state_cache
2180 .room_joined_count(room_id)
2181 .unwrap_or(0);
2182
2183 let invited_member_count = services
2184 .state_cache
2185 .room_invited_count(room_id)
2186 .unwrap_or(0);
2187
2188 let (joined_member_count, invited_member_count) =
2189 join(joined_member_count, invited_member_count).await;
2190
2191 let small_room = joined_member_count.saturating_add(invited_member_count) <= 5;
2192
2193 let heroes = services
2194 .config
2195 .calculate_heroes
2196 .and_is(small_room)
2197 .then_async(|| calculate_heroes(services, room_id, sender_user));
2198
2199 (Some(joined_member_count), Some(invited_member_count), heroes.await)
2200}
2201
2202pub(crate) async fn calculate_heroes(
2203 services: &Services,
2204 room_id: &RoomId,
2205 sender_user: &UserId,
2206) -> Vec<OwnedUserId> {
2207 const LIMIT: usize = 5;
2208
2209 services
2210 .state_accessor
2211 .room_state_type_pdus(room_id, &StateEventType::RoomMember)
2212 .ready_filter_map(Result::ok)
2213 .filter_map(|pdu| filter_hero(services, room_id, sender_user, pdu))
2214 .take(LIMIT)
2215 .collect::<Vec<_>>()
2216 .await
2217}
2218
2219async fn filter_hero<Pdu: Event>(
2220 services: &Services,
2221 room_id: &RoomId,
2222 sender_user: &UserId,
2223 pdu: Pdu,
2224) -> Option<OwnedUserId> {
2225 let user_id = pdu.state_key().map(TryInto::try_into).flat_ok()?;
2226
2227 if user_id == sender_user {
2228 return None;
2229 }
2230
2231 let Ok(content): Result<RoomMemberEventContent, _> = pdu.get_content() else {
2232 return None;
2233 };
2234
2235 if !matches!(content.membership, MembershipState::Join | MembershipState::Invite) {
2237 return None;
2238 }
2239
2240 let (is_invited, is_joined) = join(
2241 services.state_cache.is_invited(user_id, room_id),
2242 services.state_cache.is_joined(user_id, room_id),
2243 )
2244 .await;
2245
2246 if !is_joined && is_invited {
2247 return None;
2248 }
2249
2250 Some(user_id.to_owned())
2251}
2252
2253async fn typings_event_for_user(
2254 services: &Services,
2255 room_id: &RoomId,
2256 sender_user: &UserId,
2257) -> Result<SyncEphemeralRoomEvent<TypingEventContent>> {
2258 Ok(SyncEphemeralRoomEvent {
2259 content: TypingEventContent {
2260 user_ids: services
2261 .typing
2262 .typing_users_for_user(room_id, sender_user)
2263 .await?,
2264 },
2265 })
2266}
2267
2268#[cfg(test)]
2269mod tests {
2270 use super::*;
2271
2272 #[test]
2273 fn state_after_wraps_into_named_variant() {
2274 let events = StateEvents::default;
2275
2276 assert!(matches!(StateAfter::Off.wrap(events()), RoomState::Before(_)));
2277 assert!(matches!(StateAfter::Stable.wrap(events()), RoomState::After(_)));
2278 assert!(matches!(StateAfter::Unstable.wrap(events()), RoomState::AfterUnstable(_)));
2279
2280 assert!(!StateAfter::Off.requested());
2281 assert!(StateAfter::Stable.requested());
2282 assert!(StateAfter::Unstable.requested());
2283 }
2284
2285 #[test]
2286 fn state_after_selects_unstable_when_both_opted_in() {
2287 assert!(matches!(StateAfter::from((false, false)), StateAfter::Off));
2289 assert!(matches!(StateAfter::from((true, false)), StateAfter::Stable));
2290 assert!(matches!(StateAfter::from((false, true)), StateAfter::Unstable));
2291 assert!(matches!(StateAfter::from((true, true)), StateAfter::Unstable));
2292 }
2293}