Skip to main content

tuwunel_service/fetcher/
mod.rs

1//! Coalesced, failover federation fetch of raw event bytes.
2//!
3//! [`Service::fetch`] is the entry point; behind it a single worker task owns
4//! every in-flight fetch and the dedup map, so no lock guards them. The
5//! per-fetch work splits across the submodules: candidate selection, the
6//! federation transport, and response validation.
7
8mod error;
9mod inflight;
10mod opts;
11mod select;
12mod transport;
13mod validate;
14mod worker;
15
16#[cfg(test)]
17mod tests;
18
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use loole::{Receiver, Sender, unbounded};
23use tokio::sync::{
24	oneshot::{self, channel},
25	watch,
26};
27use tuwunel_core::{Result, implement};
28
29pub use self::opts::{EventWindow, FanoutGrowth, Op, Opts, Outcome};
30use self::{
31	error::Failure,
32	inflight::{Key, SharedResult, Subscription},
33	select::{RoomCandidates, Select},
34	transport::{FederationTransport, Transport},
35};
36use crate::services::OnceServices;
37
38/// Upper bound on concurrent in-flight fetches across all keys.
39const REQUESTS_MAX: usize = 256;
40
41pub struct Service {
42	services: Arc<OnceServices>,
43	channel: (Sender<Msg>, Receiver<Msg>),
44	transport: Arc<dyn Transport>,
45	select: Arc<dyn Select>,
46	capacity: usize,
47}
48
49/// Request to the worker. The worker replies with a subscription to the
50/// coalesced result, deferring the reply under backpressure until a slot frees.
51struct Msg {
52	key: Key,
53	opts: Opts,
54	reply: oneshot::Sender<Subscription>,
55}
56
57#[async_trait]
58impl crate::Service for Service {
59	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
60		let services = args.services.clone();
61		let transport: Arc<dyn Transport> =
62			Arc::new(FederationTransport { services: services.clone() });
63
64		let select: Arc<dyn Select> = Arc::new(RoomCandidates { services: services.clone() });
65
66		Ok(Arc::new(Self {
67			services,
68			channel: unbounded(),
69			transport,
70			select,
71			capacity: REQUESTS_MAX,
72		}))
73	}
74
75	async fn worker(self: Arc<Self>) -> Result {
76		self.run_worker().await;
77		Ok(())
78	}
79
80	async fn interrupt(&self) {
81		let (sender, _) = &self.channel;
82		if !sender.is_closed() {
83			sender.close();
84		}
85	}
86
87	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
88}
89
90/// Fetch raw response bytes for an event over federation, coalescing concurrent
91/// callers for the same key onto a single network attempt. Server selection,
92/// failover, and poison detection happen internally; the future resolves only
93/// once a clean response arrives or every candidate is exhausted.
94#[implement(Service)]
95#[tracing::instrument(
96	level = "debug",
97	skip_all,
98	fields(
99		op = ?opts.op,
100		room_id = ?opts.room_id,
101		event_id = ?opts.event_id,
102	),
103)]
104pub async fn fetch(&self, opts: Opts) -> Result<Arc<Outcome>> {
105	let key = Key::new(&opts);
106	let (reply, reply_rx) = channel();
107
108	self.channel
109		.0
110		.send(Msg { key, opts, reply })
111		.map_err(|_| Failure::Cancelled)?;
112
113	// Hold the strong interest token across the wait; its drop cancels the fetch.
114	let (rx, _interest) = reply_rx.await.map_err(|_| Failure::Cancelled)?;
115
116	await_result(rx).await.map_err(Into::into)
117}
118
119async fn await_result(mut rx: watch::Receiver<Option<SharedResult>>) -> SharedResult {
120	rx.wait_for(Option::is_some)
121		.await
122		.map_or(Err(Failure::Cancelled), |value| value.clone().expect("present by predicate"))
123}