Skip to main content

tuwunel_service/admin/
processor.rs

1use std::{
2	fmt::Write,
3	mem::take,
4	panic::AssertUnwindSafe,
5	sync::{Arc, Mutex},
6	time::SystemTime,
7};
8
9use futures::{AsyncWriteExt, future::FutureExt, io::BufWriter};
10use ruma::{
11	EventId,
12	events::{
13		relation::{InReplyTo, Reply as ReplyRelation},
14		room::message::{Relation::Reply, RoomMessageEventContent},
15	},
16};
17use tracing::Level;
18use tracing_subscriber::{EnvFilter, filter::LevelFilter};
19use tuwunel_core::{
20	Error, Result, debug, error,
21	log::{
22		capture,
23		capture::Capture,
24		fmt::{markdown_table, markdown_table_head},
25	},
26	trace,
27	utils::string::{collect_stream, common_prefix},
28	warn,
29};
30
31use super::{Command, CommandInput, CommandOutput, Context, ProcessorResult};
32use crate::Services;
33
34#[tracing::instrument(level = "debug", skip_all, name = "admin")]
35pub(super) async fn handle_command(
36	command: Arc<dyn Command>,
37	services: Arc<Services>,
38	input: CommandInput,
39) -> ProcessorResult {
40	AssertUnwindSafe(Box::pin(process_command(&*command, services, &input)))
41		.catch_unwind()
42		.await
43		.map_err(Error::from_panic)
44		.unwrap_or_else(|error| handle_panic(&error, &input))
45}
46
47#[must_use]
48pub(super) fn complete(mut cmd: clap::Command, line: &str) -> String {
49	let argv = parse_line(line);
50	let mut ret = Vec::<String>::with_capacity(argv.len().saturating_add(1));
51
52	'token: for token in argv.into_iter().skip(1) {
53		let cmd_ = cmd.clone();
54		let mut choice = Vec::new();
55
56		for sub in cmd_.get_subcommands() {
57			let name = sub.get_name();
58			if *name == token {
59				// token already complete; recurse to subcommand
60				ret.push(token);
61				cmd.clone_from(sub);
62				continue 'token;
63			} else if name.starts_with(&token) {
64				// partial match; add to choices
65				choice.push(name);
66			}
67		}
68
69		if choice.len() == 1 {
70			// One choice. Add extra space because it's complete
71			let choice = *choice.first().expect("only choice");
72			ret.push(choice.to_owned());
73			ret.push(String::new());
74		} else if choice.is_empty() {
75			// Nothing found, return original string
76			ret.push(token);
77		} else {
78			// Find the common prefix
79			ret.push(common_prefix(&choice).into());
80		}
81
82		// Return from completion
83		return ret.join(" ");
84	}
85
86	// Return from no completion. Needs a space though.
87	ret.push(String::new());
88	ret.join(" ")
89}
90
91async fn process_command(
92	command: &dyn Command,
93	services: Arc<Services>,
94	input: &CommandInput,
95) -> ProcessorResult {
96	let (matches, args, body) = match parse(&services, command.clap(), input) {
97		| Err(error) => return Err(Box::new(error)),
98		| Ok(parsed) => parsed,
99	};
100
101	let context = Context {
102		services: &services,
103		body: &body,
104		timer: SystemTime::now(),
105		reply_id: input.reply_id.as_deref(),
106		output: BufWriter::new(Vec::new()).into(),
107	};
108
109	let (result, mut logs) = process(&context, command, matches, &args).await;
110
111	let output = &mut context.output.lock().await;
112	output
113		.flush()
114		.await
115		.expect("final flush of output stream");
116
117	let output =
118		String::from_utf8(take(output.get_mut())).expect("invalid utf8 in command output stream");
119
120	match result {
121		| Ok(()) if logs.is_empty() =>
122			Ok(Some(reply(RoomMessageEventContent::notice_markdown(output), context.reply_id))),
123
124		| Ok(()) => {
125			logs.write_str(output.as_str())
126				.expect("output buffer");
127			Ok(Some(reply(RoomMessageEventContent::notice_markdown(logs), context.reply_id)))
128		},
129		| Err(error) => {
130			write!(&mut logs, "Command failed with error:\n```\n{error:#?}\n```")
131				.expect("output buffer");
132
133			Err(Box::new(reply(
134				RoomMessageEventContent::notice_markdown(logs),
135				context.reply_id,
136			)))
137		},
138	}
139}
140
141fn handle_panic(error: &Error, input: &CommandInput) -> ProcessorResult {
142	let link =
143		"Please submit a [bug report](https://github.com/matrix-construct/tuwunel/issues/new). \
144		 🥺";
145
146	let msg = format!("Panic occurred while processing command:\n```\n{error:#?}\n```\n{link}");
147	let content = RoomMessageEventContent::notice_markdown(msg);
148
149	error!("Panic while processing command: {error:?}");
150	Err(Box::new(reply(content, input.reply_id.as_deref())))
151}
152
153async fn process(
154	context: &Context<'_>,
155	command: &dyn Command,
156	matches: clap::ArgMatches,
157	args: &[String],
158) -> (Result, String) {
159	let (capture, logs) = capture_create(context);
160
161	let capture_scope = capture.start();
162	let result = Box::pin(command.dispatch(matches, context)).await;
163	drop(capture_scope);
164
165	debug!(
166		ok = result.is_ok(),
167		elapsed = ?context.timer.elapsed(),
168		command = ?args,
169		"command processed"
170	);
171
172	let mut output = String::new();
173
174	let logs = logs.lock().expect("locked");
175	if logs.lines().count() > 2 {
176		writeln!(&mut output, "{logs}").expect("failed to format logs to command output");
177	}
178	drop(logs);
179
180	(result, output)
181}
182
183fn capture_create(context: &Context<'_>) -> (Arc<Capture>, Arc<Mutex<String>>) {
184	let env_config = &context.services.server.config.admin_log_capture;
185	let env_filter = EnvFilter::try_new(env_config).unwrap_or_else(|e| {
186		warn!("admin_log_capture filter invalid: {e:?}");
187		cfg!(debug_assertions)
188			.then_some("debug")
189			.or(Some("info"))
190			.map(Into::into)
191			.expect("default capture EnvFilter")
192	});
193
194	let log_level = env_filter
195		.max_level_hint()
196		.and_then(LevelFilter::into_level)
197		.unwrap_or(Level::DEBUG);
198
199	let filter = move |data: capture::Data<'_>| {
200		data.level() <= log_level && data.our_modules() && data.scope.contains(&"admin")
201	};
202
203	let logs = Arc::new(Mutex::new(
204		collect_stream(|s| markdown_table_head(s)).expect("markdown table header"),
205	));
206
207	let capture = Capture::new(
208		&context.services.server.log.capture,
209		Some(filter),
210		capture::fmt(markdown_table, logs.clone()),
211	);
212
213	(capture, logs)
214}
215
216#[expect(clippy::result_large_err)]
217fn parse<'a>(
218	services: &Arc<Services>,
219	cmd: clap::Command,
220	input: &'a CommandInput,
221) -> Result<(clap::ArgMatches, Vec<String>, Vec<&'a str>), CommandOutput> {
222	let lines = input
223		.command
224		.lines()
225		.filter(|line| !line.trim().is_empty());
226
227	let command_line = lines
228		.clone()
229		.next()
230		.expect("command missing first line");
231
232	let body = lines.skip(1).collect();
233
234	match parse_command(cmd, command_line) {
235		| Ok((matches, args)) => Ok((matches, args, body)),
236		| Err(error) => {
237			let message = error
238				.to_string()
239				.replace("server.name", services.globals.server_name().as_str());
240
241			Err(reply(RoomMessageEventContent::notice_plain(message), input.reply_id.as_deref()))
242		},
243	}
244}
245
246fn parse_command(
247	mut cmd: clap::Command,
248	line: &str,
249) -> Result<(clap::ArgMatches, Vec<String>), clap::Error> {
250	let argv = parse_line(line);
251	let matches = cmd.try_get_matches_from_mut(&argv)?;
252
253	Ok((matches, argv))
254}
255
256fn parse_line(command_line: &str) -> Vec<String> {
257	let mut argv = command_line
258		.split_whitespace()
259		.map(str::to_owned)
260		.collect::<Vec<String>>();
261
262	// Remove any escapes that came with a server-side escape command
263	if !argv.is_empty() && argv[0].ends_with("admin") {
264		argv[0] = argv[0].trim_start_matches('\\').into();
265	}
266
267	// First indice has to be "admin" but for console convenience we add it here
268	if !argv.is_empty() && !argv[0].ends_with("admin") && !argv[0].starts_with('@') {
269		argv.insert(0, "admin".to_owned());
270	}
271
272	// Replace `help command` with `command --help`
273	// Clap has a help subcommand, but it omits the long help description.
274	if argv.len() > 1 && argv[1] == "help" {
275		argv.remove(1);
276		argv.push("--help".to_owned());
277	}
278
279	// Backwards compatibility with `register_appservice`-style commands
280	if argv.len() > 1 && argv[1].contains('_') {
281		argv[1] = argv[1].replace('_', "-");
282	}
283
284	// Backwards compatibility with `register_appservice`-style commands
285	if argv.len() > 2 && argv[2].contains('_') {
286		argv[2] = argv[2].replace('_', "-");
287	}
288
289	// if the user is using the `query` command (argv[1]), replace the database
290	// function/table calls with underscores to match the codebase
291	if argv.len() > 3 && argv[1].eq("query") {
292		argv[3] = argv[3].replace('_', "-");
293	}
294
295	trace!(?command_line, ?argv, "parse");
296	argv
297}
298
299fn reply(
300	mut content: RoomMessageEventContent,
301	reply_id: Option<&EventId>,
302) -> RoomMessageEventContent {
303	content.relates_to = reply_id.map(|event_id| {
304		Reply(ReplyRelation {
305			in_reply_to: InReplyTo { event_id: event_id.to_owned() },
306		})
307	});
308
309	content
310}