Skip to main content

tuwunel_admin/
processor.rs

1use std::{
2	fmt::Write,
3	mem::take,
4	panic::AssertUnwindSafe,
5	sync::{Arc, Mutex},
6	time::SystemTime,
7};
8
9use clap::{CommandFactory, Parser};
10use futures::{AsyncWriteExt, future::FutureExt, io::BufWriter};
11use ruma::{
12	EventId,
13	events::{
14		relation::{InReplyTo, Reply as ReplyRelation},
15		room::message::{Relation::Reply, RoomMessageEventContent},
16	},
17};
18use tracing::Level;
19use tracing_subscriber::{EnvFilter, filter::LevelFilter};
20use tuwunel_core::{
21	Error, Result, debug, error,
22	log::{
23		capture,
24		capture::Capture,
25		fmt::{markdown_table, markdown_table_head},
26	},
27	trace,
28	utils::string::{collect_stream, common_prefix},
29	warn,
30};
31use tuwunel_service::{
32	Services,
33	admin::{CommandInput, CommandOutput, ProcessorFuture, ProcessorResult},
34};
35
36use crate::{admin, admin::AdminCommand, context::Context};
37
38#[must_use]
39pub(super) fn complete(line: &str) -> String { complete_command(AdminCommand::command(), line) }
40
41#[must_use]
42pub(super) fn dispatch(services: Arc<Services>, command: CommandInput) -> ProcessorFuture {
43	Box::pin(handle_command(services, command))
44}
45
46#[tracing::instrument(skip_all, name = "admin")]
47async fn handle_command(services: Arc<Services>, command: CommandInput) -> ProcessorResult {
48	AssertUnwindSafe(Box::pin(process_command(services, &command)))
49		.catch_unwind()
50		.await
51		.map_err(Error::from_panic)
52		.unwrap_or_else(|error| handle_panic(&error, &command))
53}
54
55async fn process_command(services: Arc<Services>, input: &CommandInput) -> ProcessorResult {
56	let (command, args, body) = match parse(&services, input) {
57		| Err(error) => return Err(Box::new(error)),
58		| Ok(parsed) => parsed,
59	};
60
61	let context = Context {
62		services: &services,
63		body: &body,
64		timer: SystemTime::now(),
65		reply_id: input.reply_id.as_deref(),
66		output: BufWriter::new(Vec::new()).into(),
67	};
68
69	let (result, mut logs) = process(&context, command, &args).await;
70
71	let output = &mut context.output.lock().await;
72	output
73		.flush()
74		.await
75		.expect("final flush of output stream");
76
77	let output =
78		String::from_utf8(take(output.get_mut())).expect("invalid utf8 in command output stream");
79
80	match result {
81		| Ok(()) if logs.is_empty() =>
82			Ok(Some(reply(RoomMessageEventContent::notice_markdown(output), context.reply_id))),
83
84		| Ok(()) => {
85			logs.write_str(output.as_str())
86				.expect("output buffer");
87			Ok(Some(reply(RoomMessageEventContent::notice_markdown(logs), context.reply_id)))
88		},
89		| Err(error) => {
90			write!(&mut logs, "Command failed with error:\n```\n{error:#?}\n```")
91				.expect("output buffer");
92
93			Err(Box::new(reply(
94				RoomMessageEventContent::notice_markdown(logs),
95				context.reply_id,
96			)))
97		},
98	}
99}
100
101fn handle_panic(error: &Error, command: &CommandInput) -> ProcessorResult {
102	let link =
103		"Please submit a [bug report](https://github.com/matrix-construct/tuwunel/issues/new). \
104		 🥺";
105	let msg = format!("Panic occurred while processing command:\n```\n{error:#?}\n```\n{link}");
106	let content = RoomMessageEventContent::notice_markdown(msg);
107	error!("Panic while processing command: {error:?}");
108	Err(Box::new(reply(content, command.reply_id.as_deref())))
109}
110
111/// Parse and process a message from the admin room
112async fn process(
113	context: &Context<'_>,
114	command: AdminCommand,
115	args: &[String],
116) -> (Result, String) {
117	let (capture, logs) = capture_create(context);
118
119	let capture_scope = capture.start();
120	let result = Box::pin(admin::process(command, context)).await;
121	drop(capture_scope);
122
123	debug!(
124		ok = result.is_ok(),
125		elapsed = ?context.timer.elapsed(),
126		command = ?args,
127		"command processed"
128	);
129
130	let mut output = String::new();
131
132	// Prepend the logs only if any were captured
133	let logs = logs.lock().expect("locked");
134	if logs.lines().count() > 2 {
135		writeln!(&mut output, "{logs}").expect("failed to format logs to command output");
136	}
137	drop(logs);
138
139	(result, output)
140}
141
142fn capture_create(context: &Context<'_>) -> (Arc<Capture>, Arc<Mutex<String>>) {
143	let env_config = &context.services.server.config.admin_log_capture;
144	let env_filter = EnvFilter::try_new(env_config).unwrap_or_else(|e| {
145		warn!("admin_log_capture filter invalid: {e:?}");
146		cfg!(debug_assertions)
147			.then_some("debug")
148			.or(Some("info"))
149			.map(Into::into)
150			.expect("default capture EnvFilter")
151	});
152
153	let log_level = env_filter
154		.max_level_hint()
155		.and_then(LevelFilter::into_level)
156		.unwrap_or(Level::DEBUG);
157
158	let filter = move |data: capture::Data<'_>| {
159		data.level() <= log_level && data.our_modules() && data.scope.contains(&"admin")
160	};
161
162	let logs = Arc::new(Mutex::new(
163		collect_stream(|s| markdown_table_head(s)).expect("markdown table header"),
164	));
165
166	let capture = Capture::new(
167		&context.services.server.log.capture,
168		Some(filter),
169		capture::fmt(markdown_table, logs.clone()),
170	);
171
172	(capture, logs)
173}
174
175/// Parse chat messages from the admin room into an AdminCommand object
176#[expect(clippy::result_large_err)]
177fn parse<'a>(
178	services: &Arc<Services>,
179	input: &'a CommandInput,
180) -> Result<(AdminCommand, Vec<String>, Vec<&'a str>), CommandOutput> {
181	let lines = input
182		.command
183		.lines()
184		.filter(|line| !line.trim().is_empty());
185	let command_line = lines
186		.clone()
187		.next()
188		.expect("command missing first line");
189	let body = lines.skip(1).collect();
190	match parse_command(command_line) {
191		| Ok((command, args)) => Ok((command, args, body)),
192		| Err(error) => {
193			let message = error
194				.to_string()
195				.replace("server.name", services.globals.server_name().as_str());
196			Err(reply(RoomMessageEventContent::notice_plain(message), input.reply_id.as_deref()))
197		},
198	}
199}
200
201fn parse_command(line: &str) -> Result<(AdminCommand, Vec<String>)> {
202	let argv = parse_line(line);
203	let command = AdminCommand::try_parse_from(&argv)?;
204	Ok((command, argv))
205}
206
207fn complete_command(mut cmd: clap::Command, line: &str) -> String {
208	let argv = parse_line(line);
209	let mut ret = Vec::<String>::with_capacity(argv.len().saturating_add(1));
210
211	'token: for token in argv.into_iter().skip(1) {
212		let cmd_ = cmd.clone();
213		let mut choice = Vec::new();
214
215		for sub in cmd_.get_subcommands() {
216			let name = sub.get_name();
217			if *name == token {
218				// token already complete; recurse to subcommand
219				ret.push(token);
220				cmd.clone_from(sub);
221				continue 'token;
222			} else if name.starts_with(&token) {
223				// partial match; add to choices
224				choice.push(name);
225			}
226		}
227
228		if choice.len() == 1 {
229			// One choice. Add extra space because it's complete
230			let choice = *choice.first().expect("only choice");
231			ret.push(choice.to_owned());
232			ret.push(String::new());
233		} else if choice.is_empty() {
234			// Nothing found, return original string
235			ret.push(token);
236		} else {
237			// Find the common prefix
238			ret.push(common_prefix(&choice).into());
239		}
240
241		// Return from completion
242		return ret.join(" ");
243	}
244
245	// Return from no completion. Needs a space though.
246	ret.push(String::new());
247	ret.join(" ")
248}
249
250/// Parse chat messages from the admin room into an AdminCommand object
251fn parse_line(command_line: &str) -> Vec<String> {
252	let mut argv = command_line
253		.split_whitespace()
254		.map(str::to_owned)
255		.collect::<Vec<String>>();
256
257	// Remove any escapes that came with a server-side escape command
258	if !argv.is_empty() && argv[0].ends_with("admin") {
259		argv[0] = argv[0].trim_start_matches('\\').into();
260	}
261
262	// First indice has to be "admin" but for console convenience we add it here
263	if !argv.is_empty() && !argv[0].ends_with("admin") && !argv[0].starts_with('@') {
264		argv.insert(0, "admin".to_owned());
265	}
266
267	// Replace `help command` with `command --help`
268	// Clap has a help subcommand, but it omits the long help description.
269	if argv.len() > 1 && argv[1] == "help" {
270		argv.remove(1);
271		argv.push("--help".to_owned());
272	}
273
274	// Backwards compatibility with `register_appservice`-style commands
275	if argv.len() > 1 && argv[1].contains('_') {
276		argv[1] = argv[1].replace('_', "-");
277	}
278
279	// Backwards compatibility with `register_appservice`-style commands
280	if argv.len() > 2 && argv[2].contains('_') {
281		argv[2] = argv[2].replace('_', "-");
282	}
283
284	// if the user is using the `query` command (argv[1]), replace the database
285	// function/table calls with underscores to match the codebase
286	if argv.len() > 3 && argv[1].eq("query") {
287		argv[3] = argv[3].replace('_', "-");
288	}
289
290	trace!(?command_line, ?argv, "parse");
291	argv
292}
293
294fn reply(
295	mut content: RoomMessageEventContent,
296	reply_id: Option<&EventId>,
297) -> RoomMessageEventContent {
298	content.relates_to = reply_id.map(|event_id| {
299		Reply(ReplyRelation {
300			in_reply_to: InReplyTo { event_id: event_id.to_owned() },
301		})
302	});
303
304	content
305}