1use std::{
2 collections::HashMap,
3 fmt::Write,
4 iter::once,
5 path::Path,
6 str::FromStr,
7 time::{Instant, SystemTime},
8};
9
10use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::try_join3};
11use ruma::{
12 CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
13 OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId,
14 api::federation::{discovery::get_server_version, event::get_room_state},
15 events::AnyStateEvent,
16 serde::Raw,
17};
18use serde::Serialize;
19use tokio::io::AsyncWriteExt;
20use tracing_subscriber::EnvFilter;
21use tuwunel_core::{
22 Err, Error, Result, debug_error, err, info, jwt,
23 matrix::{
24 Event,
25 pdu::{PduEvent, PduId, RawPduId},
26 },
27 tokio_metrics::TaskMonitor,
28 trace, utils,
29 utils::{
30 math::Expected,
31 stream::{IterStream, ReadyExt, TryReadyExt},
32 string::EMPTY,
33 time::now_secs,
34 },
35 warn,
36};
37use tuwunel_service::rooms::{short::ShortRoomId, state_compressor::HashSetCompressStateEvent};
38
39use crate::admin_command;
40
41#[admin_command]
42pub(super) async fn echo(&self, message: Vec<String>) -> Result {
43 let message = message.join(" ");
44 self.write_str(&message).await
45}
46
47#[admin_command]
48pub(super) async fn get_auth_chain(&self, event_id: OwnedEventId) -> Result {
49 let Ok(event) = self
50 .services
51 .timeline
52 .get_pdu_json(&event_id)
53 .await
54 else {
55 return Err!("Event not found.");
56 };
57
58 let room_id_str = event
59 .get("room_id")
60 .and_then(CanonicalJsonValue::as_str)
61 .ok_or_else(|| err!(Database("Invalid event in database")))?;
62
63 let room_id = <&RoomId>::try_from(room_id_str)
64 .map_err(|_| err!(Database("Invalid room id field in event in database")))?;
65
66 let room_version = self
67 .services
68 .state
69 .get_room_version(room_id)
70 .await?;
71
72 let start = Instant::now();
73 let count = self
74 .services
75 .auth_chain
76 .event_ids_iter(room_id, &room_version, once(event_id.as_ref()))
77 .ready_filter_map(Result::ok)
78 .count()
79 .await;
80
81 let elapsed = start.elapsed();
82 let out = format!("Loaded auth chain with length {count} in {elapsed:?}");
83
84 self.write_str(&out).await
85}
86
87#[admin_command]
88pub(super) async fn parse_pdu(&self) -> Result {
89 if self.body.len() < 2
90 || !self.body[0].trim().starts_with("```")
91 || self.body.last().unwrap_or(&EMPTY).trim() != "```"
92 {
93 return Err!("Expected code block in command body. Add --help for details.");
94 }
95
96 let string = self.body[1..self.body.len().saturating_sub(1)].join("\n");
97 let rules = RoomVersionId::V6
98 .rules()
99 .expect("rules for V6 rooms");
100
101 let value =
102 serde_json::from_str(&string).map_err(|e| err!("Invalid json in command body: {e}"))?;
103
104 let hash = ruma::signatures::reference_hash(&value, &rules)
105 .map_err(|e| err!("Could not parse PDU JSON: {e:?}"))?;
106
107 let event_id = OwnedEventId::parse(format!("${hash}"));
108
109 let value = serde_json::to_value(value)?;
110
111 match serde_json::from_value::<PduEvent>(value) {
112 | Err(e) => return Err!("EventId: {event_id:?}\nCould not parse event: {e}"),
113 | Ok(pdu) => write!(self, "EventId: {event_id:?}\n{pdu:#?}"),
114 }
115 .await
116}
117
118#[admin_command]
119pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result {
120 let mut outlier = false;
121 let mut pdu_json = self
122 .services
123 .timeline
124 .get_non_outlier_pdu_json(&event_id)
125 .await;
126
127 if pdu_json.is_err() {
128 outlier = true;
129 pdu_json = self
130 .services
131 .timeline
132 .get_pdu_json(&event_id)
133 .await;
134 }
135
136 let json = pdu_json.map_err(|_| err!("PDU not found locally."))?;
137
138 let text = serde_json::to_string_pretty(&json)?;
139 let msg = if outlier {
140 "Outlier (Rejected / Soft Failed) PDU found in our database"
141 } else {
142 "PDU found in our database"
143 };
144
145 self.write_str(&format!("{msg}\n```json\n{text}\n```"))
146 .await
147}
148
149#[admin_command]
150pub(super) async fn get_short_pdu(&self, shortroomid: ShortRoomId, count: i64) -> Result {
151 let pdu_id: RawPduId = PduId { shortroomid, count: count.into() }.into();
152
153 let pdu_json = self
154 .services
155 .timeline
156 .get_pdu_json_from_id(&pdu_id)
157 .await;
158
159 let json = pdu_json.map_err(|_| err!("PDU not found locally."))?;
160
161 let json_text = serde_json::to_string_pretty(&json)?;
162
163 self.write_str(&format!("```json\n{json_text}\n```"))
164 .await
165}
166
167#[admin_command]
168pub(super) async fn get_remote_pdu_list(&self, server: OwnedServerName, force: bool) -> Result {
169 if !self.services.server.config.allow_federation {
170 return Err!("Federation is disabled on this homeserver.",);
171 }
172
173 if server == self.services.globals.server_name() {
174 return Err!(
175 "Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
176 fetching local PDUs from the database.",
177 );
178 }
179
180 if self.body.len() < 2
181 || !self.body[0].trim().starts_with("```")
182 || self.body.last().unwrap_or(&EMPTY).trim() != "```"
183 {
184 return Err!("Expected code block in command body. Add --help for details.",);
185 }
186
187 let list = self
188 .body
189 .iter()
190 .collect::<Vec<_>>()
191 .drain(1..self.body.len().saturating_sub(1))
192 .filter_map(|pdu| EventId::parse(pdu).ok())
193 .collect::<Vec<_>>();
194
195 let mut failed_count: usize = 0;
196 let mut success_count: usize = 0;
197
198 for event_id in list {
199 let result = self
200 .get_remote_pdu(event_id, server.clone())
201 .await;
202
203 if !force {
204 result?;
205 } else if let Err(e) = result {
206 warn!("Failed to get remote PDU, ignoring error: {e}");
207 failed_count = failed_count.saturating_add(1);
208 continue;
209 }
210
211 success_count = success_count.saturating_add(1);
212 }
213
214 let out =
215 format!("Fetched {success_count} remote PDUs successfully with {failed_count} failures");
216
217 self.write_str(&out).await
218}
219
220#[admin_command]
221pub(super) async fn get_remote_pdu(
222 &self,
223 event_id: OwnedEventId,
224 server: OwnedServerName,
225) -> Result {
226 if !self.services.server.config.allow_federation {
227 return Err!("Federation is disabled on this homeserver.");
228 }
229
230 if server == self.services.globals.server_name() {
231 return Err!(
232 "Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
233 fetching local PDUs.",
234 );
235 }
236
237 let response = self
238 .services
239 .federation
240 .execute(&server, ruma::api::federation::event::get_event::v1::Request {
241 event_id: event_id.clone(),
242 })
243 .await
244 .map_err(|e| {
245 err!("Remote server did not have PDU or failed sending request to remote server: {e}")
246 })?;
247
248 let json: CanonicalJsonObject = serde_json::from_str(response.pdu.get()).map_err(|e| {
249 warn!(
250 "Requested event ID {event_id} from server but failed to convert from RawValue to \
251 CanonicalJsonObject (malformed event/response?): {e}"
252 );
253 err!(Request(Unknown("Received response from server but failed to parse PDU")))
254 })?;
255
256 trace!("Attempting to parse PDU: {:?}", &response.pdu);
257 let (room_id, ..) = self
258 .services
259 .event_handler
260 .parse_incoming_pdu(&response.pdu)
261 .boxed()
262 .await
263 .map_err(|e| {
264 warn!("Failed to parse PDU: {e}");
265 info!("Full PDU: {:?}", &response.pdu);
266 err!("Failed to parse PDU remote server {server} sent us: {e}")
267 })?;
268
269 info!("Attempting to handle event ID {event_id} as backfilled PDU");
270 self.services
271 .timeline
272 .backfill_pdu(&room_id, &server, response.pdu)
273 .await?;
274
275 let text = serde_json::to_string_pretty(&json)?;
276 let msg = "Got PDU from specified server and handled as backfilled";
277 self.write_str(&format!("{msg}. Event body:\n```json\n{text}\n```"))
278 .await
279}
280
281#[admin_command]
282pub(super) async fn get_room_state(
283 &self,
284 room: OwnedRoomOrAliasId,
285 kind: Option<String>,
286 state_key: Option<String>,
287) -> Result {
288 let room_id = self.services.alias.maybe_resolve(&room).await?;
289
290 if state_key.is_none()
291 && let Some(kind) = kind.clone().map(Into::into)
292 {
293 return self
294 .services
295 .state_accessor
296 .room_state_type_pdus(&room_id, &kind)
297 .map_ok(Event::into_format)
298 .ready_and_then(|event: Raw<AnyStateEvent>| {
299 serde_json::to_value(&event).map_err(Error::from)
300 })
301 .ready_and_then(|event| serde_json::to_string_pretty(&event).map_err(Error::from))
302 .try_for_each(|json| self.write_string(format!("```json\n{json}\n```\n")))
303 .await;
304 }
305
306 if let Some(state_key) = state_key
307 && let Some(kind) = kind.map(Into::into)
308 {
309 let event: Raw<AnyStateEvent> = self
310 .services
311 .state_accessor
312 .room_state_get(&room_id, &kind, &state_key)
313 .await?
314 .into_format();
315
316 let value = serde_json::to_value(&event)?;
317 let json = serde_json::to_string_pretty(&value)?;
318 return self
319 .write_string(format!("```json\n{json}\n```\n"))
320 .await;
321 }
322
323 self.services
324 .state_accessor
325 .room_state_full_pdus(&room_id)
326 .map_ok(Event::into_format)
327 .ready_and_then(|event: Raw<AnyStateEvent>| {
328 serde_json::to_value(&event).map_err(Error::from)
329 })
330 .ready_and_then(|event| serde_json::to_string_pretty(&event).map_err(Error::from))
331 .try_for_each(|json| self.write_string(format!("```json\n{json}\n```\n")))
332 .await
333}
334
335#[admin_command]
336pub(super) async fn ping(&self, server: OwnedServerName) -> Result {
337 let timer = tokio::time::Instant::now();
338
339 let response = self
340 .services
341 .federation
342 .execute(&server, get_server_version::v1::Request {})
343 .await
344 .map_err(|e| err!("Failed sending federation request to specified server:\n\n{e}"))?;
345
346 let ping_time = timer.elapsed();
347
348 let out = if let Ok(json) = serde_json::to_string_pretty(&response.server) {
349 format!("Got response which took {ping_time:?} time:\n```json\n{json}\n```")
350 } else {
351 format!("Got non-JSON response which took {ping_time:?} time:\n{response:?}")
352 };
353
354 self.write_str(&out).await
355}
356
357#[admin_command]
358pub(super) async fn force_device_list_updates(&self) -> Result {
359 self.services
361 .users
362 .stream()
363 .for_each(|user_id| {
364 self.services
365 .users
366 .mark_device_key_update(user_id)
367 })
368 .await;
369
370 write!(self, "Marked all devices for all users as having new keys to update").await
371}
372
373#[admin_command]
374pub(super) async fn change_log_level(&self, filter: Option<String>, reset: bool) -> Result {
375 let handles = &["console"];
376
377 let filter = reset
378 .then_some(&self.services.config.log)
379 .or(filter.as_ref())
380 .ok_or_else(|| err!("No log level was specified."))?;
381
382 let filter_layer = EnvFilter::try_new(filter).map_err(|e| {
383 let source = if !reset { "specified" } else { "found in config" };
384 err!("Invalid log level filter {source}: {e}")
385 })?;
386
387 self.services
388 .server
389 .log
390 .reload
391 .reload(&filter_layer, Some(handles))
392 .map_err(|e| err!("Failed to modify and reload the global tracing log level: {e}"))?;
393
394 self.write_str(&format!("Successfully changed log level to {filter}"))
395 .await
396}
397
398#[admin_command]
399pub(super) async fn sign_json(&self) -> Result {
400 if self.body.len() < 2
401 || !self.body[0].trim().starts_with("```")
402 || self.body.last().unwrap_or(&"").trim() != "```"
403 {
404 return Err!("Expected code block in command body. Add --help for details.");
405 }
406
407 let string = self.body[1..self.body.len().expected_sub(1)].join("\n");
408 let mut value = serde_json::from_str(&string).map_err(|e| err!("Invalid json: {e}"))?;
409
410 self.services.server_keys.sign_json(&mut value)?;
411
412 let json_text = serde_json::to_string_pretty(&value)?;
413 self.write_str(&json_text).await
414}
415
416#[admin_command]
417pub(super) async fn verify_json(&self) -> Result {
418 if self.body.len() < 2
419 || !self.body[0].trim().starts_with("```")
420 || self.body.last().unwrap_or(&"").trim() != "```"
421 {
422 return Err!("Expected code block in command body. Add --help for details.");
423 }
424
425 let string = self.body[1..self.body.len().expected_sub(1)].join("\n");
426
427 let value = serde_json::from_str::<CanonicalJsonObject>(&string)
428 .map_err(|e| err!("Invalid json: {e}"))?;
429
430 self.services
431 .server_keys
432 .verify_json(&value, None)
433 .await
434 .map_err(|e| err!("Signature verification failed: {e}"))?;
435
436 self.write_str("Signature correct").await
437}
438
439#[admin_command]
440pub(super) async fn verify_pdu(&self, event_id: OwnedEventId) -> Result {
441 use ruma::signatures::Verified;
442
443 let mut event = self
444 .services
445 .timeline
446 .get_pdu_json(&event_id)
447 .await?;
448
449 event.remove("event_id");
450 let msg = match self
451 .services
452 .server_keys
453 .verify_event(&event, None)
454 .await?
455 {
456 | Verified::Signatures => "signatures OK, but content hash failed (redaction).",
457 | Verified::All => "signatures and hashes OK.",
458 };
459
460 self.write_str(msg).await
461}
462
463#[admin_command]
464#[tracing::instrument(skip(self))]
465pub(super) async fn first_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
466 if !self
467 .services
468 .state_cache
469 .server_in_room(&self.services.server.name, &room_id)
470 .await
471 {
472 return Err!("We are not participating in the room / we don't know about the room ID.",);
473 }
474
475 let first_pdu = self
476 .services
477 .timeline
478 .first_pdu_in_room(&room_id)
479 .await
480 .map_err(|_| err!(Database("Failed to find the first PDU in database")))?;
481
482 let out = format!("{first_pdu:?}");
483 self.write_str(&out).await
484}
485
486#[admin_command]
487#[tracing::instrument(skip(self))]
488pub(super) async fn latest_pdu_in_room(&self, room_id: OwnedRoomId) -> Result {
489 if !self
490 .services
491 .state_cache
492 .server_in_room(&self.services.server.name, &room_id)
493 .await
494 {
495 return Err!("We are not participating in the room / we don't know about the room ID.");
496 }
497
498 let latest_pdu = self
499 .services
500 .timeline
501 .latest_pdu_in_room(&room_id)
502 .await
503 .map_err(|_| err!(Database("Failed to find the latest PDU in database")))?;
504
505 let out = format!("{latest_pdu:?}");
506 self.write_str(&out).await
507}
508
509#[admin_command]
510#[tracing::instrument(skip(self))]
511pub(super) async fn force_set_room_state_from_server(
512 &self,
513 room_id: OwnedRoomId,
514 server_name: OwnedServerName,
515) -> Result {
516 if !self
519 .services
520 .state_cache
521 .server_in_room(&self.services.server.name, &room_id)
522 .await
523 {
524 return Err!("We are not participating in the room / we don't know about the room ID.");
525 }
526
527 let first_pdu = self
528 .services
529 .timeline
530 .latest_pdu_in_room(&room_id)
531 .await
532 .map_err(|_| err!(Database("Failed to find the latest PDU in database")))?;
533
534 let room_version = self
535 .services
536 .state
537 .get_room_version(&room_id)
538 .await?;
539
540 let mut state: HashMap<u64, OwnedEventId> = HashMap::new();
541
542 let remote_state_response = self
543 .services
544 .federation
545 .execute(&server_name, get_room_state::v1::Request {
546 room_id: room_id.clone(),
547 event_id: first_pdu.event_id().to_owned(),
548 })
549 .await?;
550
551 for pdu in remote_state_response.pdus.clone() {
552 match self
553 .services
554 .event_handler
555 .parse_incoming_pdu(&pdu)
556 .await
557 {
558 | Ok(t) => t,
559 | Err(e) => {
560 warn!("Could not parse PDU, ignoring: {e}");
561 continue;
562 },
563 };
564 }
565
566 info!("Going through room_state response PDUs");
567 for result in remote_state_response.pdus.iter().map(|pdu| {
568 self.services
569 .server_keys
570 .validate_and_add_event_id(pdu, &room_version)
571 }) {
572 let Ok((event_id, mut value)) = result.await else {
573 continue;
574 };
575
576 let invalid_pdu_err = |e| {
577 debug_error!("Invalid PDU in fetching remote room state PDUs response: {value:#?}");
578 err!(BadServerResponse(debug_error!("Invalid PDU in send_join response: {e:?}")))
579 };
580
581 let pdu = if value["type"] == "m.room.create" {
582 PduEvent::from_object_and_roomid_and_eventid(&room_id, &event_id, value.clone())
583 .map_err(invalid_pdu_err)?
584 } else {
585 PduEvent::from_object_and_eventid(&event_id, value.clone())
586 .map_err(invalid_pdu_err)?
587 };
588
589 if !value.contains_key("room_id") {
590 let room_id = CanonicalJsonValue::String(room_id.as_str().into());
591 value.insert("room_id".into(), room_id);
592 }
593
594 self.services
595 .timeline
596 .add_pdu_outlier(&event_id, &value);
597
598 if let Some(state_key) = &pdu.state_key {
599 let shortstatekey = self
600 .services
601 .short
602 .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
603 .await;
604
605 state.insert(shortstatekey, pdu.event_id.clone());
606 }
607 }
608
609 info!("Going through auth_chain response");
610 for result in remote_state_response
611 .auth_chain
612 .iter()
613 .map(|pdu| {
614 self.services
615 .server_keys
616 .validate_and_add_event_id(pdu, &room_version)
617 }) {
618 let Ok((event_id, value)) = result.await else {
619 continue;
620 };
621
622 self.services
623 .timeline
624 .add_pdu_outlier(&event_id, &value);
625 }
626
627 let new_room_state = self
628 .services
629 .event_handler
630 .resolve_state(&room_id, &room_version, state)
631 .await?;
632
633 info!("Forcing new room state");
634 let HashSetCompressStateEvent {
635 shortstatehash: short_state_hash,
636 added,
637 removed,
638 } = self
639 .services
640 .state_compressor
641 .save_state(room_id.clone().as_ref(), new_room_state)
642 .await?;
643
644 let state_lock = self.services.state.mutex.lock(&*room_id).await;
645
646 self.services
647 .state
648 .force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock)
649 .await?;
650
651 info!(
652 "Updating joined counts for room just in case (e.g. we may have found a difference in \
653 the room's m.room.member state"
654 );
655 self.services
656 .state_cache
657 .update_joined_count(&room_id)
658 .await;
659
660 self.write_str("Successfully forced the room state from the requested remote server.")
661 .await
662}
663
664#[admin_command]
665pub(super) async fn get_signing_keys(
666 &self,
667 server_name: Option<OwnedServerName>,
668 notary: Option<OwnedServerName>,
669 query: bool,
670) -> Result {
671 let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone());
672
673 if let Some(notary) = notary {
674 let signing_keys = self
675 .services
676 .server_keys
677 .notary_request(¬ary, &server_name)
678 .await?;
679
680 let out = format!("```rs\n{signing_keys:#?}\n```");
681 return self.write_str(&out).await;
682 }
683
684 let signing_keys = if query {
685 self.services
686 .server_keys
687 .server_request(&server_name)
688 .await?
689 } else {
690 self.services
691 .server_keys
692 .signing_keys_for(&server_name)
693 .await?
694 };
695
696 let out = format!("```rs\n{signing_keys:#?}\n```");
697 self.write_str(&out).await
698}
699
700#[admin_command]
701pub(super) async fn get_verify_keys(&self, server_name: Option<OwnedServerName>) -> Result {
702 let server_name = server_name.unwrap_or_else(|| self.services.server.name.clone());
703
704 let keys = self
705 .services
706 .server_keys
707 .verify_keys_for(&server_name)
708 .await;
709
710 let mut out = String::new();
711 writeln!(out, "| Key ID | Public Key |")?;
712 writeln!(out, "| --- | --- |")?;
713 for (key_id, key) in keys {
714 writeln!(out, "| {key_id} | {key:?} |")?;
715 }
716
717 self.write_str(&out).await
718}
719
720#[admin_command]
721pub(super) async fn resolve_true_destination(
722 &self,
723 server_name: OwnedServerName,
724 no_cache: bool,
725) -> Result {
726 if !self.services.server.config.allow_federation {
727 return Err!("Federation is disabled on this homeserver.",);
728 }
729
730 if server_name == self.services.server.name {
731 return Err!(
732 "Not allowed to send federation requests to ourselves. Please use `get-pdu` for \
733 fetching local PDUs.",
734 );
735 }
736
737 let actual = self
738 .services
739 .resolver
740 .resolve_actual_dest(&server_name, !no_cache)
741 .await?;
742
743 let msg = format!("Destination: {}\nHostname URI: {}", actual.dest, actual.host);
744 self.write_str(&msg).await
745}
746
747#[admin_command]
748pub(super) async fn memory_stats(&self, opts: Option<String>) -> Result {
749 const OPTS: &str = "abcdefghijklmnopqrstuvwxyz";
750
751 let opts: String = OPTS
752 .chars()
753 .filter(|&c| {
754 let allow_any = opts.as_ref().is_some_and(|opts| opts == "*");
755
756 let allow = allow_any || opts.as_ref().is_some_and(|opts| opts.contains(c));
757
758 !allow
759 })
760 .collect();
761
762 let stats = tuwunel_core::alloc::memory_stats(&opts).unwrap_or_default();
763
764 self.write_str("```\n").await?;
765 self.write_str(&stats).await?;
766 self.write_str("\n```").await?;
767 Ok(())
768}
769
770#[cfg(tokio_unstable)]
771#[admin_command]
772pub(super) async fn runtime_metrics(&self) -> Result {
773 let out = self
774 .services
775 .server
776 .metrics
777 .runtime_metrics()
778 .map_or_else(
779 || "Runtime metrics are not available.".to_owned(),
780 |metrics| {
781 format!(
782 "```rs\nnum_workers: {}\nnum_alive_tasks: {}\nglobal_queue_depth: {}\n```",
783 metrics.num_workers(),
784 metrics.num_alive_tasks(),
785 metrics.global_queue_depth()
786 )
787 },
788 );
789
790 self.write_str(&out).await
791}
792
793#[cfg(not(tokio_unstable))]
794#[admin_command]
795pub(super) async fn runtime_metrics(&self) -> Result {
796 self.write_str("Runtime metrics require building with `tokio_unstable`.")
797 .await
798}
799
800#[cfg(tokio_unstable)]
801#[admin_command]
802pub(super) async fn runtime_interval(&self) -> Result {
803 let out = self
804 .services
805 .server
806 .metrics
807 .runtime_interval()
808 .map_or_else(
809 || "Runtime metrics are not available.".to_owned(),
810 |metrics| format!("```rs\n{metrics:#?}\n```"),
811 );
812
813 self.write_str(&out).await
814}
815
816#[cfg(not(tokio_unstable))]
817#[admin_command]
818pub(super) async fn runtime_interval(&self) -> Result {
819 self.write_str("Runtime metrics require building with `tokio_unstable`.")
820 .await
821}
822
823#[admin_command]
824pub(super) async fn task_metrics(&self) -> Result {
825 let out = self
826 .services
827 .server
828 .metrics
829 .task_metrics()
830 .map(TaskMonitor::cumulative)
831 .map_or_else(
832 || "Task metrics are not available.".to_owned(),
833 |metrics| format!("```rs\n{metrics:#?}\n```"),
834 );
835
836 self.write_str(&out).await
837}
838
839#[admin_command]
840pub(super) async fn task_interval(&self) -> Result {
841 let out = self
842 .services
843 .server
844 .metrics
845 .task_interval()
846 .map_or_else(
847 || "Task metrics are not available.".to_owned(),
848 |metrics| format!("```rs\n{metrics:#?}\n```"),
849 );
850
851 self.write_str(&out).await
852}
853
854#[admin_command]
855pub(super) async fn time(&self) -> Result {
856 let now = SystemTime::now();
857 let now = utils::time::format(now, "%+");
858
859 self.write_str(&now).await
860}
861
862#[admin_command]
863pub(super) async fn list_dependencies(&self, names: bool) -> Result {
864 if names {
865 let out = info::cargo::dependencies_names().join(" ");
866 return self.write_str(&out).await;
867 }
868
869 let mut out = String::new();
870 let deps = info::cargo::dependencies();
871 writeln!(out, "| name | version | features |")?;
872 writeln!(out, "| ---- | ------- | -------- |")?;
873 for (name, dep) in deps {
874 let version = dep.try_req().unwrap_or("*");
875 let feats = dep.req_features();
876 let feats = if !feats.is_empty() {
877 feats.join(" ")
878 } else {
879 String::new()
880 };
881
882 writeln!(out, "| {name} | {version} | {feats} |")?;
883 }
884
885 self.write_str(&out).await
886}
887
888#[admin_command]
889pub(super) async fn database_stats(
890 &self,
891 property: Option<String>,
892 map: Option<String>,
893) -> Result {
894 let map_name = map.as_ref().map_or(EMPTY, String::as_str);
895 let property = property.unwrap_or_else(|| "rocksdb.stats".to_owned());
896 self.services
897 .db
898 .iter()
899 .filter(|&(&name, _)| map_name.is_empty() || map_name == name)
900 .try_stream()
901 .try_for_each(|(&name, map)| {
902 let res = map.property(&property).expect("invalid property");
903 writeln!(self, "##### {name}:\n```\n{}\n```", res.trim())
904 })
905 .await
906}
907
908#[admin_command]
909pub(super) async fn database_files(&self, map: Option<String>, level: Option<i32>) -> Result {
910 let mut files: Vec<_> = self
911 .services
912 .db
913 .engine
914 .file_list()
915 .collect::<Result<_>>()?;
916
917 files.sort_by_key(|f| f.name.clone());
918
919 writeln!(self, "| lev | sst | keys | dels | size | column |").await?;
920 writeln!(self, "| ---: | :--- | ---: | ---: | ---: | :--- |").await?;
921 files
922 .into_iter()
923 .filter(|file| {
924 map.as_deref()
925 .is_none_or(|map| map == file.column_family_name)
926 })
927 .filter(|file| {
928 level
929 .as_ref()
930 .is_none_or(|&level| level == file.level)
931 })
932 .try_stream()
933 .try_for_each(|file| {
934 writeln!(
935 self,
936 "| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |",
937 file.level,
938 file.name,
939 file.num_entries,
940 file.num_deletions,
941 file.size,
942 file.column_family_name,
943 )
944 })
945 .await
946}
947
948#[admin_command]
949pub(super) async fn trim_memory(&self) -> Result {
950 tuwunel_core::alloc::trim(None)?;
951
952 writeln!(self, "done").await
953}
954
955#[admin_command]
956pub(super) async fn create_jwt(
957 &self,
958 user: String,
959 exp_from_now: Option<u64>,
960 nbf_from_now: Option<u64>,
961 issuer: Option<String>,
962 audience: Option<String>,
963) -> Result {
964 use jwt::{Algorithm, EncodingKey, Header, encode};
965
966 #[derive(Serialize)]
967 struct Claim {
968 sub: String,
969 iss: Option<String>,
970 aud: Option<String>,
971 exp: Option<usize>,
972 nbf: Option<usize>,
973 }
974
975 let config = &self.services.config.jwt;
976 if config.format.as_str() != "HMAC" {
977 return Err!("This command only supports HMAC key format, not {}.", config.format);
978 }
979
980 let key = EncodingKey::from_secret(config.key.as_ref());
981 let alg = Algorithm::from_str(config.algorithm.as_str()).map_err(|e| {
982 err!(Config("jwt.algorithm", "JWT algorithm is not recognized or configured {e}"))
983 })?;
984
985 let header = Header { alg, ..Default::default() };
986 let claim = Claim {
987 sub: user,
988
989 iss: issuer,
990
991 aud: audience,
992
993 exp: exp_from_now
994 .and_then(|val| now_secs().checked_add(val))
995 .map(TryInto::try_into)
996 .and_then(Result::ok),
997
998 nbf: nbf_from_now
999 .and_then(|val| now_secs().checked_add(val))
1000 .map(TryInto::try_into)
1001 .and_then(Result::ok),
1002 };
1003
1004 encode(&header, &claim, &key)
1005 .map_err(|e| err!("Failed to encode JWT: {e}"))
1006 .map(async |token| self.write_str(&token).await)?
1007 .await
1008}
1009
1010#[admin_command]
1011pub(super) async fn resync_database(&self) -> Result {
1012 if !self.services.db.is_secondary() {
1013 return Err!("Not a secondary instance.");
1014 }
1015
1016 self.services
1017 .db
1018 .engine
1019 .update()
1020 .map_err(|e| err!("Failed to update from primary: {e:?}"))
1021}
1022
1023#[admin_command]
1024pub(super) async fn get_retained_pdu(&self, event_id: OwnedEventId) -> Result {
1025 let pdu = self
1026 .services
1027 .retention
1028 .get_original_pdu_json(&event_id)
1029 .await?;
1030
1031 let text = serde_json::to_string_pretty(&pdu)?;
1032
1033 self.write_str(&format!("Original PDU:\n```json\n{text}\n```"))
1034 .await?;
1035
1036 Ok(())
1037}
1038
1039#[admin_command]
1040pub(super) async fn dump_pdus(&self, dir: String) -> Result {
1041 use tokio::fs;
1042
1043 let dir_path = Path::new(&dir);
1044 fs::create_dir_all(dir_path).await?;
1045
1046 let normal = dumper(dir_path, "normal", self.services.timeline.pdus_raw());
1047 let outlier = dumper(dir_path, "outliers", self.services.timeline.outlier_pdus_raw());
1048 let retained = dumper(dir_path, "retaineds", self.services.retention.retained_pdus_raw());
1049 try_join3(normal, outlier, retained).await?;
1050
1051 Ok(())
1052}
1053
1054async fn dumper<'a, S>(dir: &Path, name: &str, stream: S) -> Result
1055where
1056 S: Stream<Item = Result<&'a [u8]>> + Send,
1057{
1058 use tokio::fs::OpenOptions;
1059
1060 let mut fopts = OpenOptions::new();
1061 fopts.write(true);
1062 fopts.create(true);
1063 fopts.truncate(true);
1064
1065 let path = dir.join(name);
1066 let file = fopts.open(path).await?;
1067 stream
1068 .try_fold(file, async |mut file, data| {
1069 file.write_all(data).await?;
1070
1071 Ok(file)
1072 })
1073 .and_then(async |mut file| Ok(file.shutdown().await?))
1074 .await
1075}