Skip to main content

tuwunel_service/fetcher/
inflight.rs

1//! Single-flight bookkeeping for one in-flight fetch.
2//!
3//! [`Key`] is the dedup key the worker's in-flight map is keyed on;
4//! [`Inflight`] is the worker-owned entry every coalesced caller subscribes to;
5//! [`SharedResult`] is the broadcast outcome and [`Subscription`] the caller's
6//! handle, whose liveness token cancels the fetch on drop.
7
8use std::{
9	collections::hash_map::DefaultHasher,
10	hash::{Hash, Hasher},
11	sync::{Arc, Weak},
12};
13
14use ruma::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, api::Direction};
15use tokio::sync::watch::{Receiver, Sender};
16use tuwunel_core::smallvec::SmallVec;
17
18use super::{Failure, Op, Opts, Outcome};
19
20/// Borrowed window ids gathered for sorting before hashing; inline-sized for
21/// the common single-prev case.
22type WindowRefs<'a> = SmallVec<[&'a OwnedEventId; 1]>;
23
24/// Single-flight dedup key. `MissingEvents` keys on a content hash of its
25/// request window instead of a single event_id, so two callers asking for the
26/// same window coalesce regardless of event order; see [`window_hash`].
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub(super) struct Key {
29	/// Endpoint class; two ops over the same event do not coalesce.
30	pub(super) op: Op,
31
32	/// Room the event belongs to, or `None` for an unscoped fetch.
33	pub(super) room_id: Option<OwnedRoomId>,
34
35	/// Sought event, or `None` for ops that do not key on one.
36	pub(super) event_id: Option<OwnedEventId>,
37
38	/// Content hash of the [`Op::MissingEvents`] window; `None` for every other
39	/// op, so their coalescing is byte-identical to before.
40	pub(super) window_hash: Option<u64>,
41
42	/// [`Op::TimestampToEvent`] search timestamp; `None` for every other op, so
43	/// distinct-timestamp queries for one room do not coalesce.
44	pub(super) ts: Option<MilliSecondsSinceUnixEpoch>,
45
46	/// [`Op::TimestampToEvent`] search direction; `None` for every other op, so
47	/// opposite-direction queries for one room do not coalesce.
48	pub(super) dir: Option<Direction>,
49}
50
51impl Hash for Key {
52	fn hash<H: Hasher>(&self, state: &mut H) {
53		self.op.hash(state);
54		self.room_id.hash(state);
55		self.event_id.hash(state);
56		self.window_hash.hash(state);
57		self.ts.hash(state);
58		self.dir
59			.map(|dir| matches!(dir, Direction::Forward))
60			.hash(state);
61	}
62}
63
64/// Outcome shared by every caller coalesced onto one fetch. Cheap to clone so
65/// the worker can broadcast it down each subscriber's channel.
66pub(super) type SharedResult = Result<Arc<Outcome>, Failure>;
67
68/// Reply handed to a caller: the channel it awaits the outcome on, plus the
69/// sole strong liveness token whose drop cancels the in-flight fetch.
70pub(super) type Subscription = (Receiver<Option<SharedResult>>, Arc<()>);
71
72/// One in-flight fetch, owned by the worker. The worker is the sole mutator, so
73/// no lock guards it; coalesced callers reach it only through their channels.
74pub(super) struct Inflight {
75	/// Result channel. Coalesced callers subscribe to await the outcome.
76	pub(super) tx: Sender<Option<SharedResult>>,
77
78	/// Liveness signal. The strong token rides to the callers; the worker holds
79	/// this weak ref and the fetch bails once it can no longer upgrade it.
80	pub(super) interest: Weak<()>,
81
82	/// Retained (shared) so a re-armed key re-dispatches without re-cloning it.
83	pub(super) opts: Arc<Opts>,
84}
85
86impl Key {
87	/// Derive the single-flight key from a request's [`Opts`].
88	pub(super) fn new(opts: &Opts) -> Self {
89		Self {
90			op: opts.op,
91			room_id: opts.room_id.clone(),
92			event_id: opts.event_id.clone(),
93			window_hash: matches!(opts.op, Op::MissingEvents).then(|| window_hash(opts)),
94			ts: opts.ts,
95			dir: opts.dir,
96		}
97	}
98}
99
100/// Order-independent content hash of an [`Op::MissingEvents`] window: the
101/// sorted `latest_events` and `earliest_events` sets plus the batch limit.
102/// Sorting before hashing folds request-order permutations onto one key; the
103/// two sets hash as distinct sequences so swapping them does not collide.
104fn window_hash(opts: &Opts) -> u64 {
105	let mut latest: WindowRefs<'_> = opts.latest_events.iter().collect();
106	let mut earliest: WindowRefs<'_> = opts.earliest_events.iter().collect();
107	latest.sort_unstable();
108	earliest.sort_unstable();
109
110	let mut hasher = DefaultHasher::new();
111	latest.hash(&mut hasher);
112	earliest.hash(&mut hasher);
113	opts.backfill_limit.hash(&mut hasher);
114
115	hasher.finish()
116}