Skip to main content

tuwunel_service/pusher/
append.rs

1use std::{collections::HashSet, sync::Arc};
2
3use futures::{
4	FutureExt, StreamExt,
5	future::{join, join4},
6};
7use ruma::{
8	RoomId, UserId,
9	api::client::push::ProfileTag,
10	events::{GlobalAccountDataEventType, TimelineEventType, push_rules::PushRulesEvent},
11	push::{Action, Actions, Ruleset, Tweak},
12};
13use serde::{Deserialize, Serialize};
14use tuwunel_core::{
15	Result, implement,
16	matrix::{
17		event::Event,
18		pdu::{Count, Pdu, PduId, RawPduId},
19	},
20	utils::{BoolExt, ReadyExt, future::TryExtExt, option::OptionExt, time::now_millis},
21};
22use tuwunel_database::{Deserialized, Json, Map};
23
24use crate::rooms::short::ShortRoomId;
25
26/// Succinct version of Ruma's Notification. Appended to the database when the
27/// user is notified. The PduCount is part of the database key so only the
28/// shortroomid is included. Together they  make the PduId.
29#[derive(Clone, Debug, Deserialize, Serialize)]
30pub struct Notified {
31	/// Milliseconds time at which the event notification was sent.
32	pub ts: u64,
33
34	/// ShortRoomId
35	pub sroomid: ShortRoomId,
36
37	/// The profile tag of the rule that matched this event.
38	#[serde(skip_serializing_if = "Option::is_none")]
39	pub tag: Option<ProfileTag>,
40
41	/// Actions vector
42	pub actions: Actions,
43}
44
45/// Called by timeline append_pdu.
46#[implement(super::Service)]
47#[tracing::instrument(name = "append", level = "debug", skip_all)]
48pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
49	// Don't notify the sender of their own events, and dont send from ignored users
50	let push_target = self
51		.services
52		.state_cache
53		.active_local_users_in_room(pdu.room_id())
54		.map(ToOwned::to_owned)
55		.ready_filter(|user| *user != pdu.sender())
56		.filter_map(async |recipient_user| {
57			self.services
58				.users
59				.user_is_ignored(pdu.sender(), &recipient_user)
60				.await
61				.is_false()
62				.then_some(recipient_user)
63		})
64		.collect::<HashSet<_>>();
65
66	let power_levels = self
67		.services
68		.state_accessor
69		.get_power_levels(pdu.room_id())
70		.ok();
71
72	let (mut push_target, power_levels) = join(push_target, power_levels).boxed().await;
73
74	if *pdu.kind() == TimelineEventType::RoomMember
75		&& let Some(Ok(target_user_id)) = pdu.state_key().map(UserId::parse)
76		&& self
77			.services
78			.users
79			.is_active_local(&target_user_id)
80			.await
81	{
82		push_target.insert(target_user_id);
83	}
84
85	let serialized = pdu.to_format();
86	let thread_root = self.services.threads.get_thread_id(pdu).await;
87	let _cork = self.db.db.cork();
88	for user in &push_target {
89		let rules_for_user = self
90			.services
91			.account_data
92			.get_global(user, GlobalAccountDataEventType::PushRules)
93			.await
94			.map_or_else(
95				|_| Ruleset::server_default(user),
96				|ev: PushRulesEvent| ev.content.global,
97			);
98
99		let actions = self
100			.services
101			.pusher
102			.get_actions(user, &rules_for_user, power_levels.as_ref(), &serialized, pdu.room_id())
103			.await;
104
105		let notify = actions
106			.iter()
107			.any(|action| matches!(action, Action::Notify));
108
109		let highlight = actions.iter().any(|action| {
110			matches!(
111				action,
112				Action::SetTweak(Tweak::Highlight(ruma::push::HighlightTweakValue::Yes))
113			)
114		});
115
116		// Mutually-exclusive partition: each notify (and each highlight)
117		// lands in either the room-level or thread bucket, never both.
118		let main_notify = (notify && thread_root.is_none())
119			.then_async(|| self.increment_notificationcount(pdu.room_id(), user));
120
121		let main_highlight = (highlight && thread_root.is_none())
122			.then_async(|| self.increment_highlightcount(pdu.room_id(), user));
123
124		let thread_notify = thread_root
125			.as_deref()
126			.filter(|_| notify)
127			.map_async(|root| self.increment_thread_notificationcount(pdu.room_id(), user, root));
128
129		let thread_highlight = thread_root
130			.as_deref()
131			.filter(|_| highlight)
132			.map_async(|root| self.increment_thread_highlightcount(pdu.room_id(), user, root));
133
134		join4(main_notify, thread_notify, main_highlight, thread_highlight).await;
135
136		if notify || highlight {
137			let id: PduId = pdu_id.into();
138			let notified = Notified {
139				ts: now_millis(),
140				sroomid: id.shortroomid,
141				tag: None,
142				actions: actions.into(),
143			};
144
145			if matches!(id.count, Count::Normal(_)) {
146				self.db
147					.useridcount_notification
148					.put((user, id.count.into_unsigned()), Json(notified));
149			}
150		}
151
152		if notify || highlight || self.services.config.push_everything {
153			self.services
154				.pusher
155				.get_pushkeys(user)
156				.map(ToOwned::to_owned)
157				.ready_for_each(|push_key| {
158					self.services
159						.sending
160						.send_pdu_push(&pdu_id, user, push_key)
161						.expect("TODO: replace with future");
162				})
163				.await;
164		}
165	}
166
167	Ok(())
168}
169
170#[implement(super::Service)]
171async fn increment_notificationcount(&self, room_id: &RoomId, user_id: &UserId) {
172	let db = &self.db.userroomid_notificationcount;
173	let key = (room_id.to_owned(), user_id.to_owned());
174	let _lock = self.notification_increment_mutex.lock(&key).await;
175
176	increment(db, (user_id, room_id)).await;
177}
178
179#[implement(super::Service)]
180async fn increment_highlightcount(&self, room_id: &RoomId, user_id: &UserId) {
181	let db = &self.db.userroomid_highlightcount;
182	let key = (room_id.to_owned(), user_id.to_owned());
183	let _lock = self.highlight_increment_mutex.lock(&key).await;
184
185	increment(db, (user_id, room_id)).await;
186}
187
188#[implement(super::Service)]
189async fn increment_thread_notificationcount(
190	&self,
191	room_id: &RoomId,
192	user_id: &UserId,
193	thread_root: &ruma::EventId,
194) {
195	let db = &self.db.userroomid_notificationcount;
196	let key = (room_id.to_owned(), user_id.to_owned());
197	let _lock = self.notification_increment_mutex.lock(&key).await;
198
199	increment_thread(db, (user_id, room_id, thread_root)).await;
200}
201
202#[implement(super::Service)]
203async fn increment_thread_highlightcount(
204	&self,
205	room_id: &RoomId,
206	user_id: &UserId,
207	thread_root: &ruma::EventId,
208) {
209	let db = &self.db.userroomid_highlightcount;
210	let key = (room_id.to_owned(), user_id.to_owned());
211	let _lock = self.highlight_increment_mutex.lock(&key).await;
212
213	increment_thread(db, (user_id, room_id, thread_root)).await;
214}
215
216async fn increment(db: &Arc<Map>, key: (&UserId, &RoomId)) {
217	let old: u64 = db.qry(&key).await.deserialized().unwrap_or(0);
218	let new = old.saturating_add(1);
219	db.put(key, new);
220}
221
222async fn increment_thread(db: &Arc<Map>, key: (&UserId, &RoomId, &ruma::EventId)) {
223	let old: u64 = db.qry(&key).await.deserialized().unwrap_or(0);
224	let new = old.saturating_add(1);
225	db.put(key, new);
226}