Skip to main content

tuwunel_service/appservice/
mod.rs

1mod append;
2mod namespace_regex;
3mod registration_info;
4pub(crate) mod request;
5
6use std::{collections::BTreeMap, ffi::OsStr, fs, iter::IntoIterator, sync::Arc};
7
8use async_trait::async_trait;
9use futures::{Future, FutureExt, Stream, TryStreamExt};
10use ruma::{RoomAliasId, RoomId, UserId, api::appservice::Registration};
11use tokio::sync::{RwLock, RwLockReadGuard};
12use tuwunel_core::{Err, Result, err, utils::stream::IterStream};
13use tuwunel_database::Map;
14
15pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo};
16
17pub struct Service {
18	registration_info: RwLock<Registrations>,
19	services: Arc<crate::services::OnceServices>,
20	db: Data,
21}
22
23struct Data {
24	id_appserviceregistrations: Arc<Map>,
25}
26
27type Registrations = BTreeMap<String, RegistrationInfo>;
28
29#[async_trait]
30impl crate::Service for Service {
31	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
32		Ok(Arc::new(Self {
33			registration_info: RwLock::new(BTreeMap::new()),
34			services: args.services.clone(),
35			db: Data {
36				id_appserviceregistrations: args.db["id_appserviceregistrations"].clone(),
37			},
38		}))
39	}
40
41	async fn worker(self: Arc<Self>) -> Result {
42		for (id, mut appservice) in self.services.config.appservice.clone() {
43			if appservice.id.is_empty() {
44				appservice.id = id.clone();
45			}
46
47			if *id != appservice.id {
48				return Err!(Config(
49					"id",
50					"Registration ID {:?} does not match the configured {id:?}",
51					appservice.id
52				));
53			}
54
55			self.load_appservice(appservice.into()).await?;
56		}
57
58		if let Some(appservice_dir) = &self.services.config.appservice_dir {
59			for dir_entry in fs::read_dir(appservice_dir)? {
60				let path = dir_entry?.path();
61
62				if !path.is_file()
63					|| !path
64						.extension()
65						.and_then(OsStr::to_str)
66						.is_some_and(|ext| matches!(ext, "yaml" | "yml"))
67				{
68					continue;
69				}
70
71				let bytes = fs::read(path)?;
72				let registration: Registration = serde_yaml::from_slice(&bytes)?;
73
74				self.load_appservice(registration).await?;
75			}
76		}
77
78		self.iter_db_ids()
79			.try_for_each(|registration| self.load_appservice(registration))
80			.await?;
81
82		Ok(())
83	}
84
85	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
86}
87
88impl Service {
89	pub async fn load_appservice(&self, registration: Registration) -> Result {
90		//TODO: Check for collisions between exclusive appservice namespaces
91
92		let registration_info =
93			RegistrationInfo::new(registration, self.services.globals.server_name())?;
94
95		let id = &registration_info.registration.id;
96
97		let mut registrations = self.registration_info.write().await;
98
99		for loaded_registration_info in registrations.values() {
100			let loaded_id = &loaded_registration_info.registration.id;
101
102			if loaded_id == id {
103				return Err!("Duplicate id: {id}");
104			}
105
106			if loaded_registration_info.registration.as_token
107				== registration_info.registration.as_token
108			{
109				return Err!("Duplicate as_token: {loaded_id} {id}");
110			}
111		}
112
113		let appservice_user = &registration_info.sender;
114
115		if !self.services.users.exists(appservice_user).await {
116			self.services
117				.users
118				.create(appservice_user, None, None)
119				.await?;
120		}
121
122		registrations.insert(id.clone(), registration_info);
123
124		Ok(())
125	}
126
127	pub async fn register_appservice(&self, registration: Registration) -> Result {
128		let id = registration.id.clone();
129
130		let appservice_yaml = serde_yaml::to_string(&registration)?;
131
132		self.load_appservice(registration).await?;
133
134		self.db
135			.id_appserviceregistrations
136			.insert(&id, appservice_yaml);
137
138		Ok(())
139	}
140
141	pub async fn unregister_appservice(&self, appservice_id: &str) -> Result {
142		let mut registrations = self.registration_info.write().await;
143
144		if !registrations.contains_key(appservice_id) {
145			return Err!("Appservice not found");
146		}
147
148		if self
149			.db
150			.id_appserviceregistrations
151			.exists(appservice_id)
152			.await
153			.is_err()
154		{
155			return Err!("Cannot unregister config appservice");
156		}
157
158		// removes the appservice registration info
159		registrations
160			.remove(appservice_id)
161			.ok_or_else(|| err!("Appservice not found"))?;
162
163		// remove the appservice from the database
164		self.db
165			.id_appserviceregistrations
166			.remove(appservice_id);
167
168		// deletes all active requests for the appservice if there are any so we stop
169		// sending to the URL
170		self.services
171			.sending
172			.cleanup_events(Some(appservice_id), None, None)
173			.await
174	}
175
176	pub async fn get_registration(&self, id: &str) -> Option<Registration> {
177		self.registration_info
178			.read()
179			.await
180			.get(id)
181			.cloned()
182			.map(|info| info.registration)
183	}
184
185	pub async fn find_from_access_token(&self, token: &str) -> Result<RegistrationInfo> {
186		self.read()
187			.await
188			.values()
189			.find(|info| info.registration.as_token == token)
190			.cloned()
191			.ok_or_else(|| err!(Request(NotFound("Missing or invalid appservice token"))))
192	}
193
194	/// Checks if a given user id matches any exclusive appservice regex
195	pub async fn is_exclusive_user_id(&self, user_id: &UserId) -> bool {
196		self.read()
197			.await
198			.values()
199			.any(|info| info.is_exclusive_user_match(user_id))
200	}
201
202	/// Checks if a given room alias matches any exclusive appservice regex
203	pub async fn is_exclusive_alias(&self, alias: &RoomAliasId) -> bool {
204		self.read()
205			.await
206			.values()
207			.any(|info| info.aliases.is_exclusive_match(alias.as_str()))
208	}
209
210	/// Checks if a given room id matches any exclusive appservice regex
211	///
212	/// TODO: use this?
213	pub async fn is_exclusive_room_id(&self, room_id: &RoomId) -> bool {
214		self.read()
215			.await
216			.values()
217			.any(|info| info.rooms.is_exclusive_match(room_id.as_str()))
218	}
219
220	pub fn iter_ids(&self) -> impl Stream<Item = String> + Send {
221		self.read()
222			.map(|info| info.keys().cloned().collect::<Vec<_>>())
223			.map(IntoIterator::into_iter)
224			.map(IterStream::stream)
225			.flatten_stream()
226	}
227
228	pub fn iter_db_ids(&self) -> impl Stream<Item = Result<Registration>> + Send {
229		self.db
230			.id_appserviceregistrations
231			.keys()
232			.and_then(async move |id: &str| Ok(self.get_db_registration(id).await?))
233	}
234
235	pub async fn get_db_registration(&self, id: &str) -> Result<Registration> {
236		self.db
237			.id_appserviceregistrations
238			.get(id)
239			.await
240			.and_then(|ref bytes| serde_yaml::from_slice(bytes).map_err(Into::into))
241			.map_err(|e| err!(Database("Invalid appservice {id:?} registration: {e:?}")))
242	}
243
244	pub fn read(&self) -> impl Future<Output = RwLockReadGuard<'_, Registrations>> + Send {
245		self.registration_info.read()
246	}
247}