Skip to main content

tuwunel_service/admin/
console.rs

1#![cfg(feature = "console")]
2
3use std::{
4	collections::VecDeque,
5	sync::{Arc, Mutex},
6};
7
8use futures::future::{AbortHandle, Abortable};
9use ruma::events::room::message::RoomMessageEventContent;
10use rustyline_async::{Readline, ReadlineError, ReadlineEvent};
11use termimad::MadSkin;
12use tokio::task::JoinHandle;
13use tuwunel_core::{Server, debug, defer, error, log, log::is_systemd_mode};
14
15pub struct Console {
16	server: Arc<Server>,
17	services: Arc<crate::services::OnceServices>,
18	worker_join: Mutex<Option<JoinHandle<()>>>,
19	input_abort: Mutex<Option<AbortHandle>>,
20	command_abort: Mutex<Option<AbortHandle>>,
21	history: Mutex<VecDeque<String>>,
22	output: MadSkin,
23}
24
25const PROMPT: &str = "uwu> ";
26const HISTORY_LIMIT: usize = 48;
27
28impl Console {
29	pub(super) fn new(args: &crate::Args<'_>) -> Arc<Self> {
30		Arc::new(Self {
31			server: args.server.clone(),
32			services: args.services.clone(),
33			worker_join: None.into(),
34			input_abort: None.into(),
35			command_abort: None.into(),
36			history: VecDeque::with_capacity(HISTORY_LIMIT).into(),
37			output: configure_output(MadSkin::default_dark()),
38		})
39	}
40
41	pub(super) fn handle_signal(self: &Arc<Self>, sig: &'static str) {
42		if !self.server.is_running() {
43			self.interrupt();
44		} else if sig == "SIGINT" {
45			self.interrupt_command();
46			self.start();
47		}
48	}
49
50	pub fn start(self: &Arc<Self>) {
51		let mut worker_join = self.worker_join.lock().expect("locked");
52		if worker_join.is_none() {
53			let self_ = Arc::clone(self);
54			_ = worker_join.insert(self.server.runtime().spawn(self_.worker()));
55		}
56	}
57
58	pub async fn close(self: &Arc<Self>) {
59		self.interrupt();
60
61		let Some(worker_join) = self.worker_join.lock().expect("locked").take() else {
62			return;
63		};
64
65		_ = worker_join.await;
66	}
67
68	pub fn interrupt(self: &Arc<Self>) {
69		self.interrupt_command();
70		self.interrupt_readline();
71		self.worker_join
72			.lock()
73			.expect("locked")
74			.as_ref()
75			.map(JoinHandle::abort);
76	}
77
78	pub fn interrupt_readline(self: &Arc<Self>) {
79		if let Some(input_abort) = self.input_abort.lock().expect("locked").take() {
80			debug!("Interrupting console readline...");
81			input_abort.abort();
82		}
83	}
84
85	pub fn interrupt_command(self: &Arc<Self>) {
86		if let Some(command_abort) = self.command_abort.lock().expect("locked").take() {
87			debug!("Interrupting console command...");
88			command_abort.abort();
89		}
90	}
91
92	#[tracing::instrument(skip_all, name = "console", level = "trace")]
93	async fn worker(self: Arc<Self>) {
94		debug!("session starting");
95
96		self.output
97			.print_inline(&format!("**tuwunel {}** admin console\n", tuwunel_core::version()));
98		self.output
99			.print_text("\"help\" for help, ^D to exit the console, ^\\ to stop the server\n");
100
101		while self.server.is_running() {
102			match self.readline().await {
103				| Ok(event) => match event {
104					| ReadlineEvent::Line(string) => self.clone().handle(string).await,
105					| ReadlineEvent::Interrupted => continue,
106					| ReadlineEvent::Eof => break,
107					| ReadlineEvent::Quit => self
108						.server
109						.shutdown()
110						.unwrap_or_else(error::default_log),
111				},
112				| Err(error) => match error {
113					| ReadlineError::Closed => break,
114					| ReadlineError::IO(error) => {
115						error!("console I/O: {error:?}");
116						break;
117					},
118				},
119			}
120		}
121
122		debug!("session ending");
123		self.worker_join.lock().expect("locked").take();
124	}
125
126	async fn readline(self: &Arc<Self>) -> Result<ReadlineEvent, ReadlineError> {
127		let _suppression = (!is_systemd_mode()).then(|| log::Suppress::new(&self.server));
128
129		let (mut readline, _writer) = Readline::new(PROMPT.to_owned())?;
130		let self_ = Arc::clone(self);
131		readline.set_tab_completer(move |line| self_.tab_complete(line));
132		self.set_history(&mut readline);
133
134		let future = readline.readline();
135
136		let (abort, abort_reg) = AbortHandle::new_pair();
137		let future = Abortable::new(future, abort_reg);
138		_ = self
139			.input_abort
140			.lock()
141			.expect("locked")
142			.insert(abort);
143		defer! {{
144			_ = self.input_abort.lock().expect("locked").take();
145		}}
146
147		let Ok(result) = future.await else {
148			return Ok(ReadlineEvent::Eof);
149		};
150
151		readline.flush()?;
152		result
153	}
154
155	async fn handle(self: Arc<Self>, line: String) {
156		if line.trim().is_empty() {
157			return;
158		}
159
160		self.add_history(line.clone());
161		let future = self.clone().process(line);
162
163		let (abort, abort_reg) = AbortHandle::new_pair();
164		let future = Abortable::new(future, abort_reg);
165		_ = self
166			.command_abort
167			.lock()
168			.expect("locked")
169			.insert(abort);
170		defer! {{
171			_ = self.command_abort.lock().expect("locked").take();
172		}}
173
174		_ = future.await;
175	}
176
177	async fn process(self: Arc<Self>, line: String) {
178		match self
179			.services
180			.admin
181			.command_in_place(line, None)
182			.await
183		{
184			| Ok(Some(ref content)) => self.output(content),
185			| Err(ref content) => self.output_err(content),
186			| _ => unreachable!(),
187		}
188	}
189
190	fn output_err(self: Arc<Self>, output_content: &RoomMessageEventContent) {
191		let output = configure_output_err(self.output.clone());
192		output.print_text(output_content.body());
193	}
194
195	fn output(self: Arc<Self>, output_content: &RoomMessageEventContent) {
196		self.output.print_text(output_content.body());
197	}
198
199	fn set_history(&self, readline: &mut Readline) {
200		self.history
201			.lock()
202			.expect("locked")
203			.iter()
204			.rev()
205			.for_each(|entry| {
206				readline
207					.add_history_entry(entry.clone())
208					.expect("added history entry");
209			});
210	}
211
212	fn add_history(&self, line: String) {
213		let mut history = self.history.lock().expect("locked");
214		history.push_front(line);
215		history.truncate(HISTORY_LIMIT);
216	}
217
218	fn tab_complete(&self, line: &str) -> String {
219		self.services
220			.admin
221			.complete_command(line)
222			.unwrap_or_else(|| line.to_owned())
223	}
224}
225
226/// Standalone/static markdown printer for errors.
227pub fn print_err(markdown: &str) {
228	let output = configure_output_err(MadSkin::default_dark());
229	output.print_text(markdown);
230}
231/// Standalone/static markdown printer.
232pub fn print(markdown: &str) {
233	let output = configure_output(MadSkin::default_dark());
234	output.print_text(markdown);
235}
236
237fn configure_output_err(mut output: MadSkin) -> MadSkin {
238	use termimad::{Alignment, CompoundStyle, LineStyle, crossterm::style::Color};
239
240	let code_style = CompoundStyle::with_fgbg(Color::AnsiValue(196), Color::AnsiValue(234));
241	output.inline_code = code_style.clone();
242	output.code_block = LineStyle {
243		left_margin: 0,
244		right_margin: 0,
245		align: Alignment::Left,
246		compound_style: code_style,
247	};
248
249	output
250}
251
252fn configure_output(mut output: MadSkin) -> MadSkin {
253	use termimad::{Alignment, CompoundStyle, LineStyle, crossterm::style::Color};
254
255	let code_style = CompoundStyle::with_fgbg(Color::AnsiValue(40), Color::AnsiValue(234));
256	output.inline_code = code_style.clone();
257	output.code_block = LineStyle {
258		left_margin: 0,
259		right_margin: 0,
260		align: Alignment::Left,
261		compound_style: code_style,
262	};
263
264	let table_style = CompoundStyle::default();
265	output.table = LineStyle {
266		left_margin: 1,
267		right_margin: 1,
268		align: Alignment::Left,
269		compound_style: table_style,
270	};
271
272	output
273}