Skip to main content

tuwunel_service/fetcher/
opts.rs

1//! Caller contract and result types for a fetch: [`Opts`] in, [`Outcome`] out.
2//!
3//! [`Op`] selects the federation endpoint and folds into the single-flight
4//! dedup key; [`FanoutGrowth`] schedules the staged fan-out width.
5
6use std::num::NonZeroUsize;
7
8use bytes::Bytes;
9use ruma::{
10	MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedServerName, RoomVersionId,
11	api::Direction,
12};
13use tuwunel_core::smallvec::SmallVec;
14
15use crate::federation::Candidates;
16
17/// Event-id window for the batch ops, inline-sized for the common single-prev
18/// case and spilling to the heap past that.
19pub type EventWindow = SmallVec<[OwnedEventId; 1]>;
20
21/// Federation endpoint a fetch targets. The dedup key folds this in, so two
22/// callers asking for the same event over different endpoints do not coalesce.
23#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
24pub enum Op {
25	/// `GET /_matrix/federation/v1/event/{eventId}`
26	Event,
27
28	/// `GET /_matrix/federation/v1/event/{eventId}` for an event fetched while
29	/// reconstructing an auth chain; routed like [`Op::Event`] but pins the
30	/// room's authority server ahead of the popularity ranking.
31	AuthEvent,
32
33	/// `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
34	AuthChain,
35
36	/// `GET /_matrix/federation/v1/backfill/{roomId}`
37	Backfill,
38
39	/// `GET /_matrix/federation/v1/state_ids/{roomId}?event_id=`
40	StateIds,
41
42	/// `POST /_matrix/federation/v1/get_missing_events/{roomId}`
43	MissingEvents,
44
45	/// `GET /_matrix/federation/v1/timestamp_to_event/{roomId}?ts=&dir=`
46	TimestampToEvent,
47}
48
49/// Per-round width schedule for staged fan-out: how many candidate servers a
50/// fetch races concurrently in each escalation round, before the worker's
51/// per-round ceiling and remaining-budget clamps. `Fixed(1)` (the `Opts::new`
52/// default) reproduces strictly-sequential attempts.
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub enum FanoutGrowth {
55	/// Every round races the same width.
56	Fixed(NonZeroUsize),
57
58	/// `base`, `base + step`, `base + 2*step`, ...
59	Linear {
60		base: NonZeroUsize,
61		step: NonZeroUsize,
62	},
63
64	/// `base`, `base * factor`, `base * factor^2`, ...  Base 1, factor 2 is the
65	/// 1 -> 2 -> 4 -> 8 hedging ramp.
66	Geometric {
67		base: NonZeroUsize,
68		factor: NonZeroUsize,
69	},
70}
71
72impl FanoutGrowth {
73	/// Width for round `round` (0-based). Always >= 1; saturating, so a runaway
74	/// exponent cannot overflow (the candidate pool and `attempt_limit` clamp
75	/// the value to something small regardless).
76	#[must_use]
77	pub fn round_width(self, round: usize) -> usize {
78		match self {
79			| Self::Fixed(width) => width.get(),
80			| Self::Linear { base, step } => base
81				.get()
82				.saturating_add(step.get().saturating_mul(round)),
83			| Self::Geometric { base, factor } => {
84				let exp = u32::try_from(round).unwrap_or(u32::MAX);
85
86				base.get()
87					.saturating_mul(factor.get().saturating_pow(exp))
88			},
89		}
90	}
91}
92
93/// Caller contract. `event_id` is the sought datum for [`Op::Event`] /
94/// [`Op::AuthEvent`] / [`Op::AuthChain`] / [`Op::StateIds`] and a reference
95/// point for the others.
96#[derive(Clone, Debug)]
97pub struct Opts {
98	/// Federation endpoint this fetch targets.
99	pub op: Op,
100
101	/// Room the fetch is scoped to, or `None` for an unscoped id-addressed
102	/// fetch.
103	pub room_id: Option<OwnedRoomId>,
104
105	/// Event to fetch (id-addressed ops) or anchor from (room-scoped ops).
106	pub event_id: Option<OwnedEventId>,
107
108	/// Timestamp the [`Op::TimestampToEvent`] search starts from; `None` for
109	/// every other op.
110	pub ts: Option<MilliSecondsSinceUnixEpoch>,
111
112	/// Direction the [`Op::TimestampToEvent`] search runs; `None` for every
113	/// other op.
114	pub dir: Option<Direction>,
115
116	/// Boundary events the requester already holds; an [`Op::MissingEvents`]
117	/// window stops its backward walk here. Empty for every other op.
118	pub earliest_events: EventWindow,
119
120	/// Frontier events an [`Op::MissingEvents`] window fills the predecessors
121	/// of. Empty for every other op.
122	pub latest_events: EventWindow,
123
124	/// Server to try ahead of the ranked candidates.
125	pub hint: Option<OwnedServerName>,
126
127	/// Caller-supplied candidate pool tried in place of the room-derived
128	/// ranking; empty defers to the room-derived candidates.
129	pub candidates: Candidates,
130
131	/// Room version governing id and signature checks; `None` assumes V11.
132	pub room_version: Option<RoomVersionId>,
133
134	/// Cap on candidate servers tried; `None` tries every candidate.
135	pub attempt_limit: Option<NonZeroUsize>,
136
137	/// Event count requested per [`Op::Backfill`] / [`Op::MissingEvents`] batch
138	/// response; defaults to 10.
139	pub backfill_limit: Option<NonZeroUsize>,
140
141	/// Per-round width curve for staged fan-out. `Fixed(1)` is sequential.
142	pub fanout_growth: FanoutGrowth,
143
144	/// Per-round concurrency ceiling. `None` lets the curve run free, clamped
145	/// only by the candidate pool and `attempt_limit`; `Some(n)` caps each
146	/// round at `n`.
147	pub fanout_max_width: Option<NonZeroUsize>,
148
149	/// Cap on escalation rounds before giving up. `None` runs until exhaustion.
150	pub fanout_rounds: Option<NonZeroUsize>,
151
152	/// Reject a response whose event does not hash to the requested id.
153	pub check_event_id: bool,
154
155	/// Reject a response that is not well-formed JSON.
156	pub check_conforms: bool,
157
158	/// Reject a response that fails content-hash verification.
159	pub check_hashes: bool,
160
161	/// Accepted but not yet consulted; redaction-aware hash verification is
162	/// unimplemented.
163	pub authoritative_redaction: bool,
164
165	/// Reject a response that fails signature verification.
166	pub check_signature: bool,
167}
168
169impl Opts {
170	/// Scope a fetch to a room.
171	#[must_use]
172	pub fn new(op: Op, room_id: OwnedRoomId) -> Self { Self::with_room_id(op, Some(room_id)) }
173
174	/// A fetch with no room scope, for id-addressed callers such as
175	/// `get-remote-pdu`; room-derived candidate ranking is skipped, leaving the
176	/// hint, the caller-supplied pool, and the event id's origin.
177	#[must_use]
178	pub fn unscoped(op: Op) -> Self { Self::with_room_id(op, None) }
179
180	/// All validation toggles default on; the caller relaxes them per request.
181	fn with_room_id(op: Op, room_id: Option<OwnedRoomId>) -> Self {
182		Self {
183			op,
184			room_id,
185			event_id: None,
186			ts: None,
187			dir: None,
188			earliest_events: EventWindow::new(),
189			latest_events: EventWindow::new(),
190			hint: None,
191			candidates: Candidates::new(),
192			room_version: None,
193			attempt_limit: None,
194			backfill_limit: None,
195			fanout_growth: FanoutGrowth::Fixed(NonZeroUsize::MIN),
196			fanout_max_width: None,
197			fanout_rounds: None,
198			check_event_id: true,
199			check_conforms: true,
200			check_hashes: true,
201			authoritative_redaction: true,
202			check_signature: true,
203		}
204	}
205
206	/// Set the target event; required for the id-addressed ops.
207	#[must_use]
208	pub fn event_id(self, event_id: OwnedEventId) -> Self {
209		Self { event_id: Some(event_id), ..self }
210	}
211
212	/// Set the timestamp the [`Op::TimestampToEvent`] search starts from.
213	#[must_use]
214	pub fn ts(self, ts: MilliSecondsSinceUnixEpoch) -> Self { Self { ts: Some(ts), ..self } }
215
216	/// Set the direction the [`Op::TimestampToEvent`] search runs.
217	#[must_use]
218	pub fn dir(self, dir: Direction) -> Self { Self { dir: Some(dir), ..self } }
219
220	/// Set the boundary the [`Op::MissingEvents`] backward walk stops at.
221	#[must_use]
222	pub fn earliest_events<I>(self, earliest_events: I) -> Self
223	where
224		I: IntoIterator<Item = OwnedEventId>,
225	{
226		Self {
227			earliest_events: earliest_events.into_iter().collect(),
228			..self
229		}
230	}
231
232	/// Set the frontier an [`Op::MissingEvents`] window fills behind.
233	#[must_use]
234	pub fn latest_events<I>(self, latest_events: I) -> Self
235	where
236		I: IntoIterator<Item = OwnedEventId>,
237	{
238		Self {
239			latest_events: latest_events.into_iter().collect(),
240			..self
241		}
242	}
243
244	/// Try the named server ahead of the ranked candidates.
245	#[must_use]
246	pub fn hint(self, hint: OwnedServerName) -> Self { Self { hint: Some(hint), ..self } }
247
248	/// Supply the candidate pool verbatim, bypassing the room-derived ranking.
249	#[must_use]
250	pub fn candidates<I>(self, candidates: I) -> Self
251	where
252		I: IntoIterator<Item = OwnedServerName>,
253	{
254		Self {
255			candidates: candidates.into_iter().collect(),
256			..self
257		}
258	}
259
260	/// Room version for [`Op::Event`] id and signature checks. `None` keeps the
261	/// V11 default, so callers on a non-V11 room must name it to avoid a
262	/// spurious rejection.
263	#[must_use]
264	pub fn room_version(self, room_version: RoomVersionId) -> Self {
265		Self { room_version: Some(room_version), ..self }
266	}
267
268	/// Cap the number of candidate servers tried.
269	#[must_use]
270	pub fn attempt_limit(self, attempt_limit: NonZeroUsize) -> Self {
271		Self {
272			attempt_limit: Some(attempt_limit),
273			..self
274		}
275	}
276
277	/// Set the events requested per [`Op::Backfill`] / [`Op::MissingEvents`]
278	/// batch.
279	#[must_use]
280	pub fn backfill_limit(self, backfill_limit: NonZeroUsize) -> Self {
281		Self {
282			backfill_limit: Some(backfill_limit),
283			..self
284		}
285	}
286
287	/// Set the per-round fan-out width schedule.
288	#[must_use]
289	pub fn fanout(self, growth: FanoutGrowth) -> Self { Self { fanout_growth: growth, ..self } }
290
291	/// Cap the per-round fan-out concurrency.
292	#[must_use]
293	pub fn fanout_max_width(self, max_width: NonZeroUsize) -> Self {
294		Self {
295			fanout_max_width: Some(max_width),
296			..self
297		}
298	}
299
300	/// Cap the number of escalation rounds.
301	#[must_use]
302	pub fn fanout_rounds(self, rounds: NonZeroUsize) -> Self {
303		Self { fanout_rounds: Some(rounds), ..self }
304	}
305
306	/// Apply the op's advised staged-fan-out ramp. `Opts::new` is otherwise
307	/// dark on every op, so a callsite opts in by chaining this; the generic
308	/// and single-shot-batch ops keep the sequential default.
309	#[must_use]
310	pub fn fanout_for_op(self) -> Self {
311		use FanoutGrowth::{Geometric, Linear};
312
313		const ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
314		const TWO: NonZeroUsize = NonZeroUsize::new(2).unwrap();
315		const THREE: NonZeroUsize = NonZeroUsize::new(3).unwrap();
316		const FOUR: NonZeroUsize = NonZeroUsize::new(4).unwrap();
317		const FIVE: NonZeroUsize = NonZeroUsize::new(5).unwrap();
318
319		match self.op {
320			| Op::AuthEvent => self
321				.fanout(Geometric { base: ONE, factor: TWO })
322				.fanout_max_width(FOUR)
323				.fanout_rounds(FIVE),
324			| Op::AuthChain => self
325				.fanout(Linear { base: ONE, step: ONE })
326				.fanout_max_width(TWO)
327				.fanout_rounds(TWO),
328			| Op::StateIds => self
329				.fanout(Linear { base: ONE, step: ONE })
330				.fanout_max_width(THREE)
331				.fanout_rounds(THREE),
332			| Op::MissingEvents => self
333				.fanout(Geometric { base: ONE, factor: TWO })
334				.fanout_rounds(THREE),
335			| Op::Event | Op::Backfill | Op::TimestampToEvent => self,
336		}
337	}
338
339	/// Toggle every validation gate at once. Callers that re-validate
340	/// downstream pass `false` to fetch raw bytes without rejecting non-V11
341	/// events.
342	#[must_use]
343	pub fn checks(self, enabled: bool) -> Self {
344		Self {
345			check_event_id: enabled,
346			check_conforms: enabled,
347			check_hashes: enabled,
348			check_signature: enabled,
349			..self
350		}
351	}
352}
353
354/// Raw response body plus the server that answered. `bytes` is ref-counted so
355/// concurrent callers coalesced onto one fetch share a single buffer.
356#[derive(Debug)]
357pub struct Outcome {
358	pub bytes: Bytes,
359	pub origin: OwnedServerName,
360}