tuwunel_admin/debug/event_fetcher/
mod.rs1mod backfill;
2mod event;
3mod event_auth;
4mod missing_events;
5mod state_ids;
6
7use std::num::NonZeroUsize;
8
9use clap::Subcommand;
10use ruma::{OwnedEventId, OwnedRoomId, OwnedServerName};
11use tokio::time::Instant;
12use tuwunel_core::{Err, Result};
13use tuwunel_service::fetcher::{Op, Opts};
14
15use crate::{Context, admin_command_dispatch};
16
17#[admin_command_dispatch(handler_prefix = "event_fetcher")]
22#[derive(Debug, Subcommand)]
23pub(crate) enum EventFetcherCommand {
24 Event {
26 room_id: OwnedRoomId,
27 event_id: OwnedEventId,
28
29 #[arg(short, long)]
31 server: Option<OwnedServerName>,
32
33 #[arg(long)]
35 attempt_limit: Option<usize>,
36
37 #[arg(long)]
39 verify: bool,
40 },
41
42 EventAuth {
44 room_id: OwnedRoomId,
45 event_id: OwnedEventId,
46
47 #[arg(short, long)]
49 server: Option<OwnedServerName>,
50
51 #[arg(long)]
53 attempt_limit: Option<usize>,
54
55 #[arg(long)]
57 verify: bool,
58 },
59
60 Backfill {
62 room_id: OwnedRoomId,
63 event_id: OwnedEventId,
64
65 #[arg(short, long)]
67 server: Option<OwnedServerName>,
68
69 #[arg(long)]
71 attempt_limit: Option<usize>,
72
73 #[arg(long)]
75 limit: Option<usize>,
76
77 #[arg(long)]
79 verify: bool,
80 },
81
82 StateIds {
84 room_id: OwnedRoomId,
85 event_id: OwnedEventId,
86
87 #[arg(short, long)]
89 server: Option<OwnedServerName>,
90
91 #[arg(long)]
93 attempt_limit: Option<usize>,
94
95 #[arg(long)]
97 verify: bool,
98 },
99
100 MissingEvents {
103 room_id: OwnedRoomId,
104 event_id: OwnedEventId,
105
106 #[arg(short, long)]
108 server: Option<OwnedServerName>,
109
110 #[arg(long)]
112 attempt_limit: Option<usize>,
113
114 #[arg(long)]
116 verify: bool,
117 },
118}
119
120pub(super) fn base_opts(
123 op: Op,
124 room_id: OwnedRoomId,
125 event_id: OwnedEventId,
126 server: Option<OwnedServerName>,
127 attempt_limit: Option<usize>,
128 verify: bool,
129) -> Opts {
130 Opts {
131 hint: server,
132 attempt_limit: attempt_limit.and_then(NonZeroUsize::new),
133 check_event_id: verify,
134 check_conforms: verify,
135 check_hashes: verify,
136 check_signature: verify,
137 ..Opts::new(op, room_id).event_id(event_id)
138 }
139}
140
141pub(super) async fn run(context: &Context<'_>, opts: Opts) -> Result {
144 if !context.services.server.config.allow_federation {
145 return Err!("Federation is disabled on this homeserver.");
146 }
147
148 let timer = Instant::now();
149 let outcome = context.services.fetcher.fetch(opts).await?;
150 let elapsed = timer.elapsed();
151 let len = outcome.bytes.len();
152 let origin = &outcome.origin;
153
154 let body = match serde_json::from_slice::<serde_json::Value>(&outcome.bytes) {
155 | Ok(value) => serde_json::to_string_pretty(&value)?,
156 | Err(_) => String::from_utf8_lossy(&outcome.bytes).into_owned(),
157 };
158
159 write!(
160 context,
161 "Fetched {len} bytes from {origin} in {elapsed:?}.\n\n```json\n{body}\n```"
162 )
163 .await
164}