Skip to main content

tuwunel_admin/debug/event_fetcher/
mod.rs

1mod backfill;
2mod event;
3mod event_auth;
4mod missing_events;
5mod state_ids;
6
7use std::num::NonZeroUsize;
8
9use clap::Subcommand;
10use ruma::{OwnedEventId, OwnedRoomId, OwnedServerName};
11use tokio::time::Instant;
12use tuwunel_core::{Err, Result};
13use tuwunel_service::fetcher::{Op, Opts};
14
15use crate::{Context, admin_command_dispatch};
16
17/// Drive the federation event-fetcher service directly, one subcommand per
18/// federation op. Diagnostic only: validation toggles default off so a
19/// malformed or unauthenticatable response is still shown; pass `verify` to
20/// turn the full poison-detection path on.
21#[admin_command_dispatch(handler_prefix = "event_fetcher")]
22#[derive(Debug, Subcommand)]
23pub(crate) enum EventFetcherCommand {
24	/// `GET /_matrix/federation/v1/event/{eventId}`
25	Event {
26		room_id: OwnedRoomId,
27		event_id: OwnedEventId,
28
29		/// Try this server first.
30		#[arg(short, long)]
31		server: Option<OwnedServerName>,
32
33		/// Cap the number of servers tried; 0 or unset means unbounded.
34		#[arg(long)]
35		attempt_limit: Option<usize>,
36
37		/// Verify signature, content hash, event id, and conformance.
38		#[arg(long)]
39		verify: bool,
40	},
41
42	/// `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
43	EventAuth {
44		room_id: OwnedRoomId,
45		event_id: OwnedEventId,
46
47		/// Try this server first.
48		#[arg(short, long)]
49		server: Option<OwnedServerName>,
50
51		/// Cap the number of servers tried; 0 or unset means unbounded.
52		#[arg(long)]
53		attempt_limit: Option<usize>,
54
55		/// Verify signature, content hash, event id, and conformance.
56		#[arg(long)]
57		verify: bool,
58	},
59
60	/// `GET /_matrix/federation/v1/backfill/{roomId}` from `event_id`.
61	Backfill {
62		room_id: OwnedRoomId,
63		event_id: OwnedEventId,
64
65		/// Try this server first.
66		#[arg(short, long)]
67		server: Option<OwnedServerName>,
68
69		/// Cap the number of servers tried; 0 or unset means unbounded.
70		#[arg(long)]
71		attempt_limit: Option<usize>,
72
73		/// Maximum number of PDUs to request (defaults to 10).
74		#[arg(long)]
75		limit: Option<usize>,
76
77		/// Verify signature, content hash, event id, and conformance.
78		#[arg(long)]
79		verify: bool,
80	},
81
82	/// `GET /_matrix/federation/v1/state_ids/{roomId}?event_id=`
83	StateIds {
84		room_id: OwnedRoomId,
85		event_id: OwnedEventId,
86
87		/// Try this server first.
88		#[arg(short, long)]
89		server: Option<OwnedServerName>,
90
91		/// Cap the number of servers tried; 0 or unset means unbounded.
92		#[arg(long)]
93		attempt_limit: Option<usize>,
94
95		/// Verify signature, content hash, event id, and conformance.
96		#[arg(long)]
97		verify: bool,
98	},
99
100	/// `POST /_matrix/federation/v1/get_missing_events/{roomId}` with
101	/// `event_id` as the sole latest event.
102	MissingEvents {
103		room_id: OwnedRoomId,
104		event_id: OwnedEventId,
105
106		/// Try this server first.
107		#[arg(short, long)]
108		server: Option<OwnedServerName>,
109
110		/// Cap the number of servers tried; 0 or unset means unbounded.
111		#[arg(long)]
112		attempt_limit: Option<usize>,
113
114		/// Verify signature, content hash, event id, and conformance.
115		#[arg(long)]
116		verify: bool,
117	},
118}
119
120/// Validation toggles default off here, inverting `Opts::new`; only `verify`
121/// re-enables them.
122pub(super) fn base_opts(
123	op: Op,
124	room_id: OwnedRoomId,
125	event_id: OwnedEventId,
126	server: Option<OwnedServerName>,
127	attempt_limit: Option<usize>,
128	verify: bool,
129) -> Opts {
130	Opts {
131		hint: server,
132		attempt_limit: attempt_limit.and_then(NonZeroUsize::new),
133		check_event_id: verify,
134		check_conforms: verify,
135		check_hashes: verify,
136		check_signature: verify,
137		..Opts::new(op, room_id).event_id(event_id)
138	}
139}
140
141/// Prints the response as pretty JSON, falling back to lossy UTF-8 on parse
142/// failure.
143pub(super) async fn run(context: &Context<'_>, opts: Opts) -> Result {
144	if !context.services.server.config.allow_federation {
145		return Err!("Federation is disabled on this homeserver.");
146	}
147
148	let timer = Instant::now();
149	let outcome = context.services.fetcher.fetch(opts).await?;
150	let elapsed = timer.elapsed();
151	let len = outcome.bytes.len();
152	let origin = &outcome.origin;
153
154	let body = match serde_json::from_slice::<serde_json::Value>(&outcome.bytes) {
155		| Ok(value) => serde_json::to_string_pretty(&value)?,
156		| Err(_) => String::from_utf8_lossy(&outcome.bytes).into_owned(),
157	};
158
159	write!(
160		context,
161		"Fetched {len} bytes from {origin} in {elapsed:?}.\n\n```json\n{body}\n```"
162	)
163	.await
164}