1mod dehydrated_device;
2pub mod device;
3mod keys;
4mod ldap;
5mod profile;
6mod register;
7
8use std::sync::Arc;
9
10use futures::{Stream, StreamExt, TryFutureExt};
11use ruma::{
12 MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, UserId,
13 api::client::filter::FilterDefinition,
14 events::{
15 GlobalAccountDataEventType,
16 ignored_user_list::IgnoredUserListEvent,
17 invite_permission_config::{InvitePermissionAction, InvitePermissionConfigEvent},
18 },
19};
20use serde::{Deserialize, Serialize};
21use tuwunel_core::{
22 Err, Result, debug_warn, err, is_equal_to,
23 pdu::PduBuilder,
24 trace,
25 utils::{
26 self, ReadyExt,
27 stream::{TryIgnore, automatic_width},
28 },
29 warn,
30};
31use tuwunel_database::{Deserialized, Json, Map};
32
33pub use self::{
34 keys::parse_master_key,
35 profile::{Propagation, propagation_default},
36 register::Register,
37};
38
39pub const PASSWORD_SENTINEL: &str = "*";
40pub const PASSWORD_DISABLED: &str = "";
41
42#[derive(Clone, Debug, Serialize, Deserialize)]
46pub struct Moderation {
47 pub when: MilliSecondsSinceUnixEpoch,
48 pub by: OwnedUserId,
49}
50
51pub struct Service {
52 services: Arc<crate::services::OnceServices>,
53 db: Data,
54}
55
56struct Data {
57 keychangeid_userid: Arc<Map>,
58 keyid_key: Arc<Map>,
59 onetimekeyid4225_otk: Arc<Map>,
60 openidtoken_expiresatuserid: Arc<Map>,
61 logintoken_expiresatuserid: Arc<Map>,
62 todeviceid_events: Arc<Map>,
63 token_userdeviceid: Arc<Map>,
64 userdeviceid_metadata: Arc<Map>,
65 userdeviceid_token: Arc<Map>,
66 userdeviceid_refresh: Arc<Map>,
67 userdeviceidalgorithm_fallback: Arc<Map>,
68 oidcdevice_userdeviceid: Arc<Map>,
69 oidccskeybypass_userid: Arc<Map>,
70 userfilterid_filter: Arc<Map>,
71 userid_avatarurl: Arc<Map>,
72 userid_blurhash: Arc<Map>,
73 userid_dehydrateddevice: Arc<Map>,
74 userid_devicelistversion: Arc<Map>,
75 userid_displayname: Arc<Map>,
76 userid_lastonetimekeyupdate: Arc<Map>,
77 userid_locked: Arc<Map>,
78 userid_masterkeyid: Arc<Map>,
79 userid_password: Arc<Map>,
80 userid_origin: Arc<Map>,
81 userid_selfsigningkeyid: Arc<Map>,
82 userid_suspended: Arc<Map>,
83 userid_usersigningkeyid: Arc<Map>,
84 useridprofilekey_value: Arc<Map>,
85}
86
87impl crate::Service for Service {
88 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
89 Ok(Arc::new(Self {
90 services: args.services.clone(),
91 db: Data {
92 keychangeid_userid: args.db["keychangeid_userid"].clone(),
93 keyid_key: args.db["keyid_key"].clone(),
94 onetimekeyid4225_otk: args.db["onetimekeyid4225_otk"].clone(),
95 openidtoken_expiresatuserid: args.db["openidtoken_expiresatuserid"].clone(),
96 logintoken_expiresatuserid: args.db["logintoken_expiresatuserid"].clone(),
97 oidcdevice_userdeviceid: args.db["oidcdevice_userdeviceid"].clone(),
98 oidccskeybypass_userid: args.db["oidccskeybypass_userid"].clone(),
99 todeviceid_events: args.db["todeviceid_events"].clone(),
100 token_userdeviceid: args.db["token_userdeviceid"].clone(),
101 userdeviceid_metadata: args.db["userdeviceid_metadata"].clone(),
102 userdeviceid_token: args.db["userdeviceid_token"].clone(),
103 userdeviceid_refresh: args.db["userdeviceid_refresh"].clone(),
104 userdeviceidalgorithm_fallback: args.db["userdeviceidalgorithm_fallback"].clone(),
105 userfilterid_filter: args.db["userfilterid_filter"].clone(),
106 userid_avatarurl: args.db["userid_avatarurl"].clone(),
107 userid_blurhash: args.db["userid_blurhash"].clone(),
108 userid_dehydrateddevice: args.db["userid_dehydrateddevice"].clone(),
109 userid_devicelistversion: args.db["userid_devicelistversion"].clone(),
110 userid_displayname: args.db["userid_displayname"].clone(),
111 userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(),
112 userid_locked: args.db["userid_locked"].clone(),
113 userid_masterkeyid: args.db["userid_masterkeyid"].clone(),
114 userid_password: args.db["userid_password"].clone(),
115 userid_origin: args.db["userid_origin"].clone(),
116 userid_selfsigningkeyid: args.db["userid_selfsigningkeyid"].clone(),
117 userid_suspended: args.db["userid_suspended"].clone(),
118 userid_usersigningkeyid: args.db["userid_usersigningkeyid"].clone(),
119 useridprofilekey_value: args.db["useridprofilekey_value"].clone(),
120 },
121 }))
122 }
123
124 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
125}
126
127impl Service {
128 pub async fn user_is_ignored(&self, sender_user: &UserId, recipient_user: &UserId) -> bool {
131 self.services
132 .account_data
133 .get_global(recipient_user, GlobalAccountDataEventType::IgnoredUserList)
134 .await
135 .is_ok_and(|ignored: IgnoredUserListEvent| {
136 ignored
137 .content
138 .ignored_users
139 .keys()
140 .any(|blocked_user| blocked_user == sender_user)
141 })
142 }
143
144 pub async fn invites_blocked(&self, user_id: &UserId) -> bool {
146 self.services
147 .account_data
148 .get_global(user_id, GlobalAccountDataEventType::InvitePermissionConfig)
149 .await
150 .is_ok_and(|event: InvitePermissionConfigEvent| {
151 matches!(event.content.default_action, Some(InvitePermissionAction::Block))
152 })
153 }
154
155 #[inline]
161 pub async fn create(
162 &self,
163 user_id: &UserId,
164 password: Option<&str>,
165 origin: Option<&str>,
166 ) -> Result {
167 let origin = origin.unwrap_or("password");
168 self.db.userid_origin.insert(user_id, origin);
169 self.set_password(user_id, password).await
170 }
171
172 pub async fn deactivate_account(&self, user_id: &UserId) -> Result {
174 self.services
176 .oauth
177 .revoke_user_tokens(user_id)
178 .await;
179
180 self.all_device_ids(user_id)
182 .for_each(|device_id| self.remove_device(user_id, device_id))
183 .await;
184
185 self.set_password(user_id, None).await?;
190
191 Ok(())
193 }
194
195 #[inline]
197 pub async fn exists(&self, user_id: &UserId) -> bool {
198 self.db.userid_password.get(user_id).await.is_ok()
199 }
200
201 pub async fn is_deactivated(&self, user_id: &UserId) -> Result<bool> {
203 self.db
204 .userid_password
205 .get(user_id)
206 .map_ok(|val| val.is_empty())
207 .map_err(|_| err!(Request(NotFound("User does not exist."))))
208 .await
209 }
210
211 pub async fn is_active(&self, user_id: &UserId) -> bool {
213 !self.is_deactivated(user_id).await.unwrap_or(true)
214 }
215
216 pub async fn is_active_local(&self, user_id: &UserId) -> bool {
218 self.services.globals.user_is_local(user_id) && self.is_active(user_id).await
219 }
220
221 pub async fn is_suspended(&self, user_id: &UserId) -> bool {
223 self.db
224 .userid_suspended
225 .get(user_id)
226 .await
227 .is_ok()
228 }
229
230 pub async fn is_locked(&self, user_id: &UserId) -> bool {
232 self.db.userid_locked.get(user_id).await.is_ok()
233 }
234
235 pub async fn get_suspension(&self, user_id: &UserId) -> Option<Moderation> {
237 self.db
238 .userid_suspended
239 .get(user_id)
240 .await
241 .deserialized::<Json<_>>()
242 .map(|Json(m)| m)
243 .ok()
244 }
245
246 pub async fn get_lock(&self, user_id: &UserId) -> Option<Moderation> {
248 self.db
249 .userid_locked
250 .get(user_id)
251 .await
252 .deserialized::<Json<_>>()
253 .map(|Json(m)| m)
254 .ok()
255 }
256
257 pub fn set_suspended(&self, user_id: &UserId, by: &UserId) {
258 let entry = Moderation {
259 when: MilliSecondsSinceUnixEpoch::now(),
260 by: by.to_owned(),
261 };
262
263 self.db
264 .userid_suspended
265 .raw_put(user_id, Json(entry));
266 }
267
268 pub fn clear_suspended(&self, user_id: &UserId) { self.db.userid_suspended.remove(user_id); }
269
270 pub fn set_locked(&self, user_id: &UserId, by: &UserId) {
271 let entry = Moderation {
272 when: MilliSecondsSinceUnixEpoch::now(),
273 by: by.to_owned(),
274 };
275
276 self.db
277 .userid_locked
278 .raw_put(user_id, Json(entry));
279 }
280
281 pub fn clear_locked(&self, user_id: &UserId) { self.db.userid_locked.remove(user_id); }
282
283 #[inline]
285 pub async fn count(&self) -> usize { self.db.userid_password.count().await }
286
287 pub fn stream(&self) -> impl Stream<Item = &UserId> + Send {
289 self.db.userid_password.keys().ignore_err()
290 }
291
292 pub fn list_local_users(&self) -> impl Stream<Item = &UserId> + Send + '_ {
297 self.db
298 .userid_password
299 .stream()
300 .ignore_err()
301 .ready_filter_map(|(u, p): (&UserId, &[u8])| (!p.is_empty()).then_some(u))
302 }
303
304 pub async fn origin(&self, user_id: &UserId) -> Result<String> {
306 self.db
307 .userid_origin
308 .get(user_id)
309 .await
310 .deserialized()
311 }
312
313 pub async fn has_password(&self, user_id: &UserId) -> Result<bool> {
316 self.password_hash(user_id)
317 .map_ok(|value| value != PASSWORD_DISABLED && value != PASSWORD_SENTINEL)
318 .await
319 }
320
321 pub async fn password_hash(&self, user_id: &UserId) -> Result<String> {
323 self.db
324 .userid_password
325 .get(user_id)
326 .await
327 .deserialized()
328 }
329
330 pub async fn set_password(&self, user_id: &UserId, password: Option<&str>) -> Result {
332 let allowed_origins = ["password", "sso"];
340 if password.is_some() && password != Some(PASSWORD_SENTINEL) {
341 let origin = self.origin(user_id).await;
342 let origin = origin.as_deref().unwrap_or("password");
343
344 if !allowed_origins.iter().any(is_equal_to!(&origin)) {
345 return Err!(Request(InvalidParam(
346 "Cannot change password of an {origin:?} user."
347 )));
348 }
349 }
350
351 match password.map(utils::hash::password) {
352 | None => {
353 self.db
354 .userid_password
355 .insert(user_id, PASSWORD_DISABLED);
356 },
357 | Some(Ok(_)) if password == Some(PASSWORD_SENTINEL) => {
358 self.db
359 .userid_password
360 .insert(user_id, PASSWORD_SENTINEL);
361 },
362 | Some(Ok(hash)) => {
363 self.db.userid_password.insert(user_id, hash);
364 self.db.userid_origin.insert(user_id, "password");
365 },
366 | Some(Err(e)) => {
367 return Err!(Request(InvalidParam(
368 "Password does not meet the requirements: {e}"
369 )));
370 },
371 }
372
373 Ok(())
374 }
375
376 #[must_use]
378 pub fn create_filter(&self, user_id: &UserId, filter: &FilterDefinition) -> String {
379 let filter_id = utils::random_string(4);
380
381 let key = (user_id, &filter_id);
382 self.db.userfilterid_filter.put(key, Json(filter));
383
384 filter_id
385 }
386
387 pub async fn get_filter(
388 &self,
389 user_id: &UserId,
390 filter_id: &str,
391 ) -> Result<FilterDefinition> {
392 let key = (user_id, filter_id);
393 self.db
394 .userfilterid_filter
395 .qry(&key)
396 .await
397 .deserialized()
398 }
399
400 pub fn create_openid_token(&self, user_id: &UserId, token: &str) -> Result<u64> {
403 use std::num::Saturating as Sat;
404
405 let expires_in = self.services.server.config.openid_token_ttl;
406 let expires_at = Sat(utils::millis_since_unix_epoch()) + Sat(expires_in) * Sat(1000);
407
408 let mut value = expires_at.0.to_be_bytes().to_vec();
409 value.extend_from_slice(user_id.as_bytes());
410
411 self.db
412 .openidtoken_expiresatuserid
413 .insert(token.as_bytes(), value.as_slice());
414
415 Ok(expires_in)
416 }
417
418 pub async fn find_from_openid_token(&self, token: &str) -> Result<OwnedUserId> {
420 let Ok(value) = self
421 .db
422 .openidtoken_expiresatuserid
423 .get(token)
424 .await
425 else {
426 return Err!(Request(Unauthorized("OpenID token is unrecognised")));
427 };
428
429 let (expires_at_bytes, user_bytes) = value.split_at(0_u64.to_be_bytes().len());
430 let expires_at =
431 u64::from_be_bytes(expires_at_bytes.try_into().map_err(|e| {
432 err!(Database("expires_at in openid_userid is invalid u64. {e}"))
433 })?);
434
435 if expires_at < utils::millis_since_unix_epoch() {
436 debug_warn!("OpenID token is expired, removing");
437 self.db
438 .openidtoken_expiresatuserid
439 .remove(token.as_bytes());
440
441 return Err!(Request(Unauthorized("OpenID token is expired")));
442 }
443
444 let user_string = utils::string_from_bytes(user_bytes)
445 .map_err(|e| err!(Database("User ID in openid_userid is invalid unicode. {e}")))?;
446
447 OwnedUserId::try_from(user_string)
448 .map_err(|e| err!(Database("User ID in openid_userid is invalid. {e}")))
449 }
450
451 #[must_use]
454 pub fn create_login_token(&self, user_id: &UserId, token: &str) -> u64 {
455 use std::num::Saturating as Sat;
456
457 let expires_in = self.services.server.config.login_token_ttl;
458 let expires_at = Sat(utils::millis_since_unix_epoch()) + Sat(expires_in);
459
460 let value = (expires_at.0, user_id);
461 self.db
462 .logintoken_expiresatuserid
463 .raw_put(token, value);
464
465 expires_in
466 }
467
468 pub async fn peek_login_token(&self, token: &str) -> Result<OwnedUserId> {
472 let Ok(value) = self
473 .db
474 .logintoken_expiresatuserid
475 .get(token)
476 .await
477 else {
478 return Err!(Request(Forbidden("Login token is unrecognised")));
479 };
480 let (expires_at, user_id): (u64, OwnedUserId) = value.deserialized()?;
481
482 if expires_at < utils::millis_since_unix_epoch() {
483 trace!(?user_id, ?token, "Removing expired login token");
484 self.db.logintoken_expiresatuserid.remove(token);
485 return Err!(Request(Forbidden("Login token is expired")));
486 }
487
488 Ok(user_id)
489 }
490
491 pub async fn find_from_login_token(&self, token: &str) -> Result<OwnedUserId> {
494 let Ok(value) = self
495 .db
496 .logintoken_expiresatuserid
497 .get(token)
498 .await
499 else {
500 return Err!(Request(Forbidden("Login token is unrecognised")));
501 };
502 let (expires_at, user_id): (u64, OwnedUserId) = value.deserialized()?;
503
504 if expires_at < utils::millis_since_unix_epoch() {
505 trace!(?user_id, ?token, "Removing expired login token");
506
507 self.db.logintoken_expiresatuserid.remove(token);
508
509 return Err!(Request(Forbidden("Login token is expired")));
510 }
511
512 self.db.logintoken_expiresatuserid.remove(token);
513
514 Ok(user_id)
515 }
516
517 #[cfg(not(feature = "ldap"))]
518 #[expect(clippy::unused_async)]
519 pub async fn search_ldap(&self, _user_id: &UserId) -> Result<Vec<(String, bool)>> {
520 Err!(FeatureDisabled("ldap"))
521 }
522
523 #[cfg(not(feature = "ldap"))]
524 #[expect(clippy::unused_async)]
525 pub async fn auth_ldap(&self, _user_dn: &str, _password: &str) -> Result {
526 Err!(FeatureDisabled("ldap"))
527 }
528
529 async fn update_all_rooms<'a, S>(&self, user_id: &UserId, rooms: S)
530 where
531 S: Stream<Item = (PduBuilder, &'a OwnedRoomId)> + Send,
532 {
533 rooms
534 .for_each_concurrent(automatic_width(), async |(pdu_builder, room_id)| {
535 let state_lock = self.services.state.mutex.lock(room_id).await;
536 if let Err(e) = self
537 .services
538 .timeline
539 .build_and_append_pdu(pdu_builder, user_id, room_id, &state_lock)
540 .await
541 {
542 warn!(
543 %user_id,
544 %room_id,
545 %e,
546 "Failed to update/send new profile join membership update in room",
547 );
548 }
549 })
550 .await;
551 }
552}