tuwunel_service/appservice/
mod.rs1mod append;
2mod namespace_regex;
3mod registration_info;
4pub(crate) mod request;
5
6use std::{collections::BTreeMap, ffi::OsStr, fs, iter::IntoIterator, sync::Arc};
7
8use async_trait::async_trait;
9use futures::{Future, FutureExt, Stream, TryStreamExt};
10use ruma::{RoomAliasId, RoomId, UserId, api::appservice::Registration};
11use tokio::sync::{RwLock, RwLockReadGuard};
12use tuwunel_core::{Err, Result, err, utils::stream::IterStream};
13use tuwunel_database::Map;
14
15pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo};
16
17pub struct Service {
18 registration_info: RwLock<Registrations>,
19 services: Arc<crate::services::OnceServices>,
20 db: Data,
21}
22
23struct Data {
24 id_appserviceregistrations: Arc<Map>,
25}
26
27type Registrations = BTreeMap<String, RegistrationInfo>;
28
29#[async_trait]
30impl crate::Service for Service {
31 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
32 Ok(Arc::new(Self {
33 registration_info: RwLock::new(BTreeMap::new()),
34 services: args.services.clone(),
35 db: Data {
36 id_appserviceregistrations: args.db["id_appserviceregistrations"].clone(),
37 },
38 }))
39 }
40
41 async fn worker(self: Arc<Self>) -> Result {
42 for (id, mut appservice) in self.services.config.appservice.clone() {
43 if appservice.id.is_empty() {
44 appservice.id = id.clone();
45 }
46
47 if *id != appservice.id {
48 return Err!(Config(
49 "id",
50 "Registration ID {:?} does not match the configured {id:?}",
51 appservice.id
52 ));
53 }
54
55 self.load_appservice(appservice.into()).await?;
56 }
57
58 if let Some(appservice_dir) = &self.services.config.appservice_dir {
59 for dir_entry in fs::read_dir(appservice_dir)? {
60 let path = dir_entry?.path();
61
62 if !path.is_file()
63 || !path
64 .extension()
65 .and_then(OsStr::to_str)
66 .is_some_and(|ext| matches!(ext, "yaml" | "yml"))
67 {
68 continue;
69 }
70
71 let bytes = fs::read(path)?;
72 let registration: Registration = serde_yaml::from_slice(&bytes)?;
73
74 self.load_appservice(registration).await?;
75 }
76 }
77
78 self.iter_db_ids()
79 .try_for_each(|registration| self.load_appservice(registration))
80 .await?;
81
82 Ok(())
83 }
84
85 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
86}
87
88impl Service {
89 pub async fn load_appservice(&self, registration: Registration) -> Result {
90 let registration_info =
93 RegistrationInfo::new(registration, self.services.globals.server_name())?;
94
95 let id = ®istration_info.registration.id;
96
97 let mut registrations = self.registration_info.write().await;
98
99 for loaded_registration_info in registrations.values() {
100 let loaded_id = &loaded_registration_info.registration.id;
101
102 if loaded_id == id {
103 return Err!("Duplicate id: {id}");
104 }
105
106 if loaded_registration_info.registration.as_token
107 == registration_info.registration.as_token
108 {
109 return Err!("Duplicate as_token: {loaded_id} {id}");
110 }
111 }
112
113 let appservice_user = ®istration_info.sender;
114
115 if !self.services.users.exists(appservice_user).await {
116 self.services
117 .users
118 .create(appservice_user, None, None)
119 .await?;
120 }
121
122 registrations.insert(id.clone(), registration_info);
123
124 Ok(())
125 }
126
127 pub async fn register_appservice(&self, registration: Registration) -> Result {
128 let id = registration.id.clone();
129
130 let appservice_yaml = serde_yaml::to_string(®istration)?;
131
132 self.load_appservice(registration).await?;
133
134 self.db
135 .id_appserviceregistrations
136 .insert(&id, appservice_yaml);
137
138 Ok(())
139 }
140
141 pub async fn unregister_appservice(&self, appservice_id: &str) -> Result {
142 let mut registrations = self.registration_info.write().await;
143
144 if !registrations.contains_key(appservice_id) {
145 return Err!("Appservice not found");
146 }
147
148 if self
149 .db
150 .id_appserviceregistrations
151 .exists(appservice_id)
152 .await
153 .is_err()
154 {
155 return Err!("Cannot unregister config appservice");
156 }
157
158 registrations
160 .remove(appservice_id)
161 .ok_or_else(|| err!("Appservice not found"))?;
162
163 self.db
165 .id_appserviceregistrations
166 .remove(appservice_id);
167
168 self.services
171 .sending
172 .cleanup_events(Some(appservice_id), None, None)
173 .await
174 }
175
176 pub async fn get_registration(&self, id: &str) -> Option<Registration> {
177 self.registration_info
178 .read()
179 .await
180 .get(id)
181 .cloned()
182 .map(|info| info.registration)
183 }
184
185 pub async fn find_from_access_token(&self, token: &str) -> Result<RegistrationInfo> {
186 self.read()
187 .await
188 .values()
189 .find(|info| info.registration.as_token == token)
190 .cloned()
191 .ok_or_else(|| err!(Request(NotFound("Missing or invalid appservice token"))))
192 }
193
194 pub async fn is_exclusive_user_id(&self, user_id: &UserId) -> bool {
196 self.read()
197 .await
198 .values()
199 .any(|info| info.is_exclusive_user_match(user_id))
200 }
201
202 pub async fn is_exclusive_alias(&self, alias: &RoomAliasId) -> bool {
204 self.read()
205 .await
206 .values()
207 .any(|info| info.aliases.is_exclusive_match(alias.as_str()))
208 }
209
210 pub async fn is_exclusive_room_id(&self, room_id: &RoomId) -> bool {
214 self.read()
215 .await
216 .values()
217 .any(|info| info.rooms.is_exclusive_match(room_id.as_str()))
218 }
219
220 pub fn iter_ids(&self) -> impl Stream<Item = String> + Send {
221 self.read()
222 .map(|info| info.keys().cloned().collect::<Vec<_>>())
223 .map(IntoIterator::into_iter)
224 .map(IterStream::stream)
225 .flatten_stream()
226 }
227
228 pub fn iter_db_ids(&self) -> impl Stream<Item = Result<Registration>> + Send {
229 self.db
230 .id_appserviceregistrations
231 .keys()
232 .and_then(async move |id: &str| Ok(self.get_db_registration(id).await?))
233 }
234
235 pub async fn get_db_registration(&self, id: &str) -> Result<Registration> {
236 self.db
237 .id_appserviceregistrations
238 .get(id)
239 .await
240 .and_then(|ref bytes| serde_yaml::from_slice(bytes).map_err(Into::into))
241 .map_err(|e| err!(Database("Invalid appservice {id:?} registration: {e:?}")))
242 }
243
244 pub fn read(&self) -> impl Future<Output = RwLockReadGuard<'_, Registrations>> + Send {
245 self.registration_info.read()
246 }
247}