1use std::{
2 collections::{BTreeMap, HashMap, HashSet, btree_map::Entry},
3 fmt::Debug,
4 sync::{
5 Arc,
6 atomic::{AtomicU64, AtomicUsize, Ordering},
7 },
8 time::{Duration, Instant},
9};
10
11use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
12use futures::{
13 FutureExt, StreamExt, TryFutureExt,
14 future::{BoxFuture, join3, try_join3},
15 pin_mut,
16 stream::FuturesUnordered,
17};
18use ruma::{
19 MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName,
20 UserId,
21 api::{
22 appservice::event::push_events::v1::{EphemeralData, Request as PushEventsRequest},
23 client::push::Pusher,
24 federation::transactions::{
25 edu::{
26 DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent,
27 ReceiptData, ReceiptMap,
28 },
29 send_transaction_message,
30 },
31 },
32 device_id,
33 events::{
34 AnySyncEphemeralRoomEvent, GlobalAccountDataEventType, push_rules::PushRulesEvent,
35 receipt::ReceiptType,
36 },
37 presence::PresenceState,
38 push,
39 serde::Raw,
40 uint,
41};
42use tuwunel_core::{
43 Error, Event, Result, debug, err, error, extract_variant,
44 result::LogErr,
45 smallvec::SmallVec,
46 trace,
47 utils::{
48 BoolExt, ReadyExt, calculate_hash, continue_exponential_backoff_secs,
49 future::TryExtExt,
50 stream::{BroadbandExt, IterStream, WidebandExt},
51 },
52 warn,
53};
54
55use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
56use crate::{federation::ShouldAttempt, rooms::timeline::RawPduId};
57
58#[derive(Debug)]
62enum TransactionStatus {
63 Running,
64 Failed(u32, Instant), Retrying(u32), }
67
68type SendingError = (Destination, Error);
69type SendingResult = Result<Destination, SendingError>;
70type SendingFuture<'a> = BoxFuture<'a, SendingResult>;
71type SendingFutures<'a> = FuturesUnordered<SendingFuture<'a>>;
72type CurTransactionStatus = HashMap<Destination, TransactionStatus>;
73
74type UserReceipts = SmallVec<[ReceiptData; 1]>;
78
79type RankedReceipts = SmallVec<[ReceiptMap; 1]>;
84
85type RoomReceipts = SmallVec<[(OwnedRoomId, RankedReceipts); 1]>;
88
89const SELECT_PRESENCE_LIMIT: usize = 256;
90const SELECT_RECEIPT_LIMIT: usize = 256;
91const DEQUEUE_LIMIT: usize = 48;
92
93pub const PDU_LIMIT: usize = 50;
94pub const EDU_LIMIT: usize = 100;
95
96impl Service {
97 #[tracing::instrument(skip(self), level = "debug")]
98 pub(super) async fn sender(self: Arc<Self>, id: usize) -> Result {
99 let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
100 let mut futures: SendingFutures<'_> = FuturesUnordered::new();
101
102 self.startup_netburst(id, &mut futures, &mut statuses)
103 .boxed()
104 .await;
105
106 self.work_loop(id, &mut futures, &mut statuses)
107 .await;
108
109 if !futures.is_empty() {
110 self.finish_responses(&mut futures).boxed().await;
111 }
112
113 Ok(())
114 }
115
116 #[tracing::instrument(
117 name = "work",
118 level = "trace",
119 skip_all,
120 fields(
121 futures = %futures.len(),
122 statuses = %statuses.len(),
123 ),
124 )]
125 async fn work_loop<'a>(
126 &'a self,
127 id: usize,
128 futures: &mut SendingFutures<'a>,
129 statuses: &mut CurTransactionStatus,
130 ) {
131 let receiver = self
132 .channels
133 .get(id)
134 .map(|(_, receiver)| receiver.clone())
135 .expect("Missing channel for sender worker");
136
137 while !receiver.is_closed() {
138 tokio::select! {
139 Some(response) = futures.next() => {
140 self.handle_response(response, futures, statuses).await;
141 },
142 request = receiver.recv_async() => match request {
143 Ok(request) => self.handle_request(request, futures, statuses).await,
144 Err(_) => return,
145 },
146 }
147 }
148 }
149
150 #[tracing::instrument(name = "response", level = "debug", skip_all)]
151 async fn handle_response<'a>(
152 &'a self,
153 response: SendingResult,
154 futures: &mut SendingFutures<'a>,
155 statuses: &mut CurTransactionStatus,
156 ) {
157 match response {
158 | Err((dest, e)) => Self::handle_response_err(dest, statuses, &e),
159 | Ok(dest) =>
160 self.handle_response_ok(&dest, futures, statuses)
161 .await,
162 }
163 }
164
165 fn handle_response_err(dest: Destination, statuses: &mut CurTransactionStatus, e: &Error) {
166 debug!(?dest, "{e:?}");
167 let push = matches!(dest, Destination::Push(..));
169
170 statuses.entry(dest).and_modify(|e| {
171 let tries = match e {
172 | TransactionStatus::Running => 1,
173 | TransactionStatus::Failed(n, _) | TransactionStatus::Retrying(n) =>
174 n.saturating_add(1),
175 };
176
177 *e = if push {
178 TransactionStatus::Failed(tries, Instant::now())
179 } else {
180 TransactionStatus::Retrying(tries)
181 };
182 });
183 }
184
185 #[expect(clippy::needless_pass_by_ref_mut)]
186 async fn handle_response_ok<'a>(
187 &'a self,
188 dest: &Destination,
189 futures: &mut SendingFutures<'a>,
190 statuses: &mut CurTransactionStatus,
191 ) {
192 let _cork = self.db.db.cork();
193 self.db.delete_all_active_requests_for(dest).await;
194
195 let new_events = self
197 .db
198 .queued_requests(dest)
199 .take(DEQUEUE_LIMIT)
200 .collect::<Vec<_>>()
201 .await;
202
203 if !new_events.is_empty() {
205 self.db.mark_as_active(new_events.iter());
206
207 let new_events_vec = new_events
208 .into_iter()
209 .map(|(_, event)| event)
210 .collect();
211
212 futures.push(self.send_events(dest.clone(), new_events_vec));
213 } else {
214 statuses.remove(dest);
215 }
216 }
217
218 #[expect(clippy::needless_pass_by_ref_mut)]
219 #[tracing::instrument(name = "request", level = "debug", skip_all)]
220 async fn handle_request<'a>(
221 &'a self,
222 msg: Msg,
223 futures: &mut SendingFutures<'a>,
224 statuses: &mut CurTransactionStatus,
225 ) {
226 let iv = vec![(msg.queue_id, msg.event)];
227 if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses).await {
228 if !events.is_empty() {
229 futures.push(self.send_events(msg.dest, events));
230 } else {
231 statuses.remove(&msg.dest);
232 }
233 }
234 }
235
236 #[tracing::instrument(
237 name = "finish",
238 level = "info",
239 skip_all,
240 fields(futures = %futures.len()),
241 )]
242 async fn finish_responses<'a>(&'a self, futures: &mut SendingFutures<'a>) {
243 use tokio::{
244 select,
245 time::{Instant, sleep_until},
246 };
247
248 let timeout = self.server.config.sender_shutdown_timeout;
249 let timeout = Duration::from_secs(timeout);
250 let now = Instant::now();
251 let deadline = now.checked_add(timeout).unwrap_or(now);
252 loop {
253 trace!("Waiting for {} requests to complete...", futures.len());
254 select! {
255 () = sleep_until(deadline) => return,
256 response = futures.next() => match response {
257 Some(Ok(dest)) => self.db.delete_all_active_requests_for(&dest).await,
258 Some(_) => continue,
259 None => return,
260 },
261 }
262 }
263 }
264
265 #[tracing::instrument(
266 name = "netburst",
267 level = "debug",
268 skip_all,
269 fields(futures = %futures.len()),
270 )]
271 #[expect(clippy::needless_pass_by_ref_mut)]
272 async fn startup_netburst<'a>(
273 &'a self,
274 id: usize,
275 futures: &mut SendingFutures<'a>,
276 statuses: &mut CurTransactionStatus,
277 ) {
278 let keep =
279 usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
280
281 let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
282 let active = self.db.active_requests();
283
284 pin_mut!(active);
285 while let Some((key, event, dest)) = active.next().await {
286 if self.shard_id(&dest) != id {
287 continue;
288 }
289
290 let entry = txns.entry(dest.clone()).or_default();
291 if self.server.config.startup_netburst_keep >= 0 && entry.len() >= keep {
292 warn!("Dropping unsent event {dest:?} {:?}", String::from_utf8_lossy(&key));
293 self.db.delete_active_request(&key);
294 } else {
295 entry.push(event);
296 }
297 }
298
299 for (dest, events) in txns {
300 if self.server.config.startup_netburst && !events.is_empty() {
301 statuses.insert(dest.clone(), TransactionStatus::Running);
302 futures.push(self.send_events(dest.clone(), events));
303 }
304 }
305 }
306
307 #[tracing::instrument(
308 name = "select",
309 level = "debug",
310 skip_all,
311 fields(
312 ?dest,
313 new_events = %new_events.len(),
314 ),
315 )]
316 async fn select_events(
317 &self,
318 dest: &Destination,
319 new_events: Vec<QueueItem>, statuses: &mut CurTransactionStatus,
321 ) -> Result<Option<Vec<SendingEvent>>> {
322 let (allow, retry) = self.select_events_current(dest, statuses).await?;
323
324 if !allow {
326 return Ok(None);
327 }
328
329 let mut events = Vec::new();
330
331 if retry {
333 self.db
334 .active_requests_for(dest)
335 .ready_for_each(|(_, e)| events.push(e))
336 .await;
337
338 return Ok(Some(events));
339 }
340
341 let _cork = self.db.db.cork();
343 if !new_events.is_empty() {
344 self.db.mark_as_active(new_events.iter());
345 for (_, e) in new_events {
346 events.push(e);
347 }
348 }
349
350 if let Destination::Federation(server_name) = dest
352 && let Ok((select_edus, last_count)) = self.select_edus(server_name).await
353 {
354 debug_assert!(select_edus.len() <= EDU_LIMIT, "exceeded edus limit");
355 let select_edus = select_edus.into_iter().map(SendingEvent::Edu);
356
357 events.extend(select_edus);
358 self.db
359 .set_latest_educount(server_name, last_count);
360 }
361
362 Ok(Some(events))
363 }
364
365 async fn select_events_current(
366 &self,
367 dest: &Destination,
368 statuses: &mut CurTransactionStatus,
369 ) -> Result<(bool, bool)> {
370 if let Destination::Federation(server) = dest {
372 let should_attempt = self
373 .services
374 .federation
375 .should_attempt(server)
376 .await;
377
378 if matches!(should_attempt, ShouldAttempt::No { .. }) {
379 return Ok((false, false));
380 }
381 }
382
383 let (mut allow, mut retry) = (true, false);
384 statuses
385 .entry(dest.clone())
386 .and_modify(|e| match e {
387 | TransactionStatus::Running => {
388 allow = false; },
390 | TransactionStatus::Failed(tries, time) => {
391 let min = self.server.config.sender_timeout;
393 let max = self.server.config.sender_retry_backoff_limit;
394 if continue_exponential_backoff_secs(min, max, time.elapsed(), *tries) {
395 allow = false;
396 } else {
397 retry = true;
398 *e = TransactionStatus::Retrying(*tries);
399 }
400 },
401 | TransactionStatus::Retrying(_) if matches!(dest, Destination::Push(..)) => {
402 allow = false; },
404 | TransactionStatus::Retrying(_) => {
405 retry = true;
407 *e = TransactionStatus::Running;
408 },
409 })
410 .or_insert(TransactionStatus::Running);
411
412 Ok((allow, retry))
413 }
414
415 #[tracing::instrument(name = "edus", level = "debug", skip_all)]
416 async fn select_edus(&self, server_name: &ServerName) -> Result<(EduVec, u64)> {
417 let since = self.db.get_latest_educount(server_name).await;
419 let since_upper = self.services.globals.current_count();
420 let batch = (since, since_upper);
421 debug_assert!(batch.0 <= batch.1, "since range must not be negative");
422
423 let events_len = AtomicUsize::default();
424 let max_edu_count = AtomicU64::new(since);
425 let device_changes =
426 self.select_edus_device_changes(server_name, batch, &max_edu_count, &events_len);
427
428 let receipts = self
429 .server
430 .config
431 .allow_outgoing_read_receipts
432 .then_async(|| {
433 self.select_edus_receipts(server_name, batch, &max_edu_count, &events_len)
434 });
435
436 let presence = self
437 .server
438 .config
439 .allow_outgoing_presence
440 .then_async(|| {
441 self.select_edus_presence(server_name, batch, &max_edu_count, &events_len)
442 });
443
444 let (device_changes, receipts, presence) =
445 join3(device_changes, receipts, presence).await;
446
447 let mut events = device_changes;
448
449 events.extend(presence.into_iter().flatten());
450 events.extend(receipts.into_iter().flatten());
451
452 Ok((events, max_edu_count.load(Ordering::Acquire)))
453 }
454
455 #[tracing::instrument(
457 name = "device_changes",
458 level = "trace",
459 skip(self, server_name, max_edu_count, events_len)
460 )]
461 async fn select_edus_device_changes(
462 &self,
463 server_name: &ServerName,
464 since: (u64, u64),
465 max_edu_count: &AtomicU64,
466 events_len: &AtomicUsize,
467 ) -> EduVec {
468 let mut events = EduVec::new();
469 let server_rooms = self
470 .services
471 .state_cache
472 .server_rooms(server_name);
473
474 pin_mut!(server_rooms);
475 let mut device_list_changes = HashSet::<OwnedUserId>::new();
476 while let Some(room_id) = server_rooms.next().await {
477 let keys_changed = self
478 .services
479 .users
480 .room_keys_changed(room_id, since.0, Some(since.1))
481 .ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id));
482
483 pin_mut!(keys_changed);
484 while let Some((user_id, count)) = keys_changed.next().await {
485 debug_assert!(count <= since.1, "exceeds upper-bound");
486
487 max_edu_count.fetch_max(count, Ordering::Relaxed);
488 if !device_list_changes.insert(user_id.into()) {
489 continue;
490 }
491
492 let edu = Edu::DeviceListUpdate(DeviceListUpdateContent {
495 user_id: user_id.into(),
496 device_id: device_id!("placeholder").to_owned(),
497 device_display_name: Some("Placeholder".to_owned()),
498 stream_id: uint!(1),
499 prev_id: Vec::new(),
500 deleted: None,
501 keys: None,
502 });
503
504 let mut buf = EduBuf::new();
505 serde_json::to_writer(&mut buf, &edu)
506 .expect("failed to serialize device list update to JSON");
507
508 if events_len.fetch_add(1, Ordering::Relaxed) >= EDU_LIMIT {
510 return events;
511 }
512
513 events.push(buf);
514 }
515 }
516
517 events
518 }
519
520 #[tracing::instrument(
530 name = "receipts",
531 level = "trace",
532 skip(self, server_name, max_edu_count, events_len)
533 )]
534 async fn select_edus_receipts(
535 &self,
536 server_name: &ServerName,
537 since: (u64, u64),
538 max_edu_count: &AtomicU64,
539 events_len: &AtomicUsize,
540 ) -> EduVec {
541 let num = AtomicUsize::new(0);
542 let by_room: RoomReceipts = self
543 .services
544 .state_cache
545 .server_rooms(server_name)
546 .map(ToOwned::to_owned)
547 .broad_filter_map(async |room_id| {
548 let ranked = self
549 .select_edus_receipts_room(&room_id, since, max_edu_count, &num)
550 .await;
551
552 ranked
553 .is_empty()
554 .is_false()
555 .then_some((room_id, ranked))
556 })
557 .collect()
558 .boxed()
559 .await;
560
561 let max_rank = by_room
562 .iter()
563 .map(|(_, maps)| maps.len())
564 .max()
565 .unwrap_or(0);
566
567 let pivot_rank = |rank: usize| -> Option<BTreeMap<OwnedRoomId, ReceiptMap>> {
568 let receipts: BTreeMap<_, _> = by_room
569 .iter()
570 .filter_map(|(room_id, maps)| {
571 maps.get(rank)
572 .cloned()
573 .map(|map| (room_id.clone(), map))
574 })
575 .collect();
576
577 receipts.is_empty().is_false().then_some(receipts)
578 };
579
580 let serialize_edu = |receipts: BTreeMap<OwnedRoomId, ReceiptMap>| -> EduBuf {
581 let mut buf = EduBuf::new();
582 serde_json::to_writer(&mut buf, &Edu::Receipt(ReceiptContent { receipts }))
583 .expect("Failed to serialize Receipt EDU to JSON vec");
584
585 buf
586 };
587
588 let reserve = |_: &_| events_len.fetch_add(1, Ordering::Relaxed) < EDU_LIMIT;
590
591 (0..max_rank)
592 .filter_map(pivot_rank)
593 .take_while(reserve)
594 .map(serialize_edu)
595 .collect()
596 }
597
598 #[tracing::instrument(
607 name = "receipts",
608 level = "trace",
609 skip(self, since, max_edu_count)
610 )]
611 async fn select_edus_receipts_room(
612 &self,
613 room_id: &RoomId,
614 since: (u64, u64),
615 max_edu_count: &AtomicU64,
616 num: &AtomicUsize,
617 ) -> RankedReceipts {
618 let receipts =
619 self.services
620 .read_receipt
621 .readreceipts_since(room_id, since.0, Some(since.1));
622
623 pin_mut!(receipts);
624 let mut by_user = BTreeMap::<OwnedUserId, UserReceipts>::new();
625 while let Some((user_id, count, read_receipt)) = receipts.next().await {
626 debug_assert!(count <= since.1, "exceeds upper-bound");
627
628 max_edu_count.fetch_max(count, Ordering::Relaxed);
629 if !self.services.globals.user_is_local(user_id) {
630 continue;
631 }
632
633 let Ok(event) = serde_json::from_str(read_receipt.json().get()) else {
634 error!(?user_id, ?count, ?read_receipt, "Invalid edu event in read_receipts.");
635 continue;
636 };
637
638 let AnySyncEphemeralRoomEvent::Receipt(r) = event else {
639 error!(?user_id, ?count, ?event, "Invalid event type in read_receipts");
640 continue;
641 };
642
643 let (event_id, mut receipt) = r
644 .content
645 .0
646 .into_iter()
647 .next()
648 .expect("we only use one event per read receipt");
649
650 let receipt = receipt
651 .remove(&ReceiptType::Read)
652 .expect("our read receipts always set this")
653 .remove(user_id)
654 .expect("our read receipts always have the user here");
655
656 let receipt_data = ReceiptData { data: receipt, event_ids: vec![event_id] };
657
658 match by_user.entry(user_id.to_owned()) {
659 | Entry::Vacant(slot) => {
660 slot.insert(SmallVec::from_buf([receipt_data]));
661 let num = num.fetch_add(1, Ordering::Relaxed);
662 if num >= SELECT_RECEIPT_LIMIT {
663 break;
664 }
665 },
666 | Entry::Occupied(mut slot) => {
667 slot.get_mut().push(receipt_data);
668 },
669 }
670 }
671
672 by_user
676 .into_iter()
677 .fold(RankedReceipts::new(), |mut acc, (user_id, receipts)| {
678 for (rank, receipt_data) in receipts.into_iter().enumerate() {
679 if rank >= acc.len() {
680 acc.push(ReceiptMap { read: BTreeMap::new() });
681 }
682
683 acc[rank]
684 .read
685 .insert(user_id.clone(), receipt_data);
686 }
687
688 acc
689 })
690 }
691
692 #[tracing::instrument(
694 name = "presence",
695 level = "trace",
696 skip(self, server_name, max_edu_count, events_len)
697 )]
698 async fn select_edus_presence(
699 &self,
700 server_name: &ServerName,
701 since: (u64, u64),
702 max_edu_count: &AtomicU64,
703 events_len: &AtomicUsize,
704 ) -> Option<EduBuf> {
705 let presence_since = self
706 .services
707 .presence
708 .presence_since(since.0, Some(since.1));
709
710 pin_mut!(presence_since);
711 let mut presence_updates = HashMap::<OwnedUserId, PresenceUpdate>::new();
712 while let Some((user_id, count, presence_bytes)) = presence_since.next().await {
713 debug_assert!(count <= since.1, "exceeded upper-bound");
714
715 max_edu_count.fetch_max(count, Ordering::Relaxed);
716 if !self.services.globals.user_is_local(user_id) {
717 continue;
718 }
719
720 if !self
721 .services
722 .state_cache
723 .server_sees_user(server_name, user_id)
724 .await
725 {
726 continue;
727 }
728
729 let Ok(presence_event) = self
730 .services
731 .presence
732 .from_json_bytes_to_event(presence_bytes, user_id)
733 .await
734 .log_err()
735 else {
736 continue;
737 };
738
739 let update = PresenceUpdate {
740 user_id: user_id.into(),
741 presence: presence_event.content.presence,
742 currently_active: presence_event
743 .content
744 .currently_active
745 .unwrap_or(false),
746 status_msg: presence_event.content.status_msg,
747 last_active_ago: presence_event
748 .content
749 .last_active_ago
750 .unwrap_or_else(|| uint!(0)),
751 };
752
753 presence_updates.insert(user_id.into(), update);
754 if presence_updates.len() >= SELECT_PRESENCE_LIMIT {
755 break;
756 }
757 }
758
759 if presence_updates.is_empty() {
760 return None;
761 }
762
763 if events_len.fetch_add(1, Ordering::Relaxed) >= EDU_LIMIT {
765 return None;
766 }
767
768 let presence_content = Edu::Presence(PresenceContent {
769 push: presence_updates.into_values().collect(),
770 });
771
772 let mut buf = EduBuf::new();
773 serde_json::to_writer(&mut buf, &presence_content)
774 .expect("failed to serialize Presence EDU to JSON");
775
776 Some(buf)
777 }
778
779 fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFuture<'_> {
780 debug_assert!(!events.is_empty(), "sending empty transaction");
781 match dest {
782 | Destination::Federation(server) => self
783 .send_events_dest_federation(server, events)
784 .boxed(),
785 | Destination::Appservice(id) => self
786 .send_events_dest_appservice(id, events)
787 .boxed(),
788 | Destination::Push(user_id, pushkey) => self
789 .send_events_dest_push(user_id, pushkey, events)
790 .boxed(),
791 }
792 }
793
794 #[tracing::instrument(
795 name = "appservice",
796 level = "debug",
797 skip(self, events),
798 fields(
799 events = %events.len(),
800 ),
801 )]
802 async fn send_events_dest_appservice(
803 &self,
804 id: String,
805 events: Vec<SendingEvent>,
806 ) -> SendingResult {
807 let Some(appservice) = self
808 .services
809 .appservice
810 .get_registration(&id)
811 .await
812 else {
813 return Err((
815 Destination::Appservice(id.clone()),
816 err!(Database(debug_warn!(?id, "Missing appservice registration"))),
817 ));
818 };
819
820 let mut pdu_jsons = Vec::with_capacity(
821 events
822 .iter()
823 .filter(|event| matches!(event, SendingEvent::Pdu(_)))
824 .count(),
825 );
826 let mut edu_jsons: Vec<Raw<EphemeralData>> = Vec::with_capacity(
827 events
828 .iter()
829 .filter(|event| matches!(event, SendingEvent::Edu(_)))
830 .count(),
831 );
832 for event in &events {
833 match event {
834 | SendingEvent::Pdu(pdu_id) => {
835 if let Ok(pdu) = self
836 .services
837 .timeline
838 .get_pdu_from_id(pdu_id)
839 .await
840 {
841 pdu_jsons.push(pdu.to_format());
842 }
843 },
844 | SendingEvent::Edu(edu) => {
845 if appservice.receive_ephemeral
846 && let Ok(edu) =
847 serde_json::from_slice(edu).and_then(|edu| Raw::new(&edu))
848 {
849 edu_jsons.push(edu);
850 }
851 },
852 | SendingEvent::Flush => {}, }
854 }
855
856 let txn_hash = calculate_hash(events.iter().filter_map(|e| match e {
857 | SendingEvent::Edu(b) => Some(b.as_ref()),
858 | SendingEvent::Pdu(b) => Some(b.as_ref()),
859 | SendingEvent::Flush => None,
860 }));
861
862 let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
863
864 match self
867 .services
868 .appservice
869 .send_request(appservice, PushEventsRequest {
870 txn_id: txn_id.into(),
871 events: pdu_jsons,
872 ephemeral: edu_jsons,
873 to_device: Vec::new(), })
875 .await
876 {
877 | Ok(_) => Ok(Destination::Appservice(id)),
878 | Err(e) => Err((Destination::Appservice(id), e)),
879 }
880 }
881
882 #[tracing::instrument(
883 name = "push",
884 level = "info",
885 skip(self, events),
886 fields(
887 events = %events.len(),
888 ),
889 )]
890 async fn send_events_dest_push(
891 &self,
892 user_id: OwnedUserId,
893 pushkey: String,
894 events: Vec<SendingEvent>,
895 ) -> SendingResult {
896 let suppressed = self.pushing_suppressed(&user_id).map(Ok);
897
898 let pusher = self
899 .services
900 .pusher
901 .get_pusher(&user_id, &pushkey)
902 .map_err(|_| {
903 (
904 Destination::Push(user_id.clone(), pushkey.clone()),
905 err!(Database(error!(?user_id, ?pushkey, "Missing pusher"))),
906 )
907 });
908
909 let rules_for_user = self
910 .services
911 .account_data
912 .get_global::<PushRulesEvent>(&user_id, GlobalAccountDataEventType::PushRules)
913 .map(|ev| {
914 ev.map_or_else(
915 |_| push::Ruleset::server_default(&user_id),
916 |ev| ev.content.global,
917 )
918 })
919 .map(Ok);
920
921 let (pusher, rules_for_user, suppressed) =
922 try_join3(pusher, rules_for_user, suppressed).await?;
923
924 if suppressed {
925 let queued = self
926 .enqueue_suppressed_push_events(&user_id, &pushkey, &events)
927 .await;
928
929 debug!(
930 ?user_id,
931 pushkey,
932 queued,
933 events = events.len(),
934 "Push suppressed; queued events"
935 );
936 return Ok(Destination::Push(user_id, pushkey));
937 }
938
939 self.schedule_flush_suppressed_for_pushkey(
940 user_id.clone(),
941 pushkey.clone(),
942 "non-suppressed push",
943 );
944
945 let _sent = events
946 .iter()
947 .stream()
948 .ready_filter_map(|event| extract_variant!(event, SendingEvent::Pdu))
949 .wide_filter_map(|pdu_id| {
950 self.services
951 .timeline
952 .get_pdu_from_id(pdu_id)
953 .ok()
954 })
955 .ready_filter(|pdu| !pdu.is_redacted())
956 .wide_filter_map(async |pdu| {
957 self.services
958 .pusher
959 .send_push_notice(&user_id, &pusher, &rules_for_user, &pdu)
960 .await
961 .map_err(|e| (Destination::Push(user_id.clone(), pushkey.clone()), e))
962 .ok()
963 })
964 .count()
965 .await;
966
967 Ok(Destination::Push(user_id, pushkey))
968 }
969
970 pub fn schedule_flush_suppressed_for_pushkey(
971 &self,
972 user_id: OwnedUserId,
973 pushkey: String,
974 reason: &'static str,
975 ) {
976 let sending = self.services.sending.clone();
977 let runtime = self.server.runtime();
978 runtime.spawn(async move {
979 sending
980 .flush_suppressed_for_pushkey(user_id, pushkey, reason)
981 .await;
982 });
983 }
984
985 pub fn schedule_flush_suppressed_for_user(&self, user_id: OwnedUserId, reason: &'static str) {
986 let sending = self.services.sending.clone();
987 let runtime = self.server.runtime();
988 runtime.spawn(async move {
989 sending
990 .flush_suppressed_for_user(user_id, reason)
991 .await;
992 });
993 }
994
995 async fn enqueue_suppressed_push_events(
996 &self,
997 user_id: &UserId,
998 pushkey: &str,
999 events: &[SendingEvent],
1000 ) -> usize {
1001 let mut queued = 0_usize;
1002 for event in events {
1003 let SendingEvent::Pdu(pdu_id) = event else {
1004 continue;
1005 };
1006
1007 let Ok(pdu) = self
1008 .services
1009 .timeline
1010 .get_pdu_from_id(pdu_id)
1011 .await
1012 else {
1013 debug!(?user_id, ?pdu_id, "Suppressing push but PDU is missing");
1014 continue;
1015 };
1016
1017 if pdu.is_redacted() {
1018 trace!(?user_id, ?pdu_id, "Suppressing push for redacted PDU");
1019 continue;
1020 }
1021
1022 if self.services.pusher.queue_suppressed_push(
1023 user_id,
1024 pushkey,
1025 pdu.room_id(),
1026 *pdu_id,
1027 ) {
1028 queued = queued.saturating_add(1);
1029 }
1030 }
1031
1032 queued
1033 }
1034
1035 async fn flush_suppressed_rooms(
1036 &self,
1037 user_id: &UserId,
1038 pushkey: &str,
1039 pusher: &Pusher,
1040 rules_for_user: &push::Ruleset,
1041 rooms: Vec<(OwnedRoomId, Vec<RawPduId>)>,
1042 reason: &'static str,
1043 ) {
1044 if rooms.is_empty() {
1045 return;
1046 }
1047
1048 let mut sent = 0_usize;
1049 debug!(?user_id, pushkey, rooms = rooms.len(), "Flushing suppressed pushes ({reason})");
1050
1051 for (room_id, pdu_ids) in rooms {
1052 let unread = self
1053 .services
1054 .pusher
1055 .notification_count(user_id, &room_id)
1056 .await;
1057
1058 if unread == 0 {
1059 trace!(?user_id, ?room_id, "Skipping suppressed push flush: no unread");
1060 continue;
1061 }
1062
1063 for pdu_id in pdu_ids {
1064 let Ok(pdu) = self
1065 .services
1066 .timeline
1067 .get_pdu_from_id(&pdu_id)
1068 .await
1069 else {
1070 debug!(?user_id, ?pdu_id, "Suppressed PDU missing during flush");
1071 continue;
1072 };
1073
1074 if pdu.is_redacted() {
1075 trace!(?user_id, ?pdu_id, "Suppressed PDU redacted during flush");
1076 continue;
1077 }
1078
1079 if let Err(error) = self
1080 .services
1081 .pusher
1082 .send_push_notice(user_id, pusher, rules_for_user, &pdu)
1083 .await
1084 {
1085 let requeued = self
1086 .services
1087 .pusher
1088 .queue_suppressed_push(user_id, pushkey, &room_id, pdu_id);
1089
1090 warn!(
1091 ?user_id,
1092 ?room_id,
1093 ?error,
1094 requeued,
1095 "Failed to send suppressed push notification"
1096 );
1097 } else {
1098 sent = sent.saturating_add(1);
1099 }
1100 }
1101 }
1102
1103 debug!(?user_id, pushkey, sent, "Flushed suppressed push notifications");
1104 }
1105
1106 async fn flush_suppressed_for_pushkey(
1107 &self,
1108 user_id: OwnedUserId,
1109 pushkey: String,
1110 reason: &'static str,
1111 ) {
1112 let suppressed = self
1113 .services
1114 .pusher
1115 .take_suppressed_for_pushkey(&user_id, &pushkey);
1116
1117 if suppressed.is_empty() {
1118 return;
1119 }
1120
1121 let pusher = match self
1122 .services
1123 .pusher
1124 .get_pusher(&user_id, &pushkey)
1125 .await
1126 {
1127 | Ok(pusher) => pusher,
1128 | Err(error) => {
1129 warn!(?user_id, pushkey, ?error, "Missing pusher for suppressed flush");
1130 return;
1131 },
1132 };
1133
1134 let rules_for_user = match self
1135 .services
1136 .account_data
1137 .get_global::<PushRulesEvent>(&user_id, GlobalAccountDataEventType::PushRules)
1138 .await
1139 {
1140 | Ok(ev) => ev.content.global,
1141 | Err(_) => push::Ruleset::server_default(&user_id),
1142 };
1143
1144 self.flush_suppressed_rooms(
1145 &user_id,
1146 &pushkey,
1147 &pusher,
1148 &rules_for_user,
1149 suppressed,
1150 reason,
1151 )
1152 .await;
1153 }
1154
1155 pub async fn flush_suppressed_for_user(&self, user_id: OwnedUserId, reason: &'static str) {
1156 let suppressed = self
1157 .services
1158 .pusher
1159 .take_suppressed_for_user(&user_id);
1160
1161 if suppressed.is_empty() {
1162 return;
1163 }
1164
1165 let rules_for_user = match self
1166 .services
1167 .account_data
1168 .get_global::<PushRulesEvent>(&user_id, GlobalAccountDataEventType::PushRules)
1169 .await
1170 {
1171 | Ok(ev) => ev.content.global,
1172 | Err(_) => push::Ruleset::server_default(&user_id),
1173 };
1174
1175 for (pushkey, rooms) in suppressed {
1176 let pusher = match self
1177 .services
1178 .pusher
1179 .get_pusher(&user_id, &pushkey)
1180 .await
1181 {
1182 | Ok(pusher) => pusher,
1183 | Err(error) => {
1184 warn!(?user_id, pushkey, ?error, "Missing pusher for suppressed flush");
1185 continue;
1186 },
1187 };
1188
1189 self.flush_suppressed_rooms(
1190 &user_id,
1191 &pushkey,
1192 &pusher,
1193 &rules_for_user,
1194 rooms,
1195 reason,
1196 )
1197 .await;
1198 }
1199 }
1200
1201 async fn pushing_suppressed(&self, user_id: &UserId) -> bool {
1204 if !self.services.config.suppress_push_when_active {
1205 debug!(?user_id, "push not suppressed: suppress_push_when_active disabled");
1206 return false;
1207 }
1208
1209 let Ok(presence) = self.services.presence.get_presence(user_id).await else {
1210 debug!(?user_id, "push not suppressed: presence unavailable");
1211 return false;
1212 };
1213
1214 if presence.content.presence != PresenceState::Online {
1215 debug!(
1216 ?user_id,
1217 presence = ?presence.content.presence,
1218 "push not suppressed: presence not online"
1219 );
1220 return false;
1221 }
1222
1223 let presence_age_ms = presence
1224 .content
1225 .last_active_ago
1226 .map(u64::from)
1227 .unwrap_or(u64::MAX);
1228
1229 if presence_age_ms >= 65_000 {
1230 debug!(?user_id, presence_age_ms, "push not suppressed: presence too old");
1231 return false;
1232 }
1233
1234 let sync_gap_ms = self
1235 .services
1236 .presence
1237 .last_sync_gap_ms(user_id)
1238 .await;
1239
1240 let considered_active = sync_gap_ms.is_some_and(|gap| gap < 32_000);
1241
1242 match sync_gap_ms {
1243 | Some(gap) if gap < 32_000 => debug!(
1244 ?user_id,
1245 presence_age_ms,
1246 sync_gap_ms = gap,
1247 "suppressing push: active heuristic"
1248 ),
1249 | Some(gap) => debug!(
1250 ?user_id,
1251 presence_age_ms,
1252 sync_gap_ms = gap,
1253 "push not suppressed: sync gap too large"
1254 ),
1255 | None => debug!(?user_id, presence_age_ms, "push not suppressed: no recent sync"),
1256 }
1257
1258 considered_active
1259 }
1260
1261 async fn send_events_dest_federation(
1262 &self,
1263 server: OwnedServerName,
1264 events: Vec<SendingEvent>,
1265 ) -> SendingResult {
1266 let pdus: Vec<_> = events
1267 .iter()
1268 .filter_map(|event| extract_variant!(event, SendingEvent::Pdu))
1269 .stream()
1270 .wide_filter_map(|pdu_id| {
1271 self.services
1272 .timeline
1273 .get_pdu_json_from_id(pdu_id)
1274 .ok()
1275 })
1276 .wide_then(|pdu| {
1277 self.services
1278 .federation
1279 .format_pdu_into(pdu, None)
1280 })
1281 .collect()
1282 .await;
1283
1284 let edus: Vec<Raw<Edu>> = events
1285 .iter()
1286 .filter_map(|edu| match edu {
1287 | SendingEvent::Edu(edu) => Some(edu.as_ref()),
1288 | _ => None,
1289 })
1290 .map(serde_json::from_slice)
1291 .filter_map(Result::ok)
1292 .collect();
1293
1294 if pdus.is_empty() && edus.is_empty() {
1295 return Ok(Destination::Federation(server));
1296 }
1297
1298 let preimage = pdus
1299 .iter()
1300 .map(|raw| raw.get().as_bytes())
1301 .chain(edus.iter().map(|raw| raw.json().get().as_bytes()));
1302
1303 let txn_hash = calculate_hash(preimage);
1304 let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
1305 let request = send_transaction_message::v1::Request {
1306 transaction_id: txn_id.into(),
1307 origin: self.server.name.clone(),
1308 origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
1309 pdus,
1310 edus,
1311 };
1312
1313 let result = self
1314 .services
1315 .federation
1316 .execute_on(&self.services.client.sender, &server, request)
1317 .await;
1318
1319 for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
1320 if let Err(e) = result {
1321 warn!(
1322 %txn_id, %server,
1323 "error sending PDU {event_id} to remote server: {e:?}"
1324 );
1325 }
1326 }
1327
1328 match result {
1329 | Ok(_) => Ok(Destination::Federation(server)),
1330 | Err(error) => Err((Destination::Federation(server), error)),
1331 }
1332 }
1333}