tuwunel_service/federation/
peer.rs1use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use futures::{Stream, StreamExt};
15use http::StatusCode;
16use ruma::ServerName;
17use tuwunel_core::{
18 Error, implement,
19 utils::{stream::TryIgnore, time::now_secs},
20};
21
22pub(super) const MAX_BACKOFF: Duration = Duration::from_hours(24);
24
25#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
27pub enum Classification {
28 #[default]
29 Transient,
30 Permanent,
31}
32
33impl Classification {
34 #[inline]
37 #[must_use]
38 fn from_byte(byte: u8) -> Self {
39 match byte {
40 | 1 => Self::Permanent,
41 | _ => Self::Transient,
42 }
43 }
44}
45
46impl From<Classification> for u8 {
47 #[inline]
48 fn from(c: Classification) -> Self {
49 match c {
50 | Classification::Transient => 0,
51 | Classification::Permanent => 1,
52 }
53 }
54}
55
56#[derive(Clone, Copy, Debug, Eq, PartialEq)]
58pub enum ShouldAttempt {
59 Yes,
60 No {
61 earliest_retry: SystemTime,
62 },
63
64 #[allow(dead_code)]
67 Deprioritize,
68}
69
70#[implement(super::Service)]
71pub fn record_success(&self, server: &ServerName) {
72 self.statuses.del((server, self.current_bucket()));
73}
74
75#[implement(super::Service)]
76pub fn record_failure(&self, server: &ServerName, classification: Classification) {
77 self.statuses
78 .put_raw((server, self.current_bucket()), [u8::from(classification)]);
79}
80
81#[implement(super::Service)]
82#[tracing::instrument(skip(self), fields(%server), level = "trace")]
83pub async fn should_attempt(&self, server: &ServerName) -> ShouldAttempt {
84 let now_bucket = self.current_bucket();
85
86 let Ok(handle) = self.statuses.qry(&(server, now_bucket)).await else {
87 return ShouldAttempt::Yes;
88 };
89
90 if matches!(classify(handle.as_ref()), Classification::Permanent) {
91 return ShouldAttempt::No {
92 earliest_retry: self
93 .bucket_start(now_bucket)
94 .checked_add(MAX_BACKOFF)
95 .unwrap_or_else(SystemTime::now),
96 };
97 }
98
99 let mut streak: u32 = 1;
102 while streak < self.n_max {
103 let prior = now_bucket.saturating_sub(u64::from(streak));
104 if !self.statuses.contains(&(server, prior)).await {
105 break;
106 }
107 streak = streak.saturating_add(1);
108 }
109
110 ShouldAttempt::No {
111 earliest_retry: self.earliest_retry(now_bucket, streak),
112 }
113}
114
115#[implement(super::Service)]
119pub fn peer_snapshot(
120 &self,
121) -> impl Stream<Item = (&ServerName, SystemTime, Classification)> + Send + '_ {
122 self.statuses.stream().ignore_err().map(
123 move |((server, bucket), value): ((&ServerName, u64), &[u8])| {
124 (server, self.bucket_start(bucket), classify(value))
125 },
126 )
127}
128
129#[implement(super::Service)]
130#[inline]
131#[must_use]
132fn current_bucket(&self) -> u64 {
133 now_secs()
134 .checked_div(self.window_secs.max(1))
135 .unwrap_or(0)
136}
137
138#[implement(super::Service)]
140#[inline]
141#[must_use]
142fn bucket_start(&self, bucket: u64) -> SystemTime {
143 let offset = bucket.saturating_mul(self.window_secs);
144
145 UNIX_EPOCH
146 .checked_add(Duration::from_secs(offset))
147 .unwrap_or(UNIX_EPOCH)
148}
149
150#[implement(super::Service)]
151#[inline]
152#[must_use]
153fn earliest_retry(&self, current_bucket: u64, streak: u32) -> SystemTime {
154 let window = Duration::from_secs(self.window_secs);
155 let delay = window
156 .saturating_mul(streak)
157 .saturating_mul(streak)
158 .min(MAX_BACKOFF);
159
160 self.bucket_start(current_bucket)
161 .checked_add(delay)
162 .unwrap_or_else(SystemTime::now)
163}
164
165#[inline]
166#[must_use]
167fn classify(bytes: &[u8]) -> Classification {
168 bytes
169 .first()
170 .copied()
171 .map_or(Classification::Transient, Classification::from_byte)
172}
173
174#[must_use]
183pub(super) fn classify_error(error: &Error) -> Option<Classification> {
184 let Error::Federation(_, response) = error else {
185 return Some(Classification::Transient);
186 };
187
188 let status = response.status_code;
189
190 match status {
191 | _ if status == StatusCode::GONE => Some(Classification::Permanent),
192 | _ if status.is_server_error() || status == StatusCode::TOO_MANY_REQUESTS =>
193 Some(Classification::Transient),
194 | _ => None,
195 }
196}