Skip to main content

tuwunel_service/users/
mod.rs

1mod dehydrated_device;
2pub mod device;
3mod keys;
4mod ldap;
5mod profile;
6mod register;
7
8use std::sync::Arc;
9
10use futures::{Stream, StreamExt, TryFutureExt};
11use ruma::{
12	MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, UserId,
13	api::client::filter::FilterDefinition,
14	events::{
15		GlobalAccountDataEventType,
16		ignored_user_list::IgnoredUserListEvent,
17		invite_permission_config::{InvitePermissionAction, InvitePermissionConfigEvent},
18	},
19};
20use serde::{Deserialize, Serialize};
21use tuwunel_core::{
22	Err, Result, debug_warn, err, is_equal_to,
23	pdu::PduBuilder,
24	trace,
25	utils::{
26		self, ReadyExt,
27		stream::{TryIgnore, automatic_width},
28	},
29	warn,
30};
31use tuwunel_database::{Deserialized, Json, Map};
32
33pub use self::{
34	keys::parse_master_key,
35	profile::{Propagation, propagation_default},
36	register::Register,
37};
38
39pub const PASSWORD_SENTINEL: &str = "*";
40pub const PASSWORD_DISABLED: &str = "";
41
42/// Forensic record for a moderation action (MSC3823 suspend, MSC3939 lock).
43/// Presence of the row is the load-bearing fact; this body is written but
44/// never read on the hot path.
45#[derive(Clone, Debug, Serialize, Deserialize)]
46pub struct Moderation {
47	pub when: MilliSecondsSinceUnixEpoch,
48	pub by: OwnedUserId,
49}
50
51pub struct Service {
52	services: Arc<crate::services::OnceServices>,
53	db: Data,
54}
55
56struct Data {
57	keychangeid_userid: Arc<Map>,
58	keyid_key: Arc<Map>,
59	onetimekeyid4225_otk: Arc<Map>,
60	openidtoken_expiresatuserid: Arc<Map>,
61	logintoken_expiresatuserid: Arc<Map>,
62	todeviceid_events: Arc<Map>,
63	token_userdeviceid: Arc<Map>,
64	userdeviceid_metadata: Arc<Map>,
65	userdeviceid_token: Arc<Map>,
66	userdeviceid_refresh: Arc<Map>,
67	userdeviceidalgorithm_fallback: Arc<Map>,
68	oidcdevice_userdeviceid: Arc<Map>,
69	oidccskeybypass_userid: Arc<Map>,
70	userfilterid_filter: Arc<Map>,
71	userid_avatarurl: Arc<Map>,
72	userid_blurhash: Arc<Map>,
73	userid_dehydrateddevice: Arc<Map>,
74	userid_devicelistversion: Arc<Map>,
75	userid_displayname: Arc<Map>,
76	userid_lastonetimekeyupdate: Arc<Map>,
77	userid_locked: Arc<Map>,
78	userid_masterkeyid: Arc<Map>,
79	userid_password: Arc<Map>,
80	userid_origin: Arc<Map>,
81	userid_selfsigningkeyid: Arc<Map>,
82	userid_suspended: Arc<Map>,
83	userid_usersigningkeyid: Arc<Map>,
84	useridprofilekey_value: Arc<Map>,
85}
86
87impl crate::Service for Service {
88	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
89		Ok(Arc::new(Self {
90			services: args.services.clone(),
91			db: Data {
92				keychangeid_userid: args.db["keychangeid_userid"].clone(),
93				keyid_key: args.db["keyid_key"].clone(),
94				onetimekeyid4225_otk: args.db["onetimekeyid4225_otk"].clone(),
95				openidtoken_expiresatuserid: args.db["openidtoken_expiresatuserid"].clone(),
96				logintoken_expiresatuserid: args.db["logintoken_expiresatuserid"].clone(),
97				oidcdevice_userdeviceid: args.db["oidcdevice_userdeviceid"].clone(),
98				oidccskeybypass_userid: args.db["oidccskeybypass_userid"].clone(),
99				todeviceid_events: args.db["todeviceid_events"].clone(),
100				token_userdeviceid: args.db["token_userdeviceid"].clone(),
101				userdeviceid_metadata: args.db["userdeviceid_metadata"].clone(),
102				userdeviceid_token: args.db["userdeviceid_token"].clone(),
103				userdeviceid_refresh: args.db["userdeviceid_refresh"].clone(),
104				userdeviceidalgorithm_fallback: args.db["userdeviceidalgorithm_fallback"].clone(),
105				userfilterid_filter: args.db["userfilterid_filter"].clone(),
106				userid_avatarurl: args.db["userid_avatarurl"].clone(),
107				userid_blurhash: args.db["userid_blurhash"].clone(),
108				userid_dehydrateddevice: args.db["userid_dehydrateddevice"].clone(),
109				userid_devicelistversion: args.db["userid_devicelistversion"].clone(),
110				userid_displayname: args.db["userid_displayname"].clone(),
111				userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(),
112				userid_locked: args.db["userid_locked"].clone(),
113				userid_masterkeyid: args.db["userid_masterkeyid"].clone(),
114				userid_password: args.db["userid_password"].clone(),
115				userid_origin: args.db["userid_origin"].clone(),
116				userid_selfsigningkeyid: args.db["userid_selfsigningkeyid"].clone(),
117				userid_suspended: args.db["userid_suspended"].clone(),
118				userid_usersigningkeyid: args.db["userid_usersigningkeyid"].clone(),
119				useridprofilekey_value: args.db["useridprofilekey_value"].clone(),
120			},
121		}))
122	}
123
124	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
125}
126
127impl Service {
128	/// Returns true/false based on whether the recipient/receiving user has
129	/// blocked the sender
130	pub async fn user_is_ignored(&self, sender_user: &UserId, recipient_user: &UserId) -> bool {
131		self.services
132			.account_data
133			.get_global(recipient_user, GlobalAccountDataEventType::IgnoredUserList)
134			.await
135			.is_ok_and(|ignored: IgnoredUserListEvent| {
136				ignored
137					.content
138					.ignored_users
139					.keys()
140					.any(|blocked_user| blocked_user == sender_user)
141			})
142	}
143
144	/// MSC4380: `m.invite_permission_config.default_action == "block"`.
145	pub async fn invites_blocked(&self, user_id: &UserId) -> bool {
146		self.services
147			.account_data
148			.get_global(user_id, GlobalAccountDataEventType::InvitePermissionConfig)
149			.await
150			.is_ok_and(|event: InvitePermissionConfigEvent| {
151				matches!(event.content.default_action, Some(InvitePermissionAction::Block))
152			})
153	}
154
155	/// Create a new user account on this homeserver.
156	///
157	/// User origin is by default "password" (meaning that it will login using
158	/// its user_id/password). Users with other origins (currently only "ldap"
159	/// is available) have special login processes.
160	#[inline]
161	pub async fn create(
162		&self,
163		user_id: &UserId,
164		password: Option<&str>,
165		origin: Option<&str>,
166	) -> Result {
167		let origin = origin.unwrap_or("password");
168		self.db.userid_origin.insert(user_id, origin);
169		self.set_password(user_id, password).await
170	}
171
172	/// Deactivate account
173	pub async fn deactivate_account(&self, user_id: &UserId) -> Result {
174		// Revoke any SSO authorizations
175		self.services
176			.oauth
177			.revoke_user_tokens(user_id)
178			.await;
179
180		// Remove all associated devices
181		self.all_device_ids(user_id)
182			.for_each(|device_id| self.remove_device(user_id, device_id))
183			.await;
184
185		// Set the password to "" to indicate a deactivated account. Hashes will never
186		// result in an empty string, so the user will not be able to log in again.
187		// Systems like changing the password without logging in should check if the
188		// account is deactivated.
189		self.set_password(user_id, None).await?;
190
191		// TODO: Unhook 3PID
192		Ok(())
193	}
194
195	/// Check if a user has an account on this homeserver.
196	#[inline]
197	pub async fn exists(&self, user_id: &UserId) -> bool {
198		self.db.userid_password.get(user_id).await.is_ok()
199	}
200
201	/// Check if account is deactivated
202	pub async fn is_deactivated(&self, user_id: &UserId) -> Result<bool> {
203		self.db
204			.userid_password
205			.get(user_id)
206			.map_ok(|val| val.is_empty())
207			.map_err(|_| err!(Request(NotFound("User does not exist."))))
208			.await
209	}
210
211	/// Check if account is active, infallible
212	pub async fn is_active(&self, user_id: &UserId) -> bool {
213		!self.is_deactivated(user_id).await.unwrap_or(true)
214	}
215
216	/// Check if account is active, infallible
217	pub async fn is_active_local(&self, user_id: &UserId) -> bool {
218		self.services.globals.user_is_local(user_id) && self.is_active(user_id).await
219	}
220
221	/// MSC3823: account is suspended (read-mostly mode, sessions retained).
222	pub async fn is_suspended(&self, user_id: &UserId) -> bool {
223		self.db
224			.userid_suspended
225			.get(user_id)
226			.await
227			.is_ok()
228	}
229
230	/// MSC3939: account is locked (401 + soft_logout, sessions retained).
231	pub async fn is_locked(&self, user_id: &UserId) -> bool {
232		self.db.userid_locked.get(user_id).await.is_ok()
233	}
234
235	/// MSC3823: forensic record for the active suspension, if any.
236	pub async fn get_suspension(&self, user_id: &UserId) -> Option<Moderation> {
237		self.db
238			.userid_suspended
239			.get(user_id)
240			.await
241			.deserialized::<Json<_>>()
242			.map(|Json(m)| m)
243			.ok()
244	}
245
246	/// MSC3939: forensic record for the active lock, if any.
247	pub async fn get_lock(&self, user_id: &UserId) -> Option<Moderation> {
248		self.db
249			.userid_locked
250			.get(user_id)
251			.await
252			.deserialized::<Json<_>>()
253			.map(|Json(m)| m)
254			.ok()
255	}
256
257	pub fn set_suspended(&self, user_id: &UserId, by: &UserId) {
258		let entry = Moderation {
259			when: MilliSecondsSinceUnixEpoch::now(),
260			by: by.to_owned(),
261		};
262
263		self.db
264			.userid_suspended
265			.raw_put(user_id, Json(entry));
266	}
267
268	pub fn clear_suspended(&self, user_id: &UserId) { self.db.userid_suspended.remove(user_id); }
269
270	pub fn set_locked(&self, user_id: &UserId, by: &UserId) {
271		let entry = Moderation {
272			when: MilliSecondsSinceUnixEpoch::now(),
273			by: by.to_owned(),
274		};
275
276		self.db
277			.userid_locked
278			.raw_put(user_id, Json(entry));
279	}
280
281	pub fn clear_locked(&self, user_id: &UserId) { self.db.userid_locked.remove(user_id); }
282
283	/// Returns the number of users registered on this server.
284	#[inline]
285	pub async fn count(&self) -> usize { self.db.userid_password.count().await }
286
287	/// Returns an iterator over all users on this homeserver.
288	pub fn stream(&self) -> impl Stream<Item = &UserId> + Send {
289		self.db.userid_password.keys().ignore_err()
290	}
291
292	/// Returns a list of local users as list of usernames.
293	///
294	/// A user account is considered `local` if the length of it's password is
295	/// greater then zero.
296	pub fn list_local_users(&self) -> impl Stream<Item = &UserId> + Send + '_ {
297		self.db
298			.userid_password
299			.stream()
300			.ignore_err()
301			.ready_filter_map(|(u, p): (&UserId, &[u8])| (!p.is_empty()).then_some(u))
302	}
303
304	/// Returns the origin of the user (password/LDAP/...).
305	pub async fn origin(&self, user_id: &UserId) -> Result<String> {
306		self.db
307			.userid_origin
308			.get(user_id)
309			.await
310			.deserialized()
311	}
312
313	/// Returns whether the user has a password. Disabled accounts and
314	/// registrations setting a sentinel password will return false here.
315	pub async fn has_password(&self, user_id: &UserId) -> Result<bool> {
316		self.password_hash(user_id)
317			.map_ok(|value| value != PASSWORD_DISABLED && value != PASSWORD_SENTINEL)
318			.await
319	}
320
321	/// Returns the password hash for the given user.
322	pub async fn password_hash(&self, user_id: &UserId) -> Result<String> {
323		self.db
324			.userid_password
325			.get(user_id)
326			.await
327			.deserialized()
328	}
329
330	/// Hash and set the user's password to the Argon2 hash
331	pub async fn set_password(&self, user_id: &UserId, password: Option<&str>) -> Result {
332		// Cannot change the password of a LDAP user. There are two special cases :
333		// - a `None` password can be used to deactivate a LDAP user
334		// - a "*" password is used as the default password of an active LDAP user
335		//
336		// The above now applies to all non-password origin users by default unless an
337		// exception is made for that origin in the condition below. Note that users
338		// with no origin are also password-origin users.
339		let allowed_origins = ["password", "sso"];
340		if password.is_some() && password != Some(PASSWORD_SENTINEL) {
341			let origin = self.origin(user_id).await;
342			let origin = origin.as_deref().unwrap_or("password");
343
344			if !allowed_origins.iter().any(is_equal_to!(&origin)) {
345				return Err!(Request(InvalidParam(
346					"Cannot change password of an {origin:?} user."
347				)));
348			}
349		}
350
351		match password.map(utils::hash::password) {
352			| None => {
353				self.db
354					.userid_password
355					.insert(user_id, PASSWORD_DISABLED);
356			},
357			| Some(Ok(_)) if password == Some(PASSWORD_SENTINEL) => {
358				self.db
359					.userid_password
360					.insert(user_id, PASSWORD_SENTINEL);
361			},
362			| Some(Ok(hash)) => {
363				self.db.userid_password.insert(user_id, hash);
364				self.db.userid_origin.insert(user_id, "password");
365			},
366			| Some(Err(e)) => {
367				return Err!(Request(InvalidParam(
368					"Password does not meet the requirements: {e}"
369				)));
370			},
371		}
372
373		Ok(())
374	}
375
376	/// Creates a new sync filter. Returns the filter id.
377	#[must_use]
378	pub fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> String {
379		let filter_id = utils::random_string(4);
380
381		let key = (user_id, &filter_id);
382		self.db.userfilterid_filter.put(key, Json(filter));
383
384		filter_id
385	}
386
387	pub async fn get_filter(
388		&self,
389		user_id: &UserId,
390		filter_id: &str,
391	) -> Result<FilterDefinition> {
392		let key = (user_id, filter_id);
393		self.db
394			.userfilterid_filter
395			.qry(&key)
396			.await
397			.deserialized()
398	}
399
400	/// Creates an OpenID token, which can be used to prove that a user has
401	/// access to an account (primarily for integrations)
402	pub fn create_openid_token(&self, user_id: &UserId, token: &str) -> Result<u64> {
403		use std::num::Saturating as Sat;
404
405		let expires_in = self.services.server.config.openid_token_ttl;
406		let expires_at = Sat(utils::millis_since_unix_epoch()) + Sat(expires_in) * Sat(1000);
407
408		let mut value = expires_at.0.to_be_bytes().to_vec();
409		value.extend_from_slice(user_id.as_bytes());
410
411		self.db
412			.openidtoken_expiresatuserid
413			.insert(token.as_bytes(), value.as_slice());
414
415		Ok(expires_in)
416	}
417
418	/// Find out which user an OpenID access token belongs to.
419	pub async fn find_from_openid_token(&self, token: &str) -> Result<OwnedUserId> {
420		let Ok(value) = self
421			.db
422			.openidtoken_expiresatuserid
423			.get(token)
424			.await
425		else {
426			return Err!(Request(Unauthorized("OpenID token is unrecognised")));
427		};
428
429		let (expires_at_bytes, user_bytes) = value.split_at(0_u64.to_be_bytes().len());
430		let expires_at =
431			u64::from_be_bytes(expires_at_bytes.try_into().map_err(|e| {
432				err!(Database("expires_at in openid_userid is invalid u64. {e}"))
433			})?);
434
435		if expires_at < utils::millis_since_unix_epoch() {
436			debug_warn!("OpenID token is expired, removing");
437			self.db
438				.openidtoken_expiresatuserid
439				.remove(token.as_bytes());
440
441			return Err!(Request(Unauthorized("OpenID token is expired")));
442		}
443
444		let user_string = utils::string_from_bytes(user_bytes)
445			.map_err(|e| err!(Database("User ID in openid_userid is invalid unicode. {e}")))?;
446
447		OwnedUserId::try_from(user_string)
448			.map_err(|e| err!(Database("User ID in openid_userid is invalid. {e}")))
449	}
450
451	/// Creates a short-lived login token, which can be used to log in using the
452	/// `m.login.token` mechanism.
453	#[must_use]
454	pub fn create_login_token(&self, user_id: &UserId, token: &str) -> u64 {
455		use std::num::Saturating as Sat;
456
457		let expires_in = self.services.server.config.login_token_ttl;
458		let expires_at = Sat(utils::millis_since_unix_epoch()) + Sat(expires_in);
459
460		let value = (expires_at.0, user_id);
461		self.db
462			.logintoken_expiresatuserid
463			.raw_put(token, value);
464
465		expires_in
466	}
467
468	/// Verify a login token is valid and return its owner without consuming it.
469	/// Unlike `find_from_login_token`, the token remains in the database
470	/// after this call and can still be consumed later.
471	pub async fn peek_login_token(&self, token: &str) -> Result<OwnedUserId> {
472		let Ok(value) = self
473			.db
474			.logintoken_expiresatuserid
475			.get(token)
476			.await
477		else {
478			return Err!(Request(Forbidden("Login token is unrecognised")));
479		};
480		let (expires_at, user_id): (u64, OwnedUserId) = value.deserialized()?;
481
482		if expires_at < utils::millis_since_unix_epoch() {
483			trace!(?user_id, ?token, "Removing expired login token");
484			self.db.logintoken_expiresatuserid.remove(token);
485			return Err!(Request(Forbidden("Login token is expired")));
486		}
487
488		Ok(user_id)
489	}
490
491	/// Find out which user a login token belongs to.
492	/// Removes the token to prevent double-use attacks.
493	pub async fn find_from_login_token(&self, token: &str) -> Result<OwnedUserId> {
494		let Ok(value) = self
495			.db
496			.logintoken_expiresatuserid
497			.get(token)
498			.await
499		else {
500			return Err!(Request(Forbidden("Login token is unrecognised")));
501		};
502		let (expires_at, user_id): (u64, OwnedUserId) = value.deserialized()?;
503
504		if expires_at < utils::millis_since_unix_epoch() {
505			trace!(?user_id, ?token, "Removing expired login token");
506
507			self.db.logintoken_expiresatuserid.remove(token);
508
509			return Err!(Request(Forbidden("Login token is expired")));
510		}
511
512		self.db.logintoken_expiresatuserid.remove(token);
513
514		Ok(user_id)
515	}
516
517	#[cfg(not(feature = "ldap"))]
518	#[expect(clippy::unused_async)]
519	pub async fn search_ldap(&self, _user_id: &UserId) -> Result<Vec<(String, bool)>> {
520		Err!(FeatureDisabled("ldap"))
521	}
522
523	#[cfg(not(feature = "ldap"))]
524	#[expect(clippy::unused_async)]
525	pub async fn auth_ldap(&self, _user_dn: &str, _password: &str) -> Result {
526		Err!(FeatureDisabled("ldap"))
527	}
528
529	async fn update_all_rooms<'a, S>(&self, user_id: &UserId, rooms: S)
530	where
531		S: Stream<Item = (PduBuilder, &'a OwnedRoomId)> + Send,
532	{
533		rooms
534			.for_each_concurrent(automatic_width(), async |(pdu_builder, room_id)| {
535				let state_lock = self.services.state.mutex.lock(room_id).await;
536				if let Err(e) = self
537					.services
538					.timeline
539					.build_and_append_pdu(pdu_builder, user_id, room_id, &state_lock)
540					.await
541				{
542					warn!(
543						%user_id,
544						%room_id,
545						%e,
546						"Failed to update/send new profile join membership update in room",
547					);
548				}
549			})
550			.await;
551	}
552}