Skip to main content

tuwunel_service/rooms/event_handler/
policy_server.rs

1use std::{
2	collections::BTreeMap,
3	time::{Duration, SystemTime, UNIX_EPOCH},
4};
5
6use ruma::{
7	CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedServerName, RoomId, RoomVersionId,
8	ServerName, SigningKeyAlgorithm,
9	api::{
10		error::{ErrorKind, RetryAfter},
11		federation::policy::sign_event::v1 as sign_event,
12	},
13	events::{StateEventType, room::policy::RoomPolicyEventContent},
14	serde::Base64,
15	signatures::{to_canonical_json_string_for_signing, verify_canonical_json_bytes},
16};
17use serde::{Deserialize, Serialize};
18use serde_json::value::to_raw_value;
19use tuwunel_core::{
20	Err, Result, at, debug, implement,
21	matrix::{Event, pdu::into_outgoing_federation, room_version},
22	trace,
23	utils::time::now_secs,
24	warn,
25};
26use tuwunel_database::{Cbor, Deserialized};
27
28/// MSC4284 unstable state event type. The merged spec stabilised this to
29/// `m.room.policy`, but the reference policy server (and Element's default
30/// deployments as of 2026-05) still write the unstable type with the singular
31/// `public_key` field; reading both keeps the gate live for those rooms.
32const UNSTABLE_POLICY_TYPE: &str = "org.matrix.msc4284.policy";
33
34/// Outcome of an inbound policy-server signature check.
35#[derive(Debug)]
36pub enum PolicyCheck {
37	/// No policy server is configured for this room (or feature is off, or
38	/// the event is the policy state event itself). The caller should not
39	/// modify its soft-fail decision based on policy considerations.
40	NotApplicable,
41
42	/// Policy server signature is present and verifies cleanly.
43	Pass,
44
45	/// Policy server signature is absent. Per MSC4284, the homeserver SHOULD
46	/// either fetch one from the policy server or soft-fail.
47	Missing,
48
49	/// Policy server signature is present but failed cryptographic
50	/// verification. Soft-fail.
51	Invalid,
52}
53
54/// Outcome of a `/sign` round-trip to the policy server.
55#[derive(Debug)]
56enum FetchOutcome {
57	/// Policy server returned a valid signature.
58	Signed(String),
59
60	/// Network error or timeout; the caller should fail open.
61	FailOpen,
62
63	/// Policy server explicitly refused (Forbidden), or returned 200 OK
64	/// with no signature for our `via` (the MSC4284 unstable refusal).
65	Refused,
66
67	/// Policy server returned `M_LIMIT_EXCEEDED`. The caller should record
68	/// the unix-secs deadline so subsequent attempts before then are
69	/// short-circuited.
70	RateLimited {
71		until_secs: u64,
72	},
73}
74
75/// Persisted per-event policy-server outcome in `eventid_policysigstate`.
76/// Absence of a row means "no prior decision recorded; proceed with `/sign`".
77#[derive(Debug, Serialize, Deserialize)]
78enum PolicySigState {
79	/// Policy server already refused this event; do not retry.
80	Refused,
81
82	/// Policy server is rate-limiting; do not retry before this unix-secs
83	/// deadline.
84	BackoffUntil {
85		until_secs: u64,
86	},
87}
88
89/// Lenient deserialiser that accepts either the stable
90/// `public_keys: { ed25519: ... }` shape or the MSC4284 unstable singular
91/// `public_key: <ed25519>` shape, and folds the latter into the former.
92#[derive(Deserialize)]
93struct UnstablePolicyContent {
94	via: OwnedServerName,
95
96	#[serde(default)]
97	public_keys: BTreeMap<SigningKeyAlgorithm, Base64>,
98
99	#[serde(default)]
100	public_key: Option<Base64>,
101}
102
103#[implement(UnstablePolicyContent)]
104fn into_stable(
105	Self { via, mut public_keys, public_key }: Self,
106) -> Option<RoomPolicyEventContent> {
107	if let Some(key) = public_key {
108		public_keys
109			.entry(SigningKeyAlgorithm::Ed25519)
110			.or_insert(key);
111	}
112
113	let ed25519 = public_keys.remove(&SigningKeyAlgorithm::Ed25519)?;
114
115	Some(RoomPolicyEventContent::new(via, ed25519))
116}
117
118#[implement(super::Service)]
119fn cache_policy_refused(&self, event_id: &EventId) {
120	self.db
121		.eventid_policysigstate
122		.raw_put(event_id.as_str(), Cbor(&PolicySigState::Refused));
123}
124
125#[implement(super::Service)]
126fn cache_policy_backoff(&self, event_id: &EventId, until_secs: u64) {
127	self.db
128		.eventid_policysigstate
129		.raw_put(event_id.as_str(), Cbor(&PolicySigState::BackoffUntil { until_secs }));
130}
131
132#[implement(super::Service)]
133async fn cached_policy_state(&self, event_id: &EventId) -> Option<PolicySigState> {
134	self.db
135		.eventid_policysigstate
136		.get(event_id.as_str())
137		.await
138		.deserialized::<Cbor<_>>()
139		.map(at!(0))
140		.ok()
141}
142
143/// Returns the room's policy event content when a policy server is in effect:
144/// state event present (stable `m.room.policy`, falling back to MSC4284's
145/// unstable `org.matrix.msc4284.policy`), parses cleanly under either the
146/// stable `public_keys` map or the unstable singular `public_key` field, and
147/// the `via` server has a joined user in the room. Any failure returns `None`,
148/// signalling "no policy server configured" so the caller skips the gate
149/// entirely.
150#[implement(super::Service)]
151pub async fn lookup_policy_server(&self, room_id: &RoomId) -> Option<RoomPolicyEventContent> {
152	let read = async |event_type: &StateEventType| {
153		self.services
154			.state_accessor
155			.room_state_get_content::<UnstablePolicyContent>(room_id, event_type, "")
156			.await
157			.ok()
158			.and_then(UnstablePolicyContent::into_stable)
159	};
160
161	let content = match read(&StateEventType::RoomPolicy).await {
162		| Some(content) => content,
163		| None => read(&StateEventType::from(UNSTABLE_POLICY_TYPE.to_owned())).await?,
164	};
165
166	self.services
167		.state_cache
168		.server_in_room(&content.via, room_id)
169		.await
170		.then_some(content)
171}
172
173/// MSC4284: ask the room's policy server to sign an outgoing event. The
174/// signature is folded into `pdu_json["signatures"]` so it persists with the
175/// event and federates transitively to other servers in the room. Returns
176/// `Forbidden` when the policy server explicitly refuses; network errors and
177/// timeouts fail open with a warn log.
178#[implement(super::Service)]
179#[tracing::instrument(name = "policy_sign", level = "debug", skip_all)]
180pub async fn sign_outgoing_pdu<E>(&self, pdu_json: &mut CanonicalJsonObject, pdu: &E) -> Result
181where
182	E: Event,
183{
184	if !self.services.server.config.enable_policy_servers {
185		return Ok(());
186	}
187
188	if is_policy_state_event(pdu) {
189		return Ok(());
190	}
191
192	let Ok(room_version) = self
193		.services
194		.state
195		.get_room_version(pdu.room_id())
196		.await
197	else {
198		return Ok(());
199	};
200
201	let Some(policy) = self.lookup_policy_server(pdu.room_id()).await else {
202		trace!(room_id = %pdu.room_id(), "no policy server configured");
203		return Ok(());
204	};
205
206	let event_id = pdu.event_id();
207	match self.cached_policy_state(event_id).await {
208		| Some(PolicySigState::Refused) =>
209			return Err!(Request(Forbidden("Event was rejected by the room's policy server."))),
210
211		| Some(PolicySigState::BackoffUntil { until_secs }) if until_secs > now_secs() => {
212			debug!(via = %policy.via, until_secs, "skipping outbound /sign during policy backoff");
213			return Ok(());
214		},
215		| _ => {},
216	}
217
218	match self
219		.fetch_policy_signature(&policy, pdu_json, &room_version)
220		.await
221	{
222		| FetchOutcome::Signed(signature) => {
223			insert_policy_signature(pdu_json, &policy.via, &signature);
224			debug!(via = %policy.via, event_id = %event_id, "folded policy server signature");
225		},
226		| FetchOutcome::Refused => {
227			self.cache_policy_refused(event_id);
228			return Err!(Request(Forbidden("Event was rejected by the room's policy server.")));
229		},
230		| FetchOutcome::RateLimited { until_secs } => {
231			self.cache_policy_backoff(event_id, until_secs);
232		},
233		| FetchOutcome::FailOpen => {},
234	}
235
236	Ok(())
237}
238
239/// Calls the policy server's `/sign` endpoint. The classification of the
240/// response (`Signed` / `Refused` / `RateLimited` / `FailOpen`) lets each
241/// caller choose its own reaction.
242#[implement(super::Service)]
243#[tracing::instrument(
244	name = "policy_fetch",
245	level = "debug",
246	skip_all,
247	fields(via = %policy.via)
248)]
249async fn fetch_policy_signature(
250	&self,
251	policy: &RoomPolicyEventContent,
252	pdu_json: &CanonicalJsonObject,
253	room_version: &RoomVersionId,
254) -> FetchOutcome {
255	let outgoing = into_outgoing_federation(pdu_json.clone(), room_version);
256	let Ok(raw) = to_raw_value(&outgoing) else {
257		warn!(via = %policy.via, "failed to serialize PDU for policy /sign; failing open");
258		return FetchOutcome::FailOpen;
259	};
260
261	let timeout = Duration::from_secs(
262		self.services
263			.server
264			.config
265			.policy_server_request_timeout,
266	);
267
268	let response = match tokio::time::timeout(
269		timeout,
270		self.services
271			.federation
272			.execute(&policy.via, sign_event::Request::new(raw)),
273	)
274	.await
275	{
276		| Ok(Ok(response)) => response,
277		| Ok(Err(error)) if error.kind() == ErrorKind::Forbidden => return FetchOutcome::Refused,
278		| Ok(Err(error)) => {
279			if let Some(until_secs) = parse_rate_limit(&error) {
280				warn!(via = %policy.via, until_secs, "policy server /sign rate-limited");
281				return FetchOutcome::RateLimited { until_secs };
282			}
283			warn!(via = %policy.via, %error, "policy server /sign failed; failing open");
284			return FetchOutcome::FailOpen;
285		},
286		| Err(elapsed) => {
287			warn!(via = %policy.via, %elapsed, "policy server /sign timed out; failing open");
288			return FetchOutcome::FailOpen;
289		},
290	};
291
292	// MSC4284 unstable: a 200 OK with no signature for `via` is also refusal.
293	response
294		.ed25519_signature(&policy.via)
295		.map(ToOwned::to_owned)
296		.map_or(FetchOutcome::Refused, FetchOutcome::Signed)
297}
298
299fn parse_rate_limit(error: &tuwunel_core::Error) -> Option<u64> {
300	let ErrorKind::LimitExceeded(data) = error.kind() else {
301		return None;
302	};
303
304	let until = match data.retry_after.as_ref()? {
305		| RetryAfter::Delay(d) => SystemTime::now().checked_add(*d)?,
306		| RetryAfter::DateTime(t) => *t,
307	};
308
309	until
310		.duration_since(UNIX_EPOCH)
311		.ok()
312		.map(|d| d.as_secs())
313}
314
315/// MSC4284: verify the inbound PDU's policy server signature; if missing, ask
316/// the policy server to sign and fold the result in. Mirrors
317/// `check_inbound_policy_signature` but upgrades `Missing` to `Pass` (fetched
318/// and verified, or fail-open on network error/timeout) or `Invalid`
319/// (refused).
320#[implement(super::Service)]
321#[tracing::instrument(name = "policy_verify_or_fetch", level = "debug", skip_all)]
322pub async fn verify_or_fetch_inbound_policy_signature<E>(
323	&self,
324	pdu_json: &mut CanonicalJsonObject,
325	pdu: &E,
326) -> PolicyCheck
327where
328	E: Event,
329{
330	match self
331		.check_inbound_policy_signature(pdu_json, pdu)
332		.await
333	{
334		| PolicyCheck::Missing =>
335			self.fetch_inbound_policy_signature(pdu_json, pdu)
336				.await,
337		| other => other,
338	}
339}
340
341/// MSC4284: when an inbound PDU has no policy server signature, ask the
342/// policy server to sign on the originator's behalf; fold the returned
343/// signature into `pdu_json` so it persists with the event and federates
344/// onward. Cached refusals short-circuit to `Invalid`; cached backoffs (or
345/// fresh 429s) fail open as `Pass` until the deadline. `Forbidden` from the
346/// policy server maps to `Invalid`. Network errors and timeouts fail open
347/// with a warn log, mapped to `Pass` since the next server in the room is
348/// likely to retry.
349#[implement(super::Service)]
350#[tracing::instrument(name = "policy_fetch_inbound", level = "debug", skip_all)]
351async fn fetch_inbound_policy_signature<E>(
352	&self,
353	pdu_json: &mut CanonicalJsonObject,
354	pdu: &E,
355) -> PolicyCheck
356where
357	E: Event,
358{
359	let Some(policy) = self.lookup_policy_server(pdu.room_id()).await else {
360		return PolicyCheck::NotApplicable;
361	};
362
363	let Ok(room_version) = self
364		.services
365		.state
366		.get_room_version(pdu.room_id())
367		.await
368	else {
369		return PolicyCheck::NotApplicable;
370	};
371
372	let event_id = pdu.event_id();
373	match self.cached_policy_state(event_id).await {
374		| Some(PolicySigState::Refused) => return PolicyCheck::Invalid,
375		| Some(PolicySigState::BackoffUntil { until_secs }) if until_secs > now_secs() => {
376			debug!(
377				until_secs,
378				via = %policy.via,
379				"policy server in backoff; failing open"
380			);
381
382			return PolicyCheck::Pass;
383		},
384		| _ => {},
385	}
386
387	match self
388		.fetch_policy_signature(&policy, pdu_json, &room_version)
389		.await
390	{
391		| FetchOutcome::Signed(signature) => {
392			debug!(
393				via = %policy.via,
394				event_id = %event_id,
395				"folded inbound policy server signature"
396			);
397
398			insert_policy_signature(pdu_json, &policy.via, &signature);
399			PolicyCheck::Pass
400		},
401		| FetchOutcome::Refused => {
402			debug!(
403				via = %policy.via,
404				event_id = %event_id,
405				"policy server refused to sign inbound PDU; soft-failing"
406			);
407
408			self.cache_policy_refused(event_id);
409			PolicyCheck::Invalid
410		},
411		| FetchOutcome::RateLimited { until_secs } => {
412			self.cache_policy_backoff(event_id, until_secs);
413			PolicyCheck::Pass
414		},
415		| FetchOutcome::FailOpen => PolicyCheck::Pass,
416	}
417}
418
419/// MSC4284: verify the policy server signature on an inbound PDU. Returns
420/// `NotApplicable` for rooms without a configured policy server (the gate is
421/// skipped); `Pass` when the signature verifies; `Missing` when no signature
422/// is present for the configured server; `Invalid` when the signature is
423/// present but cryptographic verification fails.
424#[implement(super::Service)]
425#[tracing::instrument(name = "policy_verify", level = "debug", skip_all)]
426pub async fn check_inbound_policy_signature<E>(
427	&self,
428	pdu_json: &CanonicalJsonObject,
429	pdu: &E,
430) -> PolicyCheck
431where
432	E: Event,
433{
434	if !self.services.server.config.enable_policy_servers {
435		return PolicyCheck::NotApplicable;
436	}
437
438	if is_policy_state_event(pdu) {
439		return PolicyCheck::NotApplicable;
440	}
441
442	let Some(policy) = self.lookup_policy_server(pdu.room_id()).await else {
443		return PolicyCheck::NotApplicable;
444	};
445
446	let Ok(room_version) = self
447		.services
448		.state
449		.get_room_version(pdu.room_id())
450		.await
451	else {
452		return PolicyCheck::NotApplicable;
453	};
454
455	let Ok(rules) = room_version::rules(&room_version) else {
456		return PolicyCheck::NotApplicable;
457	};
458
459	// `lookup_policy_server` already verified the ed25519 entry is present.
460	let Some(public_key) = policy
461		.public_keys
462		.get(&SigningKeyAlgorithm::Ed25519)
463	else {
464		return PolicyCheck::NotApplicable;
465	};
466
467	let Some(signature_b64) = extract_policy_signature(pdu_json, &policy.via) else {
468		return PolicyCheck::Missing;
469	};
470
471	let Ok(signature) = Base64::<ruma::serde::base64::Standard>::parse(signature_b64) else {
472		return PolicyCheck::Invalid;
473	};
474
475	let Ok(redacted) = ruma::canonical_json::redact(pdu_json.clone(), &rules.redaction, None)
476	else {
477		return PolicyCheck::Invalid;
478	};
479
480	let Ok(canonical) = to_canonical_json_string_for_signing(&redacted) else {
481		return PolicyCheck::Invalid;
482	};
483
484	verify_canonical_json_bytes(
485		&SigningKeyAlgorithm::Ed25519,
486		public_key.as_bytes(),
487		signature.as_bytes(),
488		canonical.as_bytes(),
489	)
490	.map(|()| PolicyCheck::Pass)
491	.unwrap_or_else(|error| {
492		debug!(via = %policy.via, %error, "policy server signature failed verification");
493		PolicyCheck::Invalid
494	})
495}
496
497fn is_policy_state_event<E: Event>(pdu: &E) -> bool {
498	if pdu.state_key() != Some("") {
499		return false;
500	}
501
502	let kind = pdu.kind().to_cow_str();
503
504	kind == "m.room.policy" || kind == UNSTABLE_POLICY_TYPE
505}
506
507fn extract_policy_signature<'a>(
508	pdu_json: &'a CanonicalJsonObject,
509	via: &ServerName,
510) -> Option<&'a str> {
511	let CanonicalJsonValue::Object(server_map) = pdu_json.get("signatures")? else {
512		return None;
513	};
514
515	let CanonicalJsonValue::Object(key_map) = server_map.get(via.as_str())? else {
516		return None;
517	};
518
519	let CanonicalJsonValue::String(signature) =
520		key_map.get(RoomPolicyEventContent::POLICY_SERVER_ED25519_SIGNING_KEY_ID)?
521	else {
522		return None;
523	};
524
525	Some(signature.as_str())
526}
527
528fn insert_policy_signature(
529	pdu_json: &mut CanonicalJsonObject,
530	via: &ServerName,
531	signature: &str,
532) {
533	let signatures = pdu_json
534		.entry("signatures".into())
535		.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::new()));
536
537	let CanonicalJsonValue::Object(server_map) = signatures else {
538		return;
539	};
540
541	let entry = server_map
542		.entry(via.as_str().into())
543		.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::new()));
544
545	if let CanonicalJsonValue::Object(key_map) = entry {
546		key_map.insert(
547			RoomPolicyEventContent::POLICY_SERVER_ED25519_SIGNING_KEY_ID.into(),
548			CanonicalJsonValue::String(signature.to_owned()),
549		);
550	}
551}