Skip to main content

tuwunel_service/admin/
mod.rs

1pub mod console;
2pub mod context;
3pub mod create;
4mod execute;
5mod grant;
6mod processor;
7mod register;
8
9use std::{
10	collections::BTreeMap,
11	sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock},
12	time::Instant,
13};
14
15use async_trait::async_trait;
16pub use context::Context;
17pub use create::create_admin_room;
18use futures::{FutureExt, TryFutureExt};
19use ruma::{
20	OwnedEventId, OwnedRoomAliasId, OwnedRoomId, RoomId, RoomOrAliasId, UserId,
21	events::room::message::{Relation, RoomMessageEventContent},
22};
23use tokio::sync::mpsc;
24use tuwunel_core::{
25	Err, Error, Event, Result, debug, err, error, error::default_log, pdu::PduBuilder, warn,
26};
27
28use crate::rooms::state::RoomMutexGuard;
29
30pub struct Service {
31	services: Arc<crate::services::OnceServices>,
32	channel: StdRwLock<Option<mpsc::Sender<CommandInput>>>,
33	pub command: StdRwLock<Option<Arc<dyn Command>>>,
34	pub admin_alias: OwnedRoomAliasId,
35	/// Resolved Synapse-compatible registration shared secret. Live for the
36	/// lifetime of the service; the matching nonce store sits beside it.
37	register_shared_secret: Option<String>,
38	register_nonces: StdMutex<BTreeMap<String, Instant>>,
39	#[cfg(feature = "console")]
40	pub console: Arc<console::Console>,
41}
42
43/// Inputs to a command are a multi-line string and optional reply_id.
44#[derive(Clone, Debug, Default)]
45pub struct CommandInput {
46	pub command: String,
47	pub reply_id: Option<OwnedEventId>,
48}
49
50/// Root of a clap command tree installed by a downstream crate.
51#[async_trait]
52pub trait Command: Send + Sync + 'static {
53	/// The clap command tree; equivalent to
54	/// `<C as clap::CommandFactory>::command()`.
55	fn clap(&self) -> clap::Command;
56
57	/// Dispatch already-parsed argument matches to the matching handler.
58	async fn dispatch(&self, matches: clap::ArgMatches, context: &Context<'_>) -> Result;
59}
60
61/// Result wrapping of a command's handling. Both variants are complete message
62/// events which have digested any prior errors. The wrapping preserves whether
63/// the command failed without interpreting the text. Ok(None) outputs are
64/// dropped to produce no response.
65pub type ProcessorResult = Result<Option<CommandOutput>, Box<CommandOutput>>;
66
67/// Alias for the output structure.
68pub type CommandOutput = RoomMessageEventContent;
69
70/// Maximum number of commands which can be queued for dispatch.
71const COMMAND_QUEUE_LIMIT: usize = 512;
72
73#[async_trait]
74impl crate::Service for Service {
75	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
76		Ok(Arc::new(Self {
77			services: args.services.clone(),
78			channel: StdRwLock::new(None),
79			command: StdRwLock::new(None),
80			admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", args.server.name))
81				.expect("#admins:server_name is valid alias name"),
82			register_shared_secret: register::resolve_shared_secret(&args.server.config),
83			register_nonces: StdMutex::new(BTreeMap::new()),
84			#[cfg(feature = "console")]
85			console: console::Console::new(args),
86		}))
87	}
88
89	async fn worker(self: Arc<Self>) -> Result {
90		let mut signals = self.services.server.signal.subscribe();
91		let (sender, mut receiver) = mpsc::channel(COMMAND_QUEUE_LIMIT);
92		_ = self
93			.channel
94			.write()
95			.expect("locked for writing")
96			.insert(sender);
97
98		self.console_auto_start().await;
99
100		loop {
101			tokio::select! {
102				command = receiver.recv() => match command {
103					Some(command) => self.handle_command(command).await,
104					None => break,
105				},
106				sig = signals.recv() => match sig {
107					Ok(sig) => self.handle_signal(sig).await,
108					Err(_) => continue,
109				},
110			}
111		}
112
113		//TODO: not unwind safe
114		self.interrupt().await;
115		self.console_auto_stop().await;
116
117		Ok(())
118	}
119
120	async fn interrupt(&self) {
121		#[cfg(feature = "console")]
122		self.console.interrupt();
123
124		_ = self
125			.channel
126			.write()
127			.expect("locked for writing")
128			.take();
129	}
130
131	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
132}
133
134impl Service {
135	/// Sends markdown notice to the admin room as the admin user.
136	pub async fn notice(&self, body: &str) {
137		self.send_message(RoomMessageEventContent::notice_markdown(body))
138			.await
139			.ok();
140	}
141
142	/// Sends markdown message (not an m.notice for notification reasons) to the
143	/// admin room as the admin user.
144	pub async fn send_text(&self, body: &str) {
145		self.send_message(RoomMessageEventContent::text_markdown(body))
146			.await
147			.ok();
148	}
149
150	/// Sends a message to the admin room as the admin user (see send_text() for
151	/// convenience).
152	pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result {
153		let room_id = self.get_admin_room().await?;
154
155		self.send_to_room(message_content, &room_id).await
156	}
157
158	/// Sends a markdown report to the configured report room, falling back to
159	/// the admin room, as the server user.
160	pub async fn send_report(&self, body: &str) {
161		let Ok(room_id) = self.get_report_room().await else {
162			return;
163		};
164
165		self.send_to_room(RoomMessageEventContent::text_markdown(body), &room_id)
166			.await
167			.ok();
168	}
169
170	async fn send_to_room(&self, content: RoomMessageEventContent, room_id: &RoomId) -> Result {
171		let user_id = &self.services.globals.server_user;
172
173		self.respond_to_room(content, room_id, user_id)
174			.boxed()
175			.await
176	}
177
178	/// Posts a command to the command processor queue and returns. Processing
179	/// will take place on the service worker's task asynchronously. Errors if
180	/// the queue is full.
181	pub async fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result {
182		let Some(sender) = self
183			.channel
184			.read()
185			.expect("locked for reading")
186			.clone()
187		else {
188			return Err!("Admin command queue unavailable.");
189		};
190
191		sender
192			.send(CommandInput { command, reply_id })
193			.await
194			.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
195	}
196
197	/// Dispatches a command to the processor on the current task and waits for
198	/// completion.
199	pub async fn command_in_place(
200		&self,
201		command: String,
202		reply_id: Option<OwnedEventId>,
203	) -> ProcessorResult {
204		self.process_command(CommandInput { command, reply_id })
205			.await
206	}
207
208	/// Invokes the tab-completer to complete the command. When unavailable,
209	/// None is returned.
210	pub fn complete_command(&self, command: &str) -> Option<String> {
211		self.command
212			.read()
213			.expect("locked for reading")
214			.as_ref()
215			.map(|root| processor::complete(root.clap(), command))
216	}
217
218	async fn handle_signal(&self, sig: &'static str) {
219		if sig == execute::SIGNAL {
220			self.signal_execute().await.ok();
221		}
222
223		#[cfg(feature = "console")]
224		self.console.handle_signal(sig);
225	}
226
227	async fn handle_command(&self, command: CommandInput) {
228		match self.process_command(command).await {
229			| Err(output) => self.handle_command_output(*output).await,
230			| Ok(Some(output)) => self.handle_command_output(output).await,
231			| Ok(None) => debug!("Command successful with no response"),
232		}
233	}
234
235	async fn handle_command_output(&self, content: RoomMessageEventContent) {
236		self.handle_response(content)
237			.await
238			.unwrap_or_else(default_log);
239	}
240
241	async fn process_command(&self, command: CommandInput) -> ProcessorResult {
242		let root = self
243			.command
244			.read()
245			.expect("locked for reading")
246			.clone()
247			.expect("Admin module is not loaded");
248
249		processor::handle_command(root, Arc::clone(self.services.get()), command).await
250	}
251
252	/// Checks whether a given user is an admin of this server
253	pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
254		if user_id == self.services.globals.server_user {
255			return true;
256		}
257
258		let Ok(admin_room) = self.get_admin_room().await else {
259			return false;
260		};
261
262		self.services
263			.state_cache
264			.is_joined(user_id, &admin_room)
265			.await
266	}
267
268	/// Gets the room ID of the admin room
269	///
270	/// Errors are propagated from the database, and will have None if there is
271	/// no admin room
272	pub async fn get_admin_room(&self) -> Result<OwnedRoomId> {
273		let room_id = self
274			.services
275			.alias
276			.resolve_local_alias(&self.admin_alias)
277			.await?;
278
279		self.services
280			.state_cache
281			.is_joined(&self.services.globals.server_user, &room_id)
282			.await
283			.then_some(room_id)
284			.ok_or_else(|| err!(Request(NotFound("Admin user not joined to admin room"))))
285	}
286
287	/// Gets the room reports are posted to: the configured report room when set
288	/// and usable, otherwise the admin room.
289	pub async fn get_report_room(&self) -> Result<OwnedRoomId> {
290		let Some(report_room) = self.services.server.config.report_room.as_ref() else {
291			return self.get_admin_room().await;
292		};
293
294		match self.resolve_report_room(report_room).await {
295			| Ok(room_id) => Ok(room_id),
296			| Err(e) => {
297				warn!(%report_room, error = %e, "Falling back to the admin room for reports");
298				self.get_admin_room().await
299			},
300		}
301	}
302
303	async fn resolve_report_room(&self, report_room: &RoomOrAliasId) -> Result<OwnedRoomId> {
304		let room_id = self
305			.services
306			.alias
307			.maybe_resolve(report_room)
308			.await?;
309
310		self.services
311			.state_cache
312			.is_joined(&self.services.globals.server_user, &room_id)
313			.await
314			.then_some(room_id)
315			.ok_or_else(|| err!("server user is not joined to the configured report room"))
316	}
317
318	async fn handle_response(&self, content: RoomMessageEventContent) -> Result {
319		let Some(Relation::Reply(ruma::events::relation::Reply { in_reply_to })) =
320			content.relates_to.as_ref()
321		else {
322			return Ok(());
323		};
324
325		let Ok(pdu) = self
326			.services
327			.timeline
328			.get_pdu(&in_reply_to.event_id)
329			.await
330		else {
331			error!(
332				event_id = ?in_reply_to.event_id,
333				"Missing admin command in_reply_to event"
334			);
335			return Ok(());
336		};
337
338		let response_sender = if self.is_admin_room(pdu.room_id()).await {
339			&self.services.globals.server_user
340		} else {
341			pdu.sender()
342		};
343
344		self.respond_to_room(content, pdu.room_id(), response_sender)
345			.boxed()
346			.await
347	}
348
349	async fn respond_to_room(
350		&self,
351		content: RoomMessageEventContent,
352		room_id: &RoomId,
353		user_id: &UserId,
354	) -> Result {
355		assert!(self.user_is_admin(user_id).await, "sender is not admin");
356
357		let state_lock = self.services.state.mutex.lock(room_id).await;
358
359		if let Err(e) = self
360			.services
361			.timeline
362			.build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, &state_lock)
363			.await
364		{
365			self.handle_response_error(e, room_id, user_id, &state_lock)
366				.boxed()
367				.await
368				.unwrap_or_else(default_log);
369		}
370
371		Ok(())
372	}
373
374	async fn handle_response_error(
375		&self,
376		e: Error,
377		room_id: &RoomId,
378		user_id: &UserId,
379		state_lock: &RoomMutexGuard,
380	) -> Result {
381		error!("Failed to build and append admin room response PDU: \"{e}\"");
382		let content = RoomMessageEventContent::text_plain(format!(
383			"Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command \
384			 may have finished successfully, but we could not return the output."
385		));
386
387		self.services
388			.timeline
389			.build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, state_lock)
390			.boxed()
391			.await?;
392
393		Ok(())
394	}
395
396	pub async fn is_admin_command<Pdu>(&self, event: &Pdu, body: &str) -> bool
397	where
398		Pdu: Event,
399	{
400		// Server-side command-escape with public echo
401		let is_escape = body.starts_with('\\');
402		let is_public_escape = is_escape
403			&& body
404				.trim_start_matches('\\')
405				.starts_with("!admin");
406
407		// Admin command with public echo (in admin room)
408		let server_user = &self.services.globals.server_user;
409		let is_public_prefix =
410			body.starts_with("!admin") || body.starts_with(server_user.as_str());
411
412		// Expected backward branch
413		if !is_public_escape && !is_public_prefix {
414			return false;
415		}
416
417		let user_is_local = self
418			.services
419			.globals
420			.user_is_local(event.sender());
421
422		// only allow public escaped commands by local admins
423		if is_public_escape && !user_is_local {
424			return false;
425		}
426
427		// Check if server-side command-escape is disabled by configuration
428		if is_public_escape && !self.services.server.config.admin_escape_commands {
429			return false;
430		}
431
432		// Prevent unescaped !admin from being used outside of the admin room
433		if is_public_prefix && !self.is_admin_room(event.room_id()).await {
434			return false;
435		}
436
437		// Only senders who are admin can proceed
438		if !self.user_is_admin(event.sender()).await {
439			return false;
440		}
441
442		// This will evaluate to false if the emergency password is set up so that
443		// the administrator can execute commands as the server user
444		let emergency_password_set = self
445			.services
446			.server
447			.config
448			.emergency_password
449			.is_some();
450		let from_server = event.sender() == server_user && !emergency_password_set;
451		if from_server && self.is_admin_room(event.room_id()).await {
452			return false;
453		}
454
455		// Authentic admin command
456		true
457	}
458
459	#[must_use]
460	pub async fn is_admin_room(&self, room_id_: &RoomId) -> bool {
461		self.get_admin_room()
462			.map_ok(|room_id| room_id == room_id_)
463			.await
464			.unwrap_or(false)
465	}
466}