1use std::collections::BTreeMap;
2
3use futures::{StreamExt, stream::select};
4use ruma::{EventId, OwnedEventId, RoomId, UserId, events::receipt::ReceiptThread};
5use tuwunel_core::{
6 Result, implement, trace,
7 utils::stream::{ReadyExt, TryIgnore},
8};
9use tuwunel_database::{Deserialized, Ignore, Interfix};
10
11type ThreadCounts = BTreeMap<OwnedEventId, (u64, u64)>;
13
14type ThreadLastReads = BTreeMap<OwnedEventId, u64>;
18
19#[implement(super::Service)]
20#[tracing::instrument(level = "debug", skip(self))]
21pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) {
22 let count = self.services.globals.next_count();
23
24 let userroom_id = (user_id, room_id);
25 self.db
26 .userroomid_highlightcount
27 .put(userroom_id, 0_u64);
28 self.db
29 .userroomid_notificationcount
30 .put(userroom_id, 0_u64);
31
32 let roomuser_id = (room_id, user_id);
33 self.db
34 .roomuserid_lastnotificationread
35 .put(roomuser_id, *count);
36
37 let removed = self.clear_suppressed_room(user_id, room_id);
38 if removed > 0 {
39 trace!(?user_id, ?room_id, removed, "Cleared suppressed push events after read");
40 }
41}
42
43#[implement(super::Service)]
49#[tracing::instrument(level = "debug", skip(self))]
50pub fn reset_thread_notification_counts(
51 &self,
52 user_id: &UserId,
53 room_id: &RoomId,
54 thread_root: &EventId,
55) {
56 let count = self.services.globals.next_count();
57
58 let userroom_thread = (user_id, room_id, thread_root);
59 self.db
60 .userroomid_highlightcount
61 .put(userroom_thread, 0_u64);
62 self.db
63 .userroomid_notificationcount
64 .put(userroom_thread, 0_u64);
65
66 let roomuser_thread = (room_id, user_id, thread_root);
67 self.db
68 .roomuserid_lastnotificationread
69 .put(roomuser_thread, *count);
70}
71
72#[implement(super::Service)]
81pub async fn clear_all_thread_notification_counts(&self, user_id: &UserId, room_id: &RoomId) {
82 let userroom_prefix = (user_id, room_id, Interfix);
83 let roomuser_prefix = (room_id, user_id, Interfix);
84
85 self.db
86 .userroomid_highlightcount
87 .keys_prefix_raw(&userroom_prefix)
88 .ignore_err()
89 .ready_for_each(|key| {
90 self.db.userroomid_highlightcount.remove(key);
91 })
92 .await;
93
94 self.db
95 .userroomid_notificationcount
96 .keys_prefix_raw(&userroom_prefix)
97 .ignore_err()
98 .ready_for_each(|key| {
99 self.db.userroomid_notificationcount.remove(key);
100 })
101 .await;
102
103 self.db
104 .roomuserid_lastnotificationread
105 .keys_prefix_raw(&roomuser_prefix)
106 .ignore_err()
107 .ready_for_each(|key| {
108 self.db
109 .roomuserid_lastnotificationread
110 .remove(key);
111 })
112 .await;
113}
114
115#[implement(super::Service)]
119pub async fn reset_notification_counts_for_thread(
120 &self,
121 user_id: &UserId,
122 room_id: &RoomId,
123 thread: &ReceiptThread,
124) {
125 match thread {
126 | ReceiptThread::Main => self.reset_notification_counts(user_id, room_id),
127 | ReceiptThread::Thread(root) =>
128 self.reset_thread_notification_counts(user_id, room_id, root),
129 | _ => {
130 self.reset_notification_counts(user_id, room_id);
131 self.clear_all_thread_notification_counts(user_id, room_id)
132 .await;
133 },
134 }
135}
136
137#[implement(super::Service)]
138#[tracing::instrument(level = "debug", skip(self), ret(level = "trace"))]
139pub async fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 {
140 let key = (user_id, room_id);
141 self.db
142 .userroomid_notificationcount
143 .qry(&key)
144 .await
145 .deserialized()
146 .unwrap_or(0)
147}
148
149#[implement(super::Service)]
150#[tracing::instrument(level = "debug", skip(self), ret(level = "trace"))]
151pub async fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> u64 {
152 let key = (user_id, room_id);
153 self.db
154 .userroomid_highlightcount
155 .qry(&key)
156 .await
157 .deserialized()
158 .unwrap_or(0)
159}
160
161#[implement(super::Service)]
165#[tracing::instrument(level = "debug", skip(self))]
166pub async fn thread_notification_counts(
167 &self,
168 user_id: &UserId,
169 room_id: &RoomId,
170) -> ThreadCounts {
171 let prefix = (user_id, room_id, Interfix);
172 let notifications = self
173 .db
174 .userroomid_notificationcount
175 .stream_prefix(&prefix)
176 .ignore_err()
177 .map(notification_kv);
178
179 let highlights = self
180 .db
181 .userroomid_highlightcount
182 .stream_prefix(&prefix)
183 .ignore_err()
184 .map(highlight_kv);
185
186 select(notifications, highlights)
187 .ready_fold(ThreadCounts::default(), merge_thread_count)
188 .await
189}
190
191fn notification_kv(
192 (key, notifications): ((&UserId, &RoomId, OwnedEventId), u64),
193) -> (OwnedEventId, (u64, u64)) {
194 (key.2, (notifications, 0))
195}
196
197fn highlight_kv(
198 (key, highlights): ((&UserId, &RoomId, OwnedEventId), u64),
199) -> (OwnedEventId, (u64, u64)) {
200 (key.2, (0, highlights))
201}
202
203fn merge_thread_count(
204 mut counts: ThreadCounts,
205 (root, (notifications, highlights)): (OwnedEventId, (u64, u64)),
206) -> ThreadCounts {
207 let entry = counts.entry(root).or_default();
208 entry.0 = entry.0.saturating_add(notifications);
209 entry.1 = entry.1.saturating_add(highlights);
210 counts
211}
212
213#[implement(super::Service)]
214#[tracing::instrument(level = "debug", skip(self), ret(level = "trace"))]
215pub async fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
216 let key = (room_id, user_id);
217 self.db
218 .roomuserid_lastnotificationread
219 .qry(&key)
220 .await
221 .deserialized()
222}
223
224#[implement(super::Service)]
228#[tracing::instrument(level = "debug", skip(self))]
229pub async fn thread_last_notification_reads(
230 &self,
231 user_id: &UserId,
232 room_id: &RoomId,
233) -> ThreadLastReads {
234 let prefix = (room_id, user_id, Interfix);
235 self.db
236 .roomuserid_lastnotificationread
237 .stream_prefix(&prefix)
238 .ignore_err()
239 .map(|((_, _, root), count): ((Ignore, Ignore, OwnedEventId), u64)| (root, count))
240 .collect()
241 .await
242}
243
244#[implement(super::Service)]
245pub async fn delete_room_notification_read(&self, room_id: &RoomId) -> Result {
246 let key = (room_id, Interfix);
247 self.db
248 .roomuserid_lastnotificationread
249 .keys_prefix_raw(&key)
250 .ignore_err()
251 .ready_for_each(|key| {
252 trace!("Removing key: {key:?}");
253 self.db
254 .roomuserid_lastnotificationread
255 .remove(key);
256 })
257 .await;
258
259 Ok(())
260}