Skip to main content

tuwunel_api/client/sync/
v3.rs

1use std::{
2	collections::{BTreeMap, HashMap, HashSet},
3	time::Duration,
4};
5
6use axum::extract::State;
7use futures::{
8	FutureExt, StreamExt, TryFutureExt,
9	future::{join, join3, join4, join5},
10	pin_mut,
11};
12use ruma::{
13	DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
14	api::client::{
15		filter::FilterDefinition,
16		sync::sync_events::{
17			self, DeviceLists, UnreadNotificationsCount,
18			v3::{
19				Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
20				KnockState, KnockedRoom, LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms,
21				State as RoomState, StateEvents, Timeline, ToDevice,
22			},
23		},
24	},
25	events::{
26		AnyGlobalAccountDataEvent, AnyRawAccountDataEvent, AnyRoomAccountDataEvent,
27		AnySyncEphemeralRoomEvent, AnySyncStateEvent, StateEventType, SyncEphemeralRoomEvent,
28		TimelineEventType::*,
29		presence::{PresenceEvent, PresenceEventContent},
30		room::member::{MembershipState, RoomMemberEventContent},
31		typing::TypingEventContent,
32	},
33	serde::Raw,
34	uint,
35};
36use tokio::time;
37use tuwunel_core::{
38	Result, at,
39	debug::INFO_SPAN_LEVEL,
40	debug_error, err,
41	error::{inspect_debug_log, inspect_log},
42	extract_variant, is_equal_to, is_false, is_true,
43	matrix::{
44		Event,
45		event::{Matches, trim_event_fields},
46		pdu::{EventHash, PduCount, PduEvent},
47	},
48	pair_of, ref_at,
49	result::FlatOk,
50	trace,
51	utils::{
52		self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
53		future::{OptionStream, ReadyBoolExt},
54		math::ruma_from_u64,
55		option::OptionExt,
56		result::MapExpect,
57		stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
58	},
59};
60use tuwunel_service::{
61	Services,
62	rooms::{
63		lazy_loading,
64		lazy_loading::{Options, Witness},
65		read_receipt::PrivateReadEvents,
66		short::{ShortEventId, ShortStateHash, ShortStateKey},
67	},
68};
69
70use super::{load_timeline, share_encrypted_room};
71use crate::{
72	ClientIp, Ruma,
73	client::{ignored_filter, is_empty_account_data_event, with_membership},
74};
75
76/// MSC4222 `state_after` opt-in: which room-state field the response carries.
77#[derive(Clone, Copy, Debug)]
78enum StateAfter {
79	Off,
80	Stable,
81	Unstable,
82}
83
84impl StateAfter {
85	fn requested(self) -> bool { !matches!(self, Self::Off) }
86
87	fn wrap(self, events: StateEvents) -> RoomState {
88		match self {
89			| Self::Off => RoomState::Before(events),
90			| Self::Stable => RoomState::After(events),
91			| Self::Unstable => RoomState::AfterUnstable(events),
92		}
93	}
94}
95
96impl From<(bool, bool)> for StateAfter {
97	fn from((stable, unstable): (bool, bool)) -> Self {
98		// Unstable opt-in wins: such a client reads the unstable field name.
99		match (stable, unstable) {
100			| (_, true) => Self::Unstable,
101			| (true, _) => Self::Stable,
102			| _ => Self::Off,
103		}
104	}
105}
106
107#[derive(Default)]
108struct StateChanges {
109	heroes: Option<Vec<OwnedUserId>>,
110	joined_member_count: Option<u64>,
111	invited_member_count: Option<u64>,
112	state_events: Vec<PduEvent>,
113}
114
115type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
116
117struct RoomMetadata {
118	since_shortstatehash: Option<ShortStateHash>,
119	horizon_shortstatehash: Option<ShortStateHash>,
120	after_shortstatehash: Option<ShortStateHash>,
121	current_shortstatehash: Option<ShortStateHash>,
122	receipt_events: Vec<(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)>,
123	encrypted_room: Option<bool>,
124}
125
126struct UserMetadata {
127	witness: Option<Witness>,
128	#[expect(clippy::option_option)]
129	last_notification_read: Option<Option<u64>>,
130	thread_last_reads: Option<BTreeMap<OwnedEventId, u64>>,
131	last_privateread_update: u64,
132	joined_since_last_sync: bool,
133}
134
135struct NotificationGates<F> {
136	send_notification_counts: bool,
137	send_notification_count_filter: F,
138}
139
140struct BuildJoinedRoom {
141	receipt_events: Vec<(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)>,
142	typing_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
143	private_read_events: Option<PrivateReadEvents>,
144	state_events: Vec<Raw<AnySyncStateEvent>>,
145	account_data_events: Vec<Raw<AnyRoomAccountDataEvent>>,
146	room_events: Vec<PduEvent>,
147	heroes: Option<Vec<OwnedUserId>>,
148	joined_member_count: Option<u64>,
149	invited_member_count: Option<u64>,
150	unread_notifications: UnreadNotificationsCount,
151	unread_thread_notifications: BTreeMap<OwnedEventId, UnreadNotificationsCount>,
152	state_after: StateAfter,
153	limited: bool,
154	joined_since_last_sync: bool,
155	prev_batch: Option<PduCount>,
156}
157
158/// # `GET /_matrix/client/r0/sync`
159///
160/// Synchronize the client's state with the latest state on the server.
161///
162/// - This endpoint takes a `since` parameter which should be the `next_batch`
163///   value from a previous request for incremental syncs.
164///
165/// Calling this endpoint without a `since` parameter returns:
166/// - Some of the most recent events of each timeline
167/// - Notification counts for each room
168/// - Joined and invited member counts, heroes
169/// - All state events
170///
171/// Calling this endpoint with a `since` parameter from a previous `next_batch`
172/// returns: For joined rooms:
173/// - Some of the most recent events of each timeline that happened after since
174/// - If user joined the room after since: All state events (unless lazy loading
175///   is activated) and all device list updates in that room
176/// - If the user was already in the room: A list of all events that are in the
177///   state now, but were not in the state at `since`
178/// - If the state we send contains a member event: Joined and invited member
179///   counts, heroes
180/// - Device list updates that happened after `since`
181/// - If there are events in the timeline we send or the user send updated his
182///   read mark: Notification counts
183/// - EDUs that are active now (read receipts, typing updates, presence)
184/// - TODO: Allow multiple sync streams to support Pantalaimon
185///
186/// For invited rooms:
187/// - If the user was invited after `since`: A subset of the state of the room
188///   at the point of the invite
189///
190/// For left rooms:
191/// - If the user left after `since`: `prev_batch` token, empty state (TODO:
192///   subset of the state at the point of the leave)
193#[tracing::instrument(
194	name = "sync",
195	level = "debug",
196	skip_all,
197	fields(
198		user_id = %body.sender_user(),
199		device_id = %body.sender_device.as_deref().map_or("<no device>", |x| x.as_str()),
200    )
201)]
202pub(crate) async fn sync_events_route(
203	State(services): State<crate::State>,
204	ClientIp(client): ClientIp,
205	body: Ruma<sync_events::v3::Request>,
206) -> Result<sync_events::v3::Response> {
207	let sender_user = body.sender_user();
208	let sender_device = body.sender_device.as_deref();
209
210	let filter = body
211		.body
212		.filter
213		.as_ref()
214		.map_async(async |filter| match filter {
215			| Filter::FilterDefinition(filter) => filter.clone(),
216			| Filter::FilterId(filter_id) => services
217				.users
218				.get_filter(sender_user, filter_id)
219				.await
220				.unwrap_or_default(),
221		});
222
223	let filter = filter.map(Option::unwrap_or_default);
224	let full_state = body.body.full_state;
225	let set_presence = &body.body.set_presence;
226	let state_after =
227		StateAfter::from((body.body.use_state_after, body.body.use_state_after_unstable));
228
229	let ping_presence = services
230		.presence
231		.maybe_ping_presence(
232			sender_user,
233			body.sender_device.as_deref(),
234			Some(client),
235			set_presence,
236		)
237		.inspect_err(inspect_log)
238		.ok();
239
240	// Record user as actively syncing for push suppression heuristic.
241	let note_sync = services.presence.note_sync(sender_user);
242
243	let (filter, ..) = join3(filter, ping_presence, note_sync).await;
244
245	let mut since = body
246		.body
247		.since
248		.as_deref()
249		.map(str::parse)
250		.flat_ok()
251		.unwrap_or(0);
252
253	let timeout = body
254		.body
255		.timeout
256		.as_ref()
257		.map(Duration::as_millis)
258		.map(TryInto::try_into)
259		.flat_ok()
260		.unwrap_or(services.config.client_sync_timeout_default)
261		.max(services.config.client_sync_timeout_min)
262		.min(services.config.client_sync_timeout_max);
263
264	let stop_at = time::Instant::now()
265		.checked_add(Duration::from_millis(timeout))
266		.expect("configuration must limit maximum timeout");
267
268	loop {
269		let watch_rooms = services
270			.state_cache
271			.rooms_joined(sender_user)
272			.chain(services.state_cache.rooms_invited(sender_user));
273
274		let watchers = services
275			.sync
276			.watch(sender_user, sender_device, watch_rooms)
277			.await;
278
279		let next_batch = services.globals.wait_pending().await?;
280		if since > next_batch {
281			debug_error!(since, next_batch, "received since > next_batch, clamping");
282			since = next_batch;
283		}
284
285		if since < next_batch || full_state {
286			let response = build_sync_events(
287				&services,
288				sender_user,
289				sender_device,
290				since,
291				next_batch,
292				full_state,
293				state_after,
294				&filter,
295			)
296			.await?;
297
298			let empty = response.rooms.is_empty()
299				&& response.presence.is_empty()
300				&& response.account_data.is_empty()
301				&& response.device_lists.is_empty()
302				&& response.to_device.is_empty();
303
304			if !empty || full_state {
305				return Ok(response);
306			}
307		}
308
309		// Wait for activity
310		if time::timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() {
311			let response =
312				build_empty_response(&services, sender_user, sender_device, next_batch).await;
313
314			trace!(since, next_batch, "empty response");
315			return Ok(response);
316		}
317
318		trace!(
319			since,
320			last_batch = ?next_batch,
321			count = ?services.globals.pending_count(),
322			stop_at = ?stop_at,
323			"notified by watcher"
324		);
325
326		since = next_batch;
327	}
328}
329
330async fn build_empty_response(
331	services: &Services,
332	sender_user: &UserId,
333	sender_device: Option<&DeviceId>,
334	next_batch: u64,
335) -> sync_events::v3::Response {
336	let device_one_time_keys_count = sender_device.map_async(|sender_device| {
337		services
338			.users
339			.count_one_time_keys(sender_user, sender_device)
340	});
341
342	let device_unused_fallback_key_types = sender_device.map_async(|sender_device| {
343		services
344			.users
345			.unused_fallback_key_algorithms(sender_user, sender_device)
346			.collect::<Vec<_>>()
347	});
348
349	let (device_one_time_keys_count, device_unused_fallback_key_types) =
350		join(device_one_time_keys_count, device_unused_fallback_key_types).await;
351
352	sync_events::v3::Response {
353		device_one_time_keys_count: device_one_time_keys_count.unwrap_or_default(),
354		device_unused_fallback_key_types,
355		..sync_events::v3::Response::new(next_batch.to_string())
356	}
357}
358
359#[tracing::instrument(
360	name = "build",
361	level = INFO_SPAN_LEVEL,
362	skip_all,
363	fields(
364		%since,
365		%next_batch,
366		count = ?services.globals.pending_count(),
367    )
368)]
369#[expect(clippy::too_many_arguments)]
370async fn build_sync_events(
371	services: &Services,
372	sender_user: &UserId,
373	sender_device: Option<&DeviceId>,
374	since: u64,
375	next_batch: u64,
376	full_state: bool,
377	state_after: StateAfter,
378	filter: &FilterDefinition,
379) -> Result<sync_events::v3::Response> {
380	// MSC4380: when m.invite_permission_config blocks invites, suppress stored
381	// invite events from /sync entirely; a later unblock re-exposes them.
382	let invites_blocked = services.users.invites_blocked(sender_user).await;
383
384	let joined_rooms = collect_joined_rooms(
385		services,
386		sender_user,
387		sender_device,
388		since,
389		next_batch,
390		full_state,
391		state_after,
392		filter,
393	);
394
395	let left_rooms = collect_left_rooms(
396		services,
397		sender_user,
398		since,
399		next_batch,
400		full_state,
401		state_after,
402		filter,
403	);
404
405	let invited_rooms =
406		collect_invited_rooms(services, sender_user, since, next_batch, filter, invites_blocked);
407
408	let knocked_rooms = collect_knocked_rooms(services, sender_user, since, next_batch, filter);
409
410	let presence_updates = services
411		.config
412		.allow_local_presence
413		.then_async(|| {
414			process_presence_updates(services, since, next_batch, sender_user, filter)
415		});
416
417	let account_data = collect_global_account_data(services, sender_user, since, next_batch);
418
419	let keys_changed = services
420		.users
421		.keys_changed(sender_user, since, Some(next_batch))
422		.map(ToOwned::to_owned)
423		.collect::<HashSet<_>>();
424
425	let to_device_events = sender_device.map_async(|sender_device| {
426		services
427			.users
428			.get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch))
429			.map(at!(1))
430			.collect::<Vec<_>>()
431	});
432
433	let device_one_time_keys_count = sender_device.map_async(|sender_device| {
434		services
435			.users
436			.count_one_time_keys(sender_user, sender_device)
437	});
438
439	let device_unused_fallback_key_types = sender_device.map_async(|sender_device| {
440		services
441			.users
442			.unused_fallback_key_algorithms(sender_user, sender_device)
443			.collect::<Vec<_>>()
444	});
445
446	// Remove all to-device events the device received *last time*
447	let remove_to_device_events = sender_device.map_async(|sender_device| {
448		services
449			.users
450			.remove_to_device_events(sender_user, sender_device, since)
451	});
452
453	let (
454		account_data,
455		keys_changed,
456		presence_updates,
457		(_, to_device_events, device_one_time_keys_count, device_unused_fallback_key_types),
458		(
459			(joined_rooms, mut device_list_updates, left_encrypted_users),
460			left_rooms,
461			invited_rooms,
462			knocked_rooms,
463		),
464	) = join5(
465		account_data,
466		keys_changed,
467		presence_updates,
468		join4(
469			remove_to_device_events,
470			to_device_events,
471			device_one_time_keys_count,
472			device_unused_fallback_key_types,
473		),
474		join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms),
475	)
476	.boxed()
477	.await;
478
479	device_list_updates.extend(keys_changed);
480
481	let device_list_left =
482		collect_device_list_left(services, sender_user, left_encrypted_users).await;
483
484	let presence_events = build_presence_events(presence_updates);
485
486	Ok(sync_events::v3::Response {
487		account_data: GlobalAccountData { events: account_data },
488		device_lists: DeviceLists {
489			left: device_list_left,
490			changed: device_list_updates.into_iter().collect(),
491		},
492		device_one_time_keys_count: device_one_time_keys_count.unwrap_or_default(),
493		device_unused_fallback_key_types,
494		next_batch: next_batch.to_string(),
495		presence: Presence { events: presence_events },
496		rooms: Rooms {
497			leave: left_rooms,
498			join: joined_rooms,
499			invite: invited_rooms,
500			knock: knocked_rooms,
501		},
502		to_device: ToDevice {
503			events: to_device_events.unwrap_or_default(),
504		},
505	})
506}
507
508#[expect(clippy::too_many_arguments)]
509fn collect_joined_rooms<'a>(
510	services: &'a Services,
511	sender_user: &'a UserId,
512	sender_device: Option<&'a DeviceId>,
513	since: u64,
514	next_batch: u64,
515	full_state: bool,
516	state_after: StateAfter,
517	filter: &'a FilterDefinition,
518) -> impl Future<
519	Output = (BTreeMap<OwnedRoomId, JoinedRoom>, HashSet<OwnedUserId>, HashSet<OwnedUserId>),
520> + Send
521+ 'a {
522	services
523		.state_cache
524		.rooms_joined(sender_user)
525		.ready_filter(|&room_id| filter.room.matches(room_id))
526		.map(ToOwned::to_owned)
527		.broad_filter_map(move |room_id| {
528			load_joined_room(
529				services,
530				sender_user,
531				sender_device,
532				room_id.clone(),
533				since,
534				next_batch,
535				full_state,
536				state_after,
537				filter,
538			)
539			.map_ok(move |(joined_room, dlu, jeu)| (room_id, joined_room, dlu, jeu))
540			.ok()
541		})
542		.ready_fold(
543			(BTreeMap::new(), HashSet::new(), HashSet::new()),
544			|(mut joined_rooms, mut device_list_updates, mut left_encrypted_users),
545			 (room_id, joined_room, dlu, leu)| {
546				device_list_updates.extend(dlu);
547				left_encrypted_users.extend(leu);
548				if !joined_room.is_empty() {
549					joined_rooms.insert(room_id, joined_room);
550				}
551
552				(joined_rooms, device_list_updates, left_encrypted_users)
553			},
554		)
555}
556
557fn collect_left_rooms<'a>(
558	services: &'a Services,
559	sender_user: &'a UserId,
560	since: u64,
561	next_batch: u64,
562	full_state: bool,
563	state_after: StateAfter,
564	filter: &'a FilterDefinition,
565) -> impl Future<Output = BTreeMap<OwnedRoomId, LeftRoom>> + Send + 'a {
566	services
567		.state_cache
568		.rooms_left_state(sender_user)
569		.ready_filter(|(room_id, _)| filter.room.matches(room_id))
570		.broad_filter_map(move |(room_id, _)| {
571			handle_left_room(
572				services,
573				since,
574				room_id.clone(),
575				sender_user,
576				next_batch,
577				full_state,
578				state_after,
579				filter,
580			)
581			.map_ok(move |left_room| (room_id, left_room))
582			.ok()
583		})
584		.ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room)))
585		.collect()
586}
587
588async fn collect_invited_rooms<'a>(
589	services: &'a Services,
590	sender_user: &'a UserId,
591	since: u64,
592	next_batch: u64,
593	filter: &'a FilterDefinition,
594	invites_blocked: bool,
595) -> BTreeMap<OwnedRoomId, InvitedRoom> {
596	services
597		.state_cache
598		.rooms_invited_state(sender_user)
599		.ready_filter(move |_| !invites_blocked)
600		.ready_filter(|(room_id, _)| filter.room.matches(room_id))
601		.fold_default(async |mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| {
602			let invite_count = services
603				.state_cache
604				.get_invite_count(&room_id, sender_user)
605				.await
606				.ok();
607
608			// Invited before last sync
609			if Some(since) >= invite_count || Some(next_batch) < invite_count {
610				return invited_rooms;
611			}
612
613			let invited_room = InvitedRoom {
614				invite_state: InviteState { events: invite_state },
615			};
616
617			invited_rooms.insert(room_id, invited_room);
618			invited_rooms
619		})
620		.await
621}
622
623async fn collect_knocked_rooms<'a>(
624	services: &'a Services,
625	sender_user: &'a UserId,
626	since: u64,
627	next_batch: u64,
628	filter: &'a FilterDefinition,
629) -> BTreeMap<OwnedRoomId, KnockedRoom> {
630	services
631		.state_cache
632		.rooms_knocked_state(sender_user)
633		.ready_filter(|(room_id, _)| filter.room.matches(room_id))
634		.fold_default(async |mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| {
635			let knock_count = services
636				.state_cache
637				.get_knock_count(&room_id, sender_user)
638				.await
639				.ok();
640
641			// Knocked before last sync; or after the cutoff for this sync
642			if Some(since) >= knock_count || Some(next_batch) < knock_count {
643				return knocked_rooms;
644			}
645
646			let knocked_room = KnockedRoom {
647				knock_state: KnockState { events: knock_state },
648			};
649
650			knocked_rooms.insert(room_id, knocked_room);
651			knocked_rooms
652		})
653		.await
654}
655
656fn collect_global_account_data<'a>(
657	services: &'a Services,
658	sender_user: &'a UserId,
659	since: u64,
660	next_batch: u64,
661) -> impl Future<Output = Vec<Raw<AnyGlobalAccountDataEvent>>> + Send + 'a {
662	services
663		.account_data
664		.changes_since(None, sender_user, since, Some(next_batch))
665		.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
666		.ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
667		.collect()
668}
669
670fn collect_device_list_left<'a>(
671	services: &'a Services,
672	sender_user: &'a UserId,
673	left_encrypted_users: HashSet<OwnedUserId>,
674) -> impl Future<Output = Vec<OwnedUserId>> + Send + 'a {
675	left_encrypted_users
676		.into_iter()
677		.stream()
678		.broad_filter_map(async |user_id: OwnedUserId| {
679			share_encrypted_room(services, sender_user, &user_id, None)
680				.await
681				.eq(&false)
682				.then_some(user_id)
683		})
684		.collect()
685}
686
687fn build_presence_events(presence_updates: Option<PresenceUpdates>) -> Vec<Raw<PresenceEvent>> {
688	presence_updates
689		.into_iter()
690		.flat_map(IntoIterator::into_iter)
691		.map(|(sender, content)| PresenceEvent { content, sender })
692		.map(|ref event| Raw::new(event))
693		.filter_map(Result::ok)
694		.collect()
695}
696
697#[tracing::instrument(name = "presence", level = "debug", skip_all)]
698async fn process_presence_updates(
699	services: &Services,
700	since: u64,
701	next_batch: u64,
702	syncing_user: &UserId,
703	filter: &FilterDefinition,
704) -> PresenceUpdates {
705	services
706		.presence
707		.presence_since(since, Some(next_batch))
708		.ready_filter(|(user_id, ..)| filter.presence.matches(user_id))
709		.filter(|(user_id, ..)| {
710			services
711				.state_cache
712				.user_sees_user(syncing_user, user_id)
713		})
714		.filter_map(|(user_id, _, presence_bytes)| {
715			services
716				.presence
717				.from_json_bytes_to_event(presence_bytes, user_id)
718				.map_ok(move |event| (user_id, event))
719				.ok()
720		})
721		.map(|(user_id, event)| (user_id.to_owned(), event.content))
722		.collect()
723		.boxed()
724		.await
725}
726
727#[tracing::instrument(
728	name = "left",
729	level = "debug",
730	skip_all,
731	fields(
732		room_id = %room_id,
733		full = %full_state,
734	),
735)]
736#[expect(clippy::too_many_arguments)]
737async fn handle_left_room(
738	services: &Services,
739	since: u64,
740	ref room_id: OwnedRoomId,
741	sender_user: &UserId,
742	next_batch: u64,
743	full_state: bool,
744	state_after: StateAfter,
745	filter: &FilterDefinition,
746) -> Result<Option<LeftRoom>> {
747	let left_count = services
748		.state_cache
749		.get_left_count(room_id, sender_user)
750		.await
751		.unwrap_or(0);
752
753	if left_count == 0 || left_count > next_batch {
754		return Ok(None);
755	}
756
757	let include_leave = filter.room.include_leave;
758	if since == 0 && !include_leave {
759		return Ok(None);
760	}
761
762	// Cannot sync unless the event falls within the snapshot. The room is only
763	// sync'ed once to the client, after that it's too late.
764	if since != 0 && left_count <= since {
765		return Ok(None);
766	}
767
768	let is_not_found = services.metadata.exists(room_id).is_false();
769
770	let is_disabled = services.metadata.is_disabled(room_id);
771
772	let is_banned = services.metadata.is_banned(room_id);
773
774	pin_mut!(is_not_found, is_disabled, is_banned);
775	if is_not_found.or(is_disabled).or(is_banned).await {
776		// For rejected invites, deleted, missing, or broken room state this is the last
777		// resort to convey a the minimum of information to the client.
778		let event = PduEvent {
779			event_id: EventId::new_v1(services.globals.server_name()),
780			origin_server_ts: utils::millis_since_unix_epoch().try_into()?,
781			kind: RoomMember,
782			state_key: Some(sender_user.as_str().into()),
783			sender: sender_user.to_owned(),
784			content: serde_json::from_str(r#"{"membership":"leave"}"#)?,
785			// The following keys are dropped on conversion
786			room_id: room_id.clone(),
787			depth: uint!(1),
788			origin: None,
789			unsigned: None,
790			redacts: None,
791			hashes: EventHash::default(),
792			auth_events: Default::default(),
793			prev_events: Default::default(),
794		};
795
796		let state = state_after.wrap(StateEvents {
797			events: vec![trim_event_fields(event.into_format(), filter.event_fields.as_deref())],
798		});
799
800		return Ok(Some(LeftRoom {
801			account_data: RoomAccountData::default(),
802			state,
803			timeline: Timeline {
804				limited: false,
805				events: Default::default(),
806				prev_batch: Some(left_count.to_string()),
807			},
808		}));
809	}
810
811	load_left_room(
812		services,
813		sender_user,
814		room_id,
815		since,
816		left_count,
817		full_state,
818		state_after,
819		filter,
820	)
821	.await
822}
823
824#[tracing::instrument(name = "load", level = "debug", skip_all)]
825#[expect(clippy::too_many_arguments)]
826async fn load_left_room(
827	services: &Services,
828	sender_user: &UserId,
829	room_id: &RoomId,
830	since: u64,
831	left_count: u64,
832	full_state: bool,
833	state_after: StateAfter,
834	filter: &FilterDefinition,
835) -> Result<Option<LeftRoom>> {
836	let initial = since == 0;
837	let timeline_limit: usize = filter
838		.room
839		.timeline
840		.limit
841		.map(TryInto::try_into)
842		.map_expect("UInt to usize")
843		.unwrap_or(10)
844		.min(100);
845
846	let (timeline_pdus, limited, _) = load_timeline(
847		services,
848		sender_user,
849		room_id,
850		PduCount::Normal(since),
851		Some(PduCount::Normal(left_count)),
852		timeline_limit.max(1),
853	)
854	.await
855	.unwrap_or_default();
856
857	let since_shortstatehash = services
858		.timeline
859		.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
860		.ok();
861
862	let horizon_shortstatehash = timeline_pdus
863		.first()
864		.map(at!(0))
865		.map_async(|count| {
866			services
867				.timeline
868				.get_shortstatehash(room_id, count)
869				.inspect_err(inspect_debug_log)
870				.ok()
871		});
872
873	// MSC4222 `state_after`: state at the leave (end of timeline). The
874	// stored shortstatehash at the leave PDU is state-before-leave, so
875	// step to the next PDU; if no event followed, the room's current
876	// shortstatehash is the post-leave state.
877	let after_shortstatehash = state_after.requested().then_async(|| {
878		services
879			.timeline
880			.next_shortstatehash(room_id, PduCount::Normal(left_count))
881			.or_else(|_| services.state.get_room_shortstatehash(room_id))
882			.inspect_err(inspect_debug_log)
883	});
884
885	let left_shortstatehash = services
886		.timeline
887		.get_shortstatehash(room_id, PduCount::Normal(left_count))
888		.inspect_err(inspect_debug_log)
889		.or_else(|_| services.state.get_room_shortstatehash(room_id))
890		.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
891
892	let (since_shortstatehash, horizon_shortstatehash, after_shortstatehash, left_shortstatehash) =
893		join4(
894			since_shortstatehash,
895			horizon_shortstatehash,
896			after_shortstatehash,
897			left_shortstatehash,
898		)
899		.boxed()
900		.await;
901
902	let StateChanges { state_events, .. } = calculate_state_changes(
903		services,
904		sender_user,
905		room_id,
906		full_state || initial,
907		state_after,
908		since_shortstatehash,
909		horizon_shortstatehash.flatten(),
910		after_shortstatehash.flat_ok(),
911		left_shortstatehash?,
912		false,
913		None,
914	)
915	.boxed()
916	.await?;
917
918	let is_sender_membership = |event: &PduEvent| {
919		*event.kind() == RoomMember && event.state_key() == Some(sender_user.as_str())
920	};
921
922	let timeline_sender_member = timeline_limit
923		.eq(&0)
924		.then(|| timeline_pdus.last().map(ref_at!(1)).cloned())
925		.into_iter()
926		.flat_map(Option::into_iter);
927
928	let encrypted = services
929		.state_accessor
930		.is_encrypted_room(room_id)
931		.await;
932
933	let event_fields = filter.event_fields.as_deref();
934
935	let state_events = state_events
936		.into_iter()
937		.filter(|pdu| filter.room.state.matches(pdu))
938		.filter(|pdu| timeline_limit > 0 || !is_sender_membership(pdu))
939		.chain(timeline_sender_member)
940		.stream()
941		.wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
942		.map(|pdu| trim_event_fields(pdu.into_format(), event_fields))
943		.collect();
944
945	let left_prev_batch = timeline_limit
946		.eq(&0)
947		.then_some(left_count)
948		.map(PduCount::Normal);
949
950	let prev_batch = timeline_pdus
951		.first()
952		.filter(|_| timeline_limit > 0)
953		.map(at!(0))
954		.or(left_prev_batch)
955		.as_ref()
956		.map(ToString::to_string);
957
958	let timeline_events = timeline_pdus
959		.into_iter()
960		.stream()
961		.wide_filter_map(|item| ignored_filter(services, item, sender_user))
962		.map(at!(1))
963		.ready_filter(|pdu| filter.room.timeline.matches(pdu))
964		.take(timeline_limit)
965		.wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
966		.wide_then(|pdu| {
967			services
968				.pdu_metadata
969				.bundle_aggregations(sender_user, pdu)
970		})
971		.collect::<Vec<_>>();
972
973	let account_data_events = services
974		.account_data
975		.changes_since(Some(room_id), sender_user, since, None)
976		.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
977		.ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
978		.collect();
979
980	let (state_events, account_data_events, timeline_events) =
981		join3(state_events, account_data_events, timeline_events)
982			.boxed()
983			.await;
984
985	let state = state_after.wrap(StateEvents { events: state_events });
986
987	Ok(Some(LeftRoom {
988		account_data: RoomAccountData { events: account_data_events },
989		state,
990		timeline: Timeline {
991			prev_batch,
992			limited: limited || timeline_limit == 0,
993			events: timeline_events
994				.into_iter()
995				.map(|pdu| trim_event_fields(pdu.into_format(), event_fields))
996				.collect(),
997		},
998	}))
999}
1000
1001#[tracing::instrument(
1002	name = "joined",
1003	level = "debug",
1004	skip_all,
1005	fields(
1006		room_id = ?room_id,
1007	),
1008)]
1009#[expect(clippy::too_many_arguments)]
1010async fn load_joined_room(
1011	services: &Services,
1012	sender_user: &UserId,
1013	sender_device: Option<&DeviceId>,
1014	ref room_id: OwnedRoomId,
1015	since: u64,
1016	next_batch: u64,
1017	full_state: bool,
1018	state_after: StateAfter,
1019	filter: &FilterDefinition,
1020) -> Result<(JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>)> {
1021	let initial = since == 0;
1022	let (timeline_pdus, limited, last_timeline_count) =
1023		load_join_timeline(services, sender_user, room_id, since, next_batch, filter).await?;
1024
1025	let timeline_changed = last_timeline_count.into_unsigned() > since;
1026	debug_assert!(
1027		timeline_pdus.is_empty() || timeline_changed,
1028		"if timeline events, last_timeline_count must be in the since window."
1029	);
1030
1031	let RoomMetadata {
1032		since_shortstatehash,
1033		horizon_shortstatehash,
1034		after_shortstatehash,
1035		current_shortstatehash,
1036		receipt_events,
1037		encrypted_room,
1038	} = gather_room_metadata(
1039		services,
1040		sender_user,
1041		room_id,
1042		since,
1043		next_batch,
1044		&timeline_pdus,
1045		last_timeline_count,
1046		timeline_changed,
1047		state_after,
1048	)
1049	.boxed()
1050	.await?;
1051
1052	let UserMetadata {
1053		witness,
1054		last_notification_read,
1055		thread_last_reads,
1056		last_privateread_update,
1057		joined_since_last_sync,
1058	} = gather_user_metadata(
1059		services,
1060		sender_user,
1061		sender_device,
1062		room_id,
1063		filter,
1064		&timeline_pdus,
1065		&receipt_events,
1066		since,
1067		initial,
1068		timeline_changed,
1069		encrypted_room,
1070		since_shortstatehash,
1071	)
1072	.boxed()
1073	.await;
1074
1075	let StateChanges {
1076		heroes,
1077		joined_member_count,
1078		invited_member_count,
1079		mut state_events,
1080	} = compute_join_state_changes(
1081		services,
1082		sender_user,
1083		room_id,
1084		full_state || initial,
1085		state_after,
1086		since_shortstatehash,
1087		horizon_shortstatehash,
1088		after_shortstatehash,
1089		current_shortstatehash,
1090		joined_since_last_sync,
1091		witness.as_ref(),
1092	)
1093	.await?;
1094
1095	let joined_sender_member = take_sender_membership_for_join(
1096		&mut state_events,
1097		sender_user,
1098		joined_since_last_sync,
1099		timeline_pdus.is_empty(),
1100		initial,
1101	);
1102
1103	let prev_batch =
1104		compute_join_prev_batch(&timeline_pdus, joined_sender_member.as_ref(), since);
1105	let in_window = |count: u64| count > since && count <= next_batch;
1106
1107	let NotificationGates {
1108		send_notification_counts,
1109		send_notification_count_filter,
1110	} = compute_notification_gates(
1111		last_notification_read,
1112		thread_last_reads.as_ref(),
1113		since,
1114		in_window,
1115	);
1116
1117	// `encrypted_room` is `Some` whenever timeline or state events are emitted.
1118	let encrypted = encrypted_room.unwrap_or(false);
1119
1120	let aggregates = await_join_aggregates(
1121		services,
1122		sender_user,
1123		room_id,
1124		&state_events,
1125		timeline_pdus,
1126		joined_sender_member,
1127		encrypted,
1128		initial,
1129		since,
1130		next_batch,
1131		last_privateread_update,
1132		send_notification_counts,
1133		filter,
1134	)
1135	.await;
1136
1137	let (joined_room, device_list_updates, left_encrypted_users) = finalize_joined_room(
1138		services,
1139		sender_user,
1140		filter,
1141		state_events,
1142		aggregates,
1143		receipt_events,
1144		heroes,
1145		joined_member_count,
1146		invited_member_count,
1147		thread_last_reads.as_ref(),
1148		send_notification_count_filter,
1149		FinalizeJoinFlags {
1150			encrypted,
1151			full_state,
1152			state_after,
1153			limited,
1154			joined_since_last_sync,
1155			initial,
1156		},
1157		in_window,
1158		prev_batch,
1159	)
1160	.await;
1161
1162	Ok((joined_room, device_list_updates, left_encrypted_users))
1163}
1164
1165#[expect(clippy::too_many_arguments)]
1166async fn compute_join_state_changes(
1167	services: &Services,
1168	sender_user: &UserId,
1169	room_id: &RoomId,
1170	full_state: bool,
1171	state_after: StateAfter,
1172	since_shortstatehash: Option<ShortStateHash>,
1173	horizon_shortstatehash: Option<ShortStateHash>,
1174	after_shortstatehash: Option<ShortStateHash>,
1175	current_shortstatehash: Option<ShortStateHash>,
1176	joined_since_last_sync: bool,
1177	witness: Option<&Witness>,
1178) -> Result<StateChanges> {
1179	current_shortstatehash
1180		.map_async(|current_shortstatehash| {
1181			calculate_state_changes(
1182				services,
1183				sender_user,
1184				room_id,
1185				full_state,
1186				state_after,
1187				since_shortstatehash,
1188				horizon_shortstatehash,
1189				after_shortstatehash,
1190				current_shortstatehash,
1191				joined_since_last_sync,
1192				witness,
1193			)
1194		})
1195		.await
1196		.transpose()
1197		.map(Option::unwrap_or_default)
1198}
1199
1200fn compute_join_prev_batch(
1201	timeline_pdus: &[(PduCount, PduEvent)],
1202	joined_sender_member: Option<&PduEvent>,
1203	since: u64,
1204) -> Option<PduCount> {
1205	timeline_pdus.first().map(at!(0)).or_else(|| {
1206		joined_sender_member
1207			.is_some()
1208			.then_some(since)
1209			.map(Into::into)
1210	})
1211}
1212
1213#[expect(clippy::too_many_arguments)]
1214async fn assemble_join_state_events(
1215	services: &Services,
1216	state_events: Vec<PduEvent>,
1217	sender_user: &UserId,
1218	encrypted: bool,
1219	room_events: &[PduEvent],
1220	filter: &FilterDefinition,
1221	full_state: bool,
1222	state_after: StateAfter,
1223) -> Vec<Raw<AnySyncStateEvent>> {
1224	let is_in_timeline = |event: &PduEvent| {
1225		room_events
1226			.iter()
1227			.map(Event::event_id)
1228			.any(is_equal_to!(event.event_id()))
1229	};
1230
1231	// MSC4222: when the client opts into `state_after`, state events that
1232	// took effect within the timeline appear in both the timeline and the
1233	// state section, so the in-timeline exclusion is bypassed.
1234	let include_in_state = |event: &PduEvent| {
1235		let filter = &filter.room.state;
1236		filter.matches(event) && (full_state || state_after.requested() || !is_in_timeline(event))
1237	};
1238
1239	assemble_state_events(
1240		services,
1241		state_events,
1242		sender_user,
1243		encrypted,
1244		include_in_state,
1245		filter.event_fields.as_deref(),
1246	)
1247	.await
1248}
1249
1250async fn load_join_timeline(
1251	services: &Services,
1252	sender_user: &UserId,
1253	room_id: &RoomId,
1254	since: u64,
1255	next_batch: u64,
1256	filter: &FilterDefinition,
1257) -> Result<(Vec<(PduCount, PduEvent)>, bool, PduCount)> {
1258	let timeline_limit: usize = filter
1259		.room
1260		.timeline
1261		.limit
1262		.map(TryInto::try_into)
1263		.map_expect("UInt to usize")
1264		.unwrap_or(10)
1265		.min(100);
1266
1267	load_timeline(
1268		services,
1269		sender_user,
1270		room_id,
1271		PduCount::Normal(since),
1272		Some(PduCount::Normal(next_batch)),
1273		timeline_limit,
1274	)
1275	.await
1276}
1277
1278struct JoinAggregates {
1279	room_events: Vec<PduEvent>,
1280	account_data_events: Vec<Raw<AnyRoomAccountDataEvent>>,
1281	typing_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1282	private_read_events: Option<PrivateReadEvents>,
1283	notification_count: Option<UInt>,
1284	highlight_count: Option<UInt>,
1285	thread_counts: Option<BTreeMap<OwnedEventId, (u64, u64)>>,
1286	device_list_updates: HashSet<OwnedUserId>,
1287	left_encrypted_users: HashSet<OwnedUserId>,
1288}
1289
1290#[expect(clippy::too_many_arguments)]
1291async fn await_join_aggregates(
1292	services: &Services,
1293	sender_user: &UserId,
1294	room_id: &RoomId,
1295	state_events: &[PduEvent],
1296	timeline_pdus: Vec<(PduCount, PduEvent)>,
1297	joined_sender_member: Option<PduEvent>,
1298	encrypted: bool,
1299	initial: bool,
1300	since: u64,
1301	next_batch: u64,
1302	last_privateread_update: u64,
1303	send_notification_counts: bool,
1304	filter: &FilterDefinition,
1305) -> JoinAggregates {
1306	let (notification_count, highlight_count, thread_counts) =
1307		notification_count_futures(services, sender_user, room_id, send_notification_counts);
1308
1309	let private_read_events = last_privateread_update.gt(&since).then_async(|| {
1310		services
1311			.read_receipt
1312			.private_read_get(room_id, sender_user)
1313			.unwrap_or_default()
1314	});
1315
1316	let typing_events = gather_typing_events(services, room_id, sender_user, since);
1317
1318	let device_list_updates = gather_device_list_updates(
1319		services,
1320		sender_user,
1321		room_id,
1322		timeline_membership_changes(&timeline_pdus, initial),
1323		state_events,
1324		initial,
1325		since,
1326		next_batch,
1327	);
1328
1329	let room_events = collect_room_events(
1330		services,
1331		sender_user,
1332		timeline_pdus,
1333		joined_sender_member,
1334		encrypted,
1335		filter,
1336	);
1337
1338	let account_data_events = collect_room_account_data(services, sender_user, room_id, since);
1339
1340	let (
1341		(room_events, account_data_events),
1342		(typing_events, private_read_events),
1343		(notification_count, highlight_count, thread_counts),
1344		(device_list_updates, left_encrypted_users),
1345	) = join4(
1346		join(room_events, account_data_events),
1347		join(typing_events, private_read_events),
1348		join3(notification_count, highlight_count, thread_counts),
1349		device_list_updates,
1350	)
1351	.boxed()
1352	.await;
1353
1354	JoinAggregates {
1355		room_events,
1356		account_data_events,
1357		typing_events,
1358		private_read_events,
1359		notification_count,
1360		highlight_count,
1361		thread_counts,
1362		device_list_updates,
1363		left_encrypted_users,
1364	}
1365}
1366
1367struct FinalizeJoinFlags {
1368	encrypted: bool,
1369	full_state: bool,
1370	state_after: StateAfter,
1371	limited: bool,
1372	joined_since_last_sync: bool,
1373	initial: bool,
1374}
1375
1376#[expect(clippy::too_many_arguments)]
1377async fn finalize_joined_room(
1378	services: &Services,
1379	sender_user: &UserId,
1380	filter: &FilterDefinition,
1381	state_events: Vec<PduEvent>,
1382	aggregates: JoinAggregates,
1383	receipt_events: Vec<(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)>,
1384	heroes: Option<Vec<OwnedUserId>>,
1385	joined_member_count: Option<u64>,
1386	invited_member_count: Option<u64>,
1387	thread_last_reads: Option<&BTreeMap<OwnedEventId, u64>>,
1388	send_notification_count_filter: impl Fn(&UInt) -> bool,
1389	flags: FinalizeJoinFlags,
1390	in_window: impl Fn(u64) -> bool,
1391	prev_batch: Option<PduCount>,
1392) -> (JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>) {
1393	let JoinAggregates {
1394		room_events,
1395		account_data_events,
1396		typing_events,
1397		private_read_events,
1398		notification_count,
1399		highlight_count,
1400		thread_counts,
1401		device_list_updates,
1402		left_encrypted_users,
1403	} = aggregates;
1404
1405	let FinalizeJoinFlags {
1406		encrypted,
1407		full_state,
1408		state_after,
1409		limited,
1410		joined_since_last_sync,
1411		initial,
1412	} = flags;
1413
1414	let state_events = assemble_join_state_events(
1415		services,
1416		state_events,
1417		sender_user,
1418		encrypted,
1419		&room_events,
1420		filter,
1421		full_state,
1422		state_after,
1423	)
1424	.await;
1425
1426	let (unread_notifications, unread_thread_notifications) = assemble_unread_notifications(
1427		notification_count,
1428		highlight_count,
1429		thread_counts,
1430		thread_last_reads,
1431		send_notification_count_filter,
1432		filter.room.timeline.unread_thread_notifications,
1433		initial,
1434		in_window,
1435	);
1436
1437	let joined_room = build_joined_room(
1438		BuildJoinedRoom {
1439			receipt_events,
1440			typing_events,
1441			private_read_events,
1442			state_events,
1443			account_data_events,
1444			room_events,
1445			heroes,
1446			joined_member_count,
1447			invited_member_count,
1448			unread_notifications,
1449			unread_thread_notifications,
1450			state_after,
1451			limited,
1452			joined_since_last_sync,
1453			prev_batch,
1454		},
1455		filter.event_fields.as_deref(),
1456	);
1457
1458	(joined_room, device_list_updates, left_encrypted_users)
1459}
1460
1461fn build_joined_room(args: BuildJoinedRoom, event_fields: Option<&[String]>) -> JoinedRoom {
1462	let BuildJoinedRoom {
1463		receipt_events,
1464		typing_events,
1465		private_read_events,
1466		state_events,
1467		account_data_events,
1468		room_events,
1469		heroes,
1470		joined_member_count,
1471		invited_member_count,
1472		unread_notifications,
1473		unread_thread_notifications,
1474		state_after,
1475		limited,
1476		joined_since_last_sync,
1477		prev_batch,
1478	} = args;
1479
1480	let edus: Vec<Raw<AnySyncEphemeralRoomEvent>> = receipt_events
1481		.into_iter()
1482		.map(at!(1))
1483		.chain(typing_events)
1484		.chain(private_read_events.into_iter().flatten())
1485		.collect();
1486
1487	let state = state_after.wrap(StateEvents { events: state_events });
1488
1489	let heroes = heroes
1490		.into_iter()
1491		.flatten()
1492		.map(TryInto::try_into)
1493		.filter_map(Result::ok)
1494		.collect();
1495
1496	JoinedRoom {
1497		account_data: RoomAccountData { events: account_data_events },
1498		ephemeral: Ephemeral { events: edus },
1499		state,
1500		summary: RoomSummary {
1501			joined_member_count: joined_member_count.map(ruma_from_u64),
1502			invited_member_count: invited_member_count.map(ruma_from_u64),
1503			heroes,
1504		},
1505		timeline: Timeline {
1506			limited: limited || joined_since_last_sync,
1507			prev_batch: prev_batch.as_ref().map(ToString::to_string),
1508			events: room_events
1509				.into_iter()
1510				.map(|pdu| trim_event_fields(pdu.into_format(), event_fields))
1511				.collect(),
1512		},
1513		unread_notifications,
1514		unread_thread_notifications,
1515	}
1516}
1517
1518#[expect(clippy::too_many_arguments)]
1519async fn gather_room_metadata(
1520	services: &Services,
1521	sender_user: &UserId,
1522	room_id: &RoomId,
1523	since: u64,
1524	next_batch: u64,
1525	timeline_pdus: &[(PduCount, PduEvent)],
1526	last_timeline_count: PduCount,
1527	timeline_changed: bool,
1528	state_after: StateAfter,
1529) -> Result<RoomMetadata> {
1530	let since_shortstatehash = timeline_changed.then_async(|| {
1531		services
1532			.timeline
1533			.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
1534			.ok()
1535	});
1536
1537	let horizon_shortstatehash = timeline_pdus
1538		.first()
1539		.map(at!(0))
1540		.map_async(|count| {
1541			services
1542				.timeline
1543				.get_shortstatehash(room_id, count)
1544				.inspect_err(inspect_debug_log)
1545		});
1546
1547	// MSC4222 `state_after` semantics: state at the *end* of the timeline
1548	// window. `next_shortstatehash` reads state-before the next PDU, which
1549	// equals state-after our last PDU; falling back to the room's current
1550	// state covers the case where our window already touches HEAD.
1551	let after_shortstatehash = state_after.requested().then_async(|| {
1552		services
1553			.timeline
1554			.next_shortstatehash(room_id, last_timeline_count)
1555			.or_else(|_| services.state.get_room_shortstatehash(room_id))
1556			.inspect_err(inspect_debug_log)
1557	});
1558
1559	let current_shortstatehash = timeline_changed.then_async(|| {
1560		services
1561			.timeline
1562			.get_shortstatehash(room_id, last_timeline_count)
1563			.inspect_err(inspect_debug_log)
1564			.or_else(|_| services.state.get_room_shortstatehash(room_id))
1565			.map_err(|_| err!(Database(error!("Room {room_id} has no state"))))
1566	});
1567
1568	let encrypted_room =
1569		timeline_changed.then_async(|| services.state_accessor.is_encrypted_room(room_id));
1570
1571	let receipt_events = services
1572		.read_receipt
1573		.readreceipts_since(room_id, since, Some(next_batch))
1574		.filter_map(async |(read_user, _, edu)| {
1575			services
1576				.users
1577				.user_is_ignored(read_user, sender_user)
1578				.await
1579				.or_some((read_user.to_owned(), edu))
1580		})
1581		.collect::<Vec<(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)>>();
1582
1583	let (
1584		(
1585			since_shortstatehash,
1586			horizon_shortstatehash,
1587			after_shortstatehash,
1588			current_shortstatehash,
1589		),
1590		receipt_events,
1591		encrypted_room,
1592	) = join3(
1593		join4(
1594			since_shortstatehash,
1595			horizon_shortstatehash,
1596			after_shortstatehash,
1597			current_shortstatehash,
1598		),
1599		receipt_events,
1600		encrypted_room,
1601	)
1602	.boxed()
1603	.await;
1604
1605	Ok(RoomMetadata {
1606		since_shortstatehash: since_shortstatehash.flatten(),
1607		horizon_shortstatehash: horizon_shortstatehash.flat_ok(),
1608		after_shortstatehash: after_shortstatehash.flat_ok(),
1609		current_shortstatehash: current_shortstatehash.transpose()?,
1610		receipt_events,
1611		encrypted_room,
1612	})
1613}
1614
1615fn collect_room_events<'a>(
1616	services: &'a Services,
1617	sender_user: &'a UserId,
1618	timeline_pdus: Vec<(PduCount, PduEvent)>,
1619	joined_sender_member: Option<PduEvent>,
1620	encrypted: bool,
1621	filter: &'a FilterDefinition,
1622) -> impl Future<Output = Vec<PduEvent>> + Send + 'a {
1623	let include_in_timeline = |event: &PduEvent| filter.room.timeline.matches(event);
1624	timeline_pdus
1625		.into_iter()
1626		.stream()
1627		.wide_filter_map(|item| ignored_filter(services, item, sender_user))
1628		.map(at!(1))
1629		.chain(joined_sender_member.into_iter().stream())
1630		.ready_filter(include_in_timeline)
1631		.wide_then(move |pdu| with_membership(services, pdu, sender_user, encrypted))
1632		.wide_then(move |pdu| {
1633			services
1634				.pdu_metadata
1635				.bundle_aggregations(sender_user, pdu)
1636		})
1637		.collect::<Vec<_>>()
1638}
1639
1640fn collect_room_account_data<'a>(
1641	services: &'a Services,
1642	sender_user: &'a UserId,
1643	room_id: &'a RoomId,
1644	since: u64,
1645) -> impl Future<Output = Vec<Raw<AnyRoomAccountDataEvent>>> + Send + 'a {
1646	services
1647		.account_data
1648		.changes_since(Some(room_id), sender_user, since, None)
1649		.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
1650		.ready_filter(move |e| since != 0 || !is_empty_account_data_event(e))
1651		.collect()
1652}
1653
1654#[expect(clippy::type_complexity)]
1655fn notification_count_futures<'a>(
1656	services: &'a Services,
1657	sender_user: &'a UserId,
1658	room_id: &'a RoomId,
1659	send: bool,
1660) -> (
1661	impl Future<Output = Option<UInt>> + Send + 'a,
1662	impl Future<Output = Option<UInt>> + Send + 'a,
1663	impl Future<Output = Option<BTreeMap<OwnedEventId, (u64, u64)>>> + Send + 'a,
1664) {
1665	let notification_count = send.then_async(move || {
1666		services
1667			.pusher
1668			.notification_count(sender_user, room_id)
1669			.map(TryInto::try_into)
1670			.unwrap_or(uint!(0))
1671	});
1672
1673	let highlight_count = send.then_async(move || {
1674		services
1675			.pusher
1676			.highlight_count(sender_user, room_id)
1677			.map(TryInto::try_into)
1678			.unwrap_or(uint!(0))
1679	});
1680
1681	// MSC3773: per-thread counts. Filtered downstream by per-thread last-read
1682	// so quiet threads are omitted on rounds where the main cursor advanced.
1683	let thread_counts = send.then_async(move || {
1684		services
1685			.pusher
1686			.thread_notification_counts(sender_user, room_id)
1687	});
1688
1689	(notification_count, highlight_count, thread_counts)
1690}
1691
1692fn take_sender_membership_for_join(
1693	state_events: &mut Vec<PduEvent>,
1694	sender_user: &UserId,
1695	joined_since_last_sync: bool,
1696	timeline_empty: bool,
1697	initial: bool,
1698) -> Option<PduEvent> {
1699	if !(joined_since_last_sync && timeline_empty && !initial) {
1700		return None;
1701	}
1702
1703	let is_sender_membership = |event: &PduEvent| {
1704		*event.event_type() == StateEventType::RoomMember.into()
1705			&& event
1706				.state_key()
1707				.is_some_and(is_equal_to!(sender_user.as_str()))
1708	};
1709
1710	state_events
1711		.iter()
1712		.position(is_sender_membership)
1713		.map(|pos| state_events.swap_remove(pos))
1714}
1715
1716#[expect(clippy::too_many_arguments)]
1717async fn gather_user_metadata(
1718	services: &Services,
1719	sender_user: &UserId,
1720	sender_device: Option<&DeviceId>,
1721	room_id: &RoomId,
1722	filter: &FilterDefinition,
1723	timeline_pdus: &[(PduCount, PduEvent)],
1724	receipt_events: &[(OwnedUserId, Raw<AnySyncEphemeralRoomEvent>)],
1725	since: u64,
1726	initial: bool,
1727	timeline_changed: bool,
1728	encrypted_room: Option<bool>,
1729	since_shortstatehash: Option<ShortStateHash>,
1730) -> UserMetadata {
1731	let lazy_load_options =
1732		[&filter.room.state.lazy_load_options, &filter.room.timeline.lazy_load_options];
1733
1734	let lazy_loading_enabled = encrypted_room.is_some_and(is_false!())
1735		&& lazy_load_options
1736			.iter()
1737			.any(|opts| opts.is_enabled());
1738
1739	let lazy_loading_context = &lazy_loading::Context {
1740		user_id: sender_user,
1741		device_id: sender_device,
1742		room_id,
1743		token: Some(since),
1744		options: Some(&filter.room.state.lazy_load_options),
1745		mode: lazy_loading::Mode::Update,
1746	};
1747
1748	// Reset lazy loading because this is an initial sync
1749	let lazy_load_reset =
1750		initial.then_async(|| services.lazy_loading.reset(lazy_loading_context));
1751
1752	lazy_load_reset.await;
1753	let witness = lazy_loading_enabled.then_async(|| {
1754		let witness: Witness = timeline_pdus
1755			.iter()
1756			.map(ref_at!(1))
1757			.map(Event::sender)
1758			.map(Into::into)
1759			.chain(receipt_events.iter().map(ref_at!(0)).cloned())
1760			.collect();
1761
1762		services
1763			.lazy_loading
1764			.witness_retain(witness, lazy_loading_context)
1765	});
1766
1767	let sender_joined_count = timeline_changed.then_async(|| {
1768		services
1769			.state_cache
1770			.get_joined_count(room_id, sender_user)
1771			.unwrap_or(0)
1772	});
1773
1774	let since_encryption = since_shortstatehash.map_async(|shortstatehash| {
1775		services
1776			.state_accessor
1777			.state_get(shortstatehash, &StateEventType::RoomEncryption, "")
1778	});
1779
1780	let last_notification_read = timeline_pdus.is_empty().then_async(|| {
1781		services
1782			.pusher
1783			.last_notification_read(sender_user, room_id)
1784			.ok()
1785	});
1786
1787	let thread_last_reads = timeline_pdus.is_empty().then_async(|| {
1788		services
1789			.pusher
1790			.thread_last_notification_reads(sender_user, room_id)
1791	});
1792
1793	let last_privateread_update = services
1794		.read_receipt
1795		.last_privateread_update(sender_user, room_id);
1796
1797	let (
1798		(last_privateread_update, last_notification_read, thread_last_reads),
1799		(sender_joined_count, since_encryption),
1800		witness,
1801	) = join3(
1802		join3(last_privateread_update, last_notification_read, thread_last_reads),
1803		join(sender_joined_count, since_encryption),
1804		witness,
1805	)
1806	.await;
1807
1808	let _encrypted_since_last_sync =
1809		!initial && encrypted_room.is_some_and(is_true!()) && since_encryption.is_none();
1810
1811	let joined_since_last_sync = sender_joined_count.unwrap_or(0) > since;
1812
1813	UserMetadata {
1814		witness,
1815		last_notification_read,
1816		thread_last_reads,
1817		last_privateread_update,
1818		joined_since_last_sync,
1819	}
1820}
1821
1822#[expect(clippy::option_option)]
1823fn compute_notification_gates(
1824	last_notification_read: Option<Option<u64>>,
1825	thread_last_reads: Option<&BTreeMap<OwnedEventId, u64>>,
1826	since: u64,
1827	in_window: impl Fn(u64) -> bool,
1828) -> NotificationGates<impl Fn(&UInt) -> bool> {
1829	let send_main_counts = last_notification_read
1830		.flatten()
1831		.is_none_or(&in_window);
1832
1833	let send_thread_counts =
1834		thread_last_reads.is_none_or(|reads| reads.values().copied().any(&in_window));
1835
1836	// Send room-level counts when either the main read cursor or any thread
1837	// cursor advanced within the window. Thread-only resets do not bump the
1838	// main cursor, so without the thread leg they would never reach the
1839	// client.
1840	let send_notification_counts = send_main_counts || send_thread_counts;
1841
1842	let send_notification_resets = last_notification_read
1843		.flatten()
1844		.is_some_and(|last_count| last_count > since);
1845
1846	let send_notification_count_filter =
1847		move |count: &UInt| *count != uint!(0) || send_notification_resets;
1848
1849	NotificationGates {
1850		send_notification_counts,
1851		send_notification_count_filter,
1852	}
1853}
1854
1855async fn gather_typing_events(
1856	services: &Services,
1857	room_id: &RoomId,
1858	sender_user: &UserId,
1859	since: u64,
1860) -> Vec<Raw<AnySyncEphemeralRoomEvent>> {
1861	services
1862		.typing
1863		.last_typing_update(room_id)
1864		.and_then(async |count| {
1865			if count <= since {
1866				return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
1867			}
1868
1869			let typings = typings_event_for_user(services, room_id, sender_user).await?;
1870
1871			Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
1872		})
1873		.unwrap_or(Vec::new())
1874		.await
1875}
1876
1877fn timeline_membership_changes(
1878	timeline_pdus: &[(PduCount, PduEvent)],
1879	initial: bool,
1880) -> Vec<(MembershipState, OwnedUserId)> {
1881	timeline_pdus
1882		.iter()
1883		.filter(|_| !initial)
1884		.map(ref_at!(1))
1885		.filter_map(extract_membership)
1886		.collect::<Vec<_>>()
1887}
1888
1889fn extract_membership(event: &PduEvent) -> Option<(MembershipState, OwnedUserId)> {
1890	let content: RoomMemberEventContent = event.get_content().ok()?;
1891	let user_id: OwnedUserId = event.state_key()?.parse().ok()?;
1892
1893	Some((content.membership, user_id))
1894}
1895
1896#[expect(clippy::too_many_arguments)]
1897async fn gather_device_list_updates(
1898	services: &Services,
1899	sender_user: &UserId,
1900	room_id: &RoomId,
1901	timeline_membership_changes: Vec<(MembershipState, OwnedUserId)>,
1902	state_events: &[PduEvent],
1903	initial: bool,
1904	since: u64,
1905	next_batch: u64,
1906) -> (HashSet<OwnedUserId>, HashSet<OwnedUserId>) {
1907	let keys_changed = services
1908		.users
1909		.room_keys_changed(room_id, since, Some(next_batch))
1910		.map(|(user_id, _)| user_id)
1911		.map(ToOwned::to_owned)
1912		.collect::<Vec<_>>();
1913
1914	let (mut dlu, leu) = state_events
1915		.iter()
1916		.stream()
1917		.ready_filter(|_| !initial)
1918		.ready_filter(|state_event| *state_event.event_type() == RoomMember)
1919		.ready_filter_map(extract_membership)
1920		.chain(timeline_membership_changes.into_iter().stream())
1921		.fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (membership, user_id)| {
1922			use MembershipState::*;
1923
1924			let requires_update = async |user_id| {
1925				!share_encrypted_room(services, sender_user, user_id, Some(room_id)).await
1926			};
1927
1928			match membership {
1929				| Join if requires_update(&user_id).await => dlu.insert(user_id),
1930				| Leave => leu.insert(user_id),
1931				| _ => false,
1932			};
1933
1934			(dlu, leu)
1935		})
1936		.await;
1937
1938	dlu.extend(keys_changed.await);
1939	(dlu, leu)
1940}
1941
1942async fn assemble_state_events(
1943	services: &Services,
1944	state_events: Vec<PduEvent>,
1945	sender_user: &UserId,
1946	encrypted: bool,
1947	include_in_state: impl Fn(&PduEvent) -> bool + Send + Sync,
1948	event_fields: Option<&[String]>,
1949) -> Vec<Raw<AnySyncStateEvent>> {
1950	state_events
1951		.into_iter()
1952		.filter(include_in_state)
1953		.stream()
1954		.wide_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
1955		.map(|pdu| trim_event_fields(pdu.into_format(), event_fields))
1956		.collect()
1957		.await
1958}
1959
1960#[expect(clippy::too_many_arguments)]
1961fn assemble_unread_notifications(
1962	notification_count: Option<UInt>,
1963	highlight_count: Option<UInt>,
1964	thread_counts: Option<BTreeMap<OwnedEventId, (u64, u64)>>,
1965	thread_last_reads: Option<&BTreeMap<OwnedEventId, u64>>,
1966	send_notification_count_filter: impl Fn(&UInt) -> bool,
1967	want_thread_unread: bool,
1968	initial: bool,
1969	in_window: impl Fn(u64) -> bool,
1970) -> (UnreadNotificationsCount, BTreeMap<OwnedEventId, UnreadNotificationsCount>) {
1971	let thread_counts = thread_counts.unwrap_or_default();
1972
1973	let (thread_total_notifications, thread_total_highlights) = thread_counts
1974		.values()
1975		.fold((0_u64, 0_u64), |(n, h), &(notifs, hl)| {
1976			(n.saturating_add(notifs), h.saturating_add(hl))
1977		});
1978
1979	// MSC3773: when the client opts in via the timeline filter, partition
1980	// notification counts per thread. Otherwise sum into the room total.
1981	let merge_total = |total: u64| {
1982		move |count: UInt| {
1983			want_thread_unread
1984				.is_false()
1985				.then(|| count.saturating_add(UInt::try_from(total).unwrap_or_default()))
1986				.unwrap_or(count)
1987		}
1988	};
1989
1990	let unread_notifications = UnreadNotificationsCount {
1991		highlight_count: highlight_count
1992			.map(merge_total(thread_total_highlights))
1993			.filter(&send_notification_count_filter),
1994		notification_count: notification_count
1995			.map(merge_total(thread_total_notifications))
1996			.filter(&send_notification_count_filter),
1997	};
1998
1999	// On quiet rounds (timeline empty) `thread_last_reads` is `Some`; emit
2000	// only threads whose read cursor advanced within the window. When the
2001	// timeline carried events `thread_last_reads` is `None`; emit all.
2002	// Initial sync (since == 0) is a full snapshot; bypass the gate so
2003	// clients with no prior cursor still see existing thread counts.
2004	let advanced_in_window = |root: &EventId| {
2005		initial
2006			|| thread_last_reads
2007				.is_none_or(|reads| reads.get(root).copied().is_some_and(&in_window))
2008	};
2009
2010	let unread_thread_notifications = thread_counts
2011		.into_iter()
2012		.filter(|_| want_thread_unread)
2013		.filter(|(root, _)| advanced_in_window(root))
2014		.map(|(root, (notifications, highlights))| {
2015			let counts = UnreadNotificationsCount {
2016				notification_count: UInt::try_from(notifications).ok(),
2017				highlight_count: UInt::try_from(highlights).ok(),
2018			};
2019
2020			(root, counts)
2021		})
2022		.collect();
2023
2024	(unread_notifications, unread_thread_notifications)
2025}
2026
2027#[tracing::instrument(
2028	name = "state",
2029	level = "trace",
2030	skip_all,
2031	fields(
2032	    full = %full_state,
2033	    after = ?state_after,
2034	    ss = ?since_shortstatehash,
2035	    hs = ?horizon_shortstatehash,
2036	    as = ?after_shortstatehash,
2037	    cs = %current_shortstatehash,
2038    )
2039)]
2040#[expect(clippy::too_many_arguments)]
2041async fn calculate_state_changes<'a>(
2042	services: &Services,
2043	sender_user: &UserId,
2044	room_id: &RoomId,
2045	full_state: bool,
2046	state_after: StateAfter,
2047	since_shortstatehash: Option<ShortStateHash>,
2048	horizon_shortstatehash: Option<ShortStateHash>,
2049	after_shortstatehash: Option<ShortStateHash>,
2050	current_shortstatehash: ShortStateHash,
2051	joined_since_last_sync: bool,
2052	witness: Option<&'a Witness>,
2053) -> Result<StateChanges> {
2054	let incremental = !full_state && !joined_since_last_sync && since_shortstatehash.is_some();
2055
2056	// MSC4222: `state_after` requests need state at the *end* of the
2057	// timeline; legacy `state` requests need state at the *start*. Pick
2058	// the right delta endpoint, falling back to the room's current
2059	// shortstatehash when the preferred lookup is unavailable.
2060	let horizon_shortstatehash = state_after
2061		.requested()
2062		.then_some(after_shortstatehash)
2063		.unwrap_or(horizon_shortstatehash)
2064		.unwrap_or(current_shortstatehash);
2065
2066	let since_shortstatehash = since_shortstatehash.unwrap_or(horizon_shortstatehash);
2067
2068	let state_get_shorteventid = |user_id: &'a UserId| {
2069		services
2070			.state_accessor
2071			.state_get_shortid(
2072				horizon_shortstatehash,
2073				&StateEventType::RoomMember,
2074				user_id.as_str(),
2075			)
2076			.ok()
2077	};
2078
2079	let lazy_state_ids = witness.map_async(|witness| {
2080		witness
2081			.iter()
2082			.stream()
2083			.ready_filter(|&user_id| user_id != sender_user)
2084			.broad_filter_map(|user_id| state_get_shorteventid(user_id))
2085			.into_future()
2086	});
2087
2088	let state_diff_ids = incremental.then_async(|| {
2089		services
2090			.state_accessor
2091			.state_added((since_shortstatehash, horizon_shortstatehash))
2092			.boxed()
2093			.into_future()
2094	});
2095
2096	let current_state_ids = (!incremental).then_async(|| {
2097		services
2098			.state_accessor
2099			.state_full_shortids(horizon_shortstatehash)
2100			.expect_ok()
2101			.boxed()
2102			.into_future()
2103	});
2104
2105	// Full dump is strict; the delta relaxes the member filter under MSC4222.
2106	let after = state_after.requested();
2107	let state_events = current_state_ids
2108		.stream()
2109		.map(|ids| (false, ids))
2110		.chain(
2111			state_diff_ids
2112				.stream()
2113				.map(move |ids| (after, ids)),
2114		)
2115		.broad_filter_map(async |(after, (shortstatekey, shorteventid))| {
2116			lazy_filter(services, sender_user, witness, shortstatekey, shorteventid, after).await
2117		})
2118		.chain(lazy_state_ids.stream())
2119		.broad_filter_map(|shorteventid| {
2120			services
2121				.timeline
2122				.get_pdu_from_shorteventid(shorteventid)
2123				.ok()
2124		})
2125		.collect::<Vec<_>>()
2126		.await;
2127
2128	let send_member_counts = state_events
2129		.iter()
2130		.any(|event| *event.kind() == RoomMember);
2131
2132	let member_counts =
2133		send_member_counts.then_async(|| calculate_counts(services, room_id, sender_user));
2134
2135	let (joined_member_count, invited_member_count, heroes) =
2136		member_counts.await.unwrap_or((None, None, None));
2137
2138	Ok(StateChanges {
2139		heroes,
2140		joined_member_count,
2141		invited_member_count,
2142		state_events,
2143	})
2144}
2145
2146async fn lazy_filter(
2147	services: &Services,
2148	sender_user: &UserId,
2149	witness: Option<&Witness>,
2150	shortstatekey: ShortStateKey,
2151	shorteventid: ShortEventId,
2152	after: bool,
2153) -> Option<ShortEventId> {
2154	let Some(witness) = witness else {
2155		return Some(shorteventid);
2156	};
2157
2158	let (event_type, state_key) = services
2159		.short
2160		.get_statekey_from_short(shortstatekey)
2161		.await
2162		.ok()?;
2163
2164	// An MSC4222 delta also keeps changed members the witness will not re-add
2165	// (lazy_state_ids covers witnessed ones), avoiding both a miss and a duplicate.
2166	let keep = event_type != StateEventType::RoomMember
2167		|| state_key == sender_user.as_str()
2168		|| (after && <&UserId>::try_from(state_key.as_str()).is_ok_and(|u| !witness.contains(u)));
2169
2170	keep.then_some(shorteventid)
2171}
2172
2173async fn calculate_counts(
2174	services: &Services,
2175	room_id: &RoomId,
2176	sender_user: &UserId,
2177) -> (Option<u64>, Option<u64>, Option<Vec<OwnedUserId>>) {
2178	let joined_member_count = services
2179		.state_cache
2180		.room_joined_count(room_id)
2181		.unwrap_or(0);
2182
2183	let invited_member_count = services
2184		.state_cache
2185		.room_invited_count(room_id)
2186		.unwrap_or(0);
2187
2188	let (joined_member_count, invited_member_count) =
2189		join(joined_member_count, invited_member_count).await;
2190
2191	let small_room = joined_member_count.saturating_add(invited_member_count) <= 5;
2192
2193	let heroes = services
2194		.config
2195		.calculate_heroes
2196		.and_is(small_room)
2197		.then_async(|| calculate_heroes(services, room_id, sender_user));
2198
2199	(Some(joined_member_count), Some(invited_member_count), heroes.await)
2200}
2201
2202pub(crate) async fn calculate_heroes(
2203	services: &Services,
2204	room_id: &RoomId,
2205	sender_user: &UserId,
2206) -> Vec<OwnedUserId> {
2207	const LIMIT: usize = 5;
2208
2209	services
2210		.state_accessor
2211		.room_state_type_pdus(room_id, &StateEventType::RoomMember)
2212		.ready_filter_map(Result::ok)
2213		.filter_map(|pdu| filter_hero(services, room_id, sender_user, pdu))
2214		.take(LIMIT)
2215		.collect::<Vec<_>>()
2216		.await
2217}
2218
2219async fn filter_hero<Pdu: Event>(
2220	services: &Services,
2221	room_id: &RoomId,
2222	sender_user: &UserId,
2223	pdu: Pdu,
2224) -> Option<OwnedUserId> {
2225	let user_id = pdu.state_key().map(TryInto::try_into).flat_ok()?;
2226
2227	if user_id == sender_user {
2228		return None;
2229	}
2230
2231	let Ok(content): Result<RoomMemberEventContent, _> = pdu.get_content() else {
2232		return None;
2233	};
2234
2235	// The membership was and still is invite or join
2236	if !matches!(content.membership, MembershipState::Join | MembershipState::Invite) {
2237		return None;
2238	}
2239
2240	let (is_invited, is_joined) = join(
2241		services.state_cache.is_invited(user_id, room_id),
2242		services.state_cache.is_joined(user_id, room_id),
2243	)
2244	.await;
2245
2246	if !is_joined && is_invited {
2247		return None;
2248	}
2249
2250	Some(user_id.to_owned())
2251}
2252
2253async fn typings_event_for_user(
2254	services: &Services,
2255	room_id: &RoomId,
2256	sender_user: &UserId,
2257) -> Result<SyncEphemeralRoomEvent<TypingEventContent>> {
2258	Ok(SyncEphemeralRoomEvent {
2259		content: TypingEventContent {
2260			user_ids: services
2261				.typing
2262				.typing_users_for_user(room_id, sender_user)
2263				.await?,
2264		},
2265	})
2266}
2267
2268#[cfg(test)]
2269mod tests {
2270	use super::*;
2271
2272	#[test]
2273	fn state_after_wraps_into_named_variant() {
2274		let events = StateEvents::default;
2275
2276		assert!(matches!(StateAfter::Off.wrap(events()), RoomState::Before(_)));
2277		assert!(matches!(StateAfter::Stable.wrap(events()), RoomState::After(_)));
2278		assert!(matches!(StateAfter::Unstable.wrap(events()), RoomState::AfterUnstable(_)));
2279
2280		assert!(!StateAfter::Off.requested());
2281		assert!(StateAfter::Stable.requested());
2282		assert!(StateAfter::Unstable.requested());
2283	}
2284
2285	#[test]
2286	fn state_after_selects_unstable_when_both_opted_in() {
2287		// (use_state_after, use_state_after_unstable)
2288		assert!(matches!(StateAfter::from((false, false)), StateAfter::Off));
2289		assert!(matches!(StateAfter::from((true, false)), StateAfter::Stable));
2290		assert!(matches!(StateAfter::from((false, true)), StateAfter::Unstable));
2291		assert!(matches!(StateAfter::from((true, true)), StateAfter::Unstable));
2292	}
2293}