Skip to main content

tuwunel_admin/debug/
commands.rs

1use std::{
2	collections::HashMap,
3	fmt::Write,
4	iter::once,
5	path::Path,
6	str::FromStr,
7	time::{Instant, SystemTime},
8};
9
10use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::try_join3};
11use ruma::{
12	CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
13	OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId,
14	api::federation::{discovery::get_server_version, event::get_room_state},
15	events::AnyStateEvent,
16	serde::Raw,
17};
18use serde::Serialize;
19use tokio::io::AsyncWriteExt;
20use tracing_subscriber::EnvFilter;
21use tuwunel_core::{
22	Err, Error, Result, debug_error, err, info, jwt,
23	matrix::{
24		Event,
25		pdu::{PduEvent, PduId, RawPduId},
26	},
27	tokio_metrics::TaskMonitor,
28	trace, utils,
29	utils::{
30		math::Expected,
31		stream::{IterStream, ReadyExt, TryReadyExt},
32		string::EMPTY,
33		time::now_secs,
34	},
35	warn,
36};
37use tuwunel_service::rooms::{short::ShortRoomId, state_compressor::HashSetCompressStateEvent};
38
39use crate::admin_command;
40
41#[admin_command]
42pub(super) async fn echo(&self, message: Vec<String>) -> Result {
43	let message = message.join(" ");
44	self.write_str(&message).await
45}
46
47#[admin_command]
48pub(super) async fn get_auth_chain(&self, event_id: OwnedEventId) -> Result {
49	let Ok(event) = self
50		.services
51		.timeline
52		.get_pdu_json(&event_id)
53		.await
54	else {
55		return Err!("Event not found.");
56	};
57
58	let room_id_str = event
59		.get("room_id")
60		.and_then(CanonicalJsonValue::as_str)
61		.ok_or_else(|| err!(Database("Invalid event in database")))?;
62
63	let room_id = <&RoomId>::try_from(room_id_str)
64		.map_err(|_| err!(Database("Invalid room id field in event in database")))?;
65
66	let room_version = self
67		.services
68		.state
69		.get_room_version(room_id)
70		.await?;
71
72	let start = Instant::now();
73	let count = self
74		.services
75		.auth_chain
76		.event_ids_iter(room_id, &room_version, once(event_id.as_ref()))
77		.ready_filter_map(Result::ok)
78		.count()
79		.await;
80
81	let elapsed = start.elapsed();
82	let out = format!("Loaded auth chain with length {count} in {elapsed:?}");
83
84	self.write_str(&out).await
85}
86
87#[admin_command]
88pub(super) async fn parse_pdu(&self) -> Result {
89	if self.body.len() < 2
90		|| !self.body[0].trim().starts_with("```")
91		|| self.body.last().unwrap_or(&EMPTY).trim() != "```"
92	{
93		return Err!("Expected code block in command body. Add --help for details.");
94	}
95
96	let string = self.body[1..self.body.len().saturating_sub(1)].join("\n");
97	let rules = RoomVersionId::V6
98		.rules()
99		.expect("rules for V6 rooms");
100
101	let value =
102		serde_json::from_str(&string).map_err(|e| err!("Invalid json in command body: {e}"))?;
103
104	let hash = ruma::signatures::reference_hash(&value, &rules)
105		.map_err(|e| err!("Could not parse PDU JSON: {e:?}"))?;
106
107	let event_id = OwnedEventId::parse(format!("${hash}"));
108
109	let value = serde_json::to_value(value)?;
110
111	match serde_json::from_value::<PduEvent>(value) {
112		| Err(e) => return Err!("EventId: {event_id:?}\nCould not parse event: {e}"),
113		| Ok(pdu) => write!(self, "EventId: {event_id:?}\n{pdu:#?}"),
114	}
115	.await
116}
117
118#[admin_command]
119pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result {
120	let mut outlier = false;
121	let mut pdu_json = self
122		.services
123		.timeline
124		.get_non_outlier_pdu_json(&event_id)
125		.await;
126
127	if pdu_json.is_err() {
128		outlier = true;
129		pdu_json = self
130			.services
131			.timeline
132			.get_pdu_json(&event_id)
133			.await;
134	}
135
136	let json = pdu_json.map_err(|_| err!("PDU not found locally."))?;
137
138	let text = serde_json::to_string_pretty(&json)?;
139	let msg = if outlier {
140		"Outlier (Rejected / Soft Failed) PDU found in our database"
141	} else {
142		"PDU found in our database"
143	};
144
145	self.write_str(&format!("{msg}\n```json\n{text}\n```"))
146		.await
147}
148
149#[admin_command]
150pub(super) async fn get_short_pdu(&self, shortroomid: ShortRoomId, count: i64) -> Result {
151	let pdu_id: RawPduId = PduId { shortroomid, count: count.into() }.into();
152
153	let pdu_json = self
154		.services
155		.timeline
156		.get_pdu_json_from_id(&pdu_id)
157		.await;
158
159	let json = pdu_json.map_err(|_| err!("PDU not found locally."))?;
160
161	let json_text = serde_json::to_string_pretty(&json)?;
162
163	self.write_str(&format!("```json\n{json_text}\n```"))
164		.await
165}
166
167#[admin_command]
168pub(super) async fn get_remote_pdu_list(&self, server: OwnedServerName, force: bool) -> Result {
169	if !self.services.server.config.allow_federation {
170		return Err!("Federation is disabled on this homeserver.",);
171	}
172
173	if server == self.services.globals.server_name() {
174		return Err!(
175			"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
176			 fetching local PDUs from the database.",
177		);
178	}
179
180	if self.body.len() < 2
181		|| !self.body[0].trim().starts_with("```")
182		|| self.body.last().unwrap_or(&EMPTY).trim() != "```"
183	{
184		return Err!("Expected code block in command body. Add --help for details.",);
185	}
186
187	let list = self
188		.body
189		.iter()
190		.collect::<Vec<_>>()
191		.drain(1..self.body.len().saturating_sub(1))
192		.filter_map(|pdu| EventId::parse(pdu).ok())
193		.collect::<Vec<_>>();
194
195	let mut failed_count: usize = 0;
196	let mut success_count: usize = 0;
197
198	for event_id in list {
199		let result = self
200			.get_remote_pdu(event_id, server.clone())
201			.await;
202
203		if !force {
204			result?;
205		} else if let Err(e) = result {
206			warn!("Failed to get remote PDU, ignoring error: {e}");
207			failed_count = failed_count.saturating_add(1);
208			continue;
209		}
210
211		success_count = success_count.saturating_add(1);
212	}
213
214	let out =
215		format!("Fetched {success_count} remote PDUs successfully with {failed_count} failures");
216
217	self.write_str(&out).await
218}
219
220#[admin_command]
221pub(super) async fn get_remote_pdu(
222	&self,
223	event_id: OwnedEventId,
224	server: OwnedServerName,
225) -> Result {
226	if !self.services.server.config.allow_federation {
227		return Err!("Federation is disabled on this homeserver.");
228	}
229
230	if server == self.services.globals.server_name() {
231		return Err!(
232			"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
233			 fetching local PDUs.",
234		);
235	}
236
237	let response = self
238		.services
239		.federation
240		.execute(&server, ruma::api::federation::event::get_event::v1::Request {
241			event_id: event_id.clone(),
242		})
243		.await
244		.map_err(|e| {
245			err!("Remote server did not have PDU or failed sending request to remote server: {e}")
246		})?;
247
248	let json: CanonicalJsonObject = serde_json::from_str(response.pdu.get()).map_err(|e| {
249		warn!(
250			"Requested event ID {event_id} from server but failed to convert from RawValue to \
251			 CanonicalJsonObject (malformed event/response?): {e}"
252		);
253		err!(Request(Unknown("Received response from server but failed to parse PDU")))
254	})?;
255
256	trace!("Attempting to parse PDU: {:?}", &response.pdu);
257	let (room_id, ..) = self
258		.services
259		.event_handler
260		.parse_incoming_pdu(&response.pdu)
261		.boxed()
262		.await
263		.map_err(|e| {
264			warn!("Failed to parse PDU: {e}");
265			info!("Full PDU: {:?}", &response.pdu);
266			err!("Failed to parse PDU remote server {server} sent us: {e}")
267		})?;
268
269	info!("Attempting to handle event ID {event_id} as backfilled PDU");
270	self.services
271		.timeline
272		.backfill_pdu(&room_id, &server, response.pdu)
273		.await?;
274
275	let text = serde_json::to_string_pretty(&json)?;
276	let msg = "Got PDU from specified server and handled as backfilled";
277	self.write_str(&format!("{msg}. Event body:\n```json\n{text}\n```"))
278		.await
279}
280
281#[admin_command]
282pub(super) async fn get_room_state(
283	&self,
284	room: OwnedRoomOrAliasId,
285	kind: Option<String>,
286	state_key: Option<String>,
287) -> Result {
288	let room_id = self.services.alias.maybe_resolve(&room).await?;
289
290	if state_key.is_none()
291		&& let Some(kind) = kind.clone().map(Into::into)
292	{
293		return self
294			.services
295			.state_accessor
296			.room_state_type_pdus(&room_id, &kind)
297			.map_ok(Event::into_format)
298			.ready_and_then(|event: Raw<AnyStateEvent>| {
299				serde_json::to_value(&event).map_err(Error::from)
300			})
301			.ready_and_then(|event| serde_json::to_string_pretty(&event).map_err(Error::from))
302			.try_for_each(|json| self.write_string(format!("```json\n{json}\n```\n")))
303			.await;
304	}
305
306	if let Some(state_key) = state_key
307		&& let Some(kind) = kind.map(Into::into)
308	{
309		let event: Raw<AnyStateEvent> = self
310			.services
311			.state_accessor
312			.room_state_get(&room_id, &kind, &state_key)
313			.await?
314			.into_format();
315
316		let value = serde_json::to_value(&event)?;
317		let json = serde_json::to_string_pretty(&value)?;
318		return self
319			.write_string(format!("```json\n{json}\n```\n"))
320			.await;
321	}
322
323	self.services
324		.state_accessor
325		.room_state_full_pdus(&room_id)
326		.map_ok(Event::into_format)
327		.ready_and_then(|event: Raw<AnyStateEvent>| {
328			serde_json::to_value(&event).map_err(Error::from)
329		})
330		.ready_and_then(|event| serde_json::to_string_pretty(&event).map_err(Error::from))
331		.try_for_each(|json| self.write_string(format!("```json\n{json}\n```\n")))
332		.await
333}
334
335#[admin_command]
336pub(super) async fn ping(&self, server: OwnedServerName) -> Result {
337	let timer = tokio::time::Instant::now();
338
339	let response = self
340		.services
341		.federation
342		.execute(&server, get_server_version::v1::Request {})
343		.await
344		.map_err(|e| err!("Failed sending federation request to specified server:\n\n{e}"))?;
345
346	let ping_time = timer.elapsed();
347
348	let out = if let Ok(json) = serde_json::to_string_pretty(&response.server) {
349		format!("Got response which took {ping_time:?} time:\n```json\n{json}\n```")
350	} else {
351		format!("Got non-JSON response which took {ping_time:?} time:\n{response:?}")
352	};
353
354	self.write_str(&out).await
355}
356
357#[admin_command]
358pub(super) async fn force_device_list_updates(&self) -> Result {
359	// Force E2EE device list updates for all users
360	self.services
361		.users
362		.stream()
363		.for_each(|user_id| {
364			self.services
365				.users
366				.mark_device_key_update(user_id)
367		})
368		.await;
369
370	write!(self, "Marked all devices for all users as having new keys to update").await
371}
372
373#[admin_command]
374pub(super) async fn change_log_level(&self, filter: Option<String>, reset: bool) -> Result {
375	let handles = &["console"];
376
377	let filter = reset
378		.then_some(&self.services.config.log)
379		.or(filter.as_ref())
380		.ok_or_else(|| err!("No log level was specified."))?;
381
382	let filter_layer = EnvFilter::try_new(filter).map_err(|e| {
383		let source = if !reset { "specified" } else { "found in config" };
384		err!("Invalid log level filter {source}: {e}")
385	})?;
386
387	self.services
388		.server
389		.log
390		.reload
391		.reload(&filter_layer, Some(handles))
392		.map_err(|e| err!("Failed to modify and reload the global tracing log level: {e}"))?;
393
394	self.write_str(&format!("Successfully changed log level to {filter}"))
395		.await
396}
397
398#[admin_command]
399pub(super) async fn sign_json(&self) -> Result {
400	if self.body.len() < 2
401		|| !self.body[0].trim().starts_with("```")
402		|| self.body.last().unwrap_or(&"").trim() != "```"
403	{
404		return Err!("Expected code block in command body. Add --help for details.");
405	}
406
407	let string = self.body[1..self.body.len().expected_sub(1)].join("\n");
408	let mut value = serde_json::from_str(&string).map_err(|e| err!("Invalid json: {e}"))?;
409
410	self.services.server_keys.sign_json(&mut value)?;
411
412	let json_text = serde_json::to_string_pretty(&value)?;
413	self.write_str(&json_text).await
414}
415
416#[admin_command]
417pub(super) async fn verify_json(&self) -> Result {
418	if self.body.len() < 2
419		|| !self.body[0].trim().starts_with("```")
420		|| self.body.last().unwrap_or(&"").trim() != "```"
421	{
422		return Err!("Expected code block in command body. Add --help for details.");
423	}
424
425	let string = self.body[1..self.body.len().expected_sub(1)].join("\n");
426
427	let value = serde_json::from_str::<CanonicalJsonObject>(&string)
428		.map_err(|e| err!("Invalid json: {e}"))?;
429
430	self.services
431		.server_keys
432		.verify_json(&value, None)
433		.await
434		.map_err(|e| err!("Signature verification failed: {e}"))?;
435
436	self.write_str("Signature correct").await
437}
438
439#[admin_command]
440pub(super) async fn verify_pdu(&self, event_id: OwnedEventId) -> Result {
441	use ruma::signatures::Verified;
442
443	let mut event = self
444		.services
445		.timeline
446		.get_pdu_json(&event_id)
447		.await?;
448
449	event.remove("event_id");
450	let msg = match self
451		.services
452		.server_keys
453		.verify_event(&event, None)
454		.await?
455	{
456		| Verified::Signatures => "signatures OK, but content hash failed (redaction).",
457		| Verified::All => "signatures and hashes OK.",
458	};
459
460	self.write_str(msg).await
461}
462
463#[admin_command]
464#[tracing::instrument(skip(self))]
465pub(super) async fn first_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
466	if !self
467		.services
468		.state_cache
469		.server_in_room(&self.services.server.name, &room_id)
470		.await
471	{
472		return Err!("We are not participating in the room / we don't know about the room ID.",);
473	}
474
475	let first_pdu = self
476		.services
477		.timeline
478		.first_pdu_in_room(&room_id)
479		.await
480		.map_err(|_| err!(Database("Failed to find the first PDU in database")))?;
481
482	let out = format!("{first_pdu:?}");
483	self.write_str(&out).await
484}
485
486#[admin_command]
487#[tracing::instrument(skip(self))]
488pub(super) async fn latest_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
489	if !self
490		.services
491		.state_cache
492		.server_in_room(&self.services.server.name, &room_id)
493		.await
494	{
495		return Err!("We are not participating in the room / we don't know about the room ID.");
496	}
497
498	let latest_pdu = self
499		.services
500		.timeline
501		.latest_pdu_in_room(&room_id)
502		.await
503		.map_err(|_| err!(Database("Failed to find the latest PDU in database")))?;
504
505	let out = format!("{latest_pdu:?}");
506	self.write_str(&out).await
507}
508
509#[admin_command]
510#[tracing::instrument(skip(self))]
511pub(super) async fn force_set_room_state_from_server(
512	&self,
513	room_id: OwnedRoomId,
514	server_name: OwnedServerName,
515) -> Result {
516	// TODO: diverged from join remote
517
518	if !self
519		.services
520		.state_cache
521		.server_in_room(&self.services.server.name, &room_id)
522		.await
523	{
524		return Err!("We are not participating in the room / we don't know about the room ID.");
525	}
526
527	let first_pdu = self
528		.services
529		.timeline
530		.latest_pdu_in_room(&room_id)
531		.await
532		.map_err(|_| err!(Database("Failed to find the latest PDU in database")))?;
533
534	let room_version = self
535		.services
536		.state
537		.get_room_version(&room_id)
538		.await?;
539
540	let mut state: HashMap<u64, OwnedEventId> = HashMap::new();
541
542	let remote_state_response = self
543		.services
544		.federation
545		.execute(&server_name, get_room_state::v1::Request {
546			room_id: room_id.clone(),
547			event_id: first_pdu.event_id().to_owned(),
548		})
549		.await?;
550
551	for pdu in remote_state_response.pdus.clone() {
552		match self
553			.services
554			.event_handler
555			.parse_incoming_pdu(&pdu)
556			.await
557		{
558			| Ok(t) => t,
559			| Err(e) => {
560				warn!("Could not parse PDU, ignoring: {e}");
561				continue;
562			},
563		};
564	}
565
566	info!("Going through room_state response PDUs");
567	for result in remote_state_response.pdus.iter().map(|pdu| {
568		self.services
569			.server_keys
570			.validate_and_add_event_id(pdu, &room_version)
571	}) {
572		let Ok((event_id, mut value)) = result.await else {
573			continue;
574		};
575
576		let invalid_pdu_err = |e| {
577			debug_error!("Invalid PDU in fetching remote room state PDUs response: {value:#?}");
578			err!(BadServerResponse(debug_error!("Invalid PDU in send_join response: {e:?}")))
579		};
580
581		let pdu = if value["type"] == "m.room.create" {
582			PduEvent::from_object_and_roomid_and_eventid(&room_id, &event_id, value.clone())
583				.map_err(invalid_pdu_err)?
584		} else {
585			PduEvent::from_object_and_eventid(&event_id, value.clone())
586				.map_err(invalid_pdu_err)?
587		};
588
589		if !value.contains_key("room_id") {
590			let room_id = CanonicalJsonValue::String(room_id.as_str().into());
591			value.insert("room_id".into(), room_id);
592		}
593
594		self.services
595			.timeline
596			.add_pdu_outlier(&event_id, &value);
597
598		if let Some(state_key) = &pdu.state_key {
599			let shortstatekey = self
600				.services
601				.short
602				.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
603				.await;
604
605			state.insert(shortstatekey, pdu.event_id.clone());
606		}
607	}
608
609	info!("Going through auth_chain response");
610	for result in remote_state_response
611		.auth_chain
612		.iter()
613		.map(|pdu| {
614			self.services
615				.server_keys
616				.validate_and_add_event_id(pdu, &room_version)
617		}) {
618		let Ok((event_id, value)) = result.await else {
619			continue;
620		};
621
622		self.services
623			.timeline
624			.add_pdu_outlier(&event_id, &value);
625	}
626
627	let new_room_state = self
628		.services
629		.event_handler
630		.resolve_state(&room_id, &room_version, state)
631		.await?;
632
633	info!("Forcing new room state");
634	let HashSetCompressStateEvent {
635		shortstatehash: short_state_hash,
636		added,
637		removed,
638	} = self
639		.services
640		.state_compressor
641		.save_state(room_id.clone().as_ref(), new_room_state)
642		.await?;
643
644	let state_lock = self.services.state.mutex.lock(&*room_id).await;
645
646	self.services
647		.state
648		.force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock)
649		.await?;
650
651	info!(
652		"Updating joined counts for room just in case (e.g. we may have found a difference in \
653		 the room's m.room.member state"
654	);
655	self.services
656		.state_cache
657		.update_joined_count(&room_id)
658		.await;
659
660	self.write_str("Successfully forced the room state from the requested remote server.")
661		.await
662}
663
664#[admin_command]
665pub(super) async fn get_signing_keys(
666	&self,
667	server_name: Option<OwnedServerName>,
668	notary: Option<OwnedServerName>,
669	query: bool,
670) -> Result {
671	let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone());
672
673	if let Some(notary) = notary {
674		let signing_keys = self
675			.services
676			.server_keys
677			.notary_request(&notary, &server_name)
678			.await?;
679
680		let out = format!("```rs\n{signing_keys:#?}\n```");
681		return self.write_str(&out).await;
682	}
683
684	let signing_keys = if query {
685		self.services
686			.server_keys
687			.server_request(&server_name)
688			.await?
689	} else {
690		self.services
691			.server_keys
692			.signing_keys_for(&server_name)
693			.await?
694	};
695
696	let out = format!("```rs\n{signing_keys:#?}\n```");
697	self.write_str(&out).await
698}
699
700#[admin_command]
701pub(super) async fn get_verify_keys(&self, server_name: Option<OwnedServerName>) -> Result {
702	let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone());
703
704	let keys = self
705		.services
706		.server_keys
707		.verify_keys_for(&server_name)
708		.await;
709
710	let mut out = String::new();
711	writeln!(out, "| Key ID | Public Key |")?;
712	writeln!(out, "| --- | --- |")?;
713	for (key_id, key) in keys {
714		writeln!(out, "| {key_id} | {key:?} |")?;
715	}
716
717	self.write_str(&out).await
718}
719
720#[admin_command]
721pub(super) async fn resolve_true_destination(
722	&self,
723	server_name: OwnedServerName,
724	no_cache: bool,
725) -> Result {
726	if !self.services.server.config.allow_federation {
727		return Err!("Federation is disabled on this homeserver.",);
728	}
729
730	if server_name == self.services.server.name {
731		return Err!(
732			"Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
733			 fetching local PDUs.",
734		);
735	}
736
737	let actual = self
738		.services
739		.resolver
740		.resolve_actual_dest(&server_name, !no_cache)
741		.await?;
742
743	let msg = format!("Destination: {}\nHostname URI: {}", actual.dest, actual.host);
744	self.write_str(&msg).await
745}
746
747#[admin_command]
748pub(super) async fn memory_stats(&self, opts: Option<String>) -> Result {
749	const OPTS: &str = "abcdefghijklmnopqrstuvwxyz";
750
751	let opts: String = OPTS
752		.chars()
753		.filter(|&c| {
754			let allow_any = opts.as_ref().is_some_and(|opts| opts == "*");
755
756			let allow = allow_any || opts.as_ref().is_some_and(|opts| opts.contains(c));
757
758			!allow
759		})
760		.collect();
761
762	let stats = tuwunel_core::alloc::memory_stats(&opts).unwrap_or_default();
763
764	self.write_str("```\n").await?;
765	self.write_str(&stats).await?;
766	self.write_str("\n```").await?;
767	Ok(())
768}
769
770#[cfg(tokio_unstable)]
771#[admin_command]
772pub(super) async fn runtime_metrics(&self) -> Result {
773	let out = self
774		.services
775		.server
776		.metrics
777		.runtime_metrics()
778		.map_or_else(
779			|| "Runtime metrics are not available.".to_owned(),
780			|metrics| {
781				format!(
782					"```rs\nnum_workers: {}\nnum_alive_tasks: {}\nglobal_queue_depth: {}\n```",
783					metrics.num_workers(),
784					metrics.num_alive_tasks(),
785					metrics.global_queue_depth()
786				)
787			},
788		);
789
790	self.write_str(&out).await
791}
792
793#[cfg(not(tokio_unstable))]
794#[admin_command]
795pub(super) async fn runtime_metrics(&self) -> Result {
796	self.write_str("Runtime metrics require building with `tokio_unstable`.")
797		.await
798}
799
800#[cfg(tokio_unstable)]
801#[admin_command]
802pub(super) async fn runtime_interval(&self) -> Result {
803	let out = self
804		.services
805		.server
806		.metrics
807		.runtime_interval()
808		.map_or_else(
809			|| "Runtime metrics are not available.".to_owned(),
810			|metrics| format!("```rs\n{metrics:#?}\n```"),
811		);
812
813	self.write_str(&out).await
814}
815
816#[cfg(not(tokio_unstable))]
817#[admin_command]
818pub(super) async fn runtime_interval(&self) -> Result {
819	self.write_str("Runtime metrics require building with `tokio_unstable`.")
820		.await
821}
822
823#[admin_command]
824pub(super) async fn task_metrics(&self) -> Result {
825	let out = self
826		.services
827		.server
828		.metrics
829		.task_metrics()
830		.map(TaskMonitor::cumulative)
831		.map_or_else(
832			|| "Task metrics are not available.".to_owned(),
833			|metrics| format!("```rs\n{metrics:#?}\n```"),
834		);
835
836	self.write_str(&out).await
837}
838
839#[admin_command]
840pub(super) async fn task_interval(&self) -> Result {
841	let out = self
842		.services
843		.server
844		.metrics
845		.task_interval()
846		.map_or_else(
847			|| "Task metrics are not available.".to_owned(),
848			|metrics| format!("```rs\n{metrics:#?}\n```"),
849		);
850
851	self.write_str(&out).await
852}
853
854#[admin_command]
855pub(super) async fn time(&self) -> Result {
856	let now = SystemTime::now();
857	let now = utils::time::format(now, "%+");
858
859	self.write_str(&now).await
860}
861
862#[admin_command]
863pub(super) async fn list_dependencies(&self, names: bool) -> Result {
864	if names {
865		let out = info::cargo::dependencies_names().join(" ");
866		return self.write_str(&out).await;
867	}
868
869	let mut out = String::new();
870	let deps = info::cargo::dependencies();
871	writeln!(out, "| name | version | features |")?;
872	writeln!(out, "| ---- | ------- | -------- |")?;
873	for (name, dep) in deps {
874		let version = dep.try_req().unwrap_or("*");
875		let feats = dep.req_features();
876		let feats = if !feats.is_empty() {
877			feats.join(" ")
878		} else {
879			String::new()
880		};
881
882		writeln!(out, "| {name} | {version} | {feats} |")?;
883	}
884
885	self.write_str(&out).await
886}
887
888#[admin_command]
889pub(super) async fn database_stats(
890	&self,
891	property: Option<String>,
892	map: Option<String>,
893) -> Result {
894	let map_name = map.as_ref().map_or(EMPTY, String::as_str);
895	let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
896	self.services
897		.db
898		.iter()
899		.filter(|&(&name, _)| map_name.is_empty() || map_name == name)
900		.try_stream()
901		.try_for_each(|(&name, map)| {
902			let res = map.property(&property).expect("invalid property");
903			writeln!(self, "##### {name}:\n```\n{}\n```", res.trim())
904		})
905		.await
906}
907
908#[admin_command]
909pub(super) async fn database_files(&self, map: Option<String>, level: Option<i32>) -> Result {
910	let mut files: Vec<_> = self
911		.services
912		.db
913		.engine
914		.file_list()
915		.collect::<Result<_>>()?;
916
917	files.sort_by_key(|f| f.name.clone());
918
919	writeln!(self, "| lev  | sst  | keys | dels | size | column |").await?;
920	writeln!(self, "| ---: | :--- | ---: | ---: | ---: | :---   |").await?;
921	files
922		.into_iter()
923		.filter(|file| {
924			map.as_deref()
925				.is_none_or(|map| map == file.column_family_name)
926		})
927		.filter(|file| {
928			level
929				.as_ref()
930				.is_none_or(|&level| level == file.level)
931		})
932		.try_stream()
933		.try_for_each(|file| {
934			writeln!(
935				self,
936				"| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
937				file.level,
938				file.name,
939				file.num_entries,
940				file.num_deletions,
941				file.size,
942				file.column_family_name,
943			)
944		})
945		.await
946}
947
948#[admin_command]
949pub(super) async fn trim_memory(&self) -> Result {
950	tuwunel_core::alloc::trim(None)?;
951
952	writeln!(self, "done").await
953}
954
955#[admin_command]
956pub(super) async fn create_jwt(
957	&self,
958	user: String,
959	exp_from_now: Option<u64>,
960	nbf_from_now: Option<u64>,
961	issuer: Option<String>,
962	audience: Option<String>,
963) -> Result {
964	use jwt::{Algorithm, EncodingKey, Header, encode};
965
966	#[derive(Serialize)]
967	struct Claim {
968		sub: String,
969		iss: Option<String>,
970		aud: Option<String>,
971		exp: Option<usize>,
972		nbf: Option<usize>,
973	}
974
975	let config = &self.services.config.jwt;
976	if config.format.as_str() != "HMAC" {
977		return Err!("This command only supports HMAC key format, not {}.", config.format);
978	}
979
980	let key = EncodingKey::from_secret(config.key.as_ref());
981	let alg = Algorithm::from_str(config.algorithm.as_str()).map_err(|e| {
982		err!(Config("jwt.algorithm", "JWT algorithm is not recognized or configured {e}"))
983	})?;
984
985	let header = Header { alg, ..Default::default() };
986	let claim = Claim {
987		sub: user,
988
989		iss: issuer,
990
991		aud: audience,
992
993		exp: exp_from_now
994			.and_then(|val| now_secs().checked_add(val))
995			.map(TryInto::try_into)
996			.and_then(Result::ok),
997
998		nbf: nbf_from_now
999			.and_then(|val| now_secs().checked_add(val))
1000			.map(TryInto::try_into)
1001			.and_then(Result::ok),
1002	};
1003
1004	encode(&header, &claim, &key)
1005		.map_err(|e| err!("Failed to encode JWT: {e}"))
1006		.map(async |token| self.write_str(&token).await)?
1007		.await
1008}
1009
1010#[admin_command]
1011pub(super) async fn resync_database(&self) -> Result {
1012	if !self.services.db.is_secondary() {
1013		return Err!("Not a secondary instance.");
1014	}
1015
1016	self.services
1017		.db
1018		.engine
1019		.update()
1020		.map_err(|e| err!("Failed to update from primary: {e:?}"))
1021}
1022
1023#[admin_command]
1024pub(super) async fn get_retained_pdu(&self, event_id: OwnedEventId) -> Result {
1025	let pdu = self
1026		.services
1027		.retention
1028		.get_original_pdu_json(&event_id)
1029		.await?;
1030
1031	let text = serde_json::to_string_pretty(&pdu)?;
1032
1033	self.write_str(&format!("Original PDU:\n```json\n{text}\n```"))
1034		.await?;
1035
1036	Ok(())
1037}
1038
1039#[admin_command]
1040pub(super) async fn dump_pdus(&self, dir: String) -> Result {
1041	use tokio::fs;
1042
1043	let dir_path = Path::new(&dir);
1044	fs::create_dir_all(dir_path).await?;
1045
1046	let normal = dumper(dir_path, "normal", self.services.timeline.pdus_raw());
1047	let outlier = dumper(dir_path, "outliers", self.services.timeline.outlier_pdus_raw());
1048	let retained = dumper(dir_path, "retaineds", self.services.retention.retained_pdus_raw());
1049	try_join3(normal, outlier, retained).await?;
1050
1051	Ok(())
1052}
1053
1054async fn dumper<'a, S>(dir: &Path, name: &str, stream: S) -> Result
1055where
1056	S: Stream<Item = Result<&'a [u8]>> + Send,
1057{
1058	use tokio::fs::OpenOptions;
1059
1060	let mut fopts = OpenOptions::new();
1061	fopts.write(true);
1062	fopts.create(true);
1063	fopts.truncate(true);
1064
1065	let path = dir.join(name);
1066	let file = fopts.open(path).await?;
1067	stream
1068		.try_fold(file, async |mut file, data| {
1069			file.write_all(data).await?;
1070
1071			Ok(file)
1072		})
1073		.and_then(async |mut file| Ok(file.shutdown().await?))
1074		.await
1075}