1use std::{collections::HashSet, sync::Arc};
2
3use futures::{
4 FutureExt, StreamExt,
5 future::{join, join4},
6};
7use ruma::{
8 RoomId, UserId,
9 api::client::push::ProfileTag,
10 events::{GlobalAccountDataEventType, TimelineEventType, push_rules::PushRulesEvent},
11 push::{Action, Actions, Ruleset, Tweak},
12};
13use serde::{Deserialize, Serialize};
14use tuwunel_core::{
15 Result, implement,
16 matrix::{
17 event::Event,
18 pdu::{Count, Pdu, PduId, RawPduId},
19 },
20 utils::{BoolExt, ReadyExt, future::TryExtExt, option::OptionExt, time::now_millis},
21};
22use tuwunel_database::{Deserialized, Json, Map};
23
24use crate::rooms::short::ShortRoomId;
25
26#[derive(Clone, Debug, Deserialize, Serialize)]
30pub struct Notified {
31 pub ts: u64,
33
34 pub sroomid: ShortRoomId,
36
37 #[serde(skip_serializing_if = "Option::is_none")]
39 pub tag: Option<ProfileTag>,
40
41 pub actions: Actions,
43}
44
45#[implement(super::Service)]
47#[tracing::instrument(name = "append", level = "debug", skip_all)]
48pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
49 let push_target = self
51 .services
52 .state_cache
53 .active_local_users_in_room(pdu.room_id())
54 .map(ToOwned::to_owned)
55 .ready_filter(|user| *user != pdu.sender())
56 .filter_map(async |recipient_user| {
57 self.services
58 .users
59 .user_is_ignored(pdu.sender(), &recipient_user)
60 .await
61 .is_false()
62 .then_some(recipient_user)
63 })
64 .collect::<HashSet<_>>();
65
66 let power_levels = self
67 .services
68 .state_accessor
69 .get_power_levels(pdu.room_id())
70 .ok();
71
72 let (mut push_target, power_levels) = join(push_target, power_levels).boxed().await;
73
74 if *pdu.kind() == TimelineEventType::RoomMember
75 && let Some(Ok(target_user_id)) = pdu.state_key().map(UserId::parse)
76 && self
77 .services
78 .users
79 .is_active_local(&target_user_id)
80 .await
81 {
82 push_target.insert(target_user_id);
83 }
84
85 let serialized = pdu.to_format();
86 let thread_root = self.services.threads.get_thread_id(pdu).await;
87 let _cork = self.db.db.cork();
88 for user in &push_target {
89 let rules_for_user = self
90 .services
91 .account_data
92 .get_global(user, GlobalAccountDataEventType::PushRules)
93 .await
94 .map_or_else(
95 |_| Ruleset::server_default(user),
96 |ev: PushRulesEvent| ev.content.global,
97 );
98
99 let actions = self
100 .services
101 .pusher
102 .get_actions(user, &rules_for_user, power_levels.as_ref(), &serialized, pdu.room_id())
103 .await;
104
105 let notify = actions
106 .iter()
107 .any(|action| matches!(action, Action::Notify));
108
109 let highlight = actions.iter().any(|action| {
110 matches!(
111 action,
112 Action::SetTweak(Tweak::Highlight(ruma::push::HighlightTweakValue::Yes))
113 )
114 });
115
116 let main_notify = (notify && thread_root.is_none())
119 .then_async(|| self.increment_notificationcount(pdu.room_id(), user));
120
121 let main_highlight = (highlight && thread_root.is_none())
122 .then_async(|| self.increment_highlightcount(pdu.room_id(), user));
123
124 let thread_notify = thread_root
125 .as_deref()
126 .filter(|_| notify)
127 .map_async(|root| self.increment_thread_notificationcount(pdu.room_id(), user, root));
128
129 let thread_highlight = thread_root
130 .as_deref()
131 .filter(|_| highlight)
132 .map_async(|root| self.increment_thread_highlightcount(pdu.room_id(), user, root));
133
134 join4(main_notify, thread_notify, main_highlight, thread_highlight).await;
135
136 if notify || highlight {
137 let id: PduId = pdu_id.into();
138 let notified = Notified {
139 ts: now_millis(),
140 sroomid: id.shortroomid,
141 tag: None,
142 actions: actions.into(),
143 };
144
145 if matches!(id.count, Count::Normal(_)) {
146 self.db
147 .useridcount_notification
148 .put((user, id.count.into_unsigned()), Json(notified));
149 }
150 }
151
152 if notify || highlight || self.services.config.push_everything {
153 self.services
154 .pusher
155 .get_pushkeys(user)
156 .map(ToOwned::to_owned)
157 .ready_for_each(|push_key| {
158 self.services
159 .sending
160 .send_pdu_push(&pdu_id, user, push_key)
161 .expect("TODO: replace with future");
162 })
163 .await;
164 }
165 }
166
167 Ok(())
168}
169
170#[implement(super::Service)]
171async fn increment_notificationcount(&self, room_id: &RoomId, user_id: &UserId) {
172 let db = &self.db.userroomid_notificationcount;
173 let key = (room_id.to_owned(), user_id.to_owned());
174 let _lock = self.notification_increment_mutex.lock(&key).await;
175
176 increment(db, (user_id, room_id)).await;
177}
178
179#[implement(super::Service)]
180async fn increment_highlightcount(&self, room_id: &RoomId, user_id: &UserId) {
181 let db = &self.db.userroomid_highlightcount;
182 let key = (room_id.to_owned(), user_id.to_owned());
183 let _lock = self.highlight_increment_mutex.lock(&key).await;
184
185 increment(db, (user_id, room_id)).await;
186}
187
188#[implement(super::Service)]
189async fn increment_thread_notificationcount(
190 &self,
191 room_id: &RoomId,
192 user_id: &UserId,
193 thread_root: &ruma::EventId,
194) {
195 let db = &self.db.userroomid_notificationcount;
196 let key = (room_id.to_owned(), user_id.to_owned());
197 let _lock = self.notification_increment_mutex.lock(&key).await;
198
199 increment_thread(db, (user_id, room_id, thread_root)).await;
200}
201
202#[implement(super::Service)]
203async fn increment_thread_highlightcount(
204 &self,
205 room_id: &RoomId,
206 user_id: &UserId,
207 thread_root: &ruma::EventId,
208) {
209 let db = &self.db.userroomid_highlightcount;
210 let key = (room_id.to_owned(), user_id.to_owned());
211 let _lock = self.highlight_increment_mutex.lock(&key).await;
212
213 increment_thread(db, (user_id, room_id, thread_root)).await;
214}
215
216async fn increment(db: &Arc<Map>, key: (&UserId, &RoomId)) {
217 let old: u64 = db.qry(&key).await.deserialized().unwrap_or(0);
218 let new = old.saturating_add(1);
219 db.put(key, new);
220}
221
222async fn increment_thread(db: &Arc<Map>, key: (&UserId, &RoomId, &ruma::EventId)) {
223 let old: u64 = db.qry(&key).await.deserialized().unwrap_or(0);
224 let new = old.saturating_add(1);
225 db.put(key, new);
226}