tuwunel_service/rooms/read_receipt/
data.rs1use std::sync::Arc;
2
3use futures::{Stream, StreamExt};
4use ruma::{
5 CanonicalJsonObject, RoomId, UserId,
6 events::{
7 AnySyncEphemeralRoomEvent,
8 receipt::{ReceiptEvent, ReceiptThread},
9 },
10 serde::Raw,
11};
12use tuwunel_core::{
13 Result, err, is_equal_to, trace,
14 utils::{ReadyExt, stream::TryIgnore},
15};
16use tuwunel_database::{Deserialized, Interfix, Json, Map, serialize_key};
17
18use super::ThreadKind;
19
20pub(super) struct Data {
21 roomuserid_privateread: Arc<Map>,
22 roomuserid_lastprivatereadupdate: Arc<Map>,
23 services: Arc<crate::services::OnceServices>,
24 readreceiptid_readreceipt: Arc<Map>,
25}
26
27pub(super) type ReceiptItem<'a> = (&'a UserId, u64, Raw<AnySyncEphemeralRoomEvent>);
28
29impl Data {
30 pub(super) fn new(args: &crate::Args<'_>) -> Self {
31 let db = &args.db;
32 Self {
33 roomuserid_privateread: db["roomuserid_privateread"].clone(),
34 roomuserid_lastprivatereadupdate: db["roomuserid_lastprivatereadupdate"].clone(),
35 readreceiptid_readreceipt: db["readreceiptid_readreceipt"].clone(),
36 services: args.services.clone(),
37 }
38 }
39
40 #[inline]
41 pub(super) async fn readreceipt_update(
42 &self,
43 user_id: &UserId,
44 room_id: &RoomId,
45 event: &ReceiptEvent,
46 ) {
47 let thread_kind = event_thread_kind(event);
48 let suffix = serialize_key((user_id, thread_kind))
55 .expect("failed to serialize receipt key suffix");
56
57 let user_id_bytes = user_id.as_bytes();
58 let legacy_match = thread_kind.is_empty();
59
60 let last_possible_key = (room_id, u64::MAX);
61 self.readreceiptid_readreceipt
62 .rev_keys_from_raw(&last_possible_key)
63 .ignore_err()
64 .ready_take_while(|key| key.starts_with(room_id.as_bytes()))
65 .ready_filter_map(|key| {
66 (key.ends_with(suffix.as_slice())
67 || (legacy_match && key.ends_with(user_id_bytes)))
68 .then_some(key)
69 })
70 .ready_for_each(|key| self.readreceiptid_readreceipt.del(key))
71 .await;
72
73 let count = self.services.globals.next_count();
74 let latest_id = (room_id, *count, user_id, thread_kind);
75 self.readreceiptid_readreceipt
76 .put(latest_id, Json(event));
77 }
78
79 #[inline]
80 pub(super) fn readreceipts_since<'a>(
81 &'a self,
82 room_id: &'a RoomId,
83 since: u64,
84 to: Option<u64>,
85 ) -> impl Stream<Item = ReceiptItem<'_>> + Send + 'a {
86 type Key<'a> = (&'a RoomId, u64, &'a UserId, &'a str);
88 type KeyVal<'a> = (Key<'a>, CanonicalJsonObject);
89
90 let after_since = since.saturating_add(1); let first_possible_edu = (room_id, after_since);
92
93 self.readreceiptid_readreceipt
94 .stream_from(&first_possible_edu)
95 .ignore_err()
96 .ready_take_while(move |((r, c, ..), _): &KeyVal<'_>| {
97 *r == room_id && to.is_none_or(|to| *c <= to)
98 })
99 .map(move |((_, count, user_id, _), mut json): KeyVal<'_>| {
100 json.remove("room_id");
101
102 let event = serde_json::value::to_raw_value(&json)?;
103
104 Ok((user_id, count, Raw::from_json(event)))
105 })
106 .ignore_err()
107 }
108
109 #[inline]
110 pub(super) async fn last_receipt_count<'a>(
111 &'a self,
112 room_id: &'a RoomId,
113 since: Option<u64>,
114 user_id: Option<&'a UserId>,
115 ) -> Result<u64> {
116 type Key<'a> = (&'a RoomId, u64, &'a UserId, &'a str);
118
119 let key = (room_id, u64::MAX);
120 self.readreceiptid_readreceipt
121 .rev_keys_prefix(&key)
122 .ignore_err()
123 .ready_take_while(|(_, c, u, _): &Key<'_>| {
124 since.is_none_or(|since| since > *c) && user_id.is_none_or(is_equal_to!(*u))
125 })
126 .map(|(_, c, ..): Key<'_>| c)
127 .boxed()
128 .next()
129 .await
130 .ok_or_else(|| err!(Request(NotFound("No receipts found in room"))))
131 }
132
133 #[inline]
143 pub(super) async fn private_read_set(
144 &self,
145 room_id: &RoomId,
146 user_id: &UserId,
147 pdu_count: u64,
148 thread: &ReceiptThread,
149 ) {
150 let next_count = self.services.globals.next_count();
151
152 let lastupdate_key = (room_id, user_id);
153 self.roomuserid_lastprivatereadupdate
154 .put(lastupdate_key, *next_count);
155
156 match thread.as_str() {
157 | Some(thread_kind) if !thread_kind.is_empty() => {
158 let key = (room_id, user_id, thread_kind);
159 self.roomuserid_privateread.put(key, pdu_count);
160 },
161 | _ => {
162 self.clear_thread_private_reads(room_id, user_id)
163 .await;
164
165 let key = (room_id, user_id);
166 self.roomuserid_privateread.put(key, pdu_count);
167 },
168 }
169 }
170
171 #[inline]
173 pub(super) async fn private_read_get_count(
174 &self,
175 room_id: &RoomId,
176 user_id: &UserId,
177 ) -> Result<u64> {
178 let key = (room_id, user_id);
179 self.roomuserid_privateread
180 .qry(&key)
181 .await
182 .deserialized()
183 }
184
185 #[inline]
190 pub(super) fn private_read_threaded_stream<'a>(
191 &'a self,
192 room_id: &'a RoomId,
193 user_id: &'a UserId,
194 ) -> impl Stream<Item = (ThreadKind, u64)> + Send + 'a {
195 type ThreadKv<'a> = ((&'a RoomId, &'a UserId, &'a str), u64);
196
197 let prefix = (room_id, user_id, Interfix);
198 self.roomuserid_privateread
199 .stream_prefix(&prefix)
200 .ignore_err()
201 .map(|((_, _, kind), count): ThreadKv<'_>| (ThreadKind::from(kind), count))
202 }
203
204 #[inline]
205 async fn clear_thread_private_reads(&self, room_id: &RoomId, user_id: &UserId) {
206 let prefix = (room_id, user_id, Interfix);
207 self.roomuserid_privateread
208 .keys_prefix_raw(&prefix)
209 .ignore_err()
210 .ready_for_each(|key| {
211 self.roomuserid_privateread.remove(key);
212 })
213 .await;
214 }
215
216 #[inline]
217 pub(super) async fn last_privateread_update(
218 &self,
219 user_id: &UserId,
220 room_id: &RoomId,
221 ) -> u64 {
222 let key = (room_id, user_id);
223 self.roomuserid_lastprivatereadupdate
224 .qry(&key)
225 .await
226 .deserialized()
227 .unwrap_or(0)
228 }
229
230 #[inline]
231 pub(super) async fn delete_all_read_receipts(&self, room_id: &RoomId) -> Result {
232 let prefix = (room_id, Interfix);
233
234 self.roomuserid_privateread
235 .keys_prefix_raw(&prefix)
236 .ignore_err()
237 .ready_for_each(|key| {
238 trace!("Removing key: {key:?}");
239 self.roomuserid_privateread.remove(key);
240 })
241 .await;
242
243 self.roomuserid_lastprivatereadupdate
244 .keys_prefix_raw(&prefix)
245 .ignore_err()
246 .ready_for_each(|key| {
247 trace!("Removing key: {key:?}");
248 self.roomuserid_lastprivatereadupdate.remove(key);
249 })
250 .await;
251
252 self.readreceiptid_readreceipt
253 .keys_prefix_raw(&prefix)
254 .ignore_err()
255 .ready_for_each(|key| {
256 trace!("Removing key: {key:?}");
257 self.readreceiptid_readreceipt.remove(key);
258 })
259 .await;
260
261 Ok(())
262 }
263}
264
265fn event_thread_kind(event: &ReceiptEvent) -> &str {
279 debug_assert!(
280 event
281 .content
282 .values()
283 .all(|by_type| by_type.len() == 1
284 && by_type.values().all(|by_user| by_user.len() == 1))
285 && event.content.len() == 1,
286 "receipt event must carry exactly one (event_id, type, user) triple"
287 );
288
289 event
290 .content
291 .values()
292 .next()
293 .and_then(|by_type| by_type.values().next())
294 .and_then(|by_user| by_user.values().next())
295 .and_then(|receipt| receipt.thread.as_str())
296 .unwrap_or_default()
297}