Skip to main content

tuwunel_service/rooms/read_receipt/
data.rs

1use std::sync::Arc;
2
3use futures::{Stream, StreamExt};
4use ruma::{
5	CanonicalJsonObject, RoomId, UserId,
6	events::{
7		AnySyncEphemeralRoomEvent,
8		receipt::{ReceiptEvent, ReceiptThread},
9	},
10	serde::Raw,
11};
12use tuwunel_core::{
13	Result, err, is_equal_to, trace,
14	utils::{ReadyExt, stream::TryIgnore},
15};
16use tuwunel_database::{Deserialized, Interfix, Json, Map, serialize_key};
17
18use super::ThreadKind;
19
20pub(super) struct Data {
21	roomuserid_privateread: Arc<Map>,
22	roomuserid_lastprivatereadupdate: Arc<Map>,
23	services: Arc<crate::services::OnceServices>,
24	readreceiptid_readreceipt: Arc<Map>,
25}
26
27pub(super) type ReceiptItem<'a> = (&'a UserId, u64, Raw<AnySyncEphemeralRoomEvent>);
28
29impl Data {
30	pub(super) fn new(args: &crate::Args<'_>) -> Self {
31		let db = &args.db;
32		Self {
33			roomuserid_privateread: db["roomuserid_privateread"].clone(),
34			roomuserid_lastprivatereadupdate: db["roomuserid_lastprivatereadupdate"].clone(),
35			readreceiptid_readreceipt: db["readreceiptid_readreceipt"].clone(),
36			services: args.services.clone(),
37		}
38	}
39
40	#[inline]
41	pub(super) async fn readreceipt_update(
42		&self,
43		user_id: &UserId,
44		room_id: &RoomId,
45		event: &ReceiptEvent,
46	) {
47		let thread_kind = event_thread_kind(event);
48		// MSC3771: storage key suffix is `user_id || 0xFF || thread_kind` so
49		// each (user, thread-context) tuple lives in its own row. Pre-MSC3771
50		// rows have no kind tail; on an Unthreaded sweep also match the
51		// bare-user-id ending so legacy rows are superseded rather than
52		// orphaned. Kind tails ("main", `$root`) never end in `@user:host`,
53		// so the legacy match cannot collide with thread-aware rows.
54		let suffix = serialize_key((user_id, thread_kind))
55			.expect("failed to serialize receipt key suffix");
56
57		let user_id_bytes = user_id.as_bytes();
58		let legacy_match = thread_kind.is_empty();
59
60		let last_possible_key = (room_id, u64::MAX);
61		self.readreceiptid_readreceipt
62			.rev_keys_from_raw(&last_possible_key)
63			.ignore_err()
64			.ready_take_while(|key| key.starts_with(room_id.as_bytes()))
65			.ready_filter_map(|key| {
66				(key.ends_with(suffix.as_slice())
67					|| (legacy_match && key.ends_with(user_id_bytes)))
68				.then_some(key)
69			})
70			.ready_for_each(|key| self.readreceiptid_readreceipt.del(key))
71			.await;
72
73		let count = self.services.globals.next_count();
74		let latest_id = (room_id, *count, user_id, thread_kind);
75		self.readreceiptid_readreceipt
76			.put(latest_id, Json(event));
77	}
78
79	#[inline]
80	pub(super) fn readreceipts_since<'a>(
81		&'a self,
82		room_id: &'a RoomId,
83		since: u64,
84		to: Option<u64>,
85	) -> impl Stream<Item = ReceiptItem<'_>> + Send + 'a {
86		// 4-tuple key: pre-MSC3771 rows deserialize with `&str` tail empty.
87		type Key<'a> = (&'a RoomId, u64, &'a UserId, &'a str);
88		type KeyVal<'a> = (Key<'a>, CanonicalJsonObject);
89
90		let after_since = since.saturating_add(1); // +1 so we don't send the event at since
91		let first_possible_edu = (room_id, after_since);
92
93		self.readreceiptid_readreceipt
94			.stream_from(&first_possible_edu)
95			.ignore_err()
96			.ready_take_while(move |((r, c, ..), _): &KeyVal<'_>| {
97				*r == room_id && to.is_none_or(|to| *c <= to)
98			})
99			.map(move |((_, count, user_id, _), mut json): KeyVal<'_>| {
100				json.remove("room_id");
101
102				let event = serde_json::value::to_raw_value(&json)?;
103
104				Ok((user_id, count, Raw::from_json(event)))
105			})
106			.ignore_err()
107	}
108
109	#[inline]
110	pub(super) async fn last_receipt_count<'a>(
111		&'a self,
112		room_id: &'a RoomId,
113		since: Option<u64>,
114		user_id: Option<&'a UserId>,
115	) -> Result<u64> {
116		// 4-tuple key: pre-MSC3771 rows deserialize with `&str` tail empty.
117		type Key<'a> = (&'a RoomId, u64, &'a UserId, &'a str);
118
119		let key = (room_id, u64::MAX);
120		self.readreceiptid_readreceipt
121			.rev_keys_prefix(&key)
122			.ignore_err()
123			.ready_take_while(|(_, c, u, _): &Key<'_>| {
124				since.is_none_or(|since| since > *c) && user_id.is_none_or(is_equal_to!(*u))
125			})
126			.map(|(_, c, ..): Key<'_>| c)
127			.boxed()
128			.next()
129			.await
130			.ok_or_else(|| err!(Request(NotFound("No receipts found in room"))))
131	}
132
133	/// Sets the private read marker for `(room, user, thread)`.
134	///
135	/// Unthreaded writes use the legacy 2-tuple `(room, user)` key shape
136	/// and sweep any pre-existing per-thread rows so the room-wide receipt
137	/// supersedes prior thread state. Threaded writes (Main, Thread, custom)
138	/// use a 3-tuple `(room, user, thread_kind)` key disjoint from the
139	/// legacy row by trailing separator. `roomuserid_lastprivatereadupdate`
140	/// stays 2-tuple and is bumped on every write so sync gating remains a
141	/// single point query.
142	#[inline]
143	pub(super) async fn private_read_set(
144		&self,
145		room_id: &RoomId,
146		user_id: &UserId,
147		pdu_count: u64,
148		thread: &ReceiptThread,
149	) {
150		let next_count = self.services.globals.next_count();
151
152		let lastupdate_key = (room_id, user_id);
153		self.roomuserid_lastprivatereadupdate
154			.put(lastupdate_key, *next_count);
155
156		match thread.as_str() {
157			| Some(thread_kind) if !thread_kind.is_empty() => {
158				let key = (room_id, user_id, thread_kind);
159				self.roomuserid_privateread.put(key, pdu_count);
160			},
161			| _ => {
162				self.clear_thread_private_reads(room_id, user_id)
163					.await;
164
165				let key = (room_id, user_id);
166				self.roomuserid_privateread.put(key, pdu_count);
167			},
168		}
169	}
170
171	/// Latest unthreaded (legacy 2-tuple) private read PDU count.
172	#[inline]
173	pub(super) async fn private_read_get_count(
174		&self,
175		room_id: &RoomId,
176		user_id: &UserId,
177	) -> Result<u64> {
178		let key = (room_id, user_id);
179		self.roomuserid_privateread
180			.qry(&key)
181			.await
182			.deserialized()
183	}
184
185	/// Stream of `(thread_kind, pdu_count)` for the per-thread (3-tuple)
186	/// private read rows for this `(room, user)`. The legacy 2-tuple row is
187	/// excluded by the trailing-separator prefix; query it via
188	/// `private_read_get_count`.
189	#[inline]
190	pub(super) fn private_read_threaded_stream<'a>(
191		&'a self,
192		room_id: &'a RoomId,
193		user_id: &'a UserId,
194	) -> impl Stream<Item = (ThreadKind, u64)> + Send + 'a {
195		type ThreadKv<'a> = ((&'a RoomId, &'a UserId, &'a str), u64);
196
197		let prefix = (room_id, user_id, Interfix);
198		self.roomuserid_privateread
199			.stream_prefix(&prefix)
200			.ignore_err()
201			.map(|((_, _, kind), count): ThreadKv<'_>| (ThreadKind::from(kind), count))
202	}
203
204	#[inline]
205	async fn clear_thread_private_reads(&self, room_id: &RoomId, user_id: &UserId) {
206		let prefix = (room_id, user_id, Interfix);
207		self.roomuserid_privateread
208			.keys_prefix_raw(&prefix)
209			.ignore_err()
210			.ready_for_each(|key| {
211				self.roomuserid_privateread.remove(key);
212			})
213			.await;
214	}
215
216	#[inline]
217	pub(super) async fn last_privateread_update(
218		&self,
219		user_id: &UserId,
220		room_id: &RoomId,
221	) -> u64 {
222		let key = (room_id, user_id);
223		self.roomuserid_lastprivatereadupdate
224			.qry(&key)
225			.await
226			.deserialized()
227			.unwrap_or(0)
228	}
229
230	#[inline]
231	pub(super) async fn delete_all_read_receipts(&self, room_id: &RoomId) -> Result {
232		let prefix = (room_id, Interfix);
233
234		self.roomuserid_privateread
235			.keys_prefix_raw(&prefix)
236			.ignore_err()
237			.ready_for_each(|key| {
238				trace!("Removing key: {key:?}");
239				self.roomuserid_privateread.remove(key);
240			})
241			.await;
242
243		self.roomuserid_lastprivatereadupdate
244			.keys_prefix_raw(&prefix)
245			.ignore_err()
246			.ready_for_each(|key| {
247				trace!("Removing key: {key:?}");
248				self.roomuserid_lastprivatereadupdate.remove(key);
249			})
250			.await;
251
252		self.readreceiptid_readreceipt
253			.keys_prefix_raw(&prefix)
254			.ignore_err()
255			.ready_for_each(|key| {
256				trace!("Removing key: {key:?}");
257				self.readreceiptid_readreceipt.remove(key);
258			})
259			.await;
260
261		Ok(())
262	}
263}
264
265/// Tag string used in the storage key to discriminate receipts per thread.
266/// Empty for `Unthreaded`, `"main"` for `Main`, the event-id string for
267/// `Thread(...)` (event ids start with `$`, so the values are mutually
268/// exclusive). Custom variants reuse their string form; the C/S boundary
269/// rejects them, but federation receipts may still carry them through.
270///
271/// Reads only the first `(event_id, type, user)` triple. All callers
272/// build single-entry receipts (one event id, one type, one user); a
273/// debug assertion catches future regressions. An entirely empty event
274/// or one whose only receipt lacks a thread field falls back to `""`.
275///
276/// Appended to the receipt-row key as a tolerant trailing field. Pre-
277/// MSC3771 rows have no trailing kind; they round-trip as `""`.
278fn event_thread_kind(event: &ReceiptEvent) -> &str {
279	debug_assert!(
280		event
281			.content
282			.values()
283			.all(|by_type| by_type.len() == 1
284				&& by_type.values().all(|by_user| by_user.len() == 1))
285			&& event.content.len() == 1,
286		"receipt event must carry exactly one (event_id, type, user) triple"
287	);
288
289	event
290		.content
291		.values()
292		.next()
293		.and_then(|by_type| by_type.values().next())
294		.and_then(|by_user| by_user.values().next())
295		.and_then(|receipt| receipt.thread.as_str())
296		.unwrap_or_default()
297}