Skip to main content

tuwunel_api/client/sync/
v3.rs

1use std::{
2	collections::{BTreeMap, HashMap, HashSet},
3	time::Duration,
4};
5
6use axum::extract::State;
7use futures::{
8	FutureExt, StreamExt, TryFutureExt,
9	future::{join, join3, join4, join5},
10	pin_mut,
11};
12use ruma::{
13	DeviceId, EventId, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
14	api::client::{
15		filter::FilterDefinition,
16		sync::sync_events::{
17			self, DeviceLists, UnreadNotificationsCount,
18			v3::{
19				Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
20				KnockState, KnockedRoom, LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms,
21				State as RoomState, StateEvents, Timeline, ToDevice,
22			},
23		},
24	},
25	events::{
26		AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType,
27		SyncEphemeralRoomEvent,
28		TimelineEventType::*,
29		presence::{PresenceEvent, PresenceEventContent},
30		room::member::{MembershipState, RoomMemberEventContent},
31		typing::TypingEventContent,
32	},
33	serde::Raw,
34	uint,
35};
36use tokio::time;
37use tuwunel_core::{
38	Result, at,
39	debug::INFO_SPAN_LEVEL,
40	debug_error, err,
41	error::{inspect_debug_log, inspect_log},
42	extract_variant, is_equal_to, is_false, is_true,
43	matrix::{
44		Event,
45		event::Matches,
46		pdu::{EventHash, PduCount, PduEvent},
47	},
48	pair_of, ref_at,
49	result::FlatOk,
50	trace,
51	utils::{
52		self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
53		future::{OptionStream, ReadyBoolExt},
54		math::ruma_from_u64,
55		option::OptionExt,
56		result::MapExpect,
57		stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
58	},
59};
60use tuwunel_service::{
61	Services,
62	rooms::{
63		lazy_loading,
64		lazy_loading::{Options, Witness},
65		short::{ShortEventId, ShortStateHash, ShortStateKey},
66	},
67};
68
69use super::{load_timeline, share_encrypted_room};
70use crate::{
71	ClientIp, Ruma,
72	client::{ignored_filter, is_empty_account_data_event, with_membership},
73};
74
75#[derive(Default)]
76struct StateChanges {
77	heroes: Option<Vec<OwnedUserId>>,
78	joined_member_count: Option<u64>,
79	invited_member_count: Option<u64>,
80	state_events: Vec<PduEvent>,
81}
82
83type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
84
85/// # `GET /_matrix/client/r0/sync`
86///
87/// Synchronize the client's state with the latest state on the server.
88///
89/// - This endpoint takes a `since` parameter which should be the `next_batch`
90///   value from a previous request for incremental syncs.
91///
92/// Calling this endpoint without a `since` parameter returns:
93/// - Some of the most recent events of each timeline
94/// - Notification counts for each room
95/// - Joined and invited member counts, heroes
96/// - All state events
97///
98/// Calling this endpoint with a `since` parameter from a previous `next_batch`
99/// returns: For joined rooms:
100/// - Some of the most recent events of each timeline that happened after since
101/// - If user joined the room after since: All state events (unless lazy loading
102///   is activated) and all device list updates in that room
103/// - If the user was already in the room: A list of all events that are in the
104///   state now, but were not in the state at `since`
105/// - If the state we send contains a member event: Joined and invited member
106///   counts, heroes
107/// - Device list updates that happened after `since`
108/// - If there are events in the timeline we send or the user send updated his
109///   read mark: Notification counts
110/// - EDUs that are active now (read receipts, typing updates, presence)
111/// - TODO: Allow multiple sync streams to support Pantalaimon
112///
113/// For invited rooms:
114/// - If the user was invited after `since`: A subset of the state of the room
115///   at the point of the invite
116///
117/// For left rooms:
118/// - If the user left after `since`: `prev_batch` token, empty state (TODO:
119///   subset of the state at the point of the leave)
120#[tracing::instrument(
121	name = "sync",
122	level = "debug",
123	skip_all,
124	fields(
125		user_id = %body.sender_user(),
126		device_id = %body.sender_device.as_deref().map_or("<no device>", |x| x.as_str()),
127    )
128)]
129pub(crate) async fn sync_events_route(
130	State(services): State<crate::State>,
131	ClientIp(client): ClientIp,
132	body: Ruma<sync_events::v3::Request>,
133) -> Result<sync_events::v3::Response> {
134	let sender_user = body.sender_user();
135	let sender_device = body.sender_device.as_deref();
136
137	let filter = body
138		.body
139		.filter
140		.as_ref()
141		.map_async(async |filter| match filter {
142			| Filter::FilterDefinition(filter) => filter.clone(),
143			| Filter::FilterId(filter_id) => services
144				.users
145				.get_filter(sender_user, filter_id)
146				.await
147				.unwrap_or_default(),
148		});
149
150	let filter = filter.map(Option::unwrap_or_default);
151	let full_state = body.body.full_state;
152	let set_presence = &body.body.set_presence;
153	let use_state_after = body.body.use_state_after;
154	let ping_presence = services
155		.presence
156		.maybe_ping_presence(
157			sender_user,
158			body.sender_device.as_deref(),
159			Some(client),
160			set_presence,
161		)
162		.inspect_err(inspect_log)
163		.ok();
164
165	// Record user as actively syncing for push suppression heuristic.
166	let note_sync = services.presence.note_sync(sender_user);
167
168	let (filter, ..) = join3(filter, ping_presence, note_sync).await;
169
170	let mut since = body
171		.body
172		.since
173		.as_deref()
174		.map(str::parse)
175		.flat_ok()
176		.unwrap_or(0);
177
178	let timeout = body
179		.body
180		.timeout
181		.as_ref()
182		.map(Duration::as_millis)
183		.map(TryInto::try_into)
184		.flat_ok()
185		.unwrap_or(services.config.client_sync_timeout_default)
186		.max(services.config.client_sync_timeout_min)
187		.min(services.config.client_sync_timeout_max);
188
189	let stop_at = time::Instant::now()
190		.checked_add(Duration::from_millis(timeout))
191		.expect("configuration must limit maximum timeout");
192
193	loop {
194		let watch_rooms = services
195			.state_cache
196			.rooms_joined(sender_user)
197			.chain(services.state_cache.rooms_invited(sender_user));
198
199		let watchers = services
200			.sync
201			.watch(sender_user, sender_device, watch_rooms)
202			.await;
203
204		let next_batch = services.globals.wait_pending().await?;
205		if since > next_batch {
206			debug_error!(since, next_batch, "received since > next_batch, clamping");
207			since = next_batch;
208		}
209
210		if since < next_batch || full_state {
211			let response = build_sync_events(
212				&services,
213				sender_user,
214				sender_device,
215				since,
216				next_batch,
217				full_state,
218				use_state_after,
219				&filter,
220			)
221			.await?;
222
223			let empty = response.rooms.is_empty()
224				&& response.presence.is_empty()
225				&& response.account_data.is_empty()
226				&& response.device_lists.is_empty()
227				&& response.to_device.is_empty();
228
229			if !empty || full_state {
230				return Ok(response);
231			}
232		}
233
234		// Wait for activity
235		if time::timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() {
236			let response =
237				build_empty_response(&services, sender_user, sender_device, next_batch).await;
238
239			trace!(since, next_batch, "empty response");
240			return Ok(response);
241		}
242
243		trace!(
244			since,
245			last_batch = ?next_batch,
246			count = ?services.globals.pending_count(),
247			stop_at = ?stop_at,
248			"notified by watcher"
249		);
250
251		since = next_batch;
252	}
253}
254
255async fn build_empty_response(
256	services: &Services,
257	sender_user: &UserId,
258	sender_device: Option<&DeviceId>,
259	next_batch: u64,
260) -> sync_events::v3::Response {
261	let device_one_time_keys_count = sender_device.map_async(|sender_device| {
262		services
263			.users
264			.count_one_time_keys(sender_user, sender_device)
265	});
266
267	let device_unused_fallback_key_types = sender_device.map_async(|sender_device| {
268		services
269			.users
270			.unused_fallback_key_algorithms(sender_user, sender_device)
271			.collect::<Vec<_>>()
272	});
273
274	let (device_one_time_keys_count, device_unused_fallback_key_types) =
275		join(device_one_time_keys_count, device_unused_fallback_key_types).await;
276
277	sync_events::v3::Response {
278		device_one_time_keys_count: device_one_time_keys_count.unwrap_or_default(),
279		device_unused_fallback_key_types,
280		..sync_events::v3::Response::new(next_batch.to_string())
281	}
282}
283
284#[tracing::instrument(
285	name = "build",
286	level = INFO_SPAN_LEVEL,
287	skip_all,
288	fields(
289		%since,
290		%next_batch,
291		count = ?services.globals.pending_count(),
292    )
293)]
294#[expect(clippy::too_many_arguments)]
295async fn build_sync_events(
296	services: &Services,
297	sender_user: &UserId,
298	sender_device: Option<&DeviceId>,
299	since: u64,
300	next_batch: u64,
301	full_state: bool,
302	use_state_after: bool,
303	filter: &FilterDefinition,
304) -> Result<sync_events::v3::Response> {
305	let joined_rooms = services
306		.state_cache
307		.rooms_joined(sender_user)
308		.ready_filter(|&room_id| filter.room.matches(room_id))
309		.map(ToOwned::to_owned)
310		.broad_filter_map(|room_id| {
311			load_joined_room(
312				services,
313				sender_user,
314				sender_device,
315				room_id.clone(),
316				since,
317				next_batch,
318				full_state,
319				use_state_after,
320				filter,
321			)
322			.map_ok(move |(joined_room, dlu, jeu)| (room_id, joined_room, dlu, jeu))
323			.ok()
324		})
325		.ready_fold(
326			(BTreeMap::new(), HashSet::new(), HashSet::new()),
327			|(mut joined_rooms, mut device_list_updates, mut left_encrypted_users),
328			 (room_id, joined_room, dlu, leu)| {
329				device_list_updates.extend(dlu);
330				left_encrypted_users.extend(leu);
331				if !joined_room.is_empty() {
332					joined_rooms.insert(room_id, joined_room);
333				}
334
335				(joined_rooms, device_list_updates, left_encrypted_users)
336			},
337		);
338
339	let left_rooms = services
340		.state_cache
341		.rooms_left_state(sender_user)
342		.ready_filter(|(room_id, _)| filter.room.matches(room_id))
343		.broad_filter_map(|(room_id, _)| {
344			handle_left_room(
345				services,
346				since,
347				room_id.clone(),
348				sender_user,
349				next_batch,
350				full_state,
351				use_state_after,
352				filter,
353			)
354			.map_ok(move |left_room| (room_id, left_room))
355			.ok()
356		})
357		.ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room)))
358		.collect();
359
360	let invited_rooms = services
361		.state_cache
362		.rooms_invited_state(sender_user)
363		.ready_filter(|(room_id, _)| filter.room.matches(room_id))
364		.fold_default(async |mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| {
365			let invite_count = services
366				.state_cache
367				.get_invite_count(&room_id, sender_user)
368				.await
369				.ok();
370
371			// Invited before last sync
372			if Some(since) >= invite_count || Some(next_batch) < invite_count {
373				return invited_rooms;
374			}
375
376			let invited_room = InvitedRoom {
377				invite_state: InviteState { events: invite_state },
378			};
379
380			invited_rooms.insert(room_id, invited_room);
381			invited_rooms
382		});
383
384	let knocked_rooms = services
385		.state_cache
386		.rooms_knocked_state(sender_user)
387		.ready_filter(|(room_id, _)| filter.room.matches(room_id))
388		.fold_default(async |mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| {
389			let knock_count = services
390				.state_cache
391				.get_knock_count(&room_id, sender_user)
392				.await
393				.ok();
394
395			// Knocked before last sync; or after the cutoff for this sync
396			if Some(since) >= knock_count || Some(next_batch) < knock_count {
397				return knocked_rooms;
398			}
399
400			let knocked_room = KnockedRoom {
401				knock_state: KnockState { events: knock_state },
402			};
403
404			knocked_rooms.insert(room_id, knocked_room);
405			knocked_rooms
406		});
407
408	let presence_updates = services
409		.config
410		.allow_local_presence
411		.then_async(|| {
412			process_presence_updates(services, since, next_batch, sender_user, filter)
413		});
414
415	let account_data = services
416		.account_data
417		.changes_since(None, sender_user, since, Some(next_batch))
418		.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
419		.ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
420		.collect();
421
422	// Look for device list updates of this account
423	let keys_changed = services
424		.users
425		.keys_changed(sender_user, since, Some(next_batch))
426		.map(ToOwned::to_owned)
427		.collect::<HashSet<_>>();
428
429	let to_device_events = sender_device.map_async(|sender_device| {
430		services
431			.users
432			.get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch))
433			.map(at!(1))
434			.collect::<Vec<_>>()
435	});
436
437	let device_one_time_keys_count = sender_device.map_async(|sender_device| {
438		services
439			.users
440			.count_one_time_keys(sender_user, sender_device)
441	});
442
443	let device_unused_fallback_key_types = sender_device.map_async(|sender_device| {
444		services
445			.users
446			.unused_fallback_key_algorithms(sender_user, sender_device)
447			.collect::<Vec<_>>()
448	});
449
450	// Remove all to-device events the device received *last time*
451	let remove_to_device_events = sender_device.map_async(|sender_device| {
452		services
453			.users
454			.remove_to_device_events(sender_user, sender_device, since)
455	});
456
457	let (
458		account_data,
459		keys_changed,
460		presence_updates,
461		(_, to_device_events, device_one_time_keys_count, device_unused_fallback_key_types),
462		(
463			(joined_rooms, mut device_list_updates, left_encrypted_users),
464			left_rooms,
465			invited_rooms,
466			knocked_rooms,
467		),
468	) = join5(
469		account_data,
470		keys_changed,
471		presence_updates,
472		join4(
473			remove_to_device_events,
474			to_device_events,
475			device_one_time_keys_count,
476			device_unused_fallback_key_types,
477		),
478		join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms),
479	)
480	.boxed()
481	.await;
482
483	device_list_updates.extend(keys_changed);
484
485	// If the user doesn't share an encrypted room with the target anymore, we need
486	// to tell them
487	let device_list_left = left_encrypted_users
488		.into_iter()
489		.stream()
490		.broad_filter_map(async |user_id: OwnedUserId| {
491			share_encrypted_room(services, sender_user, &user_id, None)
492				.await
493				.eq(&false)
494				.then_some(user_id)
495		})
496		.collect()
497		.await;
498
499	let presence_events = presence_updates
500		.into_iter()
501		.flat_map(IntoIterator::into_iter)
502		.map(|(sender, content)| PresenceEvent { content, sender })
503		.map(|ref event| Raw::new(event))
504		.filter_map(Result::ok)
505		.collect();
506
507	Ok(sync_events::v3::Response {
508		account_data: GlobalAccountData { events: account_data },
509		device_lists: DeviceLists {
510			left: device_list_left,
511			changed: device_list_updates.into_iter().collect(),
512		},
513		device_one_time_keys_count: device_one_time_keys_count.unwrap_or_default(),
514		device_unused_fallback_key_types,
515		next_batch: next_batch.to_string(),
516		presence: Presence { events: presence_events },
517		rooms: Rooms {
518			leave: left_rooms,
519			join: joined_rooms,
520			invite: invited_rooms,
521			knock: knocked_rooms,
522		},
523		to_device: ToDevice {
524			events: to_device_events.unwrap_or_default(),
525		},
526	})
527}
528
529#[tracing::instrument(name = "presence", level = "debug", skip_all)]
530async fn process_presence_updates(
531	services: &Services,
532	since: u64,
533	next_batch: u64,
534	syncing_user: &UserId,
535	filter: &FilterDefinition,
536) -> PresenceUpdates {
537	services
538		.presence
539		.presence_since(since, Some(next_batch))
540		.ready_filter(|(user_id, ..)| filter.presence.matches(user_id))
541		.filter(|(user_id, ..)| {
542			services
543				.state_cache
544				.user_sees_user(syncing_user, user_id)
545		})
546		.filter_map(|(user_id, _, presence_bytes)| {
547			services
548				.presence
549				.from_json_bytes_to_event(presence_bytes, user_id)
550				.map_ok(move |event| (user_id, event))
551				.ok()
552		})
553		.map(|(user_id, event)| (user_id.to_owned(), event.content))
554		.collect()
555		.boxed()
556		.await
557}
558
559#[tracing::instrument(
560	name = "left",
561	level = "debug",
562	skip_all,
563	fields(
564		room_id = %room_id,
565		full = %full_state,
566	),
567)]
568#[expect(clippy::too_many_arguments)]
569async fn handle_left_room(
570	services: &Services,
571	since: u64,
572	ref room_id: OwnedRoomId,
573	sender_user: &UserId,
574	next_batch: u64,
575	full_state: bool,
576	use_state_after: bool,
577	filter: &FilterDefinition,
578) -> Result<Option<LeftRoom>> {
579	let left_count = services
580		.state_cache
581		.get_left_count(room_id, sender_user)
582		.await
583		.unwrap_or(0);
584
585	if left_count == 0 || left_count > next_batch {
586		return Ok(None);
587	}
588
589	let include_leave = filter.room.include_leave;
590	if since == 0 && !include_leave {
591		return Ok(None);
592	}
593
594	// Cannot sync unless the event falls within the snapshot. The room is only
595	// sync'ed once to the client, after that it's too late.
596	if since != 0 && left_count <= since {
597		return Ok(None);
598	}
599
600	let is_not_found = services.metadata.exists(room_id).is_false();
601
602	let is_disabled = services.metadata.is_disabled(room_id);
603
604	let is_banned = services.metadata.is_banned(room_id);
605
606	pin_mut!(is_not_found, is_disabled, is_banned);
607	if is_not_found.or(is_disabled).or(is_banned).await {
608		// For rejected invites, deleted, missing, or broken room state this is the last
609		// resort to convey a the minimum of information to the client.
610		let event = PduEvent {
611			event_id: EventId::new_v1(services.globals.server_name()),
612			origin_server_ts: utils::millis_since_unix_epoch().try_into()?,
613			kind: RoomMember,
614			state_key: Some(sender_user.as_str().into()),
615			sender: sender_user.to_owned(),
616			content: serde_json::from_str(r#"{"membership":"leave"}"#)?,
617			// The following keys are dropped on conversion
618			room_id: room_id.clone(),
619			depth: uint!(1),
620			origin: None,
621			unsigned: None,
622			redacts: None,
623			hashes: EventHash::default(),
624			auth_events: Default::default(),
625			prev_events: Default::default(),
626			signatures: None,
627		};
628
629		let state = StateEvents { events: vec![event.into_format()] };
630
631		let state = if use_state_after {
632			RoomState::After(state)
633		} else {
634			RoomState::Before(state)
635		};
636
637		return Ok(Some(LeftRoom {
638			account_data: RoomAccountData::default(),
639			state,
640			timeline: Timeline {
641				limited: false,
642				events: Default::default(),
643				prev_batch: Some(left_count.to_string()),
644			},
645		}));
646	}
647
648	load_left_room(
649		services,
650		sender_user,
651		room_id,
652		since,
653		left_count,
654		full_state,
655		use_state_after,
656		filter,
657	)
658	.await
659}
660
661#[tracing::instrument(name = "load", level = "debug", skip_all)]
662#[expect(clippy::too_many_arguments)]
663async fn load_left_room(
664	services: &Services,
665	sender_user: &UserId,
666	room_id: &RoomId,
667	since: u64,
668	left_count: u64,
669	full_state: bool,
670	use_state_after: bool,
671	filter: &FilterDefinition,
672) -> Result<Option<LeftRoom>> {
673	let initial = since == 0;
674	let timeline_limit: usize = filter
675		.room
676		.timeline
677		.limit
678		.map(TryInto::try_into)
679		.map_expect("UInt to usize")
680		.unwrap_or(10)
681		.min(100);
682
683	let (timeline_pdus, limited, _) = load_timeline(
684		services,
685		sender_user,
686		room_id,
687		PduCount::Normal(since),
688		Some(PduCount::Normal(left_count)),
689		timeline_limit.max(1),
690	)
691	.await
692	.unwrap_or_default();
693
694	let since_shortstatehash = services
695		.timeline
696		.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
697		.ok();
698
699	let horizon_shortstatehash = timeline_pdus
700		.first()
701		.map(at!(0))
702		.map_async(|count| {
703			services
704				.timeline
705				.get_shortstatehash(room_id, count)
706				.inspect_err(inspect_debug_log)
707				.ok()
708		});
709
710	// MSC4222 `state_after`: state at the leave (end of timeline). The
711	// stored shortstatehash at the leave PDU is state-before-leave, so
712	// step to the next PDU; if no event followed, the room's current
713	// shortstatehash is the post-leave state.
714	let after_shortstatehash = use_state_after.then_async(|| {
715		services
716			.timeline
717			.next_shortstatehash(room_id, PduCount::Normal(left_count))
718			.or_else(|_| services.state.get_room_shortstatehash(room_id))
719			.inspect_err(inspect_debug_log)
720	});
721
722	let left_shortstatehash = services
723		.timeline
724		.get_shortstatehash(room_id, PduCount::Normal(left_count))
725		.inspect_err(inspect_debug_log)
726		.or_else(|_| services.state.get_room_shortstatehash(room_id))
727		.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
728
729	let (since_shortstatehash, horizon_shortstatehash, after_shortstatehash, left_shortstatehash) =
730		join4(
731			since_shortstatehash,
732			horizon_shortstatehash,
733			after_shortstatehash,
734			left_shortstatehash,
735		)
736		.boxed()
737		.await;
738
739	let StateChanges { state_events, .. } = calculate_state_changes(
740		services,
741		sender_user,
742		room_id,
743		full_state || initial,
744		use_state_after,
745		since_shortstatehash,
746		horizon_shortstatehash.flatten(),
747		after_shortstatehash.flat_ok(),
748		left_shortstatehash?,
749		false,
750		None,
751	)
752	.boxed()
753	.await?;
754
755	let is_sender_membership = |event: &PduEvent| {
756		*event.kind() == RoomMember && event.state_key() == Some(sender_user.as_str())
757	};
758
759	let timeline_sender_member = timeline_limit
760		.eq(&0)
761		.then(|| timeline_pdus.last().map(ref_at!(1)).cloned())
762		.into_iter()
763		.flat_map(Option::into_iter);
764
765	let encrypted = services
766		.state_accessor
767		.is_encrypted_room(room_id)
768		.await;
769
770	let state_events = state_events
771		.into_iter()
772		.filter(|pdu| filter.room.state.matches(pdu))
773		.filter(|pdu| timeline_limit > 0 || !is_sender_membership(pdu))
774		.chain(timeline_sender_member)
775		.stream()
776		.wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
777		.map(Event::into_format)
778		.collect();
779
780	let left_prev_batch = timeline_limit
781		.eq(&0)
782		.then_some(left_count)
783		.map(PduCount::Normal);
784
785	let prev_batch = timeline_pdus
786		.first()
787		.filter(|_| timeline_limit > 0)
788		.map(at!(0))
789		.or(left_prev_batch)
790		.as_ref()
791		.map(ToString::to_string);
792
793	let timeline_events = timeline_pdus
794		.into_iter()
795		.stream()
796		.wide_filter_map(|item| ignored_filter(services, item, sender_user))
797		.map(at!(1))
798		.ready_filter(|pdu| filter.room.timeline.matches(pdu))
799		.take(timeline_limit)
800		.wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
801		.collect::<Vec<_>>();
802
803	let account_data_events = services
804		.account_data
805		.changes_since(Some(room_id), sender_user, since, None)
806		.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
807		.ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
808		.collect();
809
810	let (state_events, account_data_events, timeline_events) =
811		join3(state_events, account_data_events, timeline_events)
812			.boxed()
813			.await;
814
815	let state = StateEvents { events: state_events };
816
817	let state = if use_state_after {
818		RoomState::After(state)
819	} else {
820		RoomState::Before(state)
821	};
822
823	Ok(Some(LeftRoom {
824		account_data: RoomAccountData { events: account_data_events },
825		state,
826		timeline: Timeline {
827			prev_batch,
828			limited: limited || timeline_limit == 0,
829			events: timeline_events
830				.into_iter()
831				.map(Event::into_format)
832				.collect(),
833		},
834	}))
835}
836
837#[tracing::instrument(
838	name = "joined",
839	level = "debug",
840	skip_all,
841	fields(
842		room_id = ?room_id,
843	),
844)]
845#[expect(clippy::too_many_arguments)]
846async fn load_joined_room(
847	services: &Services,
848	sender_user: &UserId,
849	sender_device: Option<&DeviceId>,
850	ref room_id: OwnedRoomId,
851	since: u64,
852	next_batch: u64,
853	full_state: bool,
854	use_state_after: bool,
855	filter: &FilterDefinition,
856) -> Result<(JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>)> {
857	let initial = since == 0;
858	let timeline_limit: usize = filter
859		.room
860		.timeline
861		.limit
862		.map(TryInto::try_into)
863		.map_expect("UInt to usize")
864		.unwrap_or(10)
865		.min(100);
866
867	let (timeline_pdus, limited, last_timeline_count) = load_timeline(
868		services,
869		sender_user,
870		room_id,
871		PduCount::Normal(since),
872		Some(PduCount::Normal(next_batch)),
873		timeline_limit,
874	)
875	.await?;
876
877	let timeline_changed = last_timeline_count.into_unsigned() > since;
878	debug_assert!(
879		timeline_pdus.is_empty() || timeline_changed,
880		"if timeline events, last_timeline_count must be in the since window."
881	);
882
883	let since_shortstatehash = timeline_changed.then_async(|| {
884		services
885			.timeline
886			.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
887			.ok()
888	});
889
890	let horizon_shortstatehash = timeline_pdus
891		.first()
892		.map(at!(0))
893		.map_async(|count| {
894			services
895				.timeline
896				.get_shortstatehash(room_id, count)
897				.inspect_err(inspect_debug_log)
898		});
899
900	// MSC4222 `state_after` semantics: state at the *end* of the timeline
901	// window. `next_shortstatehash` reads state-before the next PDU, which
902	// equals state-after our last PDU; falling back to the room's current
903	// state covers the case where our window already touches HEAD.
904	let after_shortstatehash = use_state_after.then_async(|| {
905		services
906			.timeline
907			.next_shortstatehash(room_id, last_timeline_count)
908			.or_else(|_| services.state.get_room_shortstatehash(room_id))
909			.inspect_err(inspect_debug_log)
910	});
911
912	let current_shortstatehash = timeline_changed.then_async(|| {
913		services
914			.timeline
915			.get_shortstatehash(room_id, last_timeline_count)
916			.inspect_err(inspect_debug_log)
917			.or_else(|_| services.state.get_room_shortstatehash(room_id))
918			.map_err(|_| err!(Database(error!("Room {room_id} has no state"))))
919	});
920
921	let encrypted_room =
922		timeline_changed.then_async(|| services.state_accessor.is_encrypted_room(room_id));
923
924	let receipt_events = services
925		.read_receipt
926		.readreceipts_since(room_id, since, Some(next_batch))
927		.filter_map(async |(read_user, _, edu)| {
928			services
929				.users
930				.user_is_ignored(read_user, sender_user)
931				.await
932				.or_some((read_user.to_owned(), edu))
933		})
934		.collect::<Vec<(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)>>();
935
936	let (
937		(
938			since_shortstatehash,
939			horizon_shortstatehash,
940			after_shortstatehash,
941			current_shortstatehash,
942		),
943		receipt_events,
944		encrypted_room,
945	) = join3(
946		join4(
947			since_shortstatehash,
948			horizon_shortstatehash,
949			after_shortstatehash,
950			current_shortstatehash,
951		),
952		receipt_events,
953		encrypted_room,
954	)
955	.map(|((since, horizon, after, current), receipt, encrypted_room)| -> Result<_> {
956		Ok((
957			(since.flatten(), horizon.flat_ok(), after.flat_ok(), current.transpose()?),
958			receipt,
959			encrypted_room,
960		))
961	})
962	.boxed()
963	.await?;
964
965	let lazy_load_options =
966		[&filter.room.state.lazy_load_options, &filter.room.timeline.lazy_load_options];
967
968	let lazy_loading_enabled = encrypted_room.is_some_and(is_false!())
969		&& lazy_load_options
970			.iter()
971			.any(|opts| opts.is_enabled());
972
973	let lazy_loading_context = &lazy_loading::Context {
974		user_id: sender_user,
975		device_id: sender_device,
976		room_id,
977		token: Some(since),
978		options: Some(&filter.room.state.lazy_load_options),
979		mode: lazy_loading::Mode::Update,
980	};
981
982	// Reset lazy loading because this is an initial sync
983	let lazy_load_reset =
984		initial.then_async(|| services.lazy_loading.reset(lazy_loading_context));
985
986	lazy_load_reset.await;
987	let witness = lazy_loading_enabled.then_async(|| {
988		let witness: Witness = timeline_pdus
989			.iter()
990			.map(ref_at!(1))
991			.map(Event::sender)
992			.map(Into::into)
993			.chain(receipt_events.iter().map(ref_at!(0)).cloned())
994			.collect();
995
996		services
997			.lazy_loading
998			.witness_retain(witness, lazy_loading_context)
999	});
1000
1001	let sender_joined_count = timeline_changed.then_async(|| {
1002		services
1003			.state_cache
1004			.get_joined_count(room_id, sender_user)
1005			.unwrap_or(0)
1006	});
1007
1008	let since_encryption = since_shortstatehash.map_async(|shortstatehash| {
1009		services
1010			.state_accessor
1011			.state_get(shortstatehash, &StateEventType::RoomEncryption, "")
1012	});
1013
1014	let last_notification_read = timeline_pdus.is_empty().then_async(|| {
1015		services
1016			.pusher
1017			.last_notification_read(sender_user, room_id)
1018			.ok()
1019	});
1020
1021	let thread_last_reads = timeline_pdus.is_empty().then_async(|| {
1022		services
1023			.pusher
1024			.thread_last_notification_reads(sender_user, room_id)
1025	});
1026
1027	let last_privateread_update = services
1028		.read_receipt
1029		.last_privateread_update(sender_user, room_id);
1030
1031	let (
1032		(last_privateread_update, last_notification_read, thread_last_reads),
1033		(sender_joined_count, since_encryption),
1034		witness,
1035	) = join3(
1036		join3(last_privateread_update, last_notification_read, thread_last_reads),
1037		join(sender_joined_count, since_encryption),
1038		witness,
1039	)
1040	.await;
1041
1042	let _encrypted_since_last_sync =
1043		!initial && encrypted_room.is_some_and(is_true!()) && since_encryption.is_none();
1044
1045	let joined_since_last_sync = sender_joined_count.unwrap_or(0) > since;
1046
1047	let state_changes = current_shortstatehash.map_async(|current_shortstatehash| {
1048		calculate_state_changes(
1049			services,
1050			sender_user,
1051			room_id,
1052			full_state || initial,
1053			use_state_after,
1054			since_shortstatehash,
1055			horizon_shortstatehash,
1056			after_shortstatehash,
1057			current_shortstatehash,
1058			joined_since_last_sync,
1059			witness.as_ref(),
1060		)
1061	});
1062
1063	let StateChanges {
1064		heroes,
1065		joined_member_count,
1066		invited_member_count,
1067		mut state_events,
1068	} = state_changes
1069		.await
1070		.transpose()?
1071		.unwrap_or_default();
1072
1073	let is_sender_membership = |event: &PduEvent| {
1074		*event.event_type() == StateEventType::RoomMember.into()
1075			&& event
1076				.state_key()
1077				.is_some_and(is_equal_to!(sender_user.as_str()))
1078	};
1079
1080	let joined_sender_member: Option<_> =
1081		(joined_since_last_sync && timeline_pdus.is_empty() && !initial)
1082			.then(|| {
1083				state_events
1084					.iter()
1085					.position(is_sender_membership)
1086					.map(|pos| state_events.swap_remove(pos))
1087			})
1088			.flatten();
1089
1090	let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| {
1091		joined_sender_member
1092			.is_some()
1093			.then_some(since)
1094			.map(Into::into)
1095	});
1096
1097	let in_window = |count: u64| count > since && count <= next_batch;
1098
1099	let send_main_counts = last_notification_read
1100		.flatten()
1101		.is_none_or(in_window);
1102
1103	let send_thread_counts = thread_last_reads
1104		.as_ref()
1105		.is_none_or(|reads| reads.values().copied().any(in_window));
1106
1107	// Send room-level counts when either the main read cursor or any thread
1108	// cursor advanced within the window. Thread-only resets do not bump the
1109	// main cursor, so without the thread leg they would never reach the
1110	// client.
1111	let send_notification_counts = send_main_counts || send_thread_counts;
1112
1113	let send_notification_resets = last_notification_read
1114		.flatten()
1115		.is_some_and(|last_count| last_count > since);
1116
1117	let send_notification_count_filter =
1118		|count: &UInt| *count != uint!(0) || send_notification_resets;
1119
1120	// Fetch main counts whenever either cursor advanced: a thread-only reset
1121	// leaves the main count unchanged but still shifts the merged total that
1122	// clients without `unread_thread_notifications` receive.
1123	let notification_count = send_notification_counts.then_async(|| {
1124		services
1125			.pusher
1126			.notification_count(sender_user, room_id)
1127			.map(TryInto::try_into)
1128			.unwrap_or(uint!(0))
1129	});
1130
1131	let highlight_count = send_notification_counts.then_async(|| {
1132		services
1133			.pusher
1134			.highlight_count(sender_user, room_id)
1135			.map(TryInto::try_into)
1136			.unwrap_or(uint!(0))
1137	});
1138
1139	// MSC3773: per-thread counts. Filtered downstream by per-thread last-read
1140	// so quiet threads are omitted on rounds where the main cursor advanced.
1141	let thread_counts = send_notification_counts.then_async(|| {
1142		services
1143			.pusher
1144			.thread_notification_counts(sender_user, room_id)
1145	});
1146
1147	let private_read_events = last_privateread_update.gt(&since).then_async(|| {
1148		services
1149			.read_receipt
1150			.private_read_get(room_id, sender_user)
1151			.unwrap_or_default()
1152	});
1153
1154	let typing_events = services
1155		.typing
1156		.last_typing_update(room_id)
1157		.and_then(async |count| {
1158			if count <= since {
1159				return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
1160			}
1161
1162			let typings = typings_event_for_user(services, room_id, sender_user).await?;
1163
1164			Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
1165		})
1166		.unwrap_or(Vec::new());
1167
1168	let keys_changed = services
1169		.users
1170		.room_keys_changed(room_id, since, Some(next_batch))
1171		.map(|(user_id, _)| user_id)
1172		.map(ToOwned::to_owned)
1173		.collect::<Vec<_>>();
1174
1175	let extract_membership = |event: &PduEvent| {
1176		let content: RoomMemberEventContent = event.get_content().ok()?;
1177		let user_id: OwnedUserId = event.state_key()?.parse().ok()?;
1178
1179		Some((content.membership, user_id))
1180	};
1181
1182	let timeline_membership_changes = timeline_pdus
1183		.iter()
1184		.filter(|_| !initial)
1185		.map(ref_at!(1))
1186		.filter_map(extract_membership)
1187		.collect::<Vec<_>>();
1188
1189	let device_list_updates = state_events
1190		.iter()
1191		.stream()
1192		.ready_filter(|_| !initial)
1193		.ready_filter(|state_event| *state_event.event_type() == RoomMember)
1194		.ready_filter_map(extract_membership)
1195		.chain(timeline_membership_changes.stream())
1196		.fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (membership, user_id)| {
1197			use MembershipState::*;
1198
1199			let requires_update = async |user_id| {
1200				!share_encrypted_room(services, sender_user, user_id, Some(room_id)).await
1201			};
1202
1203			match membership {
1204				| Join if requires_update(&user_id).await => dlu.insert(user_id),
1205				| Leave => leu.insert(user_id),
1206				| _ => false,
1207			};
1208
1209			(dlu, leu)
1210		})
1211		.then(async |(mut dlu, leu)| {
1212			dlu.extend(keys_changed.await);
1213			(dlu, leu)
1214		});
1215
1216	let include_in_timeline = |event: &PduEvent| {
1217		let filter = &filter.room.timeline;
1218		filter.matches(event)
1219	};
1220
1221	// `encrypted_room` is `Some` whenever timeline or state events are emitted.
1222	let encrypted = encrypted_room.unwrap_or(false);
1223
1224	let room_events = timeline_pdus
1225		.into_iter()
1226		.stream()
1227		.wide_filter_map(|item| ignored_filter(services, item, sender_user))
1228		.map(at!(1))
1229		.chain(joined_sender_member.into_iter().stream())
1230		.ready_filter(include_in_timeline)
1231		.wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
1232		.collect::<Vec<_>>();
1233
1234	let account_data_events = services
1235		.account_data
1236		.changes_since(Some(room_id), sender_user, since, None)
1237		.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
1238		.ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
1239		.collect();
1240
1241	let (
1242		(room_events, account_data_events),
1243		(typing_events, private_read_events),
1244		(notification_count, highlight_count, thread_counts),
1245		(device_list_updates, left_encrypted_users),
1246	) = join4(
1247		join(room_events, account_data_events),
1248		join(typing_events, private_read_events),
1249		join3(notification_count, highlight_count, thread_counts),
1250		device_list_updates,
1251	)
1252	.boxed()
1253	.await;
1254
1255	let is_in_timeline = |event: &PduEvent| {
1256		room_events
1257			.iter()
1258			.map(Event::event_id)
1259			.any(is_equal_to!(event.event_id()))
1260	};
1261
1262	// MSC4222: when the client opts into `state_after`, state events that
1263	// took effect within the timeline appear in both the timeline and the
1264	// state section, so the in-timeline exclusion is bypassed.
1265	let include_in_state = |event: &PduEvent| {
1266		let filter = &filter.room.state;
1267		filter.matches(event) && (full_state || use_state_after || !is_in_timeline(event))
1268	};
1269
1270	let state_events: Vec<_> = state_events
1271		.into_iter()
1272		.filter(include_in_state)
1273		.stream()
1274		.wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
1275		.map(Event::into_format)
1276		.collect()
1277		.await;
1278
1279	let heroes = heroes
1280		.into_iter()
1281		.flatten()
1282		.map(TryInto::try_into)
1283		.filter_map(Result::ok)
1284		.collect();
1285
1286	let edus: Vec<Raw<AnySyncEphemeralRoomEvent>> = receipt_events
1287		.into_iter()
1288		.map(at!(1))
1289		.chain(typing_events)
1290		.chain(private_read_events.into_iter().flatten())
1291		.collect();
1292
1293	let thread_counts = thread_counts.unwrap_or_default();
1294
1295	let (thread_total_notifications, thread_total_highlights) = thread_counts
1296		.values()
1297		.fold((0_u64, 0_u64), |(n, h), &(notifs, hl)| {
1298			(n.saturating_add(notifs), h.saturating_add(hl))
1299		});
1300
1301	// MSC3773: when the client opts in via the timeline filter, partition
1302	// notification counts per thread. Otherwise sum into the room total.
1303	let want_thread_unread = filter.room.timeline.unread_thread_notifications;
1304
1305	let merge_total = |total: u64| {
1306		move |count: UInt| {
1307			want_thread_unread
1308				.is_false()
1309				.then(|| count.saturating_add(UInt::try_from(total).unwrap_or_default()))
1310				.unwrap_or(count)
1311		}
1312	};
1313
1314	let unread_notifications = UnreadNotificationsCount {
1315		highlight_count: highlight_count
1316			.map(merge_total(thread_total_highlights))
1317			.filter(send_notification_count_filter),
1318		notification_count: notification_count
1319			.map(merge_total(thread_total_notifications))
1320			.filter(send_notification_count_filter),
1321	};
1322
1323	// On quiet rounds (timeline empty) `thread_last_reads` is `Some`; emit
1324	// only threads whose read cursor advanced within the window. When the
1325	// timeline carried events `thread_last_reads` is `None`; emit all.
1326	// Initial sync (since == 0) is a full snapshot; bypass the gate so
1327	// clients with no prior cursor still see existing thread counts.
1328	let advanced_in_window = |root: &EventId| {
1329		initial
1330			|| thread_last_reads
1331				.as_ref()
1332				.is_none_or(|reads| reads.get(root).copied().is_some_and(in_window))
1333	};
1334
1335	let unread_thread_notifications = thread_counts
1336		.into_iter()
1337		.filter(|_| want_thread_unread)
1338		.filter(|(root, _)| advanced_in_window(root))
1339		.map(|(root, (notifications, highlights))| {
1340			let counts = UnreadNotificationsCount {
1341				notification_count: UInt::try_from(notifications).ok(),
1342				highlight_count: UInt::try_from(highlights).ok(),
1343			};
1344
1345			(root, counts)
1346		})
1347		.collect();
1348
1349	let state = StateEvents { events: state_events };
1350
1351	let state = if use_state_after {
1352		RoomState::After(state)
1353	} else {
1354		RoomState::Before(state)
1355	};
1356
1357	let joined_room = JoinedRoom {
1358		account_data: RoomAccountData { events: account_data_events },
1359		ephemeral: Ephemeral { events: edus },
1360		state,
1361		summary: RoomSummary {
1362			joined_member_count: joined_member_count.map(ruma_from_u64),
1363			invited_member_count: invited_member_count.map(ruma_from_u64),
1364			heroes,
1365		},
1366		timeline: Timeline {
1367			limited: limited || joined_since_last_sync,
1368			prev_batch: prev_batch.as_ref().map(ToString::to_string),
1369			events: room_events
1370				.into_iter()
1371				.map(Event::into_format)
1372				.collect(),
1373		},
1374		unread_notifications,
1375		unread_thread_notifications,
1376	};
1377
1378	Ok((joined_room, device_list_updates, left_encrypted_users))
1379}
1380
1381#[tracing::instrument(
1382	name = "state",
1383	level = "trace",
1384	skip_all,
1385	fields(
1386	    full = %full_state,
1387	    after = %use_state_after,
1388	    ss = ?since_shortstatehash,
1389	    hs = ?horizon_shortstatehash,
1390	    as = ?after_shortstatehash,
1391	    cs = %current_shortstatehash,
1392    )
1393)]
1394#[expect(clippy::too_many_arguments)]
1395async fn calculate_state_changes<'a>(
1396	services: &Services,
1397	sender_user: &UserId,
1398	room_id: &RoomId,
1399	full_state: bool,
1400	use_state_after: bool,
1401	since_shortstatehash: Option<ShortStateHash>,
1402	horizon_shortstatehash: Option<ShortStateHash>,
1403	after_shortstatehash: Option<ShortStateHash>,
1404	current_shortstatehash: ShortStateHash,
1405	joined_since_last_sync: bool,
1406	witness: Option<&'a Witness>,
1407) -> Result<StateChanges> {
1408	let incremental = !full_state && !joined_since_last_sync && since_shortstatehash.is_some();
1409
1410	// MSC4222: `state_after` requests need state at the *end* of the
1411	// timeline; legacy `state` requests need state at the *start*. Pick
1412	// the right delta endpoint, falling back to the room's current
1413	// shortstatehash when the preferred lookup is unavailable.
1414	let horizon_shortstatehash = use_state_after
1415		.then_some(after_shortstatehash)
1416		.unwrap_or(horizon_shortstatehash)
1417		.unwrap_or(current_shortstatehash);
1418
1419	let since_shortstatehash = since_shortstatehash.unwrap_or(horizon_shortstatehash);
1420
1421	let state_get_shorteventid = |user_id: &'a UserId| {
1422		services
1423			.state_accessor
1424			.state_get_shortid(
1425				horizon_shortstatehash,
1426				&StateEventType::RoomMember,
1427				user_id.as_str(),
1428			)
1429			.ok()
1430	};
1431
1432	let lazy_state_ids = witness.map_async(|witness| {
1433		witness
1434			.iter()
1435			.stream()
1436			.ready_filter(|&user_id| user_id != sender_user)
1437			.broad_filter_map(|user_id| state_get_shorteventid(user_id))
1438			.into_future()
1439	});
1440
1441	let state_diff_ids = incremental.then_async(|| {
1442		services
1443			.state_accessor
1444			.state_added((since_shortstatehash, horizon_shortstatehash))
1445			.boxed()
1446			.into_future()
1447	});
1448
1449	let current_state_ids = (!incremental).then_async(|| {
1450		services
1451			.state_accessor
1452			.state_full_shortids(horizon_shortstatehash)
1453			.expect_ok()
1454			.boxed()
1455			.into_future()
1456	});
1457
1458	let state_events = current_state_ids
1459		.stream()
1460		.chain(state_diff_ids.stream())
1461		.broad_filter_map(async |(shortstatekey, shorteventid)| {
1462			lazy_filter(services, sender_user, witness, shortstatekey, shorteventid).await
1463		})
1464		.chain(lazy_state_ids.stream())
1465		.broad_filter_map(|shorteventid| {
1466			services
1467				.timeline
1468				.get_pdu_from_shorteventid(shorteventid)
1469				.ok()
1470		})
1471		.collect::<Vec<_>>()
1472		.await;
1473
1474	let send_member_counts = state_events
1475		.iter()
1476		.any(|event| *event.kind() == RoomMember);
1477
1478	let member_counts =
1479		send_member_counts.then_async(|| calculate_counts(services, room_id, sender_user));
1480
1481	let (joined_member_count, invited_member_count, heroes) =
1482		member_counts.await.unwrap_or((None, None, None));
1483
1484	Ok(StateChanges {
1485		heroes,
1486		joined_member_count,
1487		invited_member_count,
1488		state_events,
1489	})
1490}
1491
1492async fn lazy_filter(
1493	services: &Services,
1494	sender_user: &UserId,
1495	witness: Option<&Witness>,
1496	shortstatekey: ShortStateKey,
1497	shorteventid: ShortEventId,
1498) -> Option<ShortEventId> {
1499	if witness.is_none() {
1500		return Some(shorteventid);
1501	}
1502
1503	let (event_type, state_key) = services
1504		.short
1505		.get_statekey_from_short(shortstatekey)
1506		.await
1507		.ok()?;
1508
1509	(event_type != StateEventType::RoomMember || state_key == sender_user.as_str())
1510		.then_some(shorteventid)
1511}
1512
1513async fn calculate_counts(
1514	services: &Services,
1515	room_id: &RoomId,
1516	sender_user: &UserId,
1517) -> (Option<u64>, Option<u64>, Option<Vec<OwnedUserId>>) {
1518	let joined_member_count = services
1519		.state_cache
1520		.room_joined_count(room_id)
1521		.unwrap_or(0);
1522
1523	let invited_member_count = services
1524		.state_cache
1525		.room_invited_count(room_id)
1526		.unwrap_or(0);
1527
1528	let (joined_member_count, invited_member_count) =
1529		join(joined_member_count, invited_member_count).await;
1530
1531	let small_room = joined_member_count.saturating_add(invited_member_count) <= 5;
1532
1533	let heroes = services
1534		.config
1535		.calculate_heroes
1536		.and_is(small_room)
1537		.then_async(|| calculate_heroes(services, room_id, sender_user));
1538
1539	(Some(joined_member_count), Some(invited_member_count), heroes.await)
1540}
1541
1542pub(crate) async fn calculate_heroes(
1543	services: &Services,
1544	room_id: &RoomId,
1545	sender_user: &UserId,
1546) -> Vec<OwnedUserId> {
1547	const LIMIT: usize = 5;
1548
1549	services
1550		.state_accessor
1551		.room_state_type_pdus(room_id, &StateEventType::RoomMember)
1552		.ready_filter_map(Result::ok)
1553		.filter_map(|pdu| filter_hero(services, room_id, sender_user, pdu))
1554		.take(LIMIT)
1555		.collect::<Vec<_>>()
1556		.await
1557}
1558
1559async fn filter_hero<Pdu: Event>(
1560	services: &Services,
1561	room_id: &RoomId,
1562	sender_user: &UserId,
1563	pdu: Pdu,
1564) -> Option<OwnedUserId> {
1565	let user_id = pdu.state_key().map(TryInto::try_into).flat_ok()?;
1566
1567	if user_id == sender_user {
1568		return None;
1569	}
1570
1571	let Ok(content): Result<RoomMemberEventContent, _> = pdu.get_content() else {
1572		return None;
1573	};
1574
1575	// The membership was and still is invite or join
1576	if !matches!(content.membership, MembershipState::Join | MembershipState::Invite) {
1577		return None;
1578	}
1579
1580	let (is_invited, is_joined) = join(
1581		services.state_cache.is_invited(user_id, room_id),
1582		services.state_cache.is_joined(user_id, room_id),
1583	)
1584	.await;
1585
1586	if !is_joined && is_invited {
1587		return None;
1588	}
1589
1590	Some(user_id.to_owned())
1591}
1592
1593async fn typings_event_for_user(
1594	services: &Services,
1595	room_id: &RoomId,
1596	sender_user: &UserId,
1597) -> Result<SyncEphemeralRoomEvent<TypingEventContent>> {
1598	Ok(SyncEphemeralRoomEvent {
1599		content: TypingEventContent {
1600			user_ids: services
1601				.typing
1602				.typing_users_for_user(room_id, sender_user)
1603				.await?,
1604		},
1605	})
1606}