Skip to main content

tuwunel_service/pusher/
suppressed.rs

1//! Deferred push suppression queues.
2//!
3//! Stores suppressed push events in memory until they can be flushed. This is
4//! intentionally in-memory only: suppressed events are discarded on restart.
5
6use std::{
7	collections::{HashMap, VecDeque},
8	sync::Mutex,
9};
10
11use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
12use tuwunel_core::{debug, implement, trace, utils};
13
14use crate::rooms::timeline::RawPduId;
15
16const SUPPRESSED_MAX_EVENTS_PER_ROOM: usize = 512;
17const SUPPRESSED_MAX_EVENTS_PER_PUSHKEY: usize = 4096;
18const SUPPRESSED_MAX_ROOMS_PER_PUSHKEY: usize = 256;
19
20type SuppressedRooms = Vec<(OwnedRoomId, Vec<RawPduId>)>;
21type SuppressedPushes = Vec<(String, SuppressedRooms)>;
22
23#[derive(Default)]
24pub(super) struct SuppressedQueue {
25	inner: Mutex<HashMap<OwnedUserId, HashMap<String, PushkeyQueue>>>,
26}
27
28#[derive(Default)]
29struct PushkeyQueue {
30	rooms: HashMap<OwnedRoomId, VecDeque<SuppressedEvent>>,
31	total_events: usize,
32}
33
34#[derive(Clone, Debug)]
35struct SuppressedEvent {
36	pdu_id: RawPduId,
37	_inserted_at_ms: u64,
38}
39
40impl SuppressedQueue {
41	fn lock(
42		&self,
43	) -> std::sync::MutexGuard<'_, HashMap<OwnedUserId, HashMap<String, PushkeyQueue>>> {
44		self.inner
45			.lock()
46			.unwrap_or_else(std::sync::PoisonError::into_inner)
47	}
48
49	fn drain_room(queue: VecDeque<SuppressedEvent>) -> Vec<RawPduId> {
50		queue
51			.into_iter()
52			.map(|event| event.pdu_id)
53			.collect()
54	}
55
56	fn drop_one_front(queue: &mut VecDeque<SuppressedEvent>, total_events: &mut usize) -> bool {
57		if queue.pop_front().is_some() {
58			*total_events = total_events.saturating_sub(1);
59			return true;
60		}
61
62		false
63	}
64}
65
66/// Enqueue a PDU for later push delivery when suppression is active.
67#[implement(super::Service)]
68pub fn queue_suppressed_push(
69	&self,
70	user_id: &UserId,
71	pushkey: &str,
72	room_id: &RoomId,
73	pdu_id: RawPduId,
74) -> bool {
75	let mut inner = self.suppressed.lock();
76	let user_entry = inner.entry(user_id.to_owned()).or_default();
77	let push_entry = user_entry.entry(pushkey.to_owned()).or_default();
78
79	if !push_entry.rooms.contains_key(room_id)
80		&& push_entry.rooms.len() >= SUPPRESSED_MAX_ROOMS_PER_PUSHKEY
81	{
82		debug!(
83			?user_id,
84			?room_id,
85			pushkey,
86			max_rooms = SUPPRESSED_MAX_ROOMS_PER_PUSHKEY,
87			"Suppressed push queue full (rooms); dropping event"
88		);
89		return false;
90	}
91
92	let queue = push_entry
93		.rooms
94		.entry(room_id.to_owned())
95		.or_default();
96
97	if queue
98		.back()
99		.is_some_and(|event| event.pdu_id == pdu_id)
100	{
101		trace!(?user_id, ?room_id, pushkey, "Suppressed push event is duplicate; skipping");
102		return false;
103	}
104
105	if push_entry.total_events >= SUPPRESSED_MAX_EVENTS_PER_PUSHKEY && queue.is_empty() {
106		debug!(
107			?user_id,
108			?room_id,
109			pushkey,
110			max_events = SUPPRESSED_MAX_EVENTS_PER_PUSHKEY,
111			"Suppressed push queue full (total); dropping event"
112		);
113		return false;
114	}
115
116	while queue.len() >= SUPPRESSED_MAX_EVENTS_PER_ROOM
117		|| push_entry.total_events >= SUPPRESSED_MAX_EVENTS_PER_PUSHKEY
118	{
119		if !SuppressedQueue::drop_one_front(queue, &mut push_entry.total_events) {
120			break;
121		}
122	}
123
124	queue.push_back(SuppressedEvent {
125		pdu_id,
126		_inserted_at_ms: utils::millis_since_unix_epoch(),
127	});
128	push_entry.total_events = push_entry.total_events.saturating_add(1);
129
130	true
131}
132
133/// Take and remove all suppressed PDUs for a given user + pushkey.
134#[implement(super::Service)]
135pub fn take_suppressed_for_pushkey(
136	&self,
137	user_id: &UserId,
138	pushkey: &str,
139) -> Vec<(OwnedRoomId, Vec<RawPduId>)> {
140	let mut inner = self.suppressed.lock();
141	let Some(user_entry) = inner.get_mut(user_id) else {
142		return Vec::new();
143	};
144
145	let Some(push_entry) = user_entry.remove(pushkey) else {
146		return Vec::new();
147	};
148
149	if user_entry.is_empty() {
150		inner.remove(user_id);
151	}
152
153	push_entry
154		.rooms
155		.into_iter()
156		.map(|(room_id, queue)| (room_id, SuppressedQueue::drain_room(queue)))
157		.collect()
158}
159
160/// Take and remove all suppressed PDUs for a given user across all pushkeys.
161#[implement(super::Service)]
162pub fn take_suppressed_for_user(&self, user_id: &UserId) -> SuppressedPushes {
163	let mut inner = self.suppressed.lock();
164	let Some(user_entry) = inner.remove(user_id) else {
165		return Vec::new();
166	};
167
168	user_entry
169		.into_iter()
170		.map(|(pushkey, queue)| {
171			let rooms = queue
172				.rooms
173				.into_iter()
174				.map(|(room_id, q)| (room_id, SuppressedQueue::drain_room(q)))
175				.collect();
176			(pushkey, rooms)
177		})
178		.collect()
179}
180
181/// Clear suppressed PDUs for a specific room (across all pushkeys).
182#[implement(super::Service)]
183pub fn clear_suppressed_room(&self, user_id: &UserId, room_id: &RoomId) -> usize {
184	let mut inner = self.suppressed.lock();
185	let Some(user_entry) = inner.get_mut(user_id) else {
186		return 0;
187	};
188
189	let mut removed: usize = 0;
190	user_entry.retain(|_, push_entry| {
191		if let Some(queue) = push_entry.rooms.remove(room_id) {
192			removed = removed.saturating_add(queue.len());
193			push_entry.total_events = push_entry
194				.total_events
195				.saturating_sub(queue.len());
196		}
197
198		!push_entry.rooms.is_empty()
199	});
200
201	if user_entry.is_empty() {
202		inner.remove(user_id);
203	}
204
205	removed
206}
207
208/// Clear suppressed PDUs for a specific pushkey.
209#[implement(super::Service)]
210pub fn clear_suppressed_pushkey(&self, user_id: &UserId, pushkey: &str) -> usize {
211	let mut inner = self.suppressed.lock();
212	let Some(user_entry) = inner.get_mut(user_id) else {
213		return 0;
214	};
215
216	let removed = user_entry
217		.remove(pushkey)
218		.map(|queue| queue.total_events)
219		.unwrap_or(0);
220
221	if user_entry.is_empty() {
222		inner.remove(user_id);
223	}
224
225	removed
226}