Skip to main content

tuwunel_service/rooms/event_handler/
backoff.rs

1use std::{ops::Range, time::Duration};
2
3use ruma::EventId;
4use tuwunel_core::{
5	implement,
6	utils::{
7		continue_exponential_backoff,
8		stream::{ReadyExt, TryIgnore},
9		time::now_secs,
10	},
11};
12use tuwunel_database::{Ignore, Interfix};
13
14/// Bucket width in seconds. Records within one bucket collide onto a single key
15/// (`<=` the smallest call-site backoff floor), coalescing concurrent failures.
16const QUANTUM: u64 = 60;
17
18/// Accumulated `Pending` records at which the rate brake engages.
19const SUPPRESS_AFTER: u32 = 3;
20
21/// Federation step that recorded a decision; the key's leading discriminant.
22#[derive(Clone, Copy)]
23pub(super) enum Context {
24	Fetch = 0,
25	Auth = 1,
26	Upgrade = 2,
27}
28
29impl From<Context> for u8 {
30	#[inline]
31	fn from(context: Context) -> Self {
32		match context {
33			| Context::Fetch => 0,
34			| Context::Auth => 1,
35			| Context::Upgrade => 2,
36		}
37	}
38}
39
40/// Permanence of a recorded decision. Unknown discriminants decode to the
41/// weakest (`Pending`) so a future encoding can only soften, never wrongly
42/// escalate, a verdict against an old binary. `Permanent` is never written by
43/// this store.
44#[derive(Clone, Copy, Default)]
45pub(super) enum Disposition {
46	#[default]
47	Pending = 0,
48	Transient = 1,
49	Permanent = 2,
50}
51
52/// Verdict from consulting the store before a federation step.
53pub(super) enum Suppression {
54	Allow,
55	Deny,
56}
57
58#[derive(Default)]
59struct Summary {
60	total: u32,
61	pending: u32,
62	latest_secs: u64,
63	latest_class: Disposition,
64}
65
66impl From<u64> for Disposition {
67	#[inline]
68	fn from(disc: u64) -> Self {
69		match disc {
70			| 1 => Self::Transient,
71			| 2 => Self::Permanent,
72			| _ => Self::Pending,
73		}
74	}
75}
76
77impl From<Disposition> for u64 {
78	#[inline]
79	fn from(disposition: Disposition) -> Self {
80		match disposition {
81			| Disposition::Pending => 0,
82			| Disposition::Transient => 1,
83			| Disposition::Permanent => 2,
84		}
85	}
86}
87
88impl Suppression {
89	#[inline]
90	pub(super) fn is_deny(&self) -> bool { matches!(self, Self::Deny) }
91}
92
93impl Summary {
94	fn tally(mut self, (_, (class, secs)): (Ignore, (u64, u64))) -> Self {
95		let class = Disposition::from(class);
96
97		self.total = self.total.saturating_add(1);
98		if matches!(class, Disposition::Pending) {
99			self.pending = self.pending.saturating_add(1);
100		}
101
102		if secs >= self.latest_secs {
103			self.latest_secs = secs;
104			self.latest_class = class;
105		}
106
107		self
108	}
109}
110
111/// Record a federation attempt before a cancellable await, so a premature
112/// cancellation still leaves a `Pending` row behind to rate-gate against.
113#[implement(super::Service)]
114pub(super) fn record_attempt(&self, ctx: Context, event_id: &EventId) {
115	self.record_outcome(ctx, event_id, Disposition::Pending);
116}
117
118#[implement(super::Service)]
119pub(super) fn record_outcome(&self, ctx: Context, event_id: &EventId, disposition: Disposition) {
120	self.db.eventid_backoff.put(
121		(u8::from(ctx), event_id, current_bucket()),
122		(u64::from(disposition), now_secs()),
123	);
124}
125
126#[implement(super::Service)]
127pub(super) async fn record_success(&self, ctx: Context, event_id: &EventId) {
128	self.db
129		.eventid_backoff
130		.del_prefix(&(u8::from(ctx), event_id, Interfix))
131		.await;
132}
133
134#[implement(super::Service)]
135pub(super) async fn is_suppressed(
136	&self,
137	ctx: Context,
138	event_id: &EventId,
139	range: Range<Duration>,
140) -> Suppression {
141	let summary = self
142		.db
143		.eventid_backoff
144		.stream_prefix::<Ignore, (u64, u64), _>(&(u8::from(ctx), event_id, Interfix))
145		.ignore_err()
146		.ready_fold(Summary::default(), Summary::tally)
147		.await;
148
149	if summary.total == 0 {
150		return Suppression::Allow;
151	}
152
153	if matches!(summary.latest_class, Disposition::Permanent) {
154		return Suppression::Deny;
155	}
156
157	let elapsed = Duration::from_secs(now_secs().saturating_sub(summary.latest_secs));
158	let (tries, rate_ok) = match summary.latest_class {
159		| Disposition::Pending => (summary.pending, summary.pending >= SUPPRESS_AFTER),
160		| _ => (summary.total, true),
161	};
162
163	(rate_ok && continue_exponential_backoff(range.start, range.end, elapsed, tries))
164		.then_some(Suppression::Deny)
165		.unwrap_or(Suppression::Allow)
166}
167
168fn current_bucket() -> u32 { u32::try_from(now_secs() / QUANTUM).unwrap_or(u32::MAX) }