1pub mod console;
2pub mod create;
3mod execute;
4mod grant;
5
6use std::{
7 pin::Pin,
8 sync::{Arc, RwLock as StdRwLock},
9};
10
11use async_trait::async_trait;
12pub use create::create_admin_room;
13use futures::{Future, FutureExt, TryFutureExt};
14use ruma::{
15 OwnedEventId, OwnedRoomAliasId, OwnedRoomId, RoomId, UserId,
16 events::room::message::{Relation, RoomMessageEventContent},
17};
18use tokio::sync::{RwLock, mpsc};
19use tuwunel_core::{
20 Err, Error, Event, Result, debug, err, error, error::default_log, pdu::PduBuilder,
21};
22
23use crate::rooms::state::RoomMutexGuard;
24
25pub struct Service {
26 services: Arc<crate::services::OnceServices>,
27 channel: StdRwLock<Option<mpsc::Sender<CommandInput>>>,
28 pub handle: RwLock<Option<Processor>>,
29 pub complete: StdRwLock<Option<Completer>>,
30 pub admin_alias: OwnedRoomAliasId,
31 #[cfg(feature = "console")]
32 pub console: Arc<console::Console>,
33}
34
35#[derive(Clone, Debug, Default)]
37pub struct CommandInput {
38 pub command: String,
39 pub reply_id: Option<OwnedEventId>,
40}
41
42pub type Completer = fn(&str) -> String;
45
46pub type Processor = fn(Arc<crate::Services>, CommandInput) -> ProcessorFuture;
49
50pub type ProcessorFuture = Pin<Box<dyn Future<Output = ProcessorResult> + Send>>;
52
53pub type ProcessorResult = Result<Option<CommandOutput>, Box<CommandOutput>>;
58
59pub type CommandOutput = RoomMessageEventContent;
61
62const COMMAND_QUEUE_LIMIT: usize = 512;
64
65#[async_trait]
66impl crate::Service for Service {
67 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
68 Ok(Arc::new(Self {
69 services: args.services.clone(),
70 channel: StdRwLock::new(None),
71 handle: RwLock::new(None),
72 complete: StdRwLock::new(None),
73 admin_alias: OwnedRoomAliasId::try_from(format!("#admins:{}", args.server.name))
74 .expect("#admins:server_name is valid alias name"),
75 #[cfg(feature = "console")]
76 console: console::Console::new(args),
77 }))
78 }
79
80 async fn worker(self: Arc<Self>) -> Result {
81 let mut signals = self.services.server.signal.subscribe();
82 let (sender, mut receiver) = mpsc::channel(COMMAND_QUEUE_LIMIT);
83 _ = self
84 .channel
85 .write()
86 .expect("locked for writing")
87 .insert(sender);
88
89 self.console_auto_start().await;
90
91 loop {
92 tokio::select! {
93 command = receiver.recv() => match command {
94 Some(command) => self.handle_command(command).await,
95 None => break,
96 },
97 sig = signals.recv() => match sig {
98 Ok(sig) => self.handle_signal(sig).await,
99 Err(_) => continue,
100 },
101 }
102 }
103
104 self.interrupt().await;
106 self.console_auto_stop().await;
107
108 Ok(())
109 }
110
111 async fn interrupt(&self) {
112 #[cfg(feature = "console")]
113 self.console.interrupt();
114
115 _ = self
116 .channel
117 .write()
118 .expect("locked for writing")
119 .take();
120 }
121
122 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
123}
124
125impl Service {
126 pub async fn notice(&self, body: &str) {
128 self.send_message(RoomMessageEventContent::notice_markdown(body))
129 .await
130 .ok();
131 }
132
133 pub async fn send_text(&self, body: &str) {
136 self.send_message(RoomMessageEventContent::text_markdown(body))
137 .await
138 .ok();
139 }
140
141 pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result {
144 let user_id = &self.services.globals.server_user;
145 let room_id = self.get_admin_room().await?;
146 self.respond_to_room(message_content, &room_id, user_id)
147 .boxed()
148 .await
149 }
150
151 pub async fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result {
155 let Some(sender) = self
156 .channel
157 .read()
158 .expect("locked for reading")
159 .clone()
160 else {
161 return Err!("Admin command queue unavailable.");
162 };
163
164 sender
165 .send(CommandInput { command, reply_id })
166 .await
167 .map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
168 }
169
170 pub async fn command_in_place(
173 &self,
174 command: String,
175 reply_id: Option<OwnedEventId>,
176 ) -> ProcessorResult {
177 self.process_command(CommandInput { command, reply_id })
178 .await
179 }
180
181 pub fn complete_command(&self, command: &str) -> Option<String> {
184 self.complete
185 .read()
186 .expect("locked for reading")
187 .map(|complete| complete(command))
188 }
189
190 async fn handle_signal(&self, sig: &'static str) {
191 if sig == execute::SIGNAL {
192 self.signal_execute().await.ok();
193 }
194
195 #[cfg(feature = "console")]
196 self.console.handle_signal(sig);
197 }
198
199 async fn handle_command(&self, command: CommandInput) {
200 match self.process_command(command).await {
201 | Err(output) => self.handle_command_output(*output).await,
202 | Ok(Some(output)) => self.handle_command_output(output).await,
203 | Ok(None) => debug!("Command successful with no response"),
204 }
205 }
206
207 async fn handle_command_output(&self, content: RoomMessageEventContent) {
208 self.handle_response(content)
209 .await
210 .unwrap_or_else(default_log);
211 }
212
213 async fn process_command(&self, command: CommandInput) -> ProcessorResult {
214 let handle = &self
215 .handle
216 .read()
217 .await
218 .expect("Admin module is not loaded");
219
220 handle(Arc::clone(self.services.get()), command).await
221 }
222
223 pub async fn user_is_admin(&self, user_id: &UserId) -> bool {
225 if user_id == self.services.globals.server_user {
226 return true;
227 }
228
229 let Ok(admin_room) = self.get_admin_room().await else {
230 return false;
231 };
232
233 self.services
234 .state_cache
235 .is_joined(user_id, &admin_room)
236 .await
237 }
238
239 pub async fn get_admin_room(&self) -> Result<OwnedRoomId> {
244 let room_id = self
245 .services
246 .alias
247 .resolve_local_alias(&self.admin_alias)
248 .await?;
249
250 self.services
251 .state_cache
252 .is_joined(&self.services.globals.server_user, &room_id)
253 .await
254 .then_some(room_id)
255 .ok_or_else(|| err!(Request(NotFound("Admin user not joined to admin room"))))
256 }
257
258 async fn handle_response(&self, content: RoomMessageEventContent) -> Result {
259 let Some(Relation::Reply(ruma::events::relation::Reply { in_reply_to })) =
260 content.relates_to.as_ref()
261 else {
262 return Ok(());
263 };
264
265 let Ok(pdu) = self
266 .services
267 .timeline
268 .get_pdu(&in_reply_to.event_id)
269 .await
270 else {
271 error!(
272 event_id = ?in_reply_to.event_id,
273 "Missing admin command in_reply_to event"
274 );
275 return Ok(());
276 };
277
278 let response_sender = if self.is_admin_room(pdu.room_id()).await {
279 &self.services.globals.server_user
280 } else {
281 pdu.sender()
282 };
283
284 self.respond_to_room(content, pdu.room_id(), response_sender)
285 .boxed()
286 .await
287 }
288
289 async fn respond_to_room(
290 &self,
291 content: RoomMessageEventContent,
292 room_id: &RoomId,
293 user_id: &UserId,
294 ) -> Result {
295 assert!(self.user_is_admin(user_id).await, "sender is not admin");
296
297 let state_lock = self.services.state.mutex.lock(room_id).await;
298
299 if let Err(e) = self
300 .services
301 .timeline
302 .build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, &state_lock)
303 .await
304 {
305 self.handle_response_error(e, room_id, user_id, &state_lock)
306 .boxed()
307 .await
308 .unwrap_or_else(default_log);
309 }
310
311 Ok(())
312 }
313
314 async fn handle_response_error(
315 &self,
316 e: Error,
317 room_id: &RoomId,
318 user_id: &UserId,
319 state_lock: &RoomMutexGuard,
320 ) -> Result {
321 error!("Failed to build and append admin room response PDU: \"{e}\"");
322 let content = RoomMessageEventContent::text_plain(format!(
323 "Failed to build and append admin room PDU: \"{e}\"\n\nThe original admin command \
324 may have finished successfully, but we could not return the output."
325 ));
326
327 self.services
328 .timeline
329 .build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, state_lock)
330 .boxed()
331 .await?;
332
333 Ok(())
334 }
335
336 pub async fn is_admin_command<Pdu>(&self, event: &Pdu, body: &str) -> bool
337 where
338 Pdu: Event,
339 {
340 let is_escape = body.starts_with('\\');
342 let is_public_escape = is_escape
343 && body
344 .trim_start_matches('\\')
345 .starts_with("!admin");
346
347 let server_user = &self.services.globals.server_user;
349 let is_public_prefix =
350 body.starts_with("!admin") || body.starts_with(server_user.as_str());
351
352 if !is_public_escape && !is_public_prefix {
354 return false;
355 }
356
357 let user_is_local = self
358 .services
359 .globals
360 .user_is_local(event.sender());
361
362 if is_public_escape && !user_is_local {
364 return false;
365 }
366
367 if is_public_escape && !self.services.server.config.admin_escape_commands {
369 return false;
370 }
371
372 if is_public_prefix && !self.is_admin_room(event.room_id()).await {
374 return false;
375 }
376
377 if !self.user_is_admin(event.sender()).await {
379 return false;
380 }
381
382 let emergency_password_set = self
385 .services
386 .server
387 .config
388 .emergency_password
389 .is_some();
390 let from_server = event.sender() == server_user && !emergency_password_set;
391 if from_server && self.is_admin_room(event.room_id()).await {
392 return false;
393 }
394
395 true
397 }
398
399 #[must_use]
400 pub async fn is_admin_room(&self, room_id_: &RoomId) -> bool {
401 self.get_admin_room()
402 .map_ok(|room_id| room_id == room_id_)
403 .await
404 .unwrap_or(false)
405 }
406}