tuwunel_service/pusher/
suppressed.rs1use std::{
7 collections::{HashMap, VecDeque},
8 sync::Mutex,
9};
10
11use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
12use tuwunel_core::{debug, implement, trace, utils};
13
14use crate::rooms::timeline::RawPduId;
15
16const SUPPRESSED_MAX_EVENTS_PER_ROOM: usize = 512;
17const SUPPRESSED_MAX_EVENTS_PER_PUSHKEY: usize = 4096;
18const SUPPRESSED_MAX_ROOMS_PER_PUSHKEY: usize = 256;
19
20type SuppressedRooms = Vec<(OwnedRoomId, Vec<RawPduId>)>;
21type SuppressedPushes = Vec<(String, SuppressedRooms)>;
22
23#[derive(Default)]
24pub(super) struct SuppressedQueue {
25 inner: Mutex<HashMap<OwnedUserId, HashMap<String, PushkeyQueue>>>,
26}
27
28#[derive(Default)]
29struct PushkeyQueue {
30 rooms: HashMap<OwnedRoomId, VecDeque<SuppressedEvent>>,
31 total_events: usize,
32}
33
34#[derive(Clone, Debug)]
35struct SuppressedEvent {
36 pdu_id: RawPduId,
37 _inserted_at_ms: u64,
38}
39
40impl SuppressedQueue {
41 fn lock(
42 &self,
43 ) -> std::sync::MutexGuard<'_, HashMap<OwnedUserId, HashMap<String, PushkeyQueue>>> {
44 self.inner
45 .lock()
46 .unwrap_or_else(std::sync::PoisonError::into_inner)
47 }
48
49 fn drain_room(queue: VecDeque<SuppressedEvent>) -> Vec<RawPduId> {
50 queue
51 .into_iter()
52 .map(|event| event.pdu_id)
53 .collect()
54 }
55
56 fn drop_one_front(queue: &mut VecDeque<SuppressedEvent>, total_events: &mut usize) -> bool {
57 if queue.pop_front().is_some() {
58 *total_events = total_events.saturating_sub(1);
59 return true;
60 }
61
62 false
63 }
64}
65
66#[implement(super::Service)]
68pub fn queue_suppressed_push(
69 &self,
70 user_id: &UserId,
71 pushkey: &str,
72 room_id: &RoomId,
73 pdu_id: RawPduId,
74) -> bool {
75 let mut inner = self.suppressed.lock();
76 let user_entry = inner.entry(user_id.to_owned()).or_default();
77 let push_entry = user_entry.entry(pushkey.to_owned()).or_default();
78
79 if !push_entry.rooms.contains_key(room_id)
80 && push_entry.rooms.len() >= SUPPRESSED_MAX_ROOMS_PER_PUSHKEY
81 {
82 debug!(
83 ?user_id,
84 ?room_id,
85 pushkey,
86 max_rooms = SUPPRESSED_MAX_ROOMS_PER_PUSHKEY,
87 "Suppressed push queue full (rooms); dropping event"
88 );
89 return false;
90 }
91
92 let queue = push_entry
93 .rooms
94 .entry(room_id.to_owned())
95 .or_default();
96
97 if queue
98 .back()
99 .is_some_and(|event| event.pdu_id == pdu_id)
100 {
101 trace!(?user_id, ?room_id, pushkey, "Suppressed push event is duplicate; skipping");
102 return false;
103 }
104
105 if push_entry.total_events >= SUPPRESSED_MAX_EVENTS_PER_PUSHKEY && queue.is_empty() {
106 debug!(
107 ?user_id,
108 ?room_id,
109 pushkey,
110 max_events = SUPPRESSED_MAX_EVENTS_PER_PUSHKEY,
111 "Suppressed push queue full (total); dropping event"
112 );
113 return false;
114 }
115
116 while queue.len() >= SUPPRESSED_MAX_EVENTS_PER_ROOM
117 || push_entry.total_events >= SUPPRESSED_MAX_EVENTS_PER_PUSHKEY
118 {
119 if !SuppressedQueue::drop_one_front(queue, &mut push_entry.total_events) {
120 break;
121 }
122 }
123
124 queue.push_back(SuppressedEvent {
125 pdu_id,
126 _inserted_at_ms: utils::millis_since_unix_epoch(),
127 });
128 push_entry.total_events = push_entry.total_events.saturating_add(1);
129
130 true
131}
132
133#[implement(super::Service)]
135pub fn take_suppressed_for_pushkey(
136 &self,
137 user_id: &UserId,
138 pushkey: &str,
139) -> Vec<(OwnedRoomId, Vec<RawPduId>)> {
140 let mut inner = self.suppressed.lock();
141 let Some(user_entry) = inner.get_mut(user_id) else {
142 return Vec::new();
143 };
144
145 let Some(push_entry) = user_entry.remove(pushkey) else {
146 return Vec::new();
147 };
148
149 if user_entry.is_empty() {
150 inner.remove(user_id);
151 }
152
153 push_entry
154 .rooms
155 .into_iter()
156 .map(|(room_id, queue)| (room_id, SuppressedQueue::drain_room(queue)))
157 .collect()
158}
159
160#[implement(super::Service)]
162pub fn take_suppressed_for_user(&self, user_id: &UserId) -> SuppressedPushes {
163 let mut inner = self.suppressed.lock();
164 let Some(user_entry) = inner.remove(user_id) else {
165 return Vec::new();
166 };
167
168 user_entry
169 .into_iter()
170 .map(|(pushkey, queue)| {
171 let rooms = queue
172 .rooms
173 .into_iter()
174 .map(|(room_id, q)| (room_id, SuppressedQueue::drain_room(q)))
175 .collect();
176 (pushkey, rooms)
177 })
178 .collect()
179}
180
181#[implement(super::Service)]
183pub fn clear_suppressed_room(&self, user_id: &UserId, room_id: &RoomId) -> usize {
184 let mut inner = self.suppressed.lock();
185 let Some(user_entry) = inner.get_mut(user_id) else {
186 return 0;
187 };
188
189 let mut removed: usize = 0;
190 user_entry.retain(|_, push_entry| {
191 if let Some(queue) = push_entry.rooms.remove(room_id) {
192 removed = removed.saturating_add(queue.len());
193 push_entry.total_events = push_entry
194 .total_events
195 .saturating_sub(queue.len());
196 }
197
198 !push_entry.rooms.is_empty()
199 });
200
201 if user_entry.is_empty() {
202 inner.remove(user_id);
203 }
204
205 removed
206}
207
208#[implement(super::Service)]
210pub fn clear_suppressed_pushkey(&self, user_id: &UserId, pushkey: &str) -> usize {
211 let mut inner = self.suppressed.lock();
212 let Some(user_entry) = inner.get_mut(user_id) else {
213 return 0;
214 };
215
216 let removed = user_entry
217 .remove(pushkey)
218 .map(|queue| queue.total_events)
219 .unwrap_or(0);
220
221 if user_entry.is_empty() {
222 inner.remove(user_id);
223 }
224
225 removed
226}