tuwunel_service/sync/
watch.rs1use futures::{
2 Future, FutureExt, Stream, StreamExt, future::BoxFuture, pin_mut, stream::FuturesUnordered,
3};
4use ruma::{DeviceId, RoomId, UserId};
5use tuwunel_core::{implement, trace};
6use tuwunel_database::{Interfix, Separator, serialize_key};
7
8#[implement(super::Service)]
19#[tracing::instrument(skip(self, rooms), level = "debug")]
20#[expect(clippy::async_yields_async)]
21pub async fn watch<'a, Rooms>(
22 &'a self,
23 user_id: &'a UserId,
24 device_id: Option<&'a DeviceId>,
25 rooms: Rooms,
26) -> impl Future<Output = ()> + Send + 'a
27where
28 Rooms: Stream<Item = &'a RoomId> + Send + 'a,
29{
30 let userid_prefix =
31 serialize_key((user_id, Interfix)).expect("failed to serialize watch prefix");
32
33 let mut futures: FuturesUnordered<BoxFuture<'a, ()>> = [
34 self.db
35 .userroomid_joined
36 .watch_raw_prefix(&userid_prefix)
37 .boxed(),
38 self.db
39 .userroomid_invitestate
40 .watch_raw_prefix(&userid_prefix)
41 .boxed(),
42 self.db
43 .userroomid_leftstate
44 .watch_raw_prefix(&userid_prefix)
45 .boxed(),
46 self.db
47 .userroomid_knockedstate
48 .watch_raw_prefix(&userid_prefix)
49 .boxed(),
50 self.db
51 .userroomid_notificationcount
52 .watch_raw_prefix(&userid_prefix)
53 .boxed(),
54 self.db
55 .userroomid_highlightcount
56 .watch_raw_prefix(&userid_prefix)
57 .boxed(),
58 self.db
59 .roomusertype_roomuserdataid
60 .watch_prefix((Separator, user_id, Interfix))
61 .boxed(),
62 self.db
64 .keychangeid_userid
65 .watch_raw_prefix(&userid_prefix)
66 .boxed(),
67 self.db
69 .userid_lastonetimekeyupdate
70 .watch_raw_prefix(user_id)
71 .boxed(),
72 self.db
74 .roomuserdataid_accountdata
75 .watch_prefix((Option::<&RoomId>::None, user_id, Interfix))
76 .boxed(),
77 ]
78 .into_iter()
79 .collect();
80
81 if let Some(device_id) = device_id {
82 futures.push(
85 self.db
86 .todeviceid_events
87 .watch_prefix((user_id, device_id, Interfix))
88 .boxed(),
89 );
90 }
91
92 pin_mut!(rooms);
96 while let Some(room_id) = rooms.next().await {
97 let Ok(short_roomid) = self.services.short.get_shortroomid(room_id).await else {
98 continue;
99 };
100
101 futures.push(
103 self.db
104 .roomuserid_lastnotificationread
105 .watch_prefix((room_id, user_id))
106 .boxed(),
107 );
108 futures.push(
110 self.db
111 .keychangeid_userid
112 .watch_prefix((room_id, Interfix))
113 .boxed(),
114 );
115 futures.push(
117 self.db
118 .roomusertype_roomuserdataid
119 .watch_prefix((room_id, user_id))
120 .boxed(),
121 );
122 futures.push(
124 self.db
125 .pduid_pdu
126 .watch_prefix(short_roomid)
127 .boxed(),
128 );
129 futures.push(
131 self.db
132 .readreceiptid_readreceipt
133 .watch_prefix((room_id, Interfix))
134 .boxed(),
135 );
136 let mut typing_rx = self
139 .services
140 .typing
141 .typing_update_sender
142 .subscribe();
143
144 let typing_room_id = room_id.to_owned();
145 futures.push(
146 async move {
147 while let Ok(next) = typing_rx.recv().await {
148 if next == typing_room_id {
149 break;
150 }
151 }
152 }
153 .boxed(),
154 );
155 }
156
157 futures.push(self.services.server.until_shutdown().boxed());
159
160 async move {
161 if !self.services.server.is_running() {
162 return;
163 }
164
165 trace!(futures = futures.len(), "watch started");
166 futures.next().await;
167 trace!(futures = futures.len(), "watch finished");
168 }
169}