Skip to main content

tuwunel_service/admin/
mod.rs

1pub mod console;
2pub mod create;
3mod execute;
4mod grant;
5
6use std::{
7	pin::Pin,
8	sync::{Arc, RwLock as StdRwLock},
9};
10
11use async_trait::async_trait;
12pub use create::create_admin_room;
13use futures::{Future, FutureExt, TryFutureExt};
14use ruma::{
15	OwnedEventId, OwnedRoomAliasId, OwnedRoomId, RoomId, UserId,
16	events::room::message::{Relation, RoomMessageEventContent},
17};
18use tokio::sync::{RwLock, mpsc};
19use tuwunel_core::{
20	Err, Error, Event, Result, debug, err, error, error::default_log, pdu::PduBuilder,
21};
22
23use crate::rooms::state::RoomMutexGuard;
24
25pub struct Service {
26	services: Arc<crate::services::OnceServices>,
27	channel: StdRwLock<Option<mpsc::Sender<CommandInput>>>,
28	pub handle: RwLock<Option<Processor>>,
29	pub complete: StdRwLock<Option<Completer>>,
30	pub admin_alias: OwnedRoomAliasId,
31	#[cfg(feature = "console")]
32	pub console: Arc<console::Console>,
33}
34
35/// Inputs to a command are a multi-line string and optional reply_id.
36#[derive(Clone, Debug, Default)]
37pub struct CommandInput {
38	pub command: String,
39	pub reply_id: Option<OwnedEventId>,
40}
41
42/// Prototype of the tab-completer. The input is buffered text when tab
43/// asserted; the output will fully replace the input buffer.
44pub type Completer = fn(&str) -> String;
45
46/// Prototype of the command processor. This is a callback supplied by the
47/// reloadable admin module.
48pub type Processor = fn(Arc<crate::Services>, CommandInput) -> ProcessorFuture;
49
50/// Return type of the processor
51pub type ProcessorFuture = Pin<Box<dyn Future<Output = ProcessorResult> + Send>>;
52
53/// Result wrapping of a command's handling. Both variants are complete message
54/// events which have digested any prior errors. The wrapping preserves whether
55/// the command failed without interpreting the text. Ok(None) outputs are
56/// dropped to produce no response.
57pub type ProcessorResult = Result<Option<CommandOutput>, Box<CommandOutput>>;
58
59/// Alias for the output structure.
60pub type CommandOutput = RoomMessageEventContent;
61
62/// Maximum number of commands which can be queued for dispatch.
63const COMMAND_QUEUE_LIMIT: usize = 512;
64
65#[async_trait]
66impl crate::Service for Service {
67	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
68		Ok(Arc::new(Self {
69			services: args.services.clone(),
70			channel: StdRwLock::new(None),
71			handle: RwLock::new(None),
72			complete: StdRwLock::new(None),
73			admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", args.server.name))
74				.expect("#admins:server_name is valid alias name"),
75			#[cfg(feature = "console")]
76			console: console::Console::new(args),
77		}))
78	}
79
80	async fn worker(self: Arc<Self>) -> Result {
81		let mut signals = self.services.server.signal.subscribe();
82		let (sender, mut receiver) = mpsc::channel(COMMAND_QUEUE_LIMIT);
83		_ = self
84			.channel
85			.write()
86			.expect("locked for writing")
87			.insert(sender);
88
89		self.console_auto_start().await;
90
91		loop {
92			tokio::select! {
93				command = receiver.recv() => match command {
94					Some(command) => self.handle_command(command).await,
95					None => break,
96				},
97				sig = signals.recv() => match sig {
98					Ok(sig) => self.handle_signal(sig).await,
99					Err(_) => continue,
100				},
101			}
102		}
103
104		//TODO: not unwind safe
105		self.interrupt().await;
106		self.console_auto_stop().await;
107
108		Ok(())
109	}
110
111	async fn interrupt(&self) {
112		#[cfg(feature = "console")]
113		self.console.interrupt();
114
115		_ = self
116			.channel
117			.write()
118			.expect("locked for writing")
119			.take();
120	}
121
122	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
123}
124
125impl Service {
126	/// Sends markdown notice to the admin room as the admin user.
127	pub async fn notice(&self, body: &str) {
128		self.send_message(RoomMessageEventContent::notice_markdown(body))
129			.await
130			.ok();
131	}
132
133	/// Sends markdown message (not an m.notice for notification reasons) to the
134	/// admin room as the admin user.
135	pub async fn send_text(&self, body: &str) {
136		self.send_message(RoomMessageEventContent::text_markdown(body))
137			.await
138			.ok();
139	}
140
141	/// Sends a message to the admin room as the admin user (see send_text() for
142	/// convenience).
143	pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result {
144		let user_id = &self.services.globals.server_user;
145		let room_id = self.get_admin_room().await?;
146		self.respond_to_room(message_content, &room_id, user_id)
147			.boxed()
148			.await
149	}
150
151	/// Posts a command to the command processor queue and returns. Processing
152	/// will take place on the service worker's task asynchronously. Errors if
153	/// the queue is full.
154	pub async fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result {
155		let Some(sender) = self
156			.channel
157			.read()
158			.expect("locked for reading")
159			.clone()
160		else {
161			return Err!("Admin command queue unavailable.");
162		};
163
164		sender
165			.send(CommandInput { command, reply_id })
166			.await
167			.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
168	}
169
170	/// Dispatches a command to the processor on the current task and waits for
171	/// completion.
172	pub async fn command_in_place(
173		&self,
174		command: String,
175		reply_id: Option<OwnedEventId>,
176	) -> ProcessorResult {
177		self.process_command(CommandInput { command, reply_id })
178			.await
179	}
180
181	/// Invokes the tab-completer to complete the command. When unavailable,
182	/// None is returned.
183	pub fn complete_command(&self, command: &str) -> Option<String> {
184		self.complete
185			.read()
186			.expect("locked for reading")
187			.map(|complete| complete(command))
188	}
189
190	async fn handle_signal(&self, sig: &'static str) {
191		if sig == execute::SIGNAL {
192			self.signal_execute().await.ok();
193		}
194
195		#[cfg(feature = "console")]
196		self.console.handle_signal(sig);
197	}
198
199	async fn handle_command(&self, command: CommandInput) {
200		match self.process_command(command).await {
201			| Err(output) => self.handle_command_output(*output).await,
202			| Ok(Some(output)) => self.handle_command_output(output).await,
203			| Ok(None) => debug!("Command successful with no response"),
204		}
205	}
206
207	async fn handle_command_output(&self, content: RoomMessageEventContent) {
208		self.handle_response(content)
209			.await
210			.unwrap_or_else(default_log);
211	}
212
213	async fn process_command(&self, command: CommandInput) -> ProcessorResult {
214		let handle = &self
215			.handle
216			.read()
217			.await
218			.expect("Admin module is not loaded");
219
220		handle(Arc::clone(self.services.get()), command).await
221	}
222
223	/// Checks whether a given user is an admin of this server
224	pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
225		if user_id == self.services.globals.server_user {
226			return true;
227		}
228
229		let Ok(admin_room) = self.get_admin_room().await else {
230			return false;
231		};
232
233		self.services
234			.state_cache
235			.is_joined(user_id, &admin_room)
236			.await
237	}
238
239	/// Gets the room ID of the admin room
240	///
241	/// Errors are propagated from the database, and will have None if there is
242	/// no admin room
243	pub async fn get_admin_room(&self) -> Result<OwnedRoomId> {
244		let room_id = self
245			.services
246			.alias
247			.resolve_local_alias(&self.admin_alias)
248			.await?;
249
250		self.services
251			.state_cache
252			.is_joined(&self.services.globals.server_user, &room_id)
253			.await
254			.then_some(room_id)
255			.ok_or_else(|| err!(Request(NotFound("Admin user not joined to admin room"))))
256	}
257
258	async fn handle_response(&self, content: RoomMessageEventContent) -> Result {
259		let Some(Relation::Reply(ruma::events::relation::Reply { in_reply_to })) =
260			content.relates_to.as_ref()
261		else {
262			return Ok(());
263		};
264
265		let Ok(pdu) = self
266			.services
267			.timeline
268			.get_pdu(&in_reply_to.event_id)
269			.await
270		else {
271			error!(
272				event_id = ?in_reply_to.event_id,
273				"Missing admin command in_reply_to event"
274			);
275			return Ok(());
276		};
277
278		let response_sender = if self.is_admin_room(pdu.room_id()).await {
279			&self.services.globals.server_user
280		} else {
281			pdu.sender()
282		};
283
284		self.respond_to_room(content, pdu.room_id(), response_sender)
285			.boxed()
286			.await
287	}
288
289	async fn respond_to_room(
290		&self,
291		content: RoomMessageEventContent,
292		room_id: &RoomId,
293		user_id: &UserId,
294	) -> Result {
295		assert!(self.user_is_admin(user_id).await, "sender is not admin");
296
297		let state_lock = self.services.state.mutex.lock(room_id).await;
298
299		if let Err(e) = self
300			.services
301			.timeline
302			.build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, &state_lock)
303			.await
304		{
305			self.handle_response_error(e, room_id, user_id, &state_lock)
306				.boxed()
307				.await
308				.unwrap_or_else(default_log);
309		}
310
311		Ok(())
312	}
313
314	async fn handle_response_error(
315		&self,
316		e: Error,
317		room_id: &RoomId,
318		user_id: &UserId,
319		state_lock: &RoomMutexGuard,
320	) -> Result {
321		error!("Failed to build and append admin room response PDU: \"{e}\"");
322		let content = RoomMessageEventContent::text_plain(format!(
323			"Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command \
324			 may have finished successfully, but we could not return the output."
325		));
326
327		self.services
328			.timeline
329			.build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, state_lock)
330			.boxed()
331			.await?;
332
333		Ok(())
334	}
335
336	pub async fn is_admin_command<Pdu>(&self, event: &Pdu, body: &str) -> bool
337	where
338		Pdu: Event,
339	{
340		// Server-side command-escape with public echo
341		let is_escape = body.starts_with('\\');
342		let is_public_escape = is_escape
343			&& body
344				.trim_start_matches('\\')
345				.starts_with("!admin");
346
347		// Admin command with public echo (in admin room)
348		let server_user = &self.services.globals.server_user;
349		let is_public_prefix =
350			body.starts_with("!admin") || body.starts_with(server_user.as_str());
351
352		// Expected backward branch
353		if !is_public_escape && !is_public_prefix {
354			return false;
355		}
356
357		let user_is_local = self
358			.services
359			.globals
360			.user_is_local(event.sender());
361
362		// only allow public escaped commands by local admins
363		if is_public_escape && !user_is_local {
364			return false;
365		}
366
367		// Check if server-side command-escape is disabled by configuration
368		if is_public_escape && !self.services.server.config.admin_escape_commands {
369			return false;
370		}
371
372		// Prevent unescaped !admin from being used outside of the admin room
373		if is_public_prefix && !self.is_admin_room(event.room_id()).await {
374			return false;
375		}
376
377		// Only senders who are admin can proceed
378		if !self.user_is_admin(event.sender()).await {
379			return false;
380		}
381
382		// This will evaluate to false if the emergency password is set up so that
383		// the administrator can execute commands as the server user
384		let emergency_password_set = self
385			.services
386			.server
387			.config
388			.emergency_password
389			.is_some();
390		let from_server = event.sender() == server_user && !emergency_password_set;
391		if from_server && self.is_admin_room(event.room_id()).await {
392			return false;
393		}
394
395		// Authentic admin command
396		true
397	}
398
399	#[must_use]
400	pub async fn is_admin_room(&self, room_id_: &RoomId) -> bool {
401		self.get_admin_room()
402			.map_ok(|room_id| room_id == room_id_)
403			.await
404			.unwrap_or(false)
405	}
406}