1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 time::Duration,
4};
5
6use axum::extract::State;
7use futures::{
8 FutureExt, StreamExt, TryFutureExt,
9 future::{join, join3, join4, join5},
10 pin_mut,
11};
12use ruma::{
13 DeviceId, EventId, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
14 api::client::{
15 filter::FilterDefinition,
16 sync::sync_events::{
17 self, DeviceLists, UnreadNotificationsCount,
18 v3::{
19 Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
20 KnockState, KnockedRoom, LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms,
21 State as RoomState, StateEvents, Timeline, ToDevice,
22 },
23 },
24 },
25 events::{
26 AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType,
27 SyncEphemeralRoomEvent,
28 TimelineEventType::*,
29 presence::{PresenceEvent, PresenceEventContent},
30 room::member::{MembershipState, RoomMemberEventContent},
31 typing::TypingEventContent,
32 },
33 serde::Raw,
34 uint,
35};
36use tokio::time;
37use tuwunel_core::{
38 Result, at,
39 debug::INFO_SPAN_LEVEL,
40 debug_error, err,
41 error::{inspect_debug_log, inspect_log},
42 extract_variant, is_equal_to, is_false, is_true,
43 matrix::{
44 Event,
45 event::Matches,
46 pdu::{EventHash, PduCount, PduEvent},
47 },
48 pair_of, ref_at,
49 result::FlatOk,
50 trace,
51 utils::{
52 self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
53 future::{OptionStream, ReadyBoolExt},
54 math::ruma_from_u64,
55 option::OptionExt,
56 result::MapExpect,
57 stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
58 },
59};
60use tuwunel_service::{
61 Services,
62 rooms::{
63 lazy_loading,
64 lazy_loading::{Options, Witness},
65 short::{ShortEventId, ShortStateHash, ShortStateKey},
66 },
67};
68
69use super::{load_timeline, share_encrypted_room};
70use crate::{
71 ClientIp, Ruma,
72 client::{ignored_filter, is_empty_account_data_event, with_membership},
73};
74
75#[derive(Default)]
76struct StateChanges {
77 heroes: Option<Vec<OwnedUserId>>,
78 joined_member_count: Option<u64>,
79 invited_member_count: Option<u64>,
80 state_events: Vec<PduEvent>,
81}
82
83type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
84
85#[tracing::instrument(
121 name = "sync",
122 level = "debug",
123 skip_all,
124 fields(
125 user_id = %body.sender_user(),
126 device_id = %body.sender_device.as_deref().map_or("<no device>", |x| x.as_str()),
127 )
128)]
129pub(crate) async fn sync_events_route(
130 State(services): State<crate::State>,
131 ClientIp(client): ClientIp,
132 body: Ruma<sync_events::v3::Request>,
133) -> Result<sync_events::v3::Response> {
134 let sender_user = body.sender_user();
135 let sender_device = body.sender_device.as_deref();
136
137 let filter = body
138 .body
139 .filter
140 .as_ref()
141 .map_async(async |filter| match filter {
142 | Filter::FilterDefinition(filter) => filter.clone(),
143 | Filter::FilterId(filter_id) => services
144 .users
145 .get_filter(sender_user, filter_id)
146 .await
147 .unwrap_or_default(),
148 });
149
150 let filter = filter.map(Option::unwrap_or_default);
151 let full_state = body.body.full_state;
152 let set_presence = &body.body.set_presence;
153 let use_state_after = body.body.use_state_after;
154 let ping_presence = services
155 .presence
156 .maybe_ping_presence(
157 sender_user,
158 body.sender_device.as_deref(),
159 Some(client),
160 set_presence,
161 )
162 .inspect_err(inspect_log)
163 .ok();
164
165 let note_sync = services.presence.note_sync(sender_user);
167
168 let (filter, ..) = join3(filter, ping_presence, note_sync).await;
169
170 let mut since = body
171 .body
172 .since
173 .as_deref()
174 .map(str::parse)
175 .flat_ok()
176 .unwrap_or(0);
177
178 let timeout = body
179 .body
180 .timeout
181 .as_ref()
182 .map(Duration::as_millis)
183 .map(TryInto::try_into)
184 .flat_ok()
185 .unwrap_or(services.config.client_sync_timeout_default)
186 .max(services.config.client_sync_timeout_min)
187 .min(services.config.client_sync_timeout_max);
188
189 let stop_at = time::Instant::now()
190 .checked_add(Duration::from_millis(timeout))
191 .expect("configuration must limit maximum timeout");
192
193 loop {
194 let watch_rooms = services
195 .state_cache
196 .rooms_joined(sender_user)
197 .chain(services.state_cache.rooms_invited(sender_user));
198
199 let watchers = services
200 .sync
201 .watch(sender_user, sender_device, watch_rooms)
202 .await;
203
204 let next_batch = services.globals.wait_pending().await?;
205 if since > next_batch {
206 debug_error!(since, next_batch, "received since > next_batch, clamping");
207 since = next_batch;
208 }
209
210 if since < next_batch || full_state {
211 let response = build_sync_events(
212 &services,
213 sender_user,
214 sender_device,
215 since,
216 next_batch,
217 full_state,
218 use_state_after,
219 &filter,
220 )
221 .await?;
222
223 let empty = response.rooms.is_empty()
224 && response.presence.is_empty()
225 && response.account_data.is_empty()
226 && response.device_lists.is_empty()
227 && response.to_device.is_empty();
228
229 if !empty || full_state {
230 return Ok(response);
231 }
232 }
233
234 if time::timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() {
236 let response =
237 build_empty_response(&services, sender_user, sender_device, next_batch).await;
238
239 trace!(since, next_batch, "empty response");
240 return Ok(response);
241 }
242
243 trace!(
244 since,
245 last_batch = ?next_batch,
246 count = ?services.globals.pending_count(),
247 stop_at = ?stop_at,
248 "notified by watcher"
249 );
250
251 since = next_batch;
252 }
253}
254
255async fn build_empty_response(
256 services: &Services,
257 sender_user: &UserId,
258 sender_device: Option<&DeviceId>,
259 next_batch: u64,
260) -> sync_events::v3::Response {
261 let device_one_time_keys_count = sender_device.map_async(|sender_device| {
262 services
263 .users
264 .count_one_time_keys(sender_user, sender_device)
265 });
266
267 let device_unused_fallback_key_types = sender_device.map_async(|sender_device| {
268 services
269 .users
270 .unused_fallback_key_algorithms(sender_user, sender_device)
271 .collect::<Vec<_>>()
272 });
273
274 let (device_one_time_keys_count, device_unused_fallback_key_types) =
275 join(device_one_time_keys_count, device_unused_fallback_key_types).await;
276
277 sync_events::v3::Response {
278 device_one_time_keys_count: device_one_time_keys_count.unwrap_or_default(),
279 device_unused_fallback_key_types,
280 ..sync_events::v3::Response::new(next_batch.to_string())
281 }
282}
283
284#[tracing::instrument(
285 name = "build",
286 level = INFO_SPAN_LEVEL,
287 skip_all,
288 fields(
289 %since,
290 %next_batch,
291 count = ?services.globals.pending_count(),
292 )
293)]
294#[expect(clippy::too_many_arguments)]
295async fn build_sync_events(
296 services: &Services,
297 sender_user: &UserId,
298 sender_device: Option<&DeviceId>,
299 since: u64,
300 next_batch: u64,
301 full_state: bool,
302 use_state_after: bool,
303 filter: &FilterDefinition,
304) -> Result<sync_events::v3::Response> {
305 let joined_rooms = services
306 .state_cache
307 .rooms_joined(sender_user)
308 .ready_filter(|&room_id| filter.room.matches(room_id))
309 .map(ToOwned::to_owned)
310 .broad_filter_map(|room_id| {
311 load_joined_room(
312 services,
313 sender_user,
314 sender_device,
315 room_id.clone(),
316 since,
317 next_batch,
318 full_state,
319 use_state_after,
320 filter,
321 )
322 .map_ok(move |(joined_room, dlu, jeu)| (room_id, joined_room, dlu, jeu))
323 .ok()
324 })
325 .ready_fold(
326 (BTreeMap::new(), HashSet::new(), HashSet::new()),
327 |(mut joined_rooms, mut device_list_updates, mut left_encrypted_users),
328 (room_id, joined_room, dlu, leu)| {
329 device_list_updates.extend(dlu);
330 left_encrypted_users.extend(leu);
331 if !joined_room.is_empty() {
332 joined_rooms.insert(room_id, joined_room);
333 }
334
335 (joined_rooms, device_list_updates, left_encrypted_users)
336 },
337 );
338
339 let left_rooms = services
340 .state_cache
341 .rooms_left_state(sender_user)
342 .ready_filter(|(room_id, _)| filter.room.matches(room_id))
343 .broad_filter_map(|(room_id, _)| {
344 handle_left_room(
345 services,
346 since,
347 room_id.clone(),
348 sender_user,
349 next_batch,
350 full_state,
351 use_state_after,
352 filter,
353 )
354 .map_ok(move |left_room| (room_id, left_room))
355 .ok()
356 })
357 .ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room)))
358 .collect();
359
360 let invited_rooms = services
361 .state_cache
362 .rooms_invited_state(sender_user)
363 .ready_filter(|(room_id, _)| filter.room.matches(room_id))
364 .fold_default(async |mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| {
365 let invite_count = services
366 .state_cache
367 .get_invite_count(&room_id, sender_user)
368 .await
369 .ok();
370
371 if Some(since) >= invite_count || Some(next_batch) < invite_count {
373 return invited_rooms;
374 }
375
376 let invited_room = InvitedRoom {
377 invite_state: InviteState { events: invite_state },
378 };
379
380 invited_rooms.insert(room_id, invited_room);
381 invited_rooms
382 });
383
384 let knocked_rooms = services
385 .state_cache
386 .rooms_knocked_state(sender_user)
387 .ready_filter(|(room_id, _)| filter.room.matches(room_id))
388 .fold_default(async |mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| {
389 let knock_count = services
390 .state_cache
391 .get_knock_count(&room_id, sender_user)
392 .await
393 .ok();
394
395 if Some(since) >= knock_count || Some(next_batch) < knock_count {
397 return knocked_rooms;
398 }
399
400 let knocked_room = KnockedRoom {
401 knock_state: KnockState { events: knock_state },
402 };
403
404 knocked_rooms.insert(room_id, knocked_room);
405 knocked_rooms
406 });
407
408 let presence_updates = services
409 .config
410 .allow_local_presence
411 .then_async(|| {
412 process_presence_updates(services, since, next_batch, sender_user, filter)
413 });
414
415 let account_data = services
416 .account_data
417 .changes_since(None, sender_user, since, Some(next_batch))
418 .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
419 .ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
420 .collect();
421
422 let keys_changed = services
424 .users
425 .keys_changed(sender_user, since, Some(next_batch))
426 .map(ToOwned::to_owned)
427 .collect::<HashSet<_>>();
428
429 let to_device_events = sender_device.map_async(|sender_device| {
430 services
431 .users
432 .get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch))
433 .map(at!(1))
434 .collect::<Vec<_>>()
435 });
436
437 let device_one_time_keys_count = sender_device.map_async(|sender_device| {
438 services
439 .users
440 .count_one_time_keys(sender_user, sender_device)
441 });
442
443 let device_unused_fallback_key_types = sender_device.map_async(|sender_device| {
444 services
445 .users
446 .unused_fallback_key_algorithms(sender_user, sender_device)
447 .collect::<Vec<_>>()
448 });
449
450 let remove_to_device_events = sender_device.map_async(|sender_device| {
452 services
453 .users
454 .remove_to_device_events(sender_user, sender_device, since)
455 });
456
457 let (
458 account_data,
459 keys_changed,
460 presence_updates,
461 (_, to_device_events, device_one_time_keys_count, device_unused_fallback_key_types),
462 (
463 (joined_rooms, mut device_list_updates, left_encrypted_users),
464 left_rooms,
465 invited_rooms,
466 knocked_rooms,
467 ),
468 ) = join5(
469 account_data,
470 keys_changed,
471 presence_updates,
472 join4(
473 remove_to_device_events,
474 to_device_events,
475 device_one_time_keys_count,
476 device_unused_fallback_key_types,
477 ),
478 join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms),
479 )
480 .boxed()
481 .await;
482
483 device_list_updates.extend(keys_changed);
484
485 let device_list_left = left_encrypted_users
488 .into_iter()
489 .stream()
490 .broad_filter_map(async |user_id: OwnedUserId| {
491 share_encrypted_room(services, sender_user, &user_id, None)
492 .await
493 .eq(&false)
494 .then_some(user_id)
495 })
496 .collect()
497 .await;
498
499 let presence_events = presence_updates
500 .into_iter()
501 .flat_map(IntoIterator::into_iter)
502 .map(|(sender, content)| PresenceEvent { content, sender })
503 .map(|ref event| Raw::new(event))
504 .filter_map(Result::ok)
505 .collect();
506
507 Ok(sync_events::v3::Response {
508 account_data: GlobalAccountData { events: account_data },
509 device_lists: DeviceLists {
510 left: device_list_left,
511 changed: device_list_updates.into_iter().collect(),
512 },
513 device_one_time_keys_count: device_one_time_keys_count.unwrap_or_default(),
514 device_unused_fallback_key_types,
515 next_batch: next_batch.to_string(),
516 presence: Presence { events: presence_events },
517 rooms: Rooms {
518 leave: left_rooms,
519 join: joined_rooms,
520 invite: invited_rooms,
521 knock: knocked_rooms,
522 },
523 to_device: ToDevice {
524 events: to_device_events.unwrap_or_default(),
525 },
526 })
527}
528
529#[tracing::instrument(name = "presence", level = "debug", skip_all)]
530async fn process_presence_updates(
531 services: &Services,
532 since: u64,
533 next_batch: u64,
534 syncing_user: &UserId,
535 filter: &FilterDefinition,
536) -> PresenceUpdates {
537 services
538 .presence
539 .presence_since(since, Some(next_batch))
540 .ready_filter(|(user_id, ..)| filter.presence.matches(user_id))
541 .filter(|(user_id, ..)| {
542 services
543 .state_cache
544 .user_sees_user(syncing_user, user_id)
545 })
546 .filter_map(|(user_id, _, presence_bytes)| {
547 services
548 .presence
549 .from_json_bytes_to_event(presence_bytes, user_id)
550 .map_ok(move |event| (user_id, event))
551 .ok()
552 })
553 .map(|(user_id, event)| (user_id.to_owned(), event.content))
554 .collect()
555 .boxed()
556 .await
557}
558
559#[tracing::instrument(
560 name = "left",
561 level = "debug",
562 skip_all,
563 fields(
564 room_id = %room_id,
565 full = %full_state,
566 ),
567)]
568#[expect(clippy::too_many_arguments)]
569async fn handle_left_room(
570 services: &Services,
571 since: u64,
572 ref room_id: OwnedRoomId,
573 sender_user: &UserId,
574 next_batch: u64,
575 full_state: bool,
576 use_state_after: bool,
577 filter: &FilterDefinition,
578) -> Result<Option<LeftRoom>> {
579 let left_count = services
580 .state_cache
581 .get_left_count(room_id, sender_user)
582 .await
583 .unwrap_or(0);
584
585 if left_count == 0 || left_count > next_batch {
586 return Ok(None);
587 }
588
589 let include_leave = filter.room.include_leave;
590 if since == 0 && !include_leave {
591 return Ok(None);
592 }
593
594 if since != 0 && left_count <= since {
597 return Ok(None);
598 }
599
600 let is_not_found = services.metadata.exists(room_id).is_false();
601
602 let is_disabled = services.metadata.is_disabled(room_id);
603
604 let is_banned = services.metadata.is_banned(room_id);
605
606 pin_mut!(is_not_found, is_disabled, is_banned);
607 if is_not_found.or(is_disabled).or(is_banned).await {
608 let event = PduEvent {
611 event_id: EventId::new_v1(services.globals.server_name()),
612 origin_server_ts: utils::millis_since_unix_epoch().try_into()?,
613 kind: RoomMember,
614 state_key: Some(sender_user.as_str().into()),
615 sender: sender_user.to_owned(),
616 content: serde_json::from_str(r#"{"membership":"leave"}"#)?,
617 room_id: room_id.clone(),
619 depth: uint!(1),
620 origin: None,
621 unsigned: None,
622 redacts: None,
623 hashes: EventHash::default(),
624 auth_events: Default::default(),
625 prev_events: Default::default(),
626 signatures: None,
627 };
628
629 let state = StateEvents { events: vec![event.into_format()] };
630
631 let state = if use_state_after {
632 RoomState::After(state)
633 } else {
634 RoomState::Before(state)
635 };
636
637 return Ok(Some(LeftRoom {
638 account_data: RoomAccountData::default(),
639 state,
640 timeline: Timeline {
641 limited: false,
642 events: Default::default(),
643 prev_batch: Some(left_count.to_string()),
644 },
645 }));
646 }
647
648 load_left_room(
649 services,
650 sender_user,
651 room_id,
652 since,
653 left_count,
654 full_state,
655 use_state_after,
656 filter,
657 )
658 .await
659}
660
661#[tracing::instrument(name = "load", level = "debug", skip_all)]
662#[expect(clippy::too_many_arguments)]
663async fn load_left_room(
664 services: &Services,
665 sender_user: &UserId,
666 room_id: &RoomId,
667 since: u64,
668 left_count: u64,
669 full_state: bool,
670 use_state_after: bool,
671 filter: &FilterDefinition,
672) -> Result<Option<LeftRoom>> {
673 let initial = since == 0;
674 let timeline_limit: usize = filter
675 .room
676 .timeline
677 .limit
678 .map(TryInto::try_into)
679 .map_expect("UInt to usize")
680 .unwrap_or(10)
681 .min(100);
682
683 let (timeline_pdus, limited, _) = load_timeline(
684 services,
685 sender_user,
686 room_id,
687 PduCount::Normal(since),
688 Some(PduCount::Normal(left_count)),
689 timeline_limit.max(1),
690 )
691 .await
692 .unwrap_or_default();
693
694 let since_shortstatehash = services
695 .timeline
696 .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
697 .ok();
698
699 let horizon_shortstatehash = timeline_pdus
700 .first()
701 .map(at!(0))
702 .map_async(|count| {
703 services
704 .timeline
705 .get_shortstatehash(room_id, count)
706 .inspect_err(inspect_debug_log)
707 .ok()
708 });
709
710 let after_shortstatehash = use_state_after.then_async(|| {
715 services
716 .timeline
717 .next_shortstatehash(room_id, PduCount::Normal(left_count))
718 .or_else(|_| services.state.get_room_shortstatehash(room_id))
719 .inspect_err(inspect_debug_log)
720 });
721
722 let left_shortstatehash = services
723 .timeline
724 .get_shortstatehash(room_id, PduCount::Normal(left_count))
725 .inspect_err(inspect_debug_log)
726 .or_else(|_| services.state.get_room_shortstatehash(room_id))
727 .map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
728
729 let (since_shortstatehash, horizon_shortstatehash, after_shortstatehash, left_shortstatehash) =
730 join4(
731 since_shortstatehash,
732 horizon_shortstatehash,
733 after_shortstatehash,
734 left_shortstatehash,
735 )
736 .boxed()
737 .await;
738
739 let StateChanges { state_events, .. } = calculate_state_changes(
740 services,
741 sender_user,
742 room_id,
743 full_state || initial,
744 use_state_after,
745 since_shortstatehash,
746 horizon_shortstatehash.flatten(),
747 after_shortstatehash.flat_ok(),
748 left_shortstatehash?,
749 false,
750 None,
751 )
752 .boxed()
753 .await?;
754
755 let is_sender_membership = |event: &PduEvent| {
756 *event.kind() == RoomMember && event.state_key() == Some(sender_user.as_str())
757 };
758
759 let timeline_sender_member = timeline_limit
760 .eq(&0)
761 .then(|| timeline_pdus.last().map(ref_at!(1)).cloned())
762 .into_iter()
763 .flat_map(Option::into_iter);
764
765 let encrypted = services
766 .state_accessor
767 .is_encrypted_room(room_id)
768 .await;
769
770 let state_events = state_events
771 .into_iter()
772 .filter(|pdu| filter.room.state.matches(pdu))
773 .filter(|pdu| timeline_limit > 0 || !is_sender_membership(pdu))
774 .chain(timeline_sender_member)
775 .stream()
776 .wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
777 .map(Event::into_format)
778 .collect();
779
780 let left_prev_batch = timeline_limit
781 .eq(&0)
782 .then_some(left_count)
783 .map(PduCount::Normal);
784
785 let prev_batch = timeline_pdus
786 .first()
787 .filter(|_| timeline_limit > 0)
788 .map(at!(0))
789 .or(left_prev_batch)
790 .as_ref()
791 .map(ToString::to_string);
792
793 let timeline_events = timeline_pdus
794 .into_iter()
795 .stream()
796 .wide_filter_map(|item| ignored_filter(services, item, sender_user))
797 .map(at!(1))
798 .ready_filter(|pdu| filter.room.timeline.matches(pdu))
799 .take(timeline_limit)
800 .wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
801 .collect::<Vec<_>>();
802
803 let account_data_events = services
804 .account_data
805 .changes_since(Some(room_id), sender_user, since, None)
806 .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
807 .ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
808 .collect();
809
810 let (state_events, account_data_events, timeline_events) =
811 join3(state_events, account_data_events, timeline_events)
812 .boxed()
813 .await;
814
815 let state = StateEvents { events: state_events };
816
817 let state = if use_state_after {
818 RoomState::After(state)
819 } else {
820 RoomState::Before(state)
821 };
822
823 Ok(Some(LeftRoom {
824 account_data: RoomAccountData { events: account_data_events },
825 state,
826 timeline: Timeline {
827 prev_batch,
828 limited: limited || timeline_limit == 0,
829 events: timeline_events
830 .into_iter()
831 .map(Event::into_format)
832 .collect(),
833 },
834 }))
835}
836
837#[tracing::instrument(
838 name = "joined",
839 level = "debug",
840 skip_all,
841 fields(
842 room_id = ?room_id,
843 ),
844)]
845#[expect(clippy::too_many_arguments)]
846async fn load_joined_room(
847 services: &Services,
848 sender_user: &UserId,
849 sender_device: Option<&DeviceId>,
850 ref room_id: OwnedRoomId,
851 since: u64,
852 next_batch: u64,
853 full_state: bool,
854 use_state_after: bool,
855 filter: &FilterDefinition,
856) -> Result<(JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>)> {
857 let initial = since == 0;
858 let timeline_limit: usize = filter
859 .room
860 .timeline
861 .limit
862 .map(TryInto::try_into)
863 .map_expect("UInt to usize")
864 .unwrap_or(10)
865 .min(100);
866
867 let (timeline_pdus, limited, last_timeline_count) = load_timeline(
868 services,
869 sender_user,
870 room_id,
871 PduCount::Normal(since),
872 Some(PduCount::Normal(next_batch)),
873 timeline_limit,
874 )
875 .await?;
876
877 let timeline_changed = last_timeline_count.into_unsigned() > since;
878 debug_assert!(
879 timeline_pdus.is_empty() || timeline_changed,
880 "if timeline events, last_timeline_count must be in the since window."
881 );
882
883 let since_shortstatehash = timeline_changed.then_async(|| {
884 services
885 .timeline
886 .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
887 .ok()
888 });
889
890 let horizon_shortstatehash = timeline_pdus
891 .first()
892 .map(at!(0))
893 .map_async(|count| {
894 services
895 .timeline
896 .get_shortstatehash(room_id, count)
897 .inspect_err(inspect_debug_log)
898 });
899
900 let after_shortstatehash = use_state_after.then_async(|| {
905 services
906 .timeline
907 .next_shortstatehash(room_id, last_timeline_count)
908 .or_else(|_| services.state.get_room_shortstatehash(room_id))
909 .inspect_err(inspect_debug_log)
910 });
911
912 let current_shortstatehash = timeline_changed.then_async(|| {
913 services
914 .timeline
915 .get_shortstatehash(room_id, last_timeline_count)
916 .inspect_err(inspect_debug_log)
917 .or_else(|_| services.state.get_room_shortstatehash(room_id))
918 .map_err(|_| err!(Database(error!("Room {room_id} has no state"))))
919 });
920
921 let encrypted_room =
922 timeline_changed.then_async(|| services.state_accessor.is_encrypted_room(room_id));
923
924 let receipt_events = services
925 .read_receipt
926 .readreceipts_since(room_id, since, Some(next_batch))
927 .filter_map(async |(read_user, _, edu)| {
928 services
929 .users
930 .user_is_ignored(read_user, sender_user)
931 .await
932 .or_some((read_user.to_owned(), edu))
933 })
934 .collect::<Vec<(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)>>();
935
936 let (
937 (
938 since_shortstatehash,
939 horizon_shortstatehash,
940 after_shortstatehash,
941 current_shortstatehash,
942 ),
943 receipt_events,
944 encrypted_room,
945 ) = join3(
946 join4(
947 since_shortstatehash,
948 horizon_shortstatehash,
949 after_shortstatehash,
950 current_shortstatehash,
951 ),
952 receipt_events,
953 encrypted_room,
954 )
955 .map(|((since, horizon, after, current), receipt, encrypted_room)| -> Result<_> {
956 Ok((
957 (since.flatten(), horizon.flat_ok(), after.flat_ok(), current.transpose()?),
958 receipt,
959 encrypted_room,
960 ))
961 })
962 .boxed()
963 .await?;
964
965 let lazy_load_options =
966 [&filter.room.state.lazy_load_options, &filter.room.timeline.lazy_load_options];
967
968 let lazy_loading_enabled = encrypted_room.is_some_and(is_false!())
969 && lazy_load_options
970 .iter()
971 .any(|opts| opts.is_enabled());
972
973 let lazy_loading_context = &lazy_loading::Context {
974 user_id: sender_user,
975 device_id: sender_device,
976 room_id,
977 token: Some(since),
978 options: Some(&filter.room.state.lazy_load_options),
979 mode: lazy_loading::Mode::Update,
980 };
981
982 let lazy_load_reset =
984 initial.then_async(|| services.lazy_loading.reset(lazy_loading_context));
985
986 lazy_load_reset.await;
987 let witness = lazy_loading_enabled.then_async(|| {
988 let witness: Witness = timeline_pdus
989 .iter()
990 .map(ref_at!(1))
991 .map(Event::sender)
992 .map(Into::into)
993 .chain(receipt_events.iter().map(ref_at!(0)).cloned())
994 .collect();
995
996 services
997 .lazy_loading
998 .witness_retain(witness, lazy_loading_context)
999 });
1000
1001 let sender_joined_count = timeline_changed.then_async(|| {
1002 services
1003 .state_cache
1004 .get_joined_count(room_id, sender_user)
1005 .unwrap_or(0)
1006 });
1007
1008 let since_encryption = since_shortstatehash.map_async(|shortstatehash| {
1009 services
1010 .state_accessor
1011 .state_get(shortstatehash, &StateEventType::RoomEncryption, "")
1012 });
1013
1014 let last_notification_read = timeline_pdus.is_empty().then_async(|| {
1015 services
1016 .pusher
1017 .last_notification_read(sender_user, room_id)
1018 .ok()
1019 });
1020
1021 let thread_last_reads = timeline_pdus.is_empty().then_async(|| {
1022 services
1023 .pusher
1024 .thread_last_notification_reads(sender_user, room_id)
1025 });
1026
1027 let last_privateread_update = services
1028 .read_receipt
1029 .last_privateread_update(sender_user, room_id);
1030
1031 let (
1032 (last_privateread_update, last_notification_read, thread_last_reads),
1033 (sender_joined_count, since_encryption),
1034 witness,
1035 ) = join3(
1036 join3(last_privateread_update, last_notification_read, thread_last_reads),
1037 join(sender_joined_count, since_encryption),
1038 witness,
1039 )
1040 .await;
1041
1042 let _encrypted_since_last_sync =
1043 !initial && encrypted_room.is_some_and(is_true!()) && since_encryption.is_none();
1044
1045 let joined_since_last_sync = sender_joined_count.unwrap_or(0) > since;
1046
1047 let state_changes = current_shortstatehash.map_async(|current_shortstatehash| {
1048 calculate_state_changes(
1049 services,
1050 sender_user,
1051 room_id,
1052 full_state || initial,
1053 use_state_after,
1054 since_shortstatehash,
1055 horizon_shortstatehash,
1056 after_shortstatehash,
1057 current_shortstatehash,
1058 joined_since_last_sync,
1059 witness.as_ref(),
1060 )
1061 });
1062
1063 let StateChanges {
1064 heroes,
1065 joined_member_count,
1066 invited_member_count,
1067 mut state_events,
1068 } = state_changes
1069 .await
1070 .transpose()?
1071 .unwrap_or_default();
1072
1073 let is_sender_membership = |event: &PduEvent| {
1074 *event.event_type() == StateEventType::RoomMember.into()
1075 && event
1076 .state_key()
1077 .is_some_and(is_equal_to!(sender_user.as_str()))
1078 };
1079
1080 let joined_sender_member: Option<_> =
1081 (joined_since_last_sync && timeline_pdus.is_empty() && !initial)
1082 .then(|| {
1083 state_events
1084 .iter()
1085 .position(is_sender_membership)
1086 .map(|pos| state_events.swap_remove(pos))
1087 })
1088 .flatten();
1089
1090 let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| {
1091 joined_sender_member
1092 .is_some()
1093 .then_some(since)
1094 .map(Into::into)
1095 });
1096
1097 let in_window = |count: u64| count > since && count <= next_batch;
1098
1099 let send_main_counts = last_notification_read
1100 .flatten()
1101 .is_none_or(in_window);
1102
1103 let send_thread_counts = thread_last_reads
1104 .as_ref()
1105 .is_none_or(|reads| reads.values().copied().any(in_window));
1106
1107 let send_notification_counts = send_main_counts || send_thread_counts;
1112
1113 let send_notification_resets = last_notification_read
1114 .flatten()
1115 .is_some_and(|last_count| last_count > since);
1116
1117 let send_notification_count_filter =
1118 |count: &UInt| *count != uint!(0) || send_notification_resets;
1119
1120 let notification_count = send_notification_counts.then_async(|| {
1124 services
1125 .pusher
1126 .notification_count(sender_user, room_id)
1127 .map(TryInto::try_into)
1128 .unwrap_or(uint!(0))
1129 });
1130
1131 let highlight_count = send_notification_counts.then_async(|| {
1132 services
1133 .pusher
1134 .highlight_count(sender_user, room_id)
1135 .map(TryInto::try_into)
1136 .unwrap_or(uint!(0))
1137 });
1138
1139 let thread_counts = send_notification_counts.then_async(|| {
1142 services
1143 .pusher
1144 .thread_notification_counts(sender_user, room_id)
1145 });
1146
1147 let private_read_events = last_privateread_update.gt(&since).then_async(|| {
1148 services
1149 .read_receipt
1150 .private_read_get(room_id, sender_user)
1151 .unwrap_or_default()
1152 });
1153
1154 let typing_events = services
1155 .typing
1156 .last_typing_update(room_id)
1157 .and_then(async |count| {
1158 if count <= since {
1159 return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
1160 }
1161
1162 let typings = typings_event_for_user(services, room_id, sender_user).await?;
1163
1164 Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
1165 })
1166 .unwrap_or(Vec::new());
1167
1168 let keys_changed = services
1169 .users
1170 .room_keys_changed(room_id, since, Some(next_batch))
1171 .map(|(user_id, _)| user_id)
1172 .map(ToOwned::to_owned)
1173 .collect::<Vec<_>>();
1174
1175 let extract_membership = |event: &PduEvent| {
1176 let content: RoomMemberEventContent = event.get_content().ok()?;
1177 let user_id: OwnedUserId = event.state_key()?.parse().ok()?;
1178
1179 Some((content.membership, user_id))
1180 };
1181
1182 let timeline_membership_changes = timeline_pdus
1183 .iter()
1184 .filter(|_| !initial)
1185 .map(ref_at!(1))
1186 .filter_map(extract_membership)
1187 .collect::<Vec<_>>();
1188
1189 let device_list_updates = state_events
1190 .iter()
1191 .stream()
1192 .ready_filter(|_| !initial)
1193 .ready_filter(|state_event| *state_event.event_type() == RoomMember)
1194 .ready_filter_map(extract_membership)
1195 .chain(timeline_membership_changes.stream())
1196 .fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (membership, user_id)| {
1197 use MembershipState::*;
1198
1199 let requires_update = async |user_id| {
1200 !share_encrypted_room(services, sender_user, user_id, Some(room_id)).await
1201 };
1202
1203 match membership {
1204 | Join if requires_update(&user_id).await => dlu.insert(user_id),
1205 | Leave => leu.insert(user_id),
1206 | _ => false,
1207 };
1208
1209 (dlu, leu)
1210 })
1211 .then(async |(mut dlu, leu)| {
1212 dlu.extend(keys_changed.await);
1213 (dlu, leu)
1214 });
1215
1216 let include_in_timeline = |event: &PduEvent| {
1217 let filter = &filter.room.timeline;
1218 filter.matches(event)
1219 };
1220
1221 let encrypted = encrypted_room.unwrap_or(false);
1223
1224 let room_events = timeline_pdus
1225 .into_iter()
1226 .stream()
1227 .wide_filter_map(|item| ignored_filter(services, item, sender_user))
1228 .map(at!(1))
1229 .chain(joined_sender_member.into_iter().stream())
1230 .ready_filter(include_in_timeline)
1231 .wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
1232 .collect::<Vec<_>>();
1233
1234 let account_data_events = services
1235 .account_data
1236 .changes_since(Some(room_id), sender_user, since, None)
1237 .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
1238 .ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
1239 .collect();
1240
1241 let (
1242 (room_events, account_data_events),
1243 (typing_events, private_read_events),
1244 (notification_count, highlight_count, thread_counts),
1245 (device_list_updates, left_encrypted_users),
1246 ) = join4(
1247 join(room_events, account_data_events),
1248 join(typing_events, private_read_events),
1249 join3(notification_count, highlight_count, thread_counts),
1250 device_list_updates,
1251 )
1252 .boxed()
1253 .await;
1254
1255 let is_in_timeline = |event: &PduEvent| {
1256 room_events
1257 .iter()
1258 .map(Event::event_id)
1259 .any(is_equal_to!(event.event_id()))
1260 };
1261
1262 let include_in_state = |event: &PduEvent| {
1266 let filter = &filter.room.state;
1267 filter.matches(event) && (full_state || use_state_after || !is_in_timeline(event))
1268 };
1269
1270 let state_events: Vec<_> = state_events
1271 .into_iter()
1272 .filter(include_in_state)
1273 .stream()
1274 .wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
1275 .map(Event::into_format)
1276 .collect()
1277 .await;
1278
1279 let heroes = heroes
1280 .into_iter()
1281 .flatten()
1282 .map(TryInto::try_into)
1283 .filter_map(Result::ok)
1284 .collect();
1285
1286 let edus: Vec<Raw<AnySyncEphemeralRoomEvent>> = receipt_events
1287 .into_iter()
1288 .map(at!(1))
1289 .chain(typing_events)
1290 .chain(private_read_events.into_iter().flatten())
1291 .collect();
1292
1293 let thread_counts = thread_counts.unwrap_or_default();
1294
1295 let (thread_total_notifications, thread_total_highlights) = thread_counts
1296 .values()
1297 .fold((0_u64, 0_u64), |(n, h), &(notifs, hl)| {
1298 (n.saturating_add(notifs), h.saturating_add(hl))
1299 });
1300
1301 let want_thread_unread = filter.room.timeline.unread_thread_notifications;
1304
1305 let merge_total = |total: u64| {
1306 move |count: UInt| {
1307 want_thread_unread
1308 .is_false()
1309 .then(|| count.saturating_add(UInt::try_from(total).unwrap_or_default()))
1310 .unwrap_or(count)
1311 }
1312 };
1313
1314 let unread_notifications = UnreadNotificationsCount {
1315 highlight_count: highlight_count
1316 .map(merge_total(thread_total_highlights))
1317 .filter(send_notification_count_filter),
1318 notification_count: notification_count
1319 .map(merge_total(thread_total_notifications))
1320 .filter(send_notification_count_filter),
1321 };
1322
1323 let advanced_in_window = |root: &EventId| {
1329 initial
1330 || thread_last_reads
1331 .as_ref()
1332 .is_none_or(|reads| reads.get(root).copied().is_some_and(in_window))
1333 };
1334
1335 let unread_thread_notifications = thread_counts
1336 .into_iter()
1337 .filter(|_| want_thread_unread)
1338 .filter(|(root, _)| advanced_in_window(root))
1339 .map(|(root, (notifications, highlights))| {
1340 let counts = UnreadNotificationsCount {
1341 notification_count: UInt::try_from(notifications).ok(),
1342 highlight_count: UInt::try_from(highlights).ok(),
1343 };
1344
1345 (root, counts)
1346 })
1347 .collect();
1348
1349 let state = StateEvents { events: state_events };
1350
1351 let state = if use_state_after {
1352 RoomState::After(state)
1353 } else {
1354 RoomState::Before(state)
1355 };
1356
1357 let joined_room = JoinedRoom {
1358 account_data: RoomAccountData { events: account_data_events },
1359 ephemeral: Ephemeral { events: edus },
1360 state,
1361 summary: RoomSummary {
1362 joined_member_count: joined_member_count.map(ruma_from_u64),
1363 invited_member_count: invited_member_count.map(ruma_from_u64),
1364 heroes,
1365 },
1366 timeline: Timeline {
1367 limited: limited || joined_since_last_sync,
1368 prev_batch: prev_batch.as_ref().map(ToString::to_string),
1369 events: room_events
1370 .into_iter()
1371 .map(Event::into_format)
1372 .collect(),
1373 },
1374 unread_notifications,
1375 unread_thread_notifications,
1376 };
1377
1378 Ok((joined_room, device_list_updates, left_encrypted_users))
1379}
1380
1381#[tracing::instrument(
1382 name = "state",
1383 level = "trace",
1384 skip_all,
1385 fields(
1386 full = %full_state,
1387 after = %use_state_after,
1388 ss = ?since_shortstatehash,
1389 hs = ?horizon_shortstatehash,
1390 as = ?after_shortstatehash,
1391 cs = %current_shortstatehash,
1392 )
1393)]
1394#[expect(clippy::too_many_arguments)]
1395async fn calculate_state_changes<'a>(
1396 services: &Services,
1397 sender_user: &UserId,
1398 room_id: &RoomId,
1399 full_state: bool,
1400 use_state_after: bool,
1401 since_shortstatehash: Option<ShortStateHash>,
1402 horizon_shortstatehash: Option<ShortStateHash>,
1403 after_shortstatehash: Option<ShortStateHash>,
1404 current_shortstatehash: ShortStateHash,
1405 joined_since_last_sync: bool,
1406 witness: Option<&'a Witness>,
1407) -> Result<StateChanges> {
1408 let incremental = !full_state && !joined_since_last_sync && since_shortstatehash.is_some();
1409
1410 let horizon_shortstatehash = use_state_after
1415 .then_some(after_shortstatehash)
1416 .unwrap_or(horizon_shortstatehash)
1417 .unwrap_or(current_shortstatehash);
1418
1419 let since_shortstatehash = since_shortstatehash.unwrap_or(horizon_shortstatehash);
1420
1421 let state_get_shorteventid = |user_id: &'a UserId| {
1422 services
1423 .state_accessor
1424 .state_get_shortid(
1425 horizon_shortstatehash,
1426 &StateEventType::RoomMember,
1427 user_id.as_str(),
1428 )
1429 .ok()
1430 };
1431
1432 let lazy_state_ids = witness.map_async(|witness| {
1433 witness
1434 .iter()
1435 .stream()
1436 .ready_filter(|&user_id| user_id != sender_user)
1437 .broad_filter_map(|user_id| state_get_shorteventid(user_id))
1438 .into_future()
1439 });
1440
1441 let state_diff_ids = incremental.then_async(|| {
1442 services
1443 .state_accessor
1444 .state_added((since_shortstatehash, horizon_shortstatehash))
1445 .boxed()
1446 .into_future()
1447 });
1448
1449 let current_state_ids = (!incremental).then_async(|| {
1450 services
1451 .state_accessor
1452 .state_full_shortids(horizon_shortstatehash)
1453 .expect_ok()
1454 .boxed()
1455 .into_future()
1456 });
1457
1458 let state_events = current_state_ids
1459 .stream()
1460 .chain(state_diff_ids.stream())
1461 .broad_filter_map(async |(shortstatekey, shorteventid)| {
1462 lazy_filter(services, sender_user, witness, shortstatekey, shorteventid).await
1463 })
1464 .chain(lazy_state_ids.stream())
1465 .broad_filter_map(|shorteventid| {
1466 services
1467 .timeline
1468 .get_pdu_from_shorteventid(shorteventid)
1469 .ok()
1470 })
1471 .collect::<Vec<_>>()
1472 .await;
1473
1474 let send_member_counts = state_events
1475 .iter()
1476 .any(|event| *event.kind() == RoomMember);
1477
1478 let member_counts =
1479 send_member_counts.then_async(|| calculate_counts(services, room_id, sender_user));
1480
1481 let (joined_member_count, invited_member_count, heroes) =
1482 member_counts.await.unwrap_or((None, None, None));
1483
1484 Ok(StateChanges {
1485 heroes,
1486 joined_member_count,
1487 invited_member_count,
1488 state_events,
1489 })
1490}
1491
1492async fn lazy_filter(
1493 services: &Services,
1494 sender_user: &UserId,
1495 witness: Option<&Witness>,
1496 shortstatekey: ShortStateKey,
1497 shorteventid: ShortEventId,
1498) -> Option<ShortEventId> {
1499 if witness.is_none() {
1500 return Some(shorteventid);
1501 }
1502
1503 let (event_type, state_key) = services
1504 .short
1505 .get_statekey_from_short(shortstatekey)
1506 .await
1507 .ok()?;
1508
1509 (event_type != StateEventType::RoomMember || state_key == sender_user.as_str())
1510 .then_some(shorteventid)
1511}
1512
1513async fn calculate_counts(
1514 services: &Services,
1515 room_id: &RoomId,
1516 sender_user: &UserId,
1517) -> (Option<u64>, Option<u64>, Option<Vec<OwnedUserId>>) {
1518 let joined_member_count = services
1519 .state_cache
1520 .room_joined_count(room_id)
1521 .unwrap_or(0);
1522
1523 let invited_member_count = services
1524 .state_cache
1525 .room_invited_count(room_id)
1526 .unwrap_or(0);
1527
1528 let (joined_member_count, invited_member_count) =
1529 join(joined_member_count, invited_member_count).await;
1530
1531 let small_room = joined_member_count.saturating_add(invited_member_count) <= 5;
1532
1533 let heroes = services
1534 .config
1535 .calculate_heroes
1536 .and_is(small_room)
1537 .then_async(|| calculate_heroes(services, room_id, sender_user));
1538
1539 (Some(joined_member_count), Some(invited_member_count), heroes.await)
1540}
1541
1542pub(crate) async fn calculate_heroes(
1543 services: &Services,
1544 room_id: &RoomId,
1545 sender_user: &UserId,
1546) -> Vec<OwnedUserId> {
1547 const LIMIT: usize = 5;
1548
1549 services
1550 .state_accessor
1551 .room_state_type_pdus(room_id, &StateEventType::RoomMember)
1552 .ready_filter_map(Result::ok)
1553 .filter_map(|pdu| filter_hero(services, room_id, sender_user, pdu))
1554 .take(LIMIT)
1555 .collect::<Vec<_>>()
1556 .await
1557}
1558
1559async fn filter_hero<Pdu: Event>(
1560 services: &Services,
1561 room_id: &RoomId,
1562 sender_user: &UserId,
1563 pdu: Pdu,
1564) -> Option<OwnedUserId> {
1565 let user_id = pdu.state_key().map(TryInto::try_into).flat_ok()?;
1566
1567 if user_id == sender_user {
1568 return None;
1569 }
1570
1571 let Ok(content): Result<RoomMemberEventContent, _> = pdu.get_content() else {
1572 return None;
1573 };
1574
1575 if !matches!(content.membership, MembershipState::Join | MembershipState::Invite) {
1577 return None;
1578 }
1579
1580 let (is_invited, is_joined) = join(
1581 services.state_cache.is_invited(user_id, room_id),
1582 services.state_cache.is_joined(user_id, room_id),
1583 )
1584 .await;
1585
1586 if !is_joined && is_invited {
1587 return None;
1588 }
1589
1590 Some(user_id.to_owned())
1591}
1592
1593async fn typings_event_for_user(
1594 services: &Services,
1595 room_id: &RoomId,
1596 sender_user: &UserId,
1597) -> Result<SyncEphemeralRoomEvent<TypingEventContent>> {
1598 Ok(SyncEphemeralRoomEvent {
1599 content: TypingEventContent {
1600 user_ids: services
1601 .typing
1602 .typing_users_for_user(room_id, sender_user)
1603 .await?,
1604 },
1605 })
1606}