Skip to main content

tuwunel_service/rooms/state_cache/
mod.rs

1mod update;
2mod via;
3
4use std::{
5	collections::HashMap,
6	sync::{Arc, RwLock},
7};
8
9use futures::{Stream, StreamExt, future::join5, pin_mut};
10use ruma::{
11	OwnedRoomId, RoomId, ServerName, UserId,
12	events::{AnyStrippedStateEvent, AnySyncStateEvent, room::member::MembershipState},
13	serde::Raw,
14};
15use tuwunel_core::{
16	Result, implement, trace,
17	utils::{
18		self, BoolExt,
19		future::OptionStream,
20		stream::{BroadbandExt, ReadyExt, TryIgnore},
21	},
22	warn,
23};
24use tuwunel_database::{Deserialized, Ignore, Interfix, Map};
25
26use crate::appservice::RegistrationInfo;
27
28pub struct Service {
29	appservice_in_room_cache: AppServiceInRoomCache,
30	services: Arc<crate::services::OnceServices>,
31	db: Data,
32}
33
34struct Data {
35	roomid_knockedcount: Arc<Map>,
36	roomid_invitedcount: Arc<Map>,
37	roomid_inviteviaservers: Arc<Map>,
38	roomid_joinedcount: Arc<Map>,
39	roomserverids: Arc<Map>,
40	roomuserid_invitecount: Arc<Map>,
41	roomuserid_joinedcount: Arc<Map>,
42	roomuserid_leftcount: Arc<Map>,
43	roomuserid_knockedcount: Arc<Map>,
44	roomuseroncejoinedids: Arc<Map>,
45	serverroomids: Arc<Map>,
46	userroomid_invitestate: Arc<Map>,
47	userroomid_joinedcount: Arc<Map>,
48	userroomid_leftstate: Arc<Map>,
49	userroomid_knockedstate: Arc<Map>,
50}
51
52type AppServiceInRoomCache = RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>;
53type StrippedStateEventItem = (OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>);
54type SyncStateEventItem = (OwnedRoomId, Vec<Raw<AnySyncStateEvent>>);
55
56impl crate::Service for Service {
57	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
58		Ok(Arc::new(Self {
59			appservice_in_room_cache: RwLock::new(HashMap::new()),
60			services: args.services.clone(),
61			db: Data {
62				roomid_knockedcount: args.db["roomid_knockedcount"].clone(),
63				roomid_invitedcount: args.db["roomid_invitedcount"].clone(),
64				roomid_inviteviaservers: args.db["roomid_inviteviaservers"].clone(),
65				roomid_joinedcount: args.db["roomid_joinedcount"].clone(),
66				roomserverids: args.db["roomserverids"].clone(),
67				roomuserid_invitecount: args.db["roomuserid_invitecount"].clone(),
68				roomuserid_joinedcount: args.db["roomuserid_joined"].clone(),
69				roomuserid_leftcount: args.db["roomuserid_leftcount"].clone(),
70				roomuserid_knockedcount: args.db["roomuserid_knockedcount"].clone(),
71				roomuseroncejoinedids: args.db["roomuseroncejoinedids"].clone(),
72				serverroomids: args.db["serverroomids"].clone(),
73				userroomid_invitestate: args.db["userroomid_invitestate"].clone(),
74				userroomid_joinedcount: args.db["userroomid_joined"].clone(),
75				userroomid_leftstate: args.db["userroomid_leftstate"].clone(),
76				userroomid_knockedstate: args.db["userroomid_knockedstate"].clone(),
77			},
78		}))
79	}
80
81	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
82}
83
84#[implement(Service)]
85#[tracing::instrument(level = "trace", skip_all)]
86pub async fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> bool {
87	if let Some(cached) = self
88		.appservice_in_room_cache
89		.read()
90		.expect("locked")
91		.get(room_id)
92		.and_then(|map| map.get(&appservice.registration.id))
93		.copied()
94	{
95		return cached;
96	}
97
98	let in_room = self.is_joined(&appservice.sender, room_id).await
99		|| self
100			.room_members(room_id)
101			.ready_any(|user_id| appservice.is_user_match(user_id))
102			.await;
103
104	self.appservice_in_room_cache
105		.write()
106		.expect("locked")
107		.entry(room_id.into())
108		.or_default()
109		.insert(appservice.registration.id.clone(), in_room);
110
111	in_room
112}
113
114#[implement(Service)]
115pub fn get_appservice_in_room_cache_usage(&self) -> (usize, usize) {
116	let cache = self
117		.appservice_in_room_cache
118		.read()
119		.expect("locked");
120
121	(cache.len(), cache.capacity())
122}
123
124#[implement(Service)]
125#[tracing::instrument(level = "debug", skip_all)]
126pub fn clear_appservice_in_room_cache(&self) {
127	self.appservice_in_room_cache
128		.write()
129		.expect("locked")
130		.clear();
131}
132
133/// Returns an iterator of all servers participating in this room.
134#[implement(Service)]
135#[tracing::instrument(skip(self), level = "debug")]
136pub fn room_servers<'a>(
137	&'a self,
138	room_id: &'a RoomId,
139) -> impl Stream<Item = &ServerName> + Send + 'a {
140	let prefix = (room_id, Interfix);
141	self.db
142		.roomserverids
143		.keys_prefix(&prefix)
144		.ignore_err()
145		.map(|(_, server): (Ignore, &ServerName)| server)
146}
147
148#[implement(Service)]
149#[tracing::instrument(skip(self), level = "trace")]
150pub async fn server_in_room<'a>(&'a self, server: &'a ServerName, room_id: &'a RoomId) -> bool {
151	let key = (server, room_id);
152	self.db.serverroomids.qry(&key).await.is_ok()
153}
154
155/// Returns an iterator of all rooms a server participates in (as far as we
156/// know).
157#[implement(Service)]
158#[tracing::instrument(skip(self), level = "debug")]
159pub fn server_rooms<'a>(
160	&'a self,
161	server: &'a ServerName,
162) -> impl Stream<Item = &RoomId> + Send + 'a {
163	let prefix = (server, Interfix);
164	self.db
165		.serverroomids
166		.keys_prefix(&prefix)
167		.ignore_err()
168		.map(|(_, room_id): (Ignore, &RoomId)| room_id)
169}
170
171/// Returns true if server can see user by sharing at least one room.
172#[implement(Service)]
173#[tracing::instrument(skip(self), level = "trace")]
174pub async fn server_sees_user(&self, server: &ServerName, user_id: &UserId) -> bool {
175	self.server_rooms(server)
176		.map(ToOwned::to_owned)
177		.broad_any(async |room_id| self.is_joined(user_id, &room_id).await)
178		.await
179}
180
181/// Returns true if user_a and user_b share at least one room.
182#[implement(Service)]
183#[tracing::instrument(skip(self), level = "trace")]
184pub async fn user_sees_user(&self, user_a: &UserId, user_b: &UserId) -> bool {
185	let get_shared_rooms = self.get_shared_rooms(user_a, user_b);
186
187	pin_mut!(get_shared_rooms);
188	get_shared_rooms.next().await.is_some()
189}
190
191/// List the rooms common between two users
192#[implement(Service)]
193#[tracing::instrument(skip(self), level = "debug")]
194pub fn get_shared_rooms<'a>(
195	&'a self,
196	user_a: &'a UserId,
197	user_b: &'a UserId,
198) -> impl Stream<Item = &RoomId> + Send + 'a {
199	let a = self.rooms_joined(user_a).boxed();
200	let b = self.rooms_joined(user_b).boxed();
201
202	utils::set::intersection_sorted_stream2(a, b)
203}
204
205/// Returns an iterator of all joined members of a room.
206#[implement(Service)]
207#[tracing::instrument(skip(self), level = "debug")]
208pub fn room_members<'a>(
209	&'a self,
210	room_id: &'a RoomId,
211) -> impl Stream<Item = &UserId> + Send + 'a {
212	let prefix = (room_id, Interfix);
213	self.db
214		.roomuserid_joinedcount
215		.keys_prefix(&prefix)
216		.ignore_err()
217		.map(|(_, user_id): (Ignore, &UserId)| user_id)
218}
219
220/// Returns the number of users which are currently in a room
221#[implement(Service)]
222#[tracing::instrument(skip(self), level = "trace")]
223pub async fn room_joined_count(&self, room_id: &RoomId) -> Result<u64> {
224	self.db
225		.roomid_joinedcount
226		.get(room_id)
227		.await
228		.deserialized()
229}
230
231/// Returns the number of users which are currently invited to a room
232#[implement(Service)]
233#[tracing::instrument(skip(self), level = "trace")]
234pub async fn room_invited_count(&self, room_id: &RoomId) -> Result<u64> {
235	self.db
236		.roomid_invitedcount
237		.get(room_id)
238		.await
239		.deserialized()
240}
241
242/// Returns the number of users which are currently knocking upon a room
243#[implement(Service)]
244#[tracing::instrument(skip(self), level = "trace")]
245pub async fn room_knocked_count(&self, room_id: &RoomId) -> Result<u64> {
246	self.db
247		.roomid_knockedcount
248		.get(room_id)
249		.await
250		.deserialized()
251}
252
253/// Returns an iterator of all our local joined users in a room who are
254/// active (not deactivated, not guest)
255#[implement(Service)]
256#[tracing::instrument(skip(self), level = "debug")]
257pub fn active_local_users_in_room<'a>(
258	&'a self,
259	room_id: &'a RoomId,
260) -> impl Stream<Item = &UserId> + Send + 'a {
261	self.local_users_in_room(room_id)
262		.filter(|user| self.services.users.is_active(user))
263}
264
265/// Returns an iterator of all our local users in the room, even if they're
266/// deactivated/guests
267#[implement(Service)]
268#[tracing::instrument(skip(self), level = "debug")]
269pub fn local_users_in_room<'a>(
270	&'a self,
271	room_id: &'a RoomId,
272) -> impl Stream<Item = &UserId> + Send + 'a {
273	self.room_members(room_id)
274		.ready_filter(|user| self.services.globals.user_is_local(user))
275}
276
277/// Returns an iterator of only our users invited to this room.
278#[implement(Service)]
279#[tracing::instrument(skip(self), level = "debug")]
280pub fn local_users_invited_to_room<'a>(
281	&'a self,
282	room_id: &'a RoomId,
283) -> impl Stream<Item = &UserId> + Send + 'a {
284	self.room_members_invited(room_id)
285		.ready_filter(|user| self.services.globals.user_is_local(user))
286}
287
288/// Returns an iterator over all User IDs who ever joined a room.
289#[implement(Service)]
290#[tracing::instrument(skip(self), level = "debug")]
291pub fn room_useroncejoined<'a>(
292	&'a self,
293	room_id: &'a RoomId,
294) -> impl Stream<Item = &UserId> + Send + 'a {
295	let prefix = (room_id, Interfix);
296	self.db
297		.roomuseroncejoinedids
298		.keys_prefix(&prefix)
299		.ignore_err()
300		.map(|(_, user_id): (Ignore, &UserId)| user_id)
301}
302
303/// Returns an iterator over all invited members of a room.
304#[implement(Service)]
305#[tracing::instrument(skip(self), level = "debug")]
306pub fn room_members_invited<'a>(
307	&'a self,
308	room_id: &'a RoomId,
309) -> impl Stream<Item = &UserId> + Send + 'a {
310	let prefix = (room_id, Interfix);
311	self.db
312		.roomuserid_invitecount
313		.keys_prefix(&prefix)
314		.ignore_err()
315		.map(|(_, user_id): (Ignore, &UserId)| user_id)
316}
317
318/// Returns an iterator over all knocked members of a room.
319#[implement(Service)]
320#[tracing::instrument(skip(self), level = "debug")]
321pub fn room_members_knocked<'a>(
322	&'a self,
323	room_id: &'a RoomId,
324) -> impl Stream<Item = &UserId> + Send + 'a {
325	let prefix = (room_id, Interfix);
326	self.db
327		.roomuserid_knockedcount
328		.keys_prefix(&prefix)
329		.ignore_err()
330		.map(|(_, user_id): (Ignore, &UserId)| user_id)
331}
332
333#[implement(Service)]
334#[tracing::instrument(skip(self), level = "trace")]
335pub async fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
336	let key = (room_id, user_id);
337	self.db
338		.roomuserid_invitecount
339		.qry(&key)
340		.await
341		.deserialized()
342}
343
344#[implement(Service)]
345#[tracing::instrument(skip(self), level = "trace")]
346pub async fn get_knock_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
347	let key = (room_id, user_id);
348	self.db
349		.roomuserid_knockedcount
350		.qry(&key)
351		.await
352		.deserialized()
353}
354
355#[implement(Service)]
356#[tracing::instrument(skip(self), level = "trace")]
357pub async fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
358	let key = (room_id, user_id);
359	self.db
360		.roomuserid_leftcount
361		.qry(&key)
362		.await
363		.deserialized()
364}
365
366#[implement(Service)]
367#[tracing::instrument(skip(self), level = "trace")]
368pub async fn get_joined_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
369	let key = (room_id, user_id);
370	self.db
371		.roomuserid_joinedcount
372		.qry(&key)
373		.await
374		.deserialized()
375}
376
377/// Returns an iterator over all memberships for a user.
378#[implement(Service)]
379#[inline]
380pub fn all_user_memberships<'a>(
381	&'a self,
382	user_id: &'a UserId,
383) -> impl Stream<Item = (MembershipState, &RoomId)> + Send + 'a {
384	self.user_memberships(user_id, None)
385}
386
387/// Returns an iterator over all specified memberships for a user.
388#[implement(Service)]
389#[tracing::instrument(skip(self), level = "debug")]
390pub fn user_memberships<'a>(
391	&'a self,
392	user_id: &'a UserId,
393	mask: Option<&[MembershipState]>,
394) -> impl Stream<Item = (MembershipState, &RoomId)> + Send + 'a {
395	use MembershipState::*;
396	use futures::stream::select;
397
398	let joined = mask
399		.is_none_or(|mask| mask.contains(&Join))
400		.then_async(|| {
401			self.rooms_joined(user_id)
402				.map(|room_id| (Join, room_id))
403				.boxed()
404				.into_future()
405		});
406
407	let invited = mask
408		.is_none_or(|mask| mask.contains(&Invite))
409		.then_async(|| {
410			self.rooms_invited(user_id)
411				.map(|room_id| (Invite, room_id))
412				.boxed()
413				.into_future()
414		});
415
416	let knocked = mask
417		.is_none_or(|mask| mask.contains(&Knock))
418		.then_async(|| {
419			self.rooms_knocked(user_id)
420				.map(|room_id| (Knock, room_id))
421				.boxed()
422				.into_future()
423		});
424
425	let left = mask
426		.is_none_or(|mask| mask.contains(&Leave))
427		.then_async(|| {
428			self.rooms_left(user_id)
429				.map(|room_id| (Leave, room_id))
430				.boxed()
431				.into_future()
432		});
433
434	select(
435		select(joined.stream(), left.stream()),
436		select(invited.stream(), knocked.stream()),
437	)
438}
439
440/// Returns an iterator over all rooms this user joined.
441#[implement(Service)]
442#[tracing::instrument(skip(self), level = "debug")]
443pub fn rooms_joined<'a>(
444	&'a self,
445	user_id: &'a UserId,
446) -> impl Stream<Item = &RoomId> + Send + 'a {
447	self.db
448		.userroomid_joinedcount
449		.keys_raw_prefix(user_id)
450		.ignore_err()
451		.map(|(_, room_id): (Ignore, &RoomId)| room_id)
452}
453
454/// Returns an iterator over all rooms a user was invited to.
455#[implement(Service)]
456#[tracing::instrument(skip(self), level = "debug")]
457pub fn rooms_invited<'a>(
458	&'a self,
459	user_id: &'a UserId,
460) -> impl Stream<Item = &RoomId> + Send + 'a {
461	self.db
462		.userroomid_invitestate
463		.keys_raw_prefix(user_id)
464		.ignore_err()
465		.map(|(_, room_id): (Ignore, &RoomId)| room_id)
466}
467
468/// Returns an iterator over all rooms a user is currently knocking.
469#[implement(Service)]
470#[tracing::instrument(skip(self), level = "debug")]
471pub fn rooms_knocked<'a>(
472	&'a self,
473	user_id: &'a UserId,
474) -> impl Stream<Item = &RoomId> + Send + 'a {
475	self.db
476		.userroomid_knockedstate
477		.keys_raw_prefix(user_id)
478		.ignore_err()
479		.map(|(_, room_id): (Ignore, &RoomId)| room_id)
480}
481
482/// Returns an iterator over all rooms a user left.
483#[implement(Service)]
484#[tracing::instrument(skip(self), level = "debug")]
485pub fn rooms_left<'a>(&'a self, user_id: &'a UserId) -> impl Stream<Item = &RoomId> + Send + 'a {
486	self.db
487		.userroomid_leftstate
488		.keys_raw_prefix(user_id)
489		.ignore_err()
490		.map(|(_, room_id): (Ignore, &RoomId)| room_id)
491}
492
493/// Returns an iterator over all rooms a user was invited to.
494#[implement(Service)]
495#[tracing::instrument(skip(self), level = "debug")]
496pub fn rooms_invited_state<'a>(
497	&'a self,
498	user_id: &'a UserId,
499) -> impl Stream<Item = StrippedStateEventItem> + Send + 'a {
500	type KeyVal<'a> = (Key<'a>, Raw<Vec<AnyStrippedStateEvent>>);
501	type Key<'a> = (&'a UserId, &'a RoomId);
502
503	let prefix = (user_id, Interfix);
504	self.db
505		.userroomid_invitestate
506		.stream_prefix(&prefix)
507		.ignore_err()
508		.map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
509		.map(|(room_id, state)| Ok((room_id, state.deserialize_as_unchecked()?)))
510		.ignore_err()
511}
512
513/// Returns an iterator over all rooms a user is currently knocking.
514#[implement(Service)]
515#[tracing::instrument(skip(self), level = "trace")]
516pub fn rooms_knocked_state<'a>(
517	&'a self,
518	user_id: &'a UserId,
519) -> impl Stream<Item = StrippedStateEventItem> + Send + 'a {
520	type KeyVal<'a> = (Key<'a>, Raw<Vec<AnyStrippedStateEvent>>);
521	type Key<'a> = (&'a UserId, &'a RoomId);
522
523	let prefix = (user_id, Interfix);
524	self.db
525		.userroomid_knockedstate
526		.stream_prefix(&prefix)
527		.ignore_err()
528		.map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
529		.map(|(room_id, state)| Ok((room_id, state.deserialize_as_unchecked()?)))
530		.ignore_err()
531}
532
533/// Returns an iterator over all rooms a user left.
534#[implement(Service)]
535#[tracing::instrument(skip(self), level = "debug")]
536pub fn rooms_left_state<'a>(
537	&'a self,
538	user_id: &'a UserId,
539) -> impl Stream<Item = SyncStateEventItem> + Send + 'a {
540	type KeyVal<'a> = (Key<'a>, Raw<Vec<Raw<AnySyncStateEvent>>>);
541	type Key<'a> = (&'a UserId, &'a RoomId);
542
543	let prefix = (user_id, Interfix);
544	self.db
545		.userroomid_leftstate
546		.stream_prefix(&prefix)
547		.ignore_err()
548		.map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
549		.map(|(room_id, state)| Ok((room_id, state.deserialize_as_unchecked()?)))
550		.ignore_err()
551}
552
553#[implement(Service)]
554#[tracing::instrument(skip(self), level = "trace")]
555pub async fn invite_state(
556	&self,
557	user_id: &UserId,
558	room_id: &RoomId,
559) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
560	let key = (user_id, room_id);
561	self.db
562		.userroomid_invitestate
563		.qry(&key)
564		.await
565		.deserialized()
566		.and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| {
567			val.deserialize_as_unchecked().map_err(Into::into)
568		})
569}
570
571#[implement(Service)]
572#[tracing::instrument(skip(self), level = "trace")]
573pub async fn knock_state(
574	&self,
575	user_id: &UserId,
576	room_id: &RoomId,
577) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
578	let key = (user_id, room_id);
579	self.db
580		.userroomid_knockedstate
581		.qry(&key)
582		.await
583		.deserialized()
584		.and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| {
585			val.deserialize_as_unchecked().map_err(Into::into)
586		})
587}
588
589#[implement(Service)]
590#[tracing::instrument(skip(self), level = "trace")]
591pub async fn left_state(
592	&self,
593	user_id: &UserId,
594	room_id: &RoomId,
595) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
596	let key = (user_id, room_id);
597	self.db
598		.userroomid_leftstate
599		.qry(&key)
600		.await
601		.deserialized()
602		.and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| {
603			val.deserialize_as_unchecked().map_err(Into::into)
604		})
605}
606
607#[implement(Service)]
608#[tracing::instrument(skip(self), level = "trace")]
609pub async fn user_membership(
610	&self,
611	user_id: &UserId,
612	room_id: &RoomId,
613) -> Option<MembershipState> {
614	let states = join5(
615		self.is_joined(user_id, room_id),
616		self.is_left(user_id, room_id),
617		self.is_knocked(user_id, room_id),
618		self.is_invited(user_id, room_id),
619		self.once_joined(user_id, room_id),
620	)
621	.await;
622
623	match states {
624		| (true, ..) => Some(MembershipState::Join),
625		| (_, true, ..) => Some(MembershipState::Leave),
626		| (_, _, true, ..) => Some(MembershipState::Knock),
627		| (_, _, _, true, ..) => Some(MembershipState::Invite),
628		| (false, false, false, false, true) => Some(MembershipState::Ban),
629		| _ => None,
630	}
631}
632
633#[implement(Service)]
634#[tracing::instrument(skip(self), level = "debug")]
635pub async fn once_joined(&self, user_id: &UserId, room_id: &RoomId) -> bool {
636	let key = (user_id, room_id);
637	self.db.roomuseroncejoinedids.contains(&key).await
638}
639
640#[implement(Service)]
641#[tracing::instrument(skip(self), level = "trace")]
642pub async fn is_joined<'a>(&'a self, user_id: &'a UserId, room_id: &'a RoomId) -> bool {
643	let key = (user_id, room_id);
644	self.db
645		.userroomid_joinedcount
646		.contains(&key)
647		.await
648}
649
650#[implement(Service)]
651#[tracing::instrument(skip(self), level = "trace")]
652pub async fn is_knocked<'a>(&'a self, user_id: &'a UserId, room_id: &'a RoomId) -> bool {
653	let key = (user_id, room_id);
654	self.db
655		.userroomid_knockedstate
656		.contains(&key)
657		.await
658}
659
660#[implement(Service)]
661#[tracing::instrument(skip(self), level = "trace")]
662pub async fn is_invited(&self, user_id: &UserId, room_id: &RoomId) -> bool {
663	let key = (user_id, room_id);
664	self.db
665		.userroomid_invitestate
666		.contains(&key)
667		.await
668}
669
670#[implement(Service)]
671#[tracing::instrument(skip(self), level = "trace")]
672pub async fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> bool {
673	let key = (user_id, room_id);
674	self.db.userroomid_leftstate.contains(&key).await
675}
676
677#[implement(Service)]
678#[tracing::instrument(skip(self), level = "trace")]
679pub async fn delete_room_join_counts(&self, room_id: &RoomId, force: bool) -> Result {
680	let prefix = (room_id, Interfix);
681
682	self.db.roomid_knockedcount.remove(room_id);
683
684	self.db.roomid_invitedcount.remove(room_id);
685
686	self.db.roomid_inviteviaservers.remove(room_id);
687
688	self.db.roomid_joinedcount.remove(room_id);
689
690	self.db
691		.roomserverids
692		.keys_prefix(&prefix)
693		.ignore_err()
694		.ready_for_each(|key: (&RoomId, &ServerName)| {
695			trace!("Removing key: {key:?}");
696			self.db.roomserverids.del(key);
697
698			let reverse_key = (key.1, key.0);
699			trace!("Removing reverse key: {reverse_key:?}");
700			self.db.serverroomids.del(reverse_key);
701		})
702		.await;
703
704	self.db
705		.roomuserid_invitecount
706		.keys_prefix(&prefix)
707		.ignore_err()
708		.ready_for_each(|key: (&RoomId, &UserId)| {
709			trace!("Removing key: {key:?}");
710			self.db.roomuserid_invitecount.del(key);
711
712			let reverse_key = (key.1, key.0);
713			trace!("Removing reverse key: {reverse_key:?}");
714			self.db.userroomid_invitestate.del(reverse_key);
715		})
716		.await;
717
718	self.db
719		.roomuserid_joinedcount
720		.keys_prefix(&prefix)
721		.ignore_err()
722		.ready_for_each(|key: (&RoomId, &UserId)| {
723			trace!("Removing key: {key:?}");
724			self.db.roomuserid_joinedcount.del(key);
725
726			let reverse_key = (key.1, key.0);
727			trace!("Removing reverse key: {reverse_key:?}");
728			self.db.userroomid_joinedcount.del(reverse_key);
729		})
730		.await;
731
732	self.db
733		.roomuserid_knockedcount
734		.keys_prefix(&prefix)
735		.ignore_err()
736		.ready_for_each(|key: (&RoomId, &UserId)| {
737			trace!("Removing key: {key:?}");
738			self.db.roomuserid_knockedcount.del(key);
739
740			let reverse_key = (key.1, key.0);
741			trace!("Removing reverse key: {reverse_key:?}");
742			self.db.userroomid_knockedstate.del(reverse_key);
743		})
744		.await;
745
746	self.db
747		.roomuserid_leftcount
748		.keys_prefix(&prefix)
749		.ignore_err()
750		.ready_filter(|(_, user_id): &(&RoomId, &UserId)| {
751			force || !self.services.globals.user_is_local(user_id)
752		})
753		.ready_for_each(|key: (&RoomId, &UserId)| {
754			trace!("Removing key: {key:?}");
755			self.db.roomuserid_leftcount.del(key);
756
757			let reverse_key = (key.1, key.0);
758			trace!("Removing reverse key: {reverse_key:?}");
759			self.db.userroomid_leftstate.del(reverse_key);
760		})
761		.await;
762
763	Ok(())
764}