tuwunel_service/rooms/event_handler/
backoff.rs1use std::{ops::Range, time::Duration};
2
3use ruma::EventId;
4use tuwunel_core::{
5 implement,
6 utils::{
7 continue_exponential_backoff,
8 stream::{ReadyExt, TryIgnore},
9 time::now_secs,
10 },
11};
12use tuwunel_database::{Ignore, Interfix};
13
14const QUANTUM: u64 = 60;
17
18const SUPPRESS_AFTER: u32 = 3;
20
21#[derive(Clone, Copy)]
23pub(super) enum Context {
24 Fetch = 0,
25 Auth = 1,
26 Upgrade = 2,
27}
28
29impl From<Context> for u8 {
30 #[inline]
31 fn from(context: Context) -> Self {
32 match context {
33 | Context::Fetch => 0,
34 | Context::Auth => 1,
35 | Context::Upgrade => 2,
36 }
37 }
38}
39
40#[derive(Clone, Copy, Default)]
45pub(super) enum Disposition {
46 #[default]
47 Pending = 0,
48 Transient = 1,
49 Permanent = 2,
50}
51
52pub(super) enum Suppression {
54 Allow,
55 Deny,
56}
57
58#[derive(Default)]
59struct Summary {
60 total: u32,
61 pending: u32,
62 latest_secs: u64,
63 latest_class: Disposition,
64}
65
66impl From<u64> for Disposition {
67 #[inline]
68 fn from(disc: u64) -> Self {
69 match disc {
70 | 1 => Self::Transient,
71 | 2 => Self::Permanent,
72 | _ => Self::Pending,
73 }
74 }
75}
76
77impl From<Disposition> for u64 {
78 #[inline]
79 fn from(disposition: Disposition) -> Self {
80 match disposition {
81 | Disposition::Pending => 0,
82 | Disposition::Transient => 1,
83 | Disposition::Permanent => 2,
84 }
85 }
86}
87
88impl Suppression {
89 #[inline]
90 pub(super) fn is_deny(&self) -> bool { matches!(self, Self::Deny) }
91}
92
93impl Summary {
94 fn tally(mut self, (_, (class, secs)): (Ignore, (u64, u64))) -> Self {
95 let class = Disposition::from(class);
96
97 self.total = self.total.saturating_add(1);
98 if matches!(class, Disposition::Pending) {
99 self.pending = self.pending.saturating_add(1);
100 }
101
102 if secs >= self.latest_secs {
103 self.latest_secs = secs;
104 self.latest_class = class;
105 }
106
107 self
108 }
109}
110
111#[implement(super::Service)]
114pub(super) fn record_attempt(&self, ctx: Context, event_id: &EventId) {
115 self.record_outcome(ctx, event_id, Disposition::Pending);
116}
117
118#[implement(super::Service)]
119pub(super) fn record_outcome(&self, ctx: Context, event_id: &EventId, disposition: Disposition) {
120 self.db.eventid_backoff.put(
121 (u8::from(ctx), event_id, current_bucket()),
122 (u64::from(disposition), now_secs()),
123 );
124}
125
126#[implement(super::Service)]
127pub(super) async fn record_success(&self, ctx: Context, event_id: &EventId) {
128 self.db
129 .eventid_backoff
130 .del_prefix(&(u8::from(ctx), event_id, Interfix))
131 .await;
132}
133
134#[implement(super::Service)]
135pub(super) async fn is_suppressed(
136 &self,
137 ctx: Context,
138 event_id: &EventId,
139 range: Range<Duration>,
140) -> Suppression {
141 let summary = self
142 .db
143 .eventid_backoff
144 .stream_prefix::<Ignore, (u64, u64), _>(&(u8::from(ctx), event_id, Interfix))
145 .ignore_err()
146 .ready_fold(Summary::default(), Summary::tally)
147 .await;
148
149 if summary.total == 0 {
150 return Suppression::Allow;
151 }
152
153 if matches!(summary.latest_class, Disposition::Permanent) {
154 return Suppression::Deny;
155 }
156
157 let elapsed = Duration::from_secs(now_secs().saturating_sub(summary.latest_secs));
158 let (tries, rate_ok) = match summary.latest_class {
159 | Disposition::Pending => (summary.pending, summary.pending >= SUPPRESS_AFTER),
160 | _ => (summary.total, true),
161 };
162
163 (rate_ok && continue_exponential_backoff(range.start, range.end, elapsed, tries))
164 .then_some(Suppression::Deny)
165 .unwrap_or(Suppression::Allow)
166}
167
168fn current_bucket() -> u32 { u32::try_from(now_secs() / QUANTUM).unwrap_or(u32::MAX) }