Skip to main content

tuwunel_service/sync/
watch.rs

1use futures::{
2	Future, FutureExt, Stream, StreamExt, future::BoxFuture, pin_mut, stream::FuturesUnordered,
3};
4use ruma::{DeviceId, RoomId, UserId};
5use tuwunel_core::{implement, trace};
6use tuwunel_database::{Interfix, Separator, serialize_key};
7
8/// Register all sync watchers for the given user, device, and rooms eagerly,
9/// then return a future that resolves when any watcher fires.
10///
11/// The outer `await` completes once registration is done. Callers must drive
12/// it before sampling state, so a write between the registration window and
13/// the long-poll await cannot be missed; that race had `MustSyncUntil` calls
14/// hang for the full timeout when fast-path federation invites landed during
15/// the gap.
16///
17/// Two-phase by design: outer await registers, inner future awaits a hit.
18#[implement(super::Service)]
19#[tracing::instrument(skip(self, rooms), level = "debug")]
20#[expect(clippy::async_yields_async)]
21pub async fn watch<'a, Rooms>(
22	&'a self,
23	user_id: &'a UserId,
24	device_id: Option<&'a DeviceId>,
25	rooms: Rooms,
26) -> impl Future<Output = ()> + Send + 'a
27where
28	Rooms: Stream<Item = &'a RoomId> + Send + 'a,
29{
30	let userid_prefix =
31		serialize_key((user_id, Interfix)).expect("failed to serialize watch prefix");
32
33	let mut futures: FuturesUnordered<BoxFuture<'a, ()>> = [
34		self.db
35			.userroomid_joined
36			.watch_raw_prefix(&userid_prefix)
37			.boxed(),
38		self.db
39			.userroomid_invitestate
40			.watch_raw_prefix(&userid_prefix)
41			.boxed(),
42		self.db
43			.userroomid_leftstate
44			.watch_raw_prefix(&userid_prefix)
45			.boxed(),
46		self.db
47			.userroomid_knockedstate
48			.watch_raw_prefix(&userid_prefix)
49			.boxed(),
50		self.db
51			.userroomid_notificationcount
52			.watch_raw_prefix(&userid_prefix)
53			.boxed(),
54		self.db
55			.userroomid_highlightcount
56			.watch_raw_prefix(&userid_prefix)
57			.boxed(),
58		self.db
59			.roomusertype_roomuserdataid
60			.watch_prefix((Separator, user_id, Interfix))
61			.boxed(),
62		// More key changes (used when user is not joined to any rooms)
63		self.db
64			.keychangeid_userid
65			.watch_raw_prefix(&userid_prefix)
66			.boxed(),
67		// One time keys
68		self.db
69			.userid_lastonetimekeyupdate
70			.watch_raw_prefix(user_id)
71			.boxed(),
72		// User account data
73		self.db
74			.roomuserdataid_accountdata
75			.watch_prefix((Option::<&RoomId>::None, user_id, Interfix))
76			.boxed(),
77	]
78	.into_iter()
79	.collect();
80
81	if let Some(device_id) = device_id {
82		// Return when *any* user changed their key
83		// TODO: only send for user they share a room with
84		futures.push(
85			self.db
86				.todeviceid_events
87				.watch_prefix((user_id, device_id, Interfix))
88				.boxed(),
89		);
90	}
91
92	// Drive the rooms stream during phase 1 so per-room watchers register
93	// before this fn returns. Stream items are not retained across cursor
94	// advances; the rocksdb slice contract forbids stashing them.
95	pin_mut!(rooms);
96	while let Some(room_id) = rooms.next().await {
97		let Ok(short_roomid) = self.services.short.get_shortroomid(room_id).await else {
98			continue;
99		};
100
101		// Notification clearance
102		futures.push(
103			self.db
104				.roomuserid_lastnotificationread
105				.watch_prefix((room_id, user_id))
106				.boxed(),
107		);
108		// Key changes
109		futures.push(
110			self.db
111				.keychangeid_userid
112				.watch_prefix((room_id, Interfix))
113				.boxed(),
114		);
115		// Room account data
116		futures.push(
117			self.db
118				.roomusertype_roomuserdataid
119				.watch_prefix((room_id, user_id))
120				.boxed(),
121		);
122		// PDUs
123		futures.push(
124			self.db
125				.pduid_pdu
126				.watch_prefix(short_roomid)
127				.boxed(),
128		);
129		// EDUs
130		futures.push(
131			self.db
132				.readreceiptid_readreceipt
133				.watch_prefix((room_id, Interfix))
134				.boxed(),
135		);
136		// Typing: subscribe synchronously so the receiver is registered before
137		// this fn returns; `wait_for_update` would defer until poll.
138		let mut typing_rx = self
139			.services
140			.typing
141			.typing_update_sender
142			.subscribe();
143
144		let typing_room_id = room_id.to_owned();
145		futures.push(
146			async move {
147				while let Ok(next) = typing_rx.recv().await {
148					if next == typing_room_id {
149						break;
150					}
151				}
152			}
153			.boxed(),
154		);
155	}
156
157	// Server shutdown
158	futures.push(self.services.server.until_shutdown().boxed());
159
160	async move {
161		if !self.services.server.is_running() {
162			return;
163		}
164
165		trace!(futures = futures.len(), "watch started");
166		futures.next().await;
167		trace!(futures = futures.len(), "watch finished");
168	}
169}