tuwunel_service/fetcher/inflight.rs
1//! Single-flight bookkeeping for one in-flight fetch.
2//!
3//! [`Key`] is the dedup key the worker's in-flight map is keyed on;
4//! [`Inflight`] is the worker-owned entry every coalesced caller subscribes to;
5//! [`SharedResult`] is the broadcast outcome and [`Subscription`] the caller's
6//! handle, whose liveness token cancels the fetch on drop.
7
8use std::{
9 collections::hash_map::DefaultHasher,
10 hash::{Hash, Hasher},
11 sync::{Arc, Weak},
12};
13
14use ruma::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, api::Direction};
15use tokio::sync::watch::{Receiver, Sender};
16use tuwunel_core::smallvec::SmallVec;
17
18use super::{Failure, Op, Opts, Outcome};
19
20/// Borrowed window ids gathered for sorting before hashing; inline-sized for
21/// the common single-prev case.
22type WindowRefs<'a> = SmallVec<[&'a OwnedEventId; 1]>;
23
24/// Single-flight dedup key. `MissingEvents` keys on a content hash of its
25/// request window instead of a single event_id, so two callers asking for the
26/// same window coalesce regardless of event order; see [`window_hash`].
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub(super) struct Key {
29 /// Endpoint class; two ops over the same event do not coalesce.
30 pub(super) op: Op,
31
32 /// Room the event belongs to, or `None` for an unscoped fetch.
33 pub(super) room_id: Option<OwnedRoomId>,
34
35 /// Sought event, or `None` for ops that do not key on one.
36 pub(super) event_id: Option<OwnedEventId>,
37
38 /// Content hash of the [`Op::MissingEvents`] window; `None` for every other
39 /// op, so their coalescing is byte-identical to before.
40 pub(super) window_hash: Option<u64>,
41
42 /// [`Op::TimestampToEvent`] search timestamp; `None` for every other op, so
43 /// distinct-timestamp queries for one room do not coalesce.
44 pub(super) ts: Option<MilliSecondsSinceUnixEpoch>,
45
46 /// [`Op::TimestampToEvent`] search direction; `None` for every other op, so
47 /// opposite-direction queries for one room do not coalesce.
48 pub(super) dir: Option<Direction>,
49}
50
51impl Hash for Key {
52 fn hash<H: Hasher>(&self, state: &mut H) {
53 self.op.hash(state);
54 self.room_id.hash(state);
55 self.event_id.hash(state);
56 self.window_hash.hash(state);
57 self.ts.hash(state);
58 self.dir
59 .map(|dir| matches!(dir, Direction::Forward))
60 .hash(state);
61 }
62}
63
64/// Outcome shared by every caller coalesced onto one fetch. Cheap to clone so
65/// the worker can broadcast it down each subscriber's channel.
66pub(super) type SharedResult = Result<Arc<Outcome>, Failure>;
67
68/// Reply handed to a caller: the channel it awaits the outcome on, plus the
69/// sole strong liveness token whose drop cancels the in-flight fetch.
70pub(super) type Subscription = (Receiver<Option<SharedResult>>, Arc<()>);
71
72/// One in-flight fetch, owned by the worker. The worker is the sole mutator, so
73/// no lock guards it; coalesced callers reach it only through their channels.
74pub(super) struct Inflight {
75 /// Result channel. Coalesced callers subscribe to await the outcome.
76 pub(super) tx: Sender<Option<SharedResult>>,
77
78 /// Liveness signal. The strong token rides to the callers; the worker holds
79 /// this weak ref and the fetch bails once it can no longer upgrade it.
80 pub(super) interest: Weak<()>,
81
82 /// Retained (shared) so a re-armed key re-dispatches without re-cloning it.
83 pub(super) opts: Arc<Opts>,
84}
85
86impl Key {
87 /// Derive the single-flight key from a request's [`Opts`].
88 pub(super) fn new(opts: &Opts) -> Self {
89 Self {
90 op: opts.op,
91 room_id: opts.room_id.clone(),
92 event_id: opts.event_id.clone(),
93 window_hash: matches!(opts.op, Op::MissingEvents).then(|| window_hash(opts)),
94 ts: opts.ts,
95 dir: opts.dir,
96 }
97 }
98}
99
100/// Order-independent content hash of an [`Op::MissingEvents`] window: the
101/// sorted `latest_events` and `earliest_events` sets plus the batch limit.
102/// Sorting before hashing folds request-order permutations onto one key; the
103/// two sets hash as distinct sequences so swapping them does not collide.
104fn window_hash(opts: &Opts) -> u64 {
105 let mut latest: WindowRefs<'_> = opts.latest_events.iter().collect();
106 let mut earliest: WindowRefs<'_> = opts.earliest_events.iter().collect();
107 latest.sort_unstable();
108 earliest.sort_unstable();
109
110 let mut hasher = DefaultHasher::new();
111 latest.hash(&mut hasher);
112 earliest.hash(&mut hasher);
113 opts.backfill_limit.hash(&mut hasher);
114
115 hasher.finish()
116}