1pub mod console;
2pub mod context;
3pub mod create;
4mod execute;
5mod grant;
6mod processor;
7mod register;
8
9use std::{
10 collections::BTreeMap,
11 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock},
12 time::Instant,
13};
14
15use async_trait::async_trait;
16pub use context::Context;
17pub use create::create_admin_room;
18use futures::{FutureExt, TryFutureExt};
19use ruma::{
20 OwnedEventId, OwnedRoomAliasId, OwnedRoomId, RoomId, RoomOrAliasId, UserId,
21 events::room::message::{Relation, RoomMessageEventContent},
22};
23use tokio::sync::mpsc;
24use tuwunel_core::{
25 Err, Error, Event, Result, debug, err, error, error::default_log, pdu::PduBuilder, warn,
26};
27
28use crate::rooms::state::RoomMutexGuard;
29
30pub struct Service {
31 services: Arc<crate::services::OnceServices>,
32 channel: StdRwLock<Option<mpsc::Sender<CommandInput>>>,
33 pub command: StdRwLock<Option<Arc<dyn Command>>>,
34 pub admin_alias: OwnedRoomAliasId,
35 register_shared_secret: Option<String>,
38 register_nonces: StdMutex<BTreeMap<String, Instant>>,
39 #[cfg(feature = "console")]
40 pub console: Arc<console::Console>,
41}
42
43#[derive(Clone, Debug, Default)]
45pub struct CommandInput {
46 pub command: String,
47 pub reply_id: Option<OwnedEventId>,
48}
49
50#[async_trait]
52pub trait Command: Send + Sync + 'static {
53 fn clap(&self) -> clap::Command;
56
57 async fn dispatch(&self, matches: clap::ArgMatches, context: &Context<'_>) -> Result;
59}
60
61pub type ProcessorResult = Result<Option<CommandOutput>, Box<CommandOutput>>;
66
67pub type CommandOutput = RoomMessageEventContent;
69
70const COMMAND_QUEUE_LIMIT: usize = 512;
72
73#[async_trait]
74impl crate::Service for Service {
75 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
76 Ok(Arc::new(Self {
77 services: args.services.clone(),
78 channel: StdRwLock::new(None),
79 command: StdRwLock::new(None),
80 admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", args.server.name))
81 .expect("#admins:server_name is valid alias name"),
82 register_shared_secret: register::resolve_shared_secret(&args.server.config),
83 register_nonces: StdMutex::new(BTreeMap::new()),
84 #[cfg(feature = "console")]
85 console: console::Console::new(args),
86 }))
87 }
88
89 async fn worker(self: Arc<Self>) -> Result {
90 let mut signals = self.services.server.signal.subscribe();
91 let (sender, mut receiver) = mpsc::channel(COMMAND_QUEUE_LIMIT);
92 _ = self
93 .channel
94 .write()
95 .expect("locked for writing")
96 .insert(sender);
97
98 self.console_auto_start().await;
99
100 loop {
101 tokio::select! {
102 command = receiver.recv() => match command {
103 Some(command) => self.handle_command(command).await,
104 None => break,
105 },
106 sig = signals.recv() => match sig {
107 Ok(sig) => self.handle_signal(sig).await,
108 Err(_) => continue,
109 },
110 }
111 }
112
113 self.interrupt().await;
115 self.console_auto_stop().await;
116
117 Ok(())
118 }
119
120 async fn interrupt(&self) {
121 #[cfg(feature = "console")]
122 self.console.interrupt();
123
124 _ = self
125 .channel
126 .write()
127 .expect("locked for writing")
128 .take();
129 }
130
131 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
132}
133
134impl Service {
135 pub async fn notice(&self, body: &str) {
137 self.send_message(RoomMessageEventContent::notice_markdown(body))
138 .await
139 .ok();
140 }
141
142 pub async fn send_text(&self, body: &str) {
145 self.send_message(RoomMessageEventContent::text_markdown(body))
146 .await
147 .ok();
148 }
149
150 pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result {
153 let room_id = self.get_admin_room().await?;
154
155 self.send_to_room(message_content, &room_id).await
156 }
157
158 pub async fn send_report(&self, body: &str) {
161 let Ok(room_id) = self.get_report_room().await else {
162 return;
163 };
164
165 self.send_to_room(RoomMessageEventContent::text_markdown(body), &room_id)
166 .await
167 .ok();
168 }
169
170 async fn send_to_room(&self, content: RoomMessageEventContent, room_id: &RoomId) -> Result {
171 let user_id = &self.services.globals.server_user;
172
173 self.respond_to_room(content, room_id, user_id)
174 .boxed()
175 .await
176 }
177
178 pub async fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result {
182 let Some(sender) = self
183 .channel
184 .read()
185 .expect("locked for reading")
186 .clone()
187 else {
188 return Err!("Admin command queue unavailable.");
189 };
190
191 sender
192 .send(CommandInput { command, reply_id })
193 .await
194 .map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
195 }
196
197 pub async fn command_in_place(
200 &self,
201 command: String,
202 reply_id: Option<OwnedEventId>,
203 ) -> ProcessorResult {
204 self.process_command(CommandInput { command, reply_id })
205 .await
206 }
207
208 pub fn complete_command(&self, command: &str) -> Option<String> {
211 self.command
212 .read()
213 .expect("locked for reading")
214 .as_ref()
215 .map(|root| processor::complete(root.clap(), command))
216 }
217
218 async fn handle_signal(&self, sig: &'static str) {
219 if sig == execute::SIGNAL {
220 self.signal_execute().await.ok();
221 }
222
223 #[cfg(feature = "console")]
224 self.console.handle_signal(sig);
225 }
226
227 async fn handle_command(&self, command: CommandInput) {
228 match self.process_command(command).await {
229 | Err(output) => self.handle_command_output(*output).await,
230 | Ok(Some(output)) => self.handle_command_output(output).await,
231 | Ok(None) => debug!("Command successful with no response"),
232 }
233 }
234
235 async fn handle_command_output(&self, content: RoomMessageEventContent) {
236 self.handle_response(content)
237 .await
238 .unwrap_or_else(default_log);
239 }
240
241 async fn process_command(&self, command: CommandInput) -> ProcessorResult {
242 let root = self
243 .command
244 .read()
245 .expect("locked for reading")
246 .clone()
247 .expect("Admin module is not loaded");
248
249 processor::handle_command(root, Arc::clone(self.services.get()), command).await
250 }
251
252 pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
254 if user_id == self.services.globals.server_user {
255 return true;
256 }
257
258 let Ok(admin_room) = self.get_admin_room().await else {
259 return false;
260 };
261
262 self.services
263 .state_cache
264 .is_joined(user_id, &admin_room)
265 .await
266 }
267
268 pub async fn get_admin_room(&self) -> Result<OwnedRoomId> {
273 let room_id = self
274 .services
275 .alias
276 .resolve_local_alias(&self.admin_alias)
277 .await?;
278
279 self.services
280 .state_cache
281 .is_joined(&self.services.globals.server_user, &room_id)
282 .await
283 .then_some(room_id)
284 .ok_or_else(|| err!(Request(NotFound("Admin user not joined to admin room"))))
285 }
286
287 pub async fn get_report_room(&self) -> Result<OwnedRoomId> {
290 let Some(report_room) = self.services.server.config.report_room.as_ref() else {
291 return self.get_admin_room().await;
292 };
293
294 match self.resolve_report_room(report_room).await {
295 | Ok(room_id) => Ok(room_id),
296 | Err(e) => {
297 warn!(%report_room, error = %e, "Falling back to the admin room for reports");
298 self.get_admin_room().await
299 },
300 }
301 }
302
303 async fn resolve_report_room(&self, report_room: &RoomOrAliasId) -> Result<OwnedRoomId> {
304 let room_id = self
305 .services
306 .alias
307 .maybe_resolve(report_room)
308 .await?;
309
310 self.services
311 .state_cache
312 .is_joined(&self.services.globals.server_user, &room_id)
313 .await
314 .then_some(room_id)
315 .ok_or_else(|| err!("server user is not joined to the configured report room"))
316 }
317
318 async fn handle_response(&self, content: RoomMessageEventContent) -> Result {
319 let Some(Relation::Reply(ruma::events::relation::Reply { in_reply_to })) =
320 content.relates_to.as_ref()
321 else {
322 return Ok(());
323 };
324
325 let Ok(pdu) = self
326 .services
327 .timeline
328 .get_pdu(&in_reply_to.event_id)
329 .await
330 else {
331 error!(
332 event_id = ?in_reply_to.event_id,
333 "Missing admin command in_reply_to event"
334 );
335 return Ok(());
336 };
337
338 let response_sender = if self.is_admin_room(pdu.room_id()).await {
339 &self.services.globals.server_user
340 } else {
341 pdu.sender()
342 };
343
344 self.respond_to_room(content, pdu.room_id(), response_sender)
345 .boxed()
346 .await
347 }
348
349 async fn respond_to_room(
350 &self,
351 content: RoomMessageEventContent,
352 room_id: &RoomId,
353 user_id: &UserId,
354 ) -> Result {
355 assert!(self.user_is_admin(user_id).await, "sender is not admin");
356
357 let state_lock = self.services.state.mutex.lock(room_id).await;
358
359 if let Err(e) = self
360 .services
361 .timeline
362 .build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, &state_lock)
363 .await
364 {
365 self.handle_response_error(e, room_id, user_id, &state_lock)
366 .boxed()
367 .await
368 .unwrap_or_else(default_log);
369 }
370
371 Ok(())
372 }
373
374 async fn handle_response_error(
375 &self,
376 e: Error,
377 room_id: &RoomId,
378 user_id: &UserId,
379 state_lock: &RoomMutexGuard,
380 ) -> Result {
381 error!("Failed to build and append admin room response PDU: \"{e}\"");
382 let content = RoomMessageEventContent::text_plain(format!(
383 "Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command \
384 may have finished successfully, but we could not return the output."
385 ));
386
387 self.services
388 .timeline
389 .build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, state_lock)
390 .boxed()
391 .await?;
392
393 Ok(())
394 }
395
396 pub async fn is_admin_command<Pdu>(&self, event: &Pdu, body: &str) -> bool
397 where
398 Pdu: Event,
399 {
400 let is_escape = body.starts_with('\\');
402 let is_public_escape = is_escape
403 && body
404 .trim_start_matches('\\')
405 .starts_with("!admin");
406
407 let server_user = &self.services.globals.server_user;
409 let is_public_prefix =
410 body.starts_with("!admin") || body.starts_with(server_user.as_str());
411
412 if !is_public_escape && !is_public_prefix {
414 return false;
415 }
416
417 let user_is_local = self
418 .services
419 .globals
420 .user_is_local(event.sender());
421
422 if is_public_escape && !user_is_local {
424 return false;
425 }
426
427 if is_public_escape && !self.services.server.config.admin_escape_commands {
429 return false;
430 }
431
432 if is_public_prefix && !self.is_admin_room(event.room_id()).await {
434 return false;
435 }
436
437 if !self.user_is_admin(event.sender()).await {
439 return false;
440 }
441
442 let emergency_password_set = self
445 .services
446 .server
447 .config
448 .emergency_password
449 .is_some();
450 let from_server = event.sender() == server_user && !emergency_password_set;
451 if from_server && self.is_admin_room(event.room_id()).await {
452 return false;
453 }
454
455 true
457 }
458
459 #[must_use]
460 pub async fn is_admin_room(&self, room_id_: &RoomId) -> bool {
461 self.get_admin_room()
462 .map_ok(|room_id| room_id == room_id_)
463 .await
464 .unwrap_or(false)
465 }
466}