Skip to main content

tuwunel_service/pusher/
notification.rs

1use std::collections::BTreeMap;
2
3use futures::{StreamExt, stream::select};
4use ruma::{EventId, OwnedEventId, RoomId, UserId, events::receipt::ReceiptThread};
5use tuwunel_core::{
6	Result, implement, trace,
7	utils::stream::{ReadyExt, TryIgnore},
8};
9use tuwunel_database::{Deserialized, Ignore, Interfix};
10
11/// Per-thread unread counts: `(notification, highlight)` keyed by thread root.
12type ThreadCounts = BTreeMap<OwnedEventId, (u64, u64)>;
13
14/// Per-thread last-read counts keyed by thread root. Used by sync v3 to
15/// gate emission of `unread_thread_notifications` to threads whose read
16/// cursor advanced within the sync window.
17type ThreadLastReads = BTreeMap<OwnedEventId, u64>;
18
19#[implement(super::Service)]
20#[tracing::instrument(level = "debug", skip(self))]
21pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) {
22	let count = self.services.globals.next_count();
23
24	let userroom_id = (user_id, room_id);
25	self.db
26		.userroomid_highlightcount
27		.put(userroom_id, 0_u64);
28	self.db
29		.userroomid_notificationcount
30		.put(userroom_id, 0_u64);
31
32	let roomuser_id = (room_id, user_id);
33	self.db
34		.roomuserid_lastnotificationread
35		.put(roomuser_id, *count);
36
37	let removed = self.clear_suppressed_room(user_id, room_id);
38	if removed > 0 {
39		trace!(?user_id, ?room_id, removed, "Cleared suppressed push events after read");
40	}
41}
42
43/// Reset counts for a single thread within a room. Per-thread rows live in
44/// the same CFs as the main `(user, room)` rows; the trailing event id
45/// keeps them disjoint. Stamps a per-thread last-read so sync v3 can gate
46/// emission of `unread_thread_notifications` to threads that advanced
47/// within the window.
48#[implement(super::Service)]
49#[tracing::instrument(level = "debug", skip(self))]
50pub fn reset_thread_notification_counts(
51	&self,
52	user_id: &UserId,
53	room_id: &RoomId,
54	thread_root: &EventId,
55) {
56	let count = self.services.globals.next_count();
57
58	let userroom_thread = (user_id, room_id, thread_root);
59	self.db
60		.userroomid_highlightcount
61		.put(userroom_thread, 0_u64);
62	self.db
63		.userroomid_notificationcount
64		.put(userroom_thread, 0_u64);
65
66	let roomuser_thread = (room_id, user_id, thread_root);
67	self.db
68		.roomuserid_lastnotificationread
69		.put(roomuser_thread, *count);
70}
71
72/// Clear every per-thread notification, highlight, and last-read row for
73/// this user and room. The `Interfix` prefix forces a trailing separator
74/// into the scan key, so the legacy 2-tuple main row (which has no
75/// trailing separator) is excluded by construction; only 3-tuple thread
76/// rows match. Sweeps run sequentially: parallelizing them via `join`
77/// triggers a `for<'a> FnMut(&[u8])` Send-not-general-enough cascade
78/// through the route handler. Per-thread last-reads use the inverse
79/// `(room, user, ...)` order to mirror the existing sync watch prefix.
80#[implement(super::Service)]
81pub async fn clear_all_thread_notification_counts(&self, user_id: &UserId, room_id: &RoomId) {
82	let userroom_prefix = (user_id, room_id, Interfix);
83	let roomuser_prefix = (room_id, user_id, Interfix);
84
85	self.db
86		.userroomid_highlightcount
87		.keys_prefix_raw(&userroom_prefix)
88		.ignore_err()
89		.ready_for_each(|key| {
90			self.db.userroomid_highlightcount.remove(key);
91		})
92		.await;
93
94	self.db
95		.userroomid_notificationcount
96		.keys_prefix_raw(&userroom_prefix)
97		.ignore_err()
98		.ready_for_each(|key| {
99			self.db.userroomid_notificationcount.remove(key);
100		})
101		.await;
102
103	self.db
104		.roomuserid_lastnotificationread
105		.keys_prefix_raw(&roomuser_prefix)
106		.ignore_err()
107		.ready_for_each(|key| {
108			self.db
109				.roomuserid_lastnotificationread
110				.remove(key);
111		})
112		.await;
113}
114
115/// Dispatcher: route a receipt's `ReceiptThread` to the matching reset path.
116/// `Unthreaded` clears all room and thread counts; `Main` clears only the
117/// main-timeline counts; `Thread(id)` clears just that thread.
118#[implement(super::Service)]
119pub async fn reset_notification_counts_for_thread(
120	&self,
121	user_id: &UserId,
122	room_id: &RoomId,
123	thread: &ReceiptThread,
124) {
125	match thread {
126		| ReceiptThread::Main => self.reset_notification_counts(user_id, room_id),
127		| ReceiptThread::Thread(root) =>
128			self.reset_thread_notification_counts(user_id, room_id, root),
129		| _ => {
130			self.reset_notification_counts(user_id, room_id);
131			self.clear_all_thread_notification_counts(user_id, room_id)
132				.await;
133		},
134	}
135}
136
137#[implement(super::Service)]
138#[tracing::instrument(level = "debug", skip(self), ret(level = "trace"))]
139pub async fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 {
140	let key = (user_id, room_id);
141	self.db
142		.userroomid_notificationcount
143		.qry(&key)
144		.await
145		.deserialized()
146		.unwrap_or(0)
147}
148
149#[implement(super::Service)]
150#[tracing::instrument(level = "debug", skip(self), ret(level = "trace"))]
151pub async fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 {
152	let key = (user_id, room_id);
153	self.db
154		.userroomid_highlightcount
155		.qry(&key)
156		.await
157		.deserialized()
158		.unwrap_or(0)
159}
160
161/// Per-thread `(notification, highlight)` counts for one room and user.
162/// `Interfix` excludes the legacy 2-tuple main row from the scan; only
163/// 3-tuple `(user, room, root)` rows match.
164#[implement(super::Service)]
165#[tracing::instrument(level = "debug", skip(self))]
166pub async fn thread_notification_counts(
167	&self,
168	user_id: &UserId,
169	room_id: &RoomId,
170) -> ThreadCounts {
171	let prefix = (user_id, room_id, Interfix);
172	let notifications = self
173		.db
174		.userroomid_notificationcount
175		.stream_prefix(&prefix)
176		.ignore_err()
177		.map(notification_kv);
178
179	let highlights = self
180		.db
181		.userroomid_highlightcount
182		.stream_prefix(&prefix)
183		.ignore_err()
184		.map(highlight_kv);
185
186	select(notifications, highlights)
187		.ready_fold(ThreadCounts::default(), merge_thread_count)
188		.await
189}
190
191fn notification_kv(
192	(key, notifications): ((&UserId, &RoomId, OwnedEventId), u64),
193) -> (OwnedEventId, (u64, u64)) {
194	(key.2, (notifications, 0))
195}
196
197fn highlight_kv(
198	(key, highlights): ((&UserId, &RoomId, OwnedEventId), u64),
199) -> (OwnedEventId, (u64, u64)) {
200	(key.2, (0, highlights))
201}
202
203fn merge_thread_count(
204	mut counts: ThreadCounts,
205	(root, (notifications, highlights)): (OwnedEventId, (u64, u64)),
206) -> ThreadCounts {
207	let entry = counts.entry(root).or_default();
208	entry.0 = entry.0.saturating_add(notifications);
209	entry.1 = entry.1.saturating_add(highlights);
210	counts
211}
212
213#[implement(super::Service)]
214#[tracing::instrument(level = "debug", skip(self), ret(level = "trace"))]
215pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
216	let key = (room_id, user_id);
217	self.db
218		.roomuserid_lastnotificationread
219		.qry(&key)
220		.await
221		.deserialized()
222}
223
224/// Per-thread last-read counts for one room and user. `Interfix` keeps the
225/// scan to 3-tuple `(room, user, root)` rows; the legacy 2-tuple main row
226/// is excluded by construction and lives behind `last_notification_read`.
227#[implement(super::Service)]
228#[tracing::instrument(level = "debug", skip(self))]
229pub async fn thread_last_notification_reads(
230	&self,
231	user_id: &UserId,
232	room_id: &RoomId,
233) -> ThreadLastReads {
234	let prefix = (room_id, user_id, Interfix);
235	self.db
236		.roomuserid_lastnotificationread
237		.stream_prefix(&prefix)
238		.ignore_err()
239		.map(|((_, _, root), count): ((Ignore, Ignore, OwnedEventId), u64)| (root, count))
240		.collect()
241		.await
242}
243
244#[implement(super::Service)]
245pub async fn delete_room_notification_read(&self, room_id: &RoomId) -> Result {
246	let key = (room_id, Interfix);
247	self.db
248		.roomuserid_lastnotificationread
249		.keys_prefix_raw(&key)
250		.ignore_err()
251		.ready_for_each(|key| {
252			trace!("Removing key: {key:?}");
253			self.db
254				.roomuserid_lastnotificationread
255				.remove(key);
256		})
257		.await;
258
259	Ok(())
260}