Skip to main content

tuwunel_service/federation/
peer.rs

1//! Per-server reachability store backed by the `servername_status` CF.
2//!
3//! Bucket key layout: `servername || u64_be(now.as_secs() / window_secs)`. The
4//! one-byte value is the [`Classification`]. Bursts within the same window
5//! collide on the same key, which is a correct collision (the window is the
6//! coalescing quantum). The storage layout is the batch.
7//!
8//! `window_secs` is sourced from `sender_timeout` at service build time so the
9//! peer-status curve does not drift from the sender's existing quadratic
10//! backoff when both observe the same peer.
11
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use futures::{Stream, StreamExt};
15use http::StatusCode;
16use ruma::ServerName;
17use tuwunel_core::{
18	Error, implement,
19	utils::{stream::TryIgnore, time::now_secs},
20};
21
22/// Backoff ceiling, matching `sender_retry_backoff_limit`'s 24h default.
23pub(super) const MAX_BACKOFF: Duration = Duration::from_hours(24);
24
25/// Permanence classification supplied alongside a failure.
26#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
27pub enum Classification {
28	#[default]
29	Transient,
30	Permanent,
31}
32
33impl Classification {
34	/// Unknown bytes downgrade to `Transient`; a future encoding can only
35	/// soften a verdict, never wrongly escalate one against an old binary.
36	#[inline]
37	#[must_use]
38	fn from_byte(byte: u8) -> Self {
39		match byte {
40			| 1 => Self::Permanent,
41			| _ => Self::Transient,
42		}
43	}
44}
45
46impl From<Classification> for u8 {
47	#[inline]
48	fn from(c: Classification) -> Self {
49		match c {
50			| Classification::Transient => 0,
51			| Classification::Permanent => 1,
52		}
53	}
54}
55
56/// Verdict for [`Service::should_attempt`].
57#[derive(Clone, Copy, Debug, Eq, PartialEq)]
58pub enum ShouldAttempt {
59	Yes,
60	No {
61		earliest_retry: SystemTime,
62	},
63
64	/// Eligible but should be sorted to the back of any candidate list
65	/// rather than skipped outright.
66	#[allow(dead_code)]
67	Deprioritize,
68}
69
70#[implement(super::Service)]
71pub fn record_success(&self, server: &ServerName) {
72	self.statuses.del((server, self.current_bucket()));
73}
74
75#[implement(super::Service)]
76pub fn record_failure(&self, server: &ServerName, classification: Classification) {
77	self.statuses
78		.put_raw((server, self.current_bucket()), [u8::from(classification)]);
79}
80
81#[implement(super::Service)]
82#[tracing::instrument(skip(self), fields(%server), level = "trace")]
83pub async fn should_attempt(&self, server: &ServerName) -> ShouldAttempt {
84	let now_bucket = self.current_bucket();
85
86	let Ok(handle) = self.statuses.qry(&(server, now_bucket)).await else {
87		return ShouldAttempt::Yes;
88	};
89
90	if matches!(classify(handle.as_ref()), Classification::Permanent) {
91		return ShouldAttempt::No {
92			earliest_retry: self
93				.bucket_start(now_bucket)
94				.checked_add(MAX_BACKOFF)
95				.unwrap_or_else(SystemTime::now),
96		};
97	}
98
99	// streak walks back until the first gap; async `contains` predicate
100	// forces an imperative loop rather than `take_while`.
101	let mut streak: u32 = 1;
102	while streak < self.n_max {
103		let prior = now_bucket.saturating_sub(u64::from(streak));
104		if !self.statuses.contains(&(server, prior)).await {
105			break;
106		}
107		streak = streak.saturating_add(1);
108	}
109
110	ShouldAttempt::No {
111		earliest_retry: self.earliest_retry(now_bucket, streak),
112	}
113}
114
115/// Yields one tuple per populated bucket, ordered by `(server, bucket_start)`.
116/// The admin/metrics consumer groups adjacent rows per server to reconstruct
117/// streak and latest-failure information.
118#[implement(super::Service)]
119pub fn peer_snapshot(
120	&self,
121) -> impl Stream<Item = (&ServerName, SystemTime, Classification)> + Send + '_ {
122	self.statuses.stream().ignore_err().map(
123		move |((server, bucket), value): ((&ServerName, u64), &[u8])| {
124			(server, self.bucket_start(bucket), classify(value))
125		},
126	)
127}
128
129#[implement(super::Service)]
130#[inline]
131#[must_use]
132fn current_bucket(&self) -> u64 {
133	now_secs()
134		.checked_div(self.window_secs.max(1))
135		.unwrap_or(0)
136}
137
138/// Wall-clock instant at the start of `bucket`.
139#[implement(super::Service)]
140#[inline]
141#[must_use]
142fn bucket_start(&self, bucket: u64) -> SystemTime {
143	let offset = bucket.saturating_mul(self.window_secs);
144
145	UNIX_EPOCH
146		.checked_add(Duration::from_secs(offset))
147		.unwrap_or(UNIX_EPOCH)
148}
149
150#[implement(super::Service)]
151#[inline]
152#[must_use]
153fn earliest_retry(&self, current_bucket: u64, streak: u32) -> SystemTime {
154	let window = Duration::from_secs(self.window_secs);
155	let delay = window
156		.saturating_mul(streak)
157		.saturating_mul(streak)
158		.min(MAX_BACKOFF);
159
160	self.bucket_start(current_bucket)
161		.checked_add(delay)
162		.unwrap_or_else(SystemTime::now)
163}
164
165#[inline]
166#[must_use]
167fn classify(bytes: &[u8]) -> Classification {
168	bytes
169		.first()
170		.copied()
171		.map_or(Classification::Transient, Classification::from_byte)
172}
173
174/// Classifies a failed federation attempt for the peer-reachability store, or
175/// `None` when it carries no reachability signal. An HTTP response proves the
176/// peer reachable, so a content-level 4xx (a forbidden invite, a 403 backfill)
177/// must not count against it; only 5xx or an explicit rate-limit (429) records
178/// `Transient`. A 410 is the exception: a Matrix server never returns it for
179/// one endpoint and not another, so a received 410 is a proxy operator
180/// deliberately signaling the peer is gone, and records `Permanent`. Transport
181/// failures carry no response and are always transient.
182#[must_use]
183pub(super) fn classify_error(error: &Error) -> Option<Classification> {
184	let Error::Federation(_, response) = error else {
185		return Some(Classification::Transient);
186	};
187
188	let status = response.status_code;
189
190	match status {
191		| _ if status == StatusCode::GONE => Some(Classification::Permanent),
192		| _ if status.is_server_error() || status == StatusCode::TOO_MANY_REQUESTS =>
193			Some(Classification::Transient),
194		| _ => None,
195	}
196}