tuwunel_service/appservice/
mod.rs1mod append;
2mod namespace_regex;
3mod ping;
4mod registration_info;
5pub(crate) mod request;
6
7use std::{
8 collections::BTreeMap,
9 ffi::OsStr,
10 fs::{self, read_dir},
11 iter::IntoIterator,
12 sync::Arc,
13};
14
15use async_trait::async_trait;
16use futures::{Future, FutureExt, Stream, TryStreamExt};
17use ruma::{RoomAliasId, RoomId, UserId, api::appservice::Registration};
18use tokio::sync::{RwLock, RwLockReadGuard};
19use tuwunel_core::{Err, Result, err, utils::stream::IterStream};
20use tuwunel_database::Map;
21
22pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo};
23
24pub struct Service {
25 registration_info: RwLock<Registrations>,
26 services: Arc<crate::services::OnceServices>,
27 db: Data,
28}
29
30struct Data {
31 id_appserviceregistrations: Arc<Map>,
32}
33
34type Registrations = BTreeMap<String, RegistrationInfo>;
35
36#[async_trait]
37impl crate::Service for Service {
38 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
39 Ok(Arc::new(Self {
40 registration_info: RwLock::new(BTreeMap::new()),
41 services: args.services.clone(),
42 db: Data {
43 id_appserviceregistrations: args.db["id_appserviceregistrations"].clone(),
44 },
45 }))
46 }
47
48 async fn worker(self: Arc<Self>) -> Result {
49 for (id, mut appservice) in self.services.config.appservice.clone() {
50 if appservice.id.is_empty() {
51 appservice.id = id.clone();
52 }
53
54 if *id != appservice.id {
55 return Err!(Config(
56 "id",
57 "Registration ID {:?} does not match the configured {id:?}",
58 appservice.id
59 ));
60 }
61
62 self.load_appservice(appservice.into()).await?;
63 }
64
65 if let Some(appservice_dir) = &self.services.config.appservice_dir {
66 let entries = read_dir(appservice_dir).map_err(|e| {
67 err!(Config("appservice_dir", "Failed to read {appservice_dir:?}: {e}"))
68 })?;
69
70 for dir_entry in entries {
71 let path = dir_entry?.path();
72
73 if !path.is_file()
74 || !path
75 .extension()
76 .and_then(OsStr::to_str)
77 .is_some_and(|ext| matches!(ext, "yaml" | "yml"))
78 {
79 continue;
80 }
81
82 let bytes = fs::read(path)?;
83 let registration: Registration = serde_yaml::from_slice(&bytes)?;
84
85 self.load_appservice(registration).await?;
86 }
87 }
88
89 self.iter_db_ids()
90 .try_for_each(|registration| self.load_appservice(registration))
91 .await?;
92
93 Ok(())
94 }
95
96 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
97}
98
99impl Service {
100 pub async fn load_appservice(&self, registration: Registration) -> Result {
101 let registration_info =
104 RegistrationInfo::new(registration, self.services.globals.server_name())?;
105
106 let id = ®istration_info.registration.id;
107
108 let mut registrations = self.registration_info.write().await;
109
110 for loaded_registration_info in registrations.values() {
111 let loaded_id = &loaded_registration_info.registration.id;
112
113 if loaded_id == id {
114 return Err!("Duplicate id: {id}");
115 }
116
117 if loaded_registration_info.registration.as_token
118 == registration_info.registration.as_token
119 {
120 return Err!("Duplicate as_token: {loaded_id} {id}");
121 }
122 }
123
124 let appservice_user = ®istration_info.sender;
125
126 if !self.services.users.exists(appservice_user).await {
127 self.services
128 .users
129 .create(appservice_user, None, None)
130 .await?;
131 }
132
133 registrations.insert(id.clone(), registration_info);
134
135 Ok(())
136 }
137
138 pub async fn register_appservice(&self, registration: Registration) -> Result {
139 let id = registration.id.clone();
140
141 let appservice_yaml = serde_yaml::to_string(®istration)?;
142
143 self.load_appservice(registration).await?;
144
145 self.db
146 .id_appserviceregistrations
147 .insert(&id, appservice_yaml);
148
149 Ok(())
150 }
151
152 pub async fn unregister_appservice(&self, appservice_id: &str) -> Result {
153 let mut registrations = self.registration_info.write().await;
154
155 if !registrations.contains_key(appservice_id) {
156 return Err!("Appservice not found");
157 }
158
159 if self
160 .db
161 .id_appserviceregistrations
162 .exists(appservice_id)
163 .await
164 .is_err()
165 {
166 return Err!("Cannot unregister config appservice");
167 }
168
169 registrations
171 .remove(appservice_id)
172 .ok_or_else(|| err!("Appservice not found"))?;
173
174 self.db
176 .id_appserviceregistrations
177 .remove(appservice_id);
178
179 self.services
182 .sending
183 .cleanup_events(Some(appservice_id), None, None)
184 .await
185 }
186
187 pub async fn get_registration(&self, id: &str) -> Option<Registration> {
188 self.registration_info
189 .read()
190 .await
191 .get(id)
192 .cloned()
193 .map(|info| info.registration)
194 }
195
196 pub async fn find_from_access_token(&self, token: &str) -> Result<RegistrationInfo> {
197 self.read()
198 .await
199 .values()
200 .find(|info| info.registration.as_token == token)
201 .cloned()
202 .ok_or_else(|| err!(Request(NotFound("Missing or invalid appservice token"))))
203 }
204
205 pub async fn is_exclusive_user_id(&self, user_id: &UserId) -> bool {
207 self.read()
208 .await
209 .values()
210 .any(|info| info.is_exclusive_user_match(user_id))
211 }
212
213 pub async fn is_exclusive_alias(&self, alias: &RoomAliasId) -> bool {
215 self.read()
216 .await
217 .values()
218 .any(|info| info.aliases.is_exclusive_match(alias.as_str()))
219 }
220
221 pub async fn is_exclusive_room_id(&self, room_id: &RoomId) -> bool {
225 self.read()
226 .await
227 .values()
228 .any(|info| info.rooms.is_exclusive_match(room_id.as_str()))
229 }
230
231 pub fn iter_ids(&self) -> impl Stream<Item = String> + Send {
232 self.read()
233 .map(|info| info.keys().cloned().collect::<Vec<_>>())
234 .map(IntoIterator::into_iter)
235 .map(IterStream::stream)
236 .flatten_stream()
237 }
238
239 pub fn iter_db_ids(&self) -> impl Stream<Item = Result<Registration>> + Send {
240 self.db
241 .id_appserviceregistrations
242 .keys()
243 .and_then(async move |id: &str| Ok(self.get_db_registration(id).await?))
244 }
245
246 pub async fn get_db_registration(&self, id: &str) -> Result<Registration> {
247 self.db
248 .id_appserviceregistrations
249 .get(id)
250 .await
251 .and_then(|ref bytes| serde_yaml::from_slice(bytes).map_err(Into::into))
252 .map_err(|e| err!(Database("Invalid appservice {id:?} registration: {e:?}")))
253 }
254
255 pub fn read(&self) -> impl Future<Output = RwLockReadGuard<'_, Registrations>> + Send {
256 self.registration_info.read()
257 }
258}