Skip to main content

tuwunel_service/sending/
sender.rs

1use std::{
2	collections::{BTreeMap, HashMap, HashSet, btree_map::Entry},
3	fmt::Debug,
4	sync::{
5		Arc,
6		atomic::{AtomicU64, AtomicUsize, Ordering},
7	},
8	time::{Duration, Instant},
9};
10
11use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
12use futures::{
13	FutureExt, StreamExt, TryFutureExt,
14	future::{BoxFuture, join3, try_join3},
15	pin_mut,
16	stream::FuturesUnordered,
17};
18use ruma::{
19	MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName,
20	UserId,
21	api::{
22		appservice::event::push_events::v1::{EphemeralData, Request as PushEventsRequest},
23		client::push::Pusher,
24		federation::transactions::{
25			edu::{
26				DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent,
27				ReceiptData, ReceiptMap,
28			},
29			send_transaction_message,
30		},
31	},
32	device_id,
33	events::{
34		AnySyncEphemeralRoomEvent, GlobalAccountDataEventType, push_rules::PushRulesEvent,
35		receipt::ReceiptType,
36	},
37	presence::PresenceState,
38	push,
39	serde::Raw,
40	uint,
41};
42use tuwunel_core::{
43	Error, Event, Result, debug, err, error, extract_variant,
44	result::LogErr,
45	smallvec::SmallVec,
46	trace,
47	utils::{
48		BoolExt, ReadyExt, calculate_hash, continue_exponential_backoff_secs,
49		future::TryExtExt,
50		stream::{BroadbandExt, IterStream, WidebandExt},
51	},
52	warn,
53};
54
55use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
56use crate::{federation::ShouldAttempt, rooms::timeline::RawPduId};
57
58/// In-flight bookkeeping for one `Destination`. Cross-attempt backoff lives
59/// in `peer_status` (federation only); appservice/push paths keep their own
60/// status because they are not server-keyed.
61#[derive(Debug)]
62enum TransactionStatus {
63	Running,
64	Failed(u32, Instant), // push backoff: tries, last failure
65	Retrying(u32),        // number of times failed
66}
67
68type SendingError = (Destination, Error);
69type SendingResult = Result<Destination, SendingError>;
70type SendingFuture<'a> = BoxFuture<'a, SendingResult>;
71type SendingFutures<'a> = FuturesUnordered<SendingFuture<'a>>;
72type CurTransactionStatus = HashMap<Destination, TransactionStatus>;
73
74/// Per-(room, user) bucket of `ReceiptData`. MSC3771 allows one receipt
75/// per thread context per user per EDU window; the dominant case is
76/// still a single receipt, so inline-1 fits without a heap touch.
77type UserReceipts = SmallVec<[ReceiptData; 1]>;
78
79/// Per-rank slice of receipt EDU output. Each entry becomes one
80/// `Edu::Receipt` buffer; rank 0 carries each user's earliest receipt
81/// in the window, rank 1 the next, and so on. Most windows produce a
82/// single rank.
83type RankedReceipts = SmallVec<[ReceiptMap; 1]>;
84
85/// Per-room ranked receipts gathered for one federation EDU window. The
86/// common case is a single room, so inline-1 avoids a heap touch.
87type RoomReceipts = SmallVec<[(OwnedRoomId, RankedReceipts); 1]>;
88
89const SELECT_PRESENCE_LIMIT: usize = 256;
90const SELECT_RECEIPT_LIMIT: usize = 256;
91const DEQUEUE_LIMIT: usize = 48;
92
93pub const PDU_LIMIT: usize = 50;
94pub const EDU_LIMIT: usize = 100;
95
96impl Service {
97	#[tracing::instrument(skip(self), level = "debug")]
98	pub(super) async fn sender(self: Arc<Self>, id: usize) -> Result {
99		let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
100		let mut futures: SendingFutures<'_> = FuturesUnordered::new();
101
102		self.startup_netburst(id, &mut futures, &mut statuses)
103			.boxed()
104			.await;
105
106		self.work_loop(id, &mut futures, &mut statuses)
107			.await;
108
109		if !futures.is_empty() {
110			self.finish_responses(&mut futures).boxed().await;
111		}
112
113		Ok(())
114	}
115
116	#[tracing::instrument(
117		name = "work",
118		level = "trace",
119		skip_all,
120		fields(
121			futures = %futures.len(),
122			statuses = %statuses.len(),
123		),
124	)]
125	async fn work_loop<'a>(
126		&'a self,
127		id: usize,
128		futures: &mut SendingFutures<'a>,
129		statuses: &mut CurTransactionStatus,
130	) {
131		let receiver = self
132			.channels
133			.get(id)
134			.map(|(_, receiver)| receiver.clone())
135			.expect("Missing channel for sender worker");
136
137		while !receiver.is_closed() {
138			tokio::select! {
139				Some(response) = futures.next() => {
140					self.handle_response(response, futures, statuses).await;
141				},
142				request = receiver.recv_async() => match request {
143					Ok(request) => self.handle_request(request, futures, statuses).await,
144					Err(_) => return,
145				},
146			}
147		}
148	}
149
150	#[tracing::instrument(name = "response", level = "debug", skip_all)]
151	async fn handle_response<'a>(
152		&'a self,
153		response: SendingResult,
154		futures: &mut SendingFutures<'a>,
155		statuses: &mut CurTransactionStatus,
156	) {
157		match response {
158			| Err((dest, e)) => Self::handle_response_err(dest, statuses, &e),
159			| Ok(dest) =>
160				self.handle_response_ok(&dest, futures, statuses)
161					.await,
162		}
163	}
164
165	fn handle_response_err(dest: Destination, statuses: &mut CurTransactionStatus, e: &Error) {
166		debug!(?dest, "{e:?}");
167		// Push backs off locally; federation defers to peer_status, appservice retries.
168		let push = matches!(dest, Destination::Push(..));
169
170		statuses.entry(dest).and_modify(|e| {
171			let tries = match e {
172				| TransactionStatus::Running => 1,
173				| TransactionStatus::Failed(n, _) | TransactionStatus::Retrying(n) =>
174					n.saturating_add(1),
175			};
176
177			*e = if push {
178				TransactionStatus::Failed(tries, Instant::now())
179			} else {
180				TransactionStatus::Retrying(tries)
181			};
182		});
183	}
184
185	#[expect(clippy::needless_pass_by_ref_mut)]
186	async fn handle_response_ok<'a>(
187		&'a self,
188		dest: &Destination,
189		futures: &mut SendingFutures<'a>,
190		statuses: &mut CurTransactionStatus,
191	) {
192		let _cork = self.db.db.cork();
193		self.db.delete_all_active_requests_for(dest).await;
194
195		// Find events that have been added since starting the last request
196		let new_events = self
197			.db
198			.queued_requests(dest)
199			.take(DEQUEUE_LIMIT)
200			.collect::<Vec<_>>()
201			.await;
202
203		// Insert any pdus we found
204		if !new_events.is_empty() {
205			self.db.mark_as_active(new_events.iter());
206
207			let new_events_vec = new_events
208				.into_iter()
209				.map(|(_, event)| event)
210				.collect();
211
212			futures.push(self.send_events(dest.clone(), new_events_vec));
213		} else {
214			statuses.remove(dest);
215		}
216	}
217
218	#[expect(clippy::needless_pass_by_ref_mut)]
219	#[tracing::instrument(name = "request", level = "debug", skip_all)]
220	async fn handle_request<'a>(
221		&'a self,
222		msg: Msg,
223		futures: &mut SendingFutures<'a>,
224		statuses: &mut CurTransactionStatus,
225	) {
226		let iv = vec![(msg.queue_id, msg.event)];
227		if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses).await {
228			if !events.is_empty() {
229				futures.push(self.send_events(msg.dest, events));
230			} else {
231				statuses.remove(&msg.dest);
232			}
233		}
234	}
235
236	#[tracing::instrument(
237		name = "finish",
238		level = "info",
239		skip_all,
240		fields(futures = %futures.len()),
241	)]
242	async fn finish_responses<'a>(&'a self, futures: &mut SendingFutures<'a>) {
243		use tokio::{
244			select,
245			time::{Instant, sleep_until},
246		};
247
248		let timeout = self.server.config.sender_shutdown_timeout;
249		let timeout = Duration::from_secs(timeout);
250		let now = Instant::now();
251		let deadline = now.checked_add(timeout).unwrap_or(now);
252		loop {
253			trace!("Waiting for {} requests to complete...", futures.len());
254			select! {
255				() = sleep_until(deadline) => return,
256				response = futures.next() => match response {
257					Some(Ok(dest)) => self.db.delete_all_active_requests_for(&dest).await,
258					Some(_) => continue,
259					None => return,
260				},
261			}
262		}
263	}
264
265	#[tracing::instrument(
266		name = "netburst",
267		level = "debug",
268		skip_all,
269		fields(futures = %futures.len()),
270	)]
271	#[expect(clippy::needless_pass_by_ref_mut)]
272	async fn startup_netburst<'a>(
273		&'a self,
274		id: usize,
275		futures: &mut SendingFutures<'a>,
276		statuses: &mut CurTransactionStatus,
277	) {
278		let keep =
279			usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
280
281		let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
282		let active = self.db.active_requests();
283
284		pin_mut!(active);
285		while let Some((key, event, dest)) = active.next().await {
286			if self.shard_id(&dest) != id {
287				continue;
288			}
289
290			let entry = txns.entry(dest.clone()).or_default();
291			if self.server.config.startup_netburst_keep >= 0 && entry.len() >= keep {
292				warn!("Dropping unsent event {dest:?} {:?}", String::from_utf8_lossy(&key));
293				self.db.delete_active_request(&key);
294			} else {
295				entry.push(event);
296			}
297		}
298
299		for (dest, events) in txns {
300			if self.server.config.startup_netburst && !events.is_empty() {
301				statuses.insert(dest.clone(), TransactionStatus::Running);
302				futures.push(self.send_events(dest.clone(), events));
303			}
304		}
305	}
306
307	#[tracing::instrument(
308		name = "select",
309		level = "debug",
310		skip_all,
311		fields(
312			?dest,
313			new_events = %new_events.len(),
314		),
315	)]
316	async fn select_events(
317		&self,
318		dest: &Destination,
319		new_events: Vec<QueueItem>, // Events we want to send: event and full key
320		statuses: &mut CurTransactionStatus,
321	) -> Result<Option<Vec<SendingEvent>>> {
322		let (allow, retry) = self.select_events_current(dest, statuses).await?;
323
324		// Nothing can be done for this remote, bail out.
325		if !allow {
326			return Ok(None);
327		}
328
329		let mut events = Vec::new();
330
331		// Must retry any previous transaction for this remote.
332		if retry {
333			self.db
334				.active_requests_for(dest)
335				.ready_for_each(|(_, e)| events.push(e))
336				.await;
337
338			return Ok(Some(events));
339		}
340
341		// Compose the next transaction
342		let _cork = self.db.db.cork();
343		if !new_events.is_empty() {
344			self.db.mark_as_active(new_events.iter());
345			for (_, e) in new_events {
346				events.push(e);
347			}
348		}
349
350		// Add EDU's into the transaction
351		if let Destination::Federation(server_name) = dest
352			&& let Ok((select_edus, last_count)) = self.select_edus(server_name).await
353		{
354			debug_assert!(select_edus.len() <= EDU_LIMIT, "exceeded edus limit");
355			let select_edus = select_edus.into_iter().map(SendingEvent::Edu);
356
357			events.extend(select_edus);
358			self.db
359				.set_latest_educount(server_name, last_count);
360		}
361
362		Ok(Some(events))
363	}
364
365	async fn select_events_current(
366		&self,
367		dest: &Destination,
368		statuses: &mut CurTransactionStatus,
369	) -> Result<(bool, bool)> {
370		// peer_status gates federation only; appservice and push fall through.
371		if let Destination::Federation(server) = dest {
372			let should_attempt = self
373				.services
374				.federation
375				.should_attempt(server)
376				.await;
377
378			if matches!(should_attempt, ShouldAttempt::No { .. }) {
379				return Ok((false, false));
380			}
381		}
382
383		let (mut allow, mut retry) = (true, false);
384		statuses
385			.entry(dest.clone())
386			.and_modify(|e| match e {
387				| TransactionStatus::Running => {
388					allow = false; // already running
389				},
390				| TransactionStatus::Failed(tries, time) => {
391					// Push backoff: hold off until the exponential window elapses.
392					let min = self.server.config.sender_timeout;
393					let max = self.server.config.sender_retry_backoff_limit;
394					if continue_exponential_backoff_secs(min, max, time.elapsed(), *tries) {
395						allow = false;
396					} else {
397						retry = true;
398						*e = TransactionStatus::Retrying(*tries);
399					}
400				},
401				| TransactionStatus::Retrying(_) if matches!(dest, Destination::Push(..)) => {
402					allow = false; // push retry already in flight
403				},
404				| TransactionStatus::Retrying(_) => {
405					// Promote to Running so a concurrent select does not double-send.
406					retry = true;
407					*e = TransactionStatus::Running;
408				},
409			})
410			.or_insert(TransactionStatus::Running);
411
412		Ok((allow, retry))
413	}
414
415	#[tracing::instrument(name = "edus", level = "debug", skip_all)]
416	async fn select_edus(&self, server_name: &ServerName) -> Result<(EduVec, u64)> {
417		// selection window
418		let since = self.db.get_latest_educount(server_name).await;
419		let since_upper = self.services.globals.current_count();
420		let batch = (since, since_upper);
421		debug_assert!(batch.0 <= batch.1, "since range must not be negative");
422
423		let events_len = AtomicUsize::default();
424		let max_edu_count = AtomicU64::new(since);
425		let device_changes =
426			self.select_edus_device_changes(server_name, batch, &max_edu_count, &events_len);
427
428		let receipts = self
429			.server
430			.config
431			.allow_outgoing_read_receipts
432			.then_async(|| {
433				self.select_edus_receipts(server_name, batch, &max_edu_count, &events_len)
434			});
435
436		let presence = self
437			.server
438			.config
439			.allow_outgoing_presence
440			.then_async(|| {
441				self.select_edus_presence(server_name, batch, &max_edu_count, &events_len)
442			});
443
444		let (device_changes, receipts, presence) =
445			join3(device_changes, receipts, presence).await;
446
447		let mut events = device_changes;
448
449		events.extend(presence.into_iter().flatten());
450		events.extend(receipts.into_iter().flatten());
451
452		Ok((events, max_edu_count.load(Ordering::Acquire)))
453	}
454
455	/// Look for device changes
456	#[tracing::instrument(
457		name = "device_changes",
458		level = "trace",
459		skip(self, server_name, max_edu_count, events_len)
460	)]
461	async fn select_edus_device_changes(
462		&self,
463		server_name: &ServerName,
464		since: (u64, u64),
465		max_edu_count: &AtomicU64,
466		events_len: &AtomicUsize,
467	) -> EduVec {
468		let mut events = EduVec::new();
469		let server_rooms = self
470			.services
471			.state_cache
472			.server_rooms(server_name);
473
474		pin_mut!(server_rooms);
475		let mut device_list_changes = HashSet::<OwnedUserId>::new();
476		while let Some(room_id) = server_rooms.next().await {
477			let keys_changed = self
478				.services
479				.users
480				.room_keys_changed(room_id, since.0, Some(since.1))
481				.ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id));
482
483			pin_mut!(keys_changed);
484			while let Some((user_id, count)) = keys_changed.next().await {
485				debug_assert!(count <= since.1, "exceeds upper-bound");
486
487				max_edu_count.fetch_max(count, Ordering::Relaxed);
488				if !device_list_changes.insert(user_id.into()) {
489					continue;
490				}
491
492				// Empty prev id forces synapse to resync; because synapse resyncs,
493				// we can just insert placeholder data
494				let edu = Edu::DeviceListUpdate(DeviceListUpdateContent {
495					user_id: user_id.into(),
496					device_id: device_id!("placeholder").to_owned(),
497					device_display_name: Some("Placeholder".to_owned()),
498					stream_id: uint!(1),
499					prev_id: Vec::new(),
500					deleted: None,
501					keys: None,
502				});
503
504				let mut buf = EduBuf::new();
505				serde_json::to_writer(&mut buf, &edu)
506					.expect("failed to serialize device list update to JSON");
507
508				// Reserve before push so concurrent producers see the count first.
509				if events_len.fetch_add(1, Ordering::Relaxed) >= EDU_LIMIT {
510					return events;
511				}
512
513				events.push(buf);
514			}
515		}
516
517		events
518	}
519
520	/// Look for read receipts in this room
521	///
522	/// MSC3771 lets a user emit multiple receipts in the same EDU window, one
523	/// per thread context. The federation EDU shape allows only one
524	/// `ReceiptData` per `(room, user)` slot, so a user with N parallel
525	/// thread receipts ships across N parallel `Edu::Receipt` buffers within
526	/// the same transaction. Each buffer is shape-compliant; receivers
527	/// process them as independent receipt EDUs and our storage keeps each
528	/// thread distinct.
529	#[tracing::instrument(
530		name = "receipts",
531		level = "trace",
532		skip(self, server_name, max_edu_count, events_len)
533	)]
534	async fn select_edus_receipts(
535		&self,
536		server_name: &ServerName,
537		since: (u64, u64),
538		max_edu_count: &AtomicU64,
539		events_len: &AtomicUsize,
540	) -> EduVec {
541		let num = AtomicUsize::new(0);
542		let by_room: RoomReceipts = self
543			.services
544			.state_cache
545			.server_rooms(server_name)
546			.map(ToOwned::to_owned)
547			.broad_filter_map(async |room_id| {
548				let ranked = self
549					.select_edus_receipts_room(&room_id, since, max_edu_count, &num)
550					.await;
551
552				ranked
553					.is_empty()
554					.is_false()
555					.then_some((room_id, ranked))
556			})
557			.collect()
558			.boxed()
559			.await;
560
561		let max_rank = by_room
562			.iter()
563			.map(|(_, maps)| maps.len())
564			.max()
565			.unwrap_or(0);
566
567		let pivot_rank = |rank: usize| -> Option<BTreeMap<OwnedRoomId, ReceiptMap>> {
568			let receipts: BTreeMap<_, _> = by_room
569				.iter()
570				.filter_map(|(room_id, maps)| {
571					maps.get(rank)
572						.cloned()
573						.map(|map| (room_id.clone(), map))
574				})
575				.collect();
576
577			receipts.is_empty().is_false().then_some(receipts)
578		};
579
580		let serialize_edu = |receipts: BTreeMap<OwnedRoomId, ReceiptMap>| -> EduBuf {
581			let mut buf = EduBuf::new();
582			serde_json::to_writer(&mut buf, &Edu::Receipt(ReceiptContent { receipts }))
583				.expect("Failed to serialize Receipt EDU to JSON vec");
584
585			buf
586		};
587
588		// Reserve a slot per rank from the shared EDU budget.
589		let reserve = |_: &_| events_len.fetch_add(1, Ordering::Relaxed) < EDU_LIMIT;
590
591		(0..max_rank)
592			.filter_map(pivot_rank)
593			.take_while(reserve)
594			.map(serialize_edu)
595			.collect()
596	}
597
598	/// Look for read receipts in this room.
599	///
600	/// Returns a per-rank vector of [`ReceiptMap`]s. Each user's receipts in
601	/// the window (one per thread context, count-ordered) are placed into
602	/// successive ranks, so rank 0 carries each user's earliest receipt,
603	/// rank 1 the next, and so on. The receipt-limit budget bounds distinct
604	/// users only; subsequent thread receipts for an already-counted user do
605	/// not consume additional budget.
606	#[tracing::instrument(
607		name = "receipts",
608		level = "trace",
609		skip(self, since, max_edu_count)
610	)]
611	async fn select_edus_receipts_room(
612		&self,
613		room_id: &RoomId,
614		since: (u64, u64),
615		max_edu_count: &AtomicU64,
616		num: &AtomicUsize,
617	) -> RankedReceipts {
618		let receipts =
619			self.services
620				.read_receipt
621				.readreceipts_since(room_id, since.0, Some(since.1));
622
623		pin_mut!(receipts);
624		let mut by_user = BTreeMap::<OwnedUserId, UserReceipts>::new();
625		while let Some((user_id, count, read_receipt)) = receipts.next().await {
626			debug_assert!(count <= since.1, "exceeds upper-bound");
627
628			max_edu_count.fetch_max(count, Ordering::Relaxed);
629			if !self.services.globals.user_is_local(user_id) {
630				continue;
631			}
632
633			let Ok(event) = serde_json::from_str(read_receipt.json().get()) else {
634				error!(?user_id, ?count, ?read_receipt, "Invalid edu event in read_receipts.");
635				continue;
636			};
637
638			let AnySyncEphemeralRoomEvent::Receipt(r) = event else {
639				error!(?user_id, ?count, ?event, "Invalid event type in read_receipts");
640				continue;
641			};
642
643			let (event_id, mut receipt) = r
644				.content
645				.0
646				.into_iter()
647				.next()
648				.expect("we only use one event per read receipt");
649
650			let receipt = receipt
651				.remove(&ReceiptType::Read)
652				.expect("our read receipts always set this")
653				.remove(user_id)
654				.expect("our read receipts always have the user here");
655
656			let receipt_data = ReceiptData { data: receipt, event_ids: vec![event_id] };
657
658			match by_user.entry(user_id.to_owned()) {
659				| Entry::Vacant(slot) => {
660					slot.insert(SmallVec::from_buf([receipt_data]));
661					let num = num.fetch_add(1, Ordering::Relaxed);
662					if num >= SELECT_RECEIPT_LIMIT {
663						break;
664					}
665				},
666				| Entry::Occupied(mut slot) => {
667					slot.get_mut().push(receipt_data);
668				},
669			}
670		}
671
672		// Pivot per-user count-ordered receipts into rank-major
673		// `RankedReceipts`. Rank 0 carries each user's earliest receipt in
674		// the window, rank 1 the next, and so on.
675		by_user
676			.into_iter()
677			.fold(RankedReceipts::new(), |mut acc, (user_id, receipts)| {
678				for (rank, receipt_data) in receipts.into_iter().enumerate() {
679					if rank >= acc.len() {
680						acc.push(ReceiptMap { read: BTreeMap::new() });
681					}
682
683					acc[rank]
684						.read
685						.insert(user_id.clone(), receipt_data);
686				}
687
688				acc
689			})
690	}
691
692	/// Look for presence
693	#[tracing::instrument(
694		name = "presence",
695		level = "trace",
696		skip(self, server_name, max_edu_count, events_len)
697	)]
698	async fn select_edus_presence(
699		&self,
700		server_name: &ServerName,
701		since: (u64, u64),
702		max_edu_count: &AtomicU64,
703		events_len: &AtomicUsize,
704	) -> Option<EduBuf> {
705		let presence_since = self
706			.services
707			.presence
708			.presence_since(since.0, Some(since.1));
709
710		pin_mut!(presence_since);
711		let mut presence_updates = HashMap::<OwnedUserId, PresenceUpdate>::new();
712		while let Some((user_id, count, presence_bytes)) = presence_since.next().await {
713			debug_assert!(count <= since.1, "exceeded upper-bound");
714
715			max_edu_count.fetch_max(count, Ordering::Relaxed);
716			if !self.services.globals.user_is_local(user_id) {
717				continue;
718			}
719
720			if !self
721				.services
722				.state_cache
723				.server_sees_user(server_name, user_id)
724				.await
725			{
726				continue;
727			}
728
729			let Ok(presence_event) = self
730				.services
731				.presence
732				.from_json_bytes_to_event(presence_bytes, user_id)
733				.await
734				.log_err()
735			else {
736				continue;
737			};
738
739			let update = PresenceUpdate {
740				user_id: user_id.into(),
741				presence: presence_event.content.presence,
742				currently_active: presence_event
743					.content
744					.currently_active
745					.unwrap_or(false),
746				status_msg: presence_event.content.status_msg,
747				last_active_ago: presence_event
748					.content
749					.last_active_ago
750					.unwrap_or_else(|| uint!(0)),
751			};
752
753			presence_updates.insert(user_id.into(), update);
754			if presence_updates.len() >= SELECT_PRESENCE_LIMIT {
755				break;
756			}
757		}
758
759		if presence_updates.is_empty() {
760			return None;
761		}
762
763		// Reserve our slot in the shared transaction budget before serializing.
764		if events_len.fetch_add(1, Ordering::Relaxed) >= EDU_LIMIT {
765			return None;
766		}
767
768		let presence_content = Edu::Presence(PresenceContent {
769			push: presence_updates.into_values().collect(),
770		});
771
772		let mut buf = EduBuf::new();
773		serde_json::to_writer(&mut buf, &presence_content)
774			.expect("failed to serialize Presence EDU to JSON");
775
776		Some(buf)
777	}
778
779	fn send_events(&self, dest: Destination, events: Vec<SendingEvent>) -> SendingFuture<'_> {
780		debug_assert!(!events.is_empty(), "sending empty transaction");
781		match dest {
782			| Destination::Federation(server) => self
783				.send_events_dest_federation(server, events)
784				.boxed(),
785			| Destination::Appservice(id) => self
786				.send_events_dest_appservice(id, events)
787				.boxed(),
788			| Destination::Push(user_id, pushkey) => self
789				.send_events_dest_push(user_id, pushkey, events)
790				.boxed(),
791		}
792	}
793
794	#[tracing::instrument(
795		name = "appservice",
796		level = "debug",
797		skip(self, events),
798		fields(
799			events = %events.len(),
800		),
801	)]
802	async fn send_events_dest_appservice(
803		&self,
804		id: String,
805		events: Vec<SendingEvent>,
806	) -> SendingResult {
807		let Some(appservice) = self
808			.services
809			.appservice
810			.get_registration(&id)
811			.await
812		else {
813			//TODO: appservice queue cleanup.
814			return Err((
815				Destination::Appservice(id.clone()),
816				err!(Database(debug_warn!(?id, "Missing appservice registration"))),
817			));
818		};
819
820		let mut pdu_jsons = Vec::with_capacity(
821			events
822				.iter()
823				.filter(|event| matches!(event, SendingEvent::Pdu(_)))
824				.count(),
825		);
826		let mut edu_jsons: Vec<Raw<EphemeralData>> = Vec::with_capacity(
827			events
828				.iter()
829				.filter(|event| matches!(event, SendingEvent::Edu(_)))
830				.count(),
831		);
832		for event in &events {
833			match event {
834				| SendingEvent::Pdu(pdu_id) => {
835					if let Ok(pdu) = self
836						.services
837						.timeline
838						.get_pdu_from_id(pdu_id)
839						.await
840					{
841						pdu_jsons.push(pdu.to_format());
842					}
843				},
844				| SendingEvent::Edu(edu) => {
845					if appservice.receive_ephemeral
846						&& let Ok(edu) =
847							serde_json::from_slice(edu).and_then(|edu| Raw::new(&edu))
848					{
849						edu_jsons.push(edu);
850					}
851				},
852				| SendingEvent::Flush => {}, // flush only; no new content
853			}
854		}
855
856		let txn_hash = calculate_hash(events.iter().filter_map(|e| match e {
857			| SendingEvent::Edu(b) => Some(b.as_ref()),
858			| SendingEvent::Pdu(b) => Some(b.as_ref()),
859			| SendingEvent::Flush => None,
860		}));
861
862		let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
863
864		//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
865		// transaction");
866		match self
867			.services
868			.appservice
869			.send_request(appservice, PushEventsRequest {
870				txn_id: txn_id.into(),
871				events: pdu_jsons,
872				ephemeral: edu_jsons,
873				to_device: Vec::new(), // TODO
874			})
875			.await
876		{
877			| Ok(_) => Ok(Destination::Appservice(id)),
878			| Err(e) => Err((Destination::Appservice(id), e)),
879		}
880	}
881
882	#[tracing::instrument(
883		name = "push",
884		level = "info",
885		skip(self, events),
886		fields(
887			events = %events.len(),
888		),
889	)]
890	async fn send_events_dest_push(
891		&self,
892		user_id: OwnedUserId,
893		pushkey: String,
894		events: Vec<SendingEvent>,
895	) -> SendingResult {
896		let suppressed = self.pushing_suppressed(&user_id).map(Ok);
897
898		let pusher = self
899			.services
900			.pusher
901			.get_pusher(&user_id, &pushkey)
902			.map_err(|_| {
903				(
904					Destination::Push(user_id.clone(), pushkey.clone()),
905					err!(Database(error!(?user_id, ?pushkey, "Missing pusher"))),
906				)
907			});
908
909		let rules_for_user = self
910			.services
911			.account_data
912			.get_global::<PushRulesEvent>(&user_id, GlobalAccountDataEventType::PushRules)
913			.map(|ev| {
914				ev.map_or_else(
915					|_| push::Ruleset::server_default(&user_id),
916					|ev| ev.content.global,
917				)
918			})
919			.map(Ok);
920
921		let (pusher, rules_for_user, suppressed) =
922			try_join3(pusher, rules_for_user, suppressed).await?;
923
924		if suppressed {
925			let queued = self
926				.enqueue_suppressed_push_events(&user_id, &pushkey, &events)
927				.await;
928
929			debug!(
930				?user_id,
931				pushkey,
932				queued,
933				events = events.len(),
934				"Push suppressed; queued events"
935			);
936			return Ok(Destination::Push(user_id, pushkey));
937		}
938
939		self.schedule_flush_suppressed_for_pushkey(
940			user_id.clone(),
941			pushkey.clone(),
942			"non-suppressed push",
943		);
944
945		let _sent = events
946			.iter()
947			.stream()
948			.ready_filter_map(|event| extract_variant!(event, SendingEvent::Pdu))
949			.wide_filter_map(|pdu_id| {
950				self.services
951					.timeline
952					.get_pdu_from_id(pdu_id)
953					.ok()
954			})
955			.ready_filter(|pdu| !pdu.is_redacted())
956			.wide_filter_map(async |pdu| {
957				self.services
958					.pusher
959					.send_push_notice(&user_id, &pusher, &rules_for_user, &pdu)
960					.await
961					.map_err(|e| (Destination::Push(user_id.clone(), pushkey.clone()), e))
962					.ok()
963			})
964			.count()
965			.await;
966
967		Ok(Destination::Push(user_id, pushkey))
968	}
969
970	pub fn schedule_flush_suppressed_for_pushkey(
971		&self,
972		user_id: OwnedUserId,
973		pushkey: String,
974		reason: &'static str,
975	) {
976		let sending = self.services.sending.clone();
977		let runtime = self.server.runtime();
978		runtime.spawn(async move {
979			sending
980				.flush_suppressed_for_pushkey(user_id, pushkey, reason)
981				.await;
982		});
983	}
984
985	pub fn schedule_flush_suppressed_for_user(&self, user_id: OwnedUserId, reason: &'static str) {
986		let sending = self.services.sending.clone();
987		let runtime = self.server.runtime();
988		runtime.spawn(async move {
989			sending
990				.flush_suppressed_for_user(user_id, reason)
991				.await;
992		});
993	}
994
995	async fn enqueue_suppressed_push_events(
996		&self,
997		user_id: &UserId,
998		pushkey: &str,
999		events: &[SendingEvent],
1000	) -> usize {
1001		let mut queued = 0_usize;
1002		for event in events {
1003			let SendingEvent::Pdu(pdu_id) = event else {
1004				continue;
1005			};
1006
1007			let Ok(pdu) = self
1008				.services
1009				.timeline
1010				.get_pdu_from_id(pdu_id)
1011				.await
1012			else {
1013				debug!(?user_id, ?pdu_id, "Suppressing push but PDU is missing");
1014				continue;
1015			};
1016
1017			if pdu.is_redacted() {
1018				trace!(?user_id, ?pdu_id, "Suppressing push for redacted PDU");
1019				continue;
1020			}
1021
1022			if self.services.pusher.queue_suppressed_push(
1023				user_id,
1024				pushkey,
1025				pdu.room_id(),
1026				*pdu_id,
1027			) {
1028				queued = queued.saturating_add(1);
1029			}
1030		}
1031
1032		queued
1033	}
1034
1035	async fn flush_suppressed_rooms(
1036		&self,
1037		user_id: &UserId,
1038		pushkey: &str,
1039		pusher: &Pusher,
1040		rules_for_user: &push::Ruleset,
1041		rooms: Vec<(OwnedRoomId, Vec<RawPduId>)>,
1042		reason: &'static str,
1043	) {
1044		if rooms.is_empty() {
1045			return;
1046		}
1047
1048		let mut sent = 0_usize;
1049		debug!(?user_id, pushkey, rooms = rooms.len(), "Flushing suppressed pushes ({reason})");
1050
1051		for (room_id, pdu_ids) in rooms {
1052			let unread = self
1053				.services
1054				.pusher
1055				.notification_count(user_id, &room_id)
1056				.await;
1057
1058			if unread == 0 {
1059				trace!(?user_id, ?room_id, "Skipping suppressed push flush: no unread");
1060				continue;
1061			}
1062
1063			for pdu_id in pdu_ids {
1064				let Ok(pdu) = self
1065					.services
1066					.timeline
1067					.get_pdu_from_id(&pdu_id)
1068					.await
1069				else {
1070					debug!(?user_id, ?pdu_id, "Suppressed PDU missing during flush");
1071					continue;
1072				};
1073
1074				if pdu.is_redacted() {
1075					trace!(?user_id, ?pdu_id, "Suppressed PDU redacted during flush");
1076					continue;
1077				}
1078
1079				if let Err(error) = self
1080					.services
1081					.pusher
1082					.send_push_notice(user_id, pusher, rules_for_user, &pdu)
1083					.await
1084				{
1085					let requeued = self
1086						.services
1087						.pusher
1088						.queue_suppressed_push(user_id, pushkey, &room_id, pdu_id);
1089
1090					warn!(
1091						?user_id,
1092						?room_id,
1093						?error,
1094						requeued,
1095						"Failed to send suppressed push notification"
1096					);
1097				} else {
1098					sent = sent.saturating_add(1);
1099				}
1100			}
1101		}
1102
1103		debug!(?user_id, pushkey, sent, "Flushed suppressed push notifications");
1104	}
1105
1106	async fn flush_suppressed_for_pushkey(
1107		&self,
1108		user_id: OwnedUserId,
1109		pushkey: String,
1110		reason: &'static str,
1111	) {
1112		let suppressed = self
1113			.services
1114			.pusher
1115			.take_suppressed_for_pushkey(&user_id, &pushkey);
1116
1117		if suppressed.is_empty() {
1118			return;
1119		}
1120
1121		let pusher = match self
1122			.services
1123			.pusher
1124			.get_pusher(&user_id, &pushkey)
1125			.await
1126		{
1127			| Ok(pusher) => pusher,
1128			| Err(error) => {
1129				warn!(?user_id, pushkey, ?error, "Missing pusher for suppressed flush");
1130				return;
1131			},
1132		};
1133
1134		let rules_for_user = match self
1135			.services
1136			.account_data
1137			.get_global::<PushRulesEvent>(&user_id, GlobalAccountDataEventType::PushRules)
1138			.await
1139		{
1140			| Ok(ev) => ev.content.global,
1141			| Err(_) => push::Ruleset::server_default(&user_id),
1142		};
1143
1144		self.flush_suppressed_rooms(
1145			&user_id,
1146			&pushkey,
1147			&pusher,
1148			&rules_for_user,
1149			suppressed,
1150			reason,
1151		)
1152		.await;
1153	}
1154
1155	pub async fn flush_suppressed_for_user(&self, user_id: OwnedUserId, reason: &'static str) {
1156		let suppressed = self
1157			.services
1158			.pusher
1159			.take_suppressed_for_user(&user_id);
1160
1161		if suppressed.is_empty() {
1162			return;
1163		}
1164
1165		let rules_for_user = match self
1166			.services
1167			.account_data
1168			.get_global::<PushRulesEvent>(&user_id, GlobalAccountDataEventType::PushRules)
1169			.await
1170		{
1171			| Ok(ev) => ev.content.global,
1172			| Err(_) => push::Ruleset::server_default(&user_id),
1173		};
1174
1175		for (pushkey, rooms) in suppressed {
1176			let pusher = match self
1177				.services
1178				.pusher
1179				.get_pusher(&user_id, &pushkey)
1180				.await
1181			{
1182				| Ok(pusher) => pusher,
1183				| Err(error) => {
1184					warn!(?user_id, pushkey, ?error, "Missing pusher for suppressed flush");
1185					continue;
1186				},
1187			};
1188
1189			self.flush_suppressed_rooms(
1190				&user_id,
1191				&pushkey,
1192				&pusher,
1193				&rules_for_user,
1194				rooms,
1195				reason,
1196			)
1197			.await;
1198		}
1199	}
1200
1201	// optional suppression: heuristic combining presence age and recent sync
1202	// activity.
1203	async fn pushing_suppressed(&self, user_id: &UserId) -> bool {
1204		if !self.services.config.suppress_push_when_active {
1205			debug!(?user_id, "push not suppressed: suppress_push_when_active disabled");
1206			return false;
1207		}
1208
1209		let Ok(presence) = self.services.presence.get_presence(user_id).await else {
1210			debug!(?user_id, "push not suppressed: presence unavailable");
1211			return false;
1212		};
1213
1214		if presence.content.presence != PresenceState::Online {
1215			debug!(
1216				?user_id,
1217				presence = ?presence.content.presence,
1218				"push not suppressed: presence not online"
1219			);
1220			return false;
1221		}
1222
1223		let presence_age_ms = presence
1224			.content
1225			.last_active_ago
1226			.map(u64::from)
1227			.unwrap_or(u64::MAX);
1228
1229		if presence_age_ms >= 65_000 {
1230			debug!(?user_id, presence_age_ms, "push not suppressed: presence too old");
1231			return false;
1232		}
1233
1234		let sync_gap_ms = self
1235			.services
1236			.presence
1237			.last_sync_gap_ms(user_id)
1238			.await;
1239
1240		let considered_active = sync_gap_ms.is_some_and(|gap| gap < 32_000);
1241
1242		match sync_gap_ms {
1243			| Some(gap) if gap < 32_000 => debug!(
1244				?user_id,
1245				presence_age_ms,
1246				sync_gap_ms = gap,
1247				"suppressing push: active heuristic"
1248			),
1249			| Some(gap) => debug!(
1250				?user_id,
1251				presence_age_ms,
1252				sync_gap_ms = gap,
1253				"push not suppressed: sync gap too large"
1254			),
1255			| None => debug!(?user_id, presence_age_ms, "push not suppressed: no recent sync"),
1256		}
1257
1258		considered_active
1259	}
1260
1261	async fn send_events_dest_federation(
1262		&self,
1263		server: OwnedServerName,
1264		events: Vec<SendingEvent>,
1265	) -> SendingResult {
1266		let pdus: Vec<_> = events
1267			.iter()
1268			.filter_map(|event| extract_variant!(event, SendingEvent::Pdu))
1269			.stream()
1270			.wide_filter_map(|pdu_id| {
1271				self.services
1272					.timeline
1273					.get_pdu_json_from_id(pdu_id)
1274					.ok()
1275			})
1276			.wide_then(|pdu| {
1277				self.services
1278					.federation
1279					.format_pdu_into(pdu, None)
1280			})
1281			.collect()
1282			.await;
1283
1284		let edus: Vec<Raw<Edu>> = events
1285			.iter()
1286			.filter_map(|edu| match edu {
1287				| SendingEvent::Edu(edu) => Some(edu.as_ref()),
1288				| _ => None,
1289			})
1290			.map(serde_json::from_slice)
1291			.filter_map(Result::ok)
1292			.collect();
1293
1294		if pdus.is_empty() && edus.is_empty() {
1295			return Ok(Destination::Federation(server));
1296		}
1297
1298		let preimage = pdus
1299			.iter()
1300			.map(|raw| raw.get().as_bytes())
1301			.chain(edus.iter().map(|raw| raw.json().get().as_bytes()));
1302
1303		let txn_hash = calculate_hash(preimage);
1304		let txn_id = &*URL_SAFE_NO_PAD.encode(txn_hash);
1305		let request = send_transaction_message::v1::Request {
1306			transaction_id: txn_id.into(),
1307			origin: self.server.name.clone(),
1308			origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
1309			pdus,
1310			edus,
1311		};
1312
1313		let result = self
1314			.services
1315			.federation
1316			.execute_on(&self.services.client.sender, &server, request)
1317			.await;
1318
1319		for (event_id, result) in result.iter().flat_map(|resp| resp.pdus.iter()) {
1320			if let Err(e) = result {
1321				warn!(
1322					%txn_id, %server,
1323					"error sending PDU {event_id} to remote server: {e:?}"
1324				);
1325			}
1326		}
1327
1328		match result {
1329			| Ok(_) => Ok(Destination::Federation(server)),
1330			| Err(error) => Err((Destination::Federation(server), error)),
1331		}
1332	}
1333}