1mod update;
2mod via;
3
4use std::{
5 collections::HashMap,
6 sync::{Arc, RwLock},
7};
8
9use futures::{Stream, StreamExt, future::join5, pin_mut};
10use ruma::{
11 OwnedRoomId, RoomId, ServerName, UserId,
12 events::{AnyStrippedStateEvent, AnySyncStateEvent, room::member::MembershipState},
13 serde::Raw,
14};
15use tuwunel_core::{
16 Result, implement, trace,
17 utils::{
18 self, BoolExt,
19 future::OptionStream,
20 stream::{BroadbandExt, ReadyExt, TryIgnore},
21 },
22 warn,
23};
24use tuwunel_database::{Deserialized, Ignore, Interfix, Map};
25
26use crate::appservice::RegistrationInfo;
27
28pub struct Service {
29 appservice_in_room_cache: AppServiceInRoomCache,
30 services: Arc<crate::services::OnceServices>,
31 db: Data,
32}
33
34struct Data {
35 roomid_knockedcount: Arc<Map>,
36 roomid_invitedcount: Arc<Map>,
37 roomid_inviteviaservers: Arc<Map>,
38 roomid_joinedcount: Arc<Map>,
39 roomserverids: Arc<Map>,
40 roomuserid_invitecount: Arc<Map>,
41 roomuserid_joinedcount: Arc<Map>,
42 roomuserid_leftcount: Arc<Map>,
43 roomuserid_knockedcount: Arc<Map>,
44 roomuseroncejoinedids: Arc<Map>,
45 serverroomids: Arc<Map>,
46 userroomid_invitestate: Arc<Map>,
47 userroomid_joinedcount: Arc<Map>,
48 userroomid_leftstate: Arc<Map>,
49 userroomid_knockedstate: Arc<Map>,
50}
51
52type AppServiceInRoomCache = RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>;
53type StrippedStateEventItem = (OwnedRoomId, Vec<Raw<AnyStrippedStateEvent>>);
54type SyncStateEventItem = (OwnedRoomId, Vec<Raw<AnySyncStateEvent>>);
55
56impl crate::Service for Service {
57 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
58 Ok(Arc::new(Self {
59 appservice_in_room_cache: RwLock::new(HashMap::new()),
60 services: args.services.clone(),
61 db: Data {
62 roomid_knockedcount: args.db["roomid_knockedcount"].clone(),
63 roomid_invitedcount: args.db["roomid_invitedcount"].clone(),
64 roomid_inviteviaservers: args.db["roomid_inviteviaservers"].clone(),
65 roomid_joinedcount: args.db["roomid_joinedcount"].clone(),
66 roomserverids: args.db["roomserverids"].clone(),
67 roomuserid_invitecount: args.db["roomuserid_invitecount"].clone(),
68 roomuserid_joinedcount: args.db["roomuserid_joined"].clone(),
69 roomuserid_leftcount: args.db["roomuserid_leftcount"].clone(),
70 roomuserid_knockedcount: args.db["roomuserid_knockedcount"].clone(),
71 roomuseroncejoinedids: args.db["roomuseroncejoinedids"].clone(),
72 serverroomids: args.db["serverroomids"].clone(),
73 userroomid_invitestate: args.db["userroomid_invitestate"].clone(),
74 userroomid_joinedcount: args.db["userroomid_joined"].clone(),
75 userroomid_leftstate: args.db["userroomid_leftstate"].clone(),
76 userroomid_knockedstate: args.db["userroomid_knockedstate"].clone(),
77 },
78 }))
79 }
80
81 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
82}
83
84#[implement(Service)]
85#[tracing::instrument(level = "trace", skip_all)]
86pub async fn appservice_in_room(&self, room_id: &RoomId, appservice: &RegistrationInfo) -> bool {
87 if let Some(cached) = self
88 .appservice_in_room_cache
89 .read()
90 .expect("locked")
91 .get(room_id)
92 .and_then(|map| map.get(&appservice.registration.id))
93 .copied()
94 {
95 return cached;
96 }
97
98 let in_room = self.is_joined(&appservice.sender, room_id).await
99 || self
100 .room_members(room_id)
101 .ready_any(|user_id| appservice.is_user_match(user_id))
102 .await;
103
104 self.appservice_in_room_cache
105 .write()
106 .expect("locked")
107 .entry(room_id.into())
108 .or_default()
109 .insert(appservice.registration.id.clone(), in_room);
110
111 in_room
112}
113
114#[implement(Service)]
115pub fn get_appservice_in_room_cache_usage(&self) -> (usize, usize) {
116 let cache = self
117 .appservice_in_room_cache
118 .read()
119 .expect("locked");
120
121 (cache.len(), cache.capacity())
122}
123
124#[implement(Service)]
125#[tracing::instrument(level = "debug", skip_all)]
126pub fn clear_appservice_in_room_cache(&self) {
127 self.appservice_in_room_cache
128 .write()
129 .expect("locked")
130 .clear();
131}
132
133#[implement(Service)]
135#[tracing::instrument(skip(self), level = "debug")]
136pub fn room_servers<'a>(
137 &'a self,
138 room_id: &'a RoomId,
139) -> impl Stream<Item = &ServerName> + Send + 'a {
140 let prefix = (room_id, Interfix);
141 self.db
142 .roomserverids
143 .keys_prefix(&prefix)
144 .ignore_err()
145 .map(|(_, server): (Ignore, &ServerName)| server)
146}
147
148#[implement(Service)]
149#[tracing::instrument(skip(self), level = "trace")]
150pub async fn server_in_room<'a>(&'a self, server: &'a ServerName, room_id: &'a RoomId) -> bool {
151 let key = (server, room_id);
152 self.db.serverroomids.qry(&key).await.is_ok()
153}
154
155#[implement(Service)]
158#[tracing::instrument(skip(self), level = "debug")]
159pub fn server_rooms<'a>(
160 &'a self,
161 server: &'a ServerName,
162) -> impl Stream<Item = &RoomId> + Send + 'a {
163 let prefix = (server, Interfix);
164 self.db
165 .serverroomids
166 .keys_prefix(&prefix)
167 .ignore_err()
168 .map(|(_, room_id): (Ignore, &RoomId)| room_id)
169}
170
171#[implement(Service)]
173#[tracing::instrument(skip(self), level = "trace")]
174pub async fn server_sees_user(&self, server: &ServerName, user_id: &UserId) -> bool {
175 self.server_rooms(server)
176 .map(ToOwned::to_owned)
177 .broad_any(async |room_id| self.is_joined(user_id, &room_id).await)
178 .await
179}
180
181#[implement(Service)]
183#[tracing::instrument(skip(self), level = "trace")]
184pub async fn user_sees_user(&self, user_a: &UserId, user_b: &UserId) -> bool {
185 let get_shared_rooms = self.get_shared_rooms(user_a, user_b);
186
187 pin_mut!(get_shared_rooms);
188 get_shared_rooms.next().await.is_some()
189}
190
191#[implement(Service)]
193#[tracing::instrument(skip(self), level = "debug")]
194pub fn get_shared_rooms<'a>(
195 &'a self,
196 user_a: &'a UserId,
197 user_b: &'a UserId,
198) -> impl Stream<Item = &RoomId> + Send + 'a {
199 let a = self.rooms_joined(user_a).boxed();
200 let b = self.rooms_joined(user_b).boxed();
201
202 utils::set::intersection_sorted_stream2(a, b)
203}
204
205#[implement(Service)]
207#[tracing::instrument(skip(self), level = "debug")]
208pub fn room_members<'a>(
209 &'a self,
210 room_id: &'a RoomId,
211) -> impl Stream<Item = &UserId> + Send + 'a {
212 let prefix = (room_id, Interfix);
213 self.db
214 .roomuserid_joinedcount
215 .keys_prefix(&prefix)
216 .ignore_err()
217 .map(|(_, user_id): (Ignore, &UserId)| user_id)
218}
219
220#[implement(Service)]
222#[tracing::instrument(skip(self), level = "trace")]
223pub async fn room_joined_count(&self, room_id: &RoomId) -> Result<u64> {
224 self.db
225 .roomid_joinedcount
226 .get(room_id)
227 .await
228 .deserialized()
229}
230
231#[implement(Service)]
233#[tracing::instrument(skip(self), level = "trace")]
234pub async fn room_invited_count(&self, room_id: &RoomId) -> Result<u64> {
235 self.db
236 .roomid_invitedcount
237 .get(room_id)
238 .await
239 .deserialized()
240}
241
242#[implement(Service)]
244#[tracing::instrument(skip(self), level = "trace")]
245pub async fn room_knocked_count(&self, room_id: &RoomId) -> Result<u64> {
246 self.db
247 .roomid_knockedcount
248 .get(room_id)
249 .await
250 .deserialized()
251}
252
253#[implement(Service)]
256#[tracing::instrument(skip(self), level = "debug")]
257pub fn active_local_users_in_room<'a>(
258 &'a self,
259 room_id: &'a RoomId,
260) -> impl Stream<Item = &UserId> + Send + 'a {
261 self.local_users_in_room(room_id)
262 .filter(|user| self.services.users.is_active(user))
263}
264
265#[implement(Service)]
268#[tracing::instrument(skip(self), level = "debug")]
269pub fn local_users_in_room<'a>(
270 &'a self,
271 room_id: &'a RoomId,
272) -> impl Stream<Item = &UserId> + Send + 'a {
273 self.room_members(room_id)
274 .ready_filter(|user| self.services.globals.user_is_local(user))
275}
276
277#[implement(Service)]
279#[tracing::instrument(skip(self), level = "debug")]
280pub fn local_users_invited_to_room<'a>(
281 &'a self,
282 room_id: &'a RoomId,
283) -> impl Stream<Item = &UserId> + Send + 'a {
284 self.room_members_invited(room_id)
285 .ready_filter(|user| self.services.globals.user_is_local(user))
286}
287
288#[implement(Service)]
290#[tracing::instrument(skip(self), level = "debug")]
291pub fn room_useroncejoined<'a>(
292 &'a self,
293 room_id: &'a RoomId,
294) -> impl Stream<Item = &UserId> + Send + 'a {
295 let prefix = (room_id, Interfix);
296 self.db
297 .roomuseroncejoinedids
298 .keys_prefix(&prefix)
299 .ignore_err()
300 .map(|(_, user_id): (Ignore, &UserId)| user_id)
301}
302
303#[implement(Service)]
305#[tracing::instrument(skip(self), level = "debug")]
306pub fn room_members_invited<'a>(
307 &'a self,
308 room_id: &'a RoomId,
309) -> impl Stream<Item = &UserId> + Send + 'a {
310 let prefix = (room_id, Interfix);
311 self.db
312 .roomuserid_invitecount
313 .keys_prefix(&prefix)
314 .ignore_err()
315 .map(|(_, user_id): (Ignore, &UserId)| user_id)
316}
317
318#[implement(Service)]
320#[tracing::instrument(skip(self), level = "debug")]
321pub fn room_members_knocked<'a>(
322 &'a self,
323 room_id: &'a RoomId,
324) -> impl Stream<Item = &UserId> + Send + 'a {
325 let prefix = (room_id, Interfix);
326 self.db
327 .roomuserid_knockedcount
328 .keys_prefix(&prefix)
329 .ignore_err()
330 .map(|(_, user_id): (Ignore, &UserId)| user_id)
331}
332
333#[implement(Service)]
334#[tracing::instrument(skip(self), level = "trace")]
335pub async fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
336 let key = (room_id, user_id);
337 self.db
338 .roomuserid_invitecount
339 .qry(&key)
340 .await
341 .deserialized()
342}
343
344#[implement(Service)]
345#[tracing::instrument(skip(self), level = "trace")]
346pub async fn get_knock_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
347 let key = (room_id, user_id);
348 self.db
349 .roomuserid_knockedcount
350 .qry(&key)
351 .await
352 .deserialized()
353}
354
355#[implement(Service)]
356#[tracing::instrument(skip(self), level = "trace")]
357pub async fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
358 let key = (room_id, user_id);
359 self.db
360 .roomuserid_leftcount
361 .qry(&key)
362 .await
363 .deserialized()
364}
365
366#[implement(Service)]
367#[tracing::instrument(skip(self), level = "trace")]
368pub async fn get_joined_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
369 let key = (room_id, user_id);
370 self.db
371 .roomuserid_joinedcount
372 .qry(&key)
373 .await
374 .deserialized()
375}
376
377#[implement(Service)]
379#[inline]
380pub fn all_user_memberships<'a>(
381 &'a self,
382 user_id: &'a UserId,
383) -> impl Stream<Item = (MembershipState, &RoomId)> + Send + 'a {
384 self.user_memberships(user_id, None)
385}
386
387#[implement(Service)]
389#[tracing::instrument(skip(self), level = "debug")]
390pub fn user_memberships<'a>(
391 &'a self,
392 user_id: &'a UserId,
393 mask: Option<&[MembershipState]>,
394) -> impl Stream<Item = (MembershipState, &RoomId)> + Send + 'a {
395 use MembershipState::*;
396 use futures::stream::select;
397
398 let joined = mask
399 .is_none_or(|mask| mask.contains(&Join))
400 .then_async(|| {
401 self.rooms_joined(user_id)
402 .map(|room_id| (Join, room_id))
403 .boxed()
404 .into_future()
405 });
406
407 let invited = mask
408 .is_none_or(|mask| mask.contains(&Invite))
409 .then_async(|| {
410 self.rooms_invited(user_id)
411 .map(|room_id| (Invite, room_id))
412 .boxed()
413 .into_future()
414 });
415
416 let knocked = mask
417 .is_none_or(|mask| mask.contains(&Knock))
418 .then_async(|| {
419 self.rooms_knocked(user_id)
420 .map(|room_id| (Knock, room_id))
421 .boxed()
422 .into_future()
423 });
424
425 let left = mask
426 .is_none_or(|mask| mask.contains(&Leave))
427 .then_async(|| {
428 self.rooms_left(user_id)
429 .map(|room_id| (Leave, room_id))
430 .boxed()
431 .into_future()
432 });
433
434 select(
435 select(joined.stream(), left.stream()),
436 select(invited.stream(), knocked.stream()),
437 )
438}
439
440#[implement(Service)]
442#[tracing::instrument(skip(self), level = "debug")]
443pub fn rooms_joined<'a>(
444 &'a self,
445 user_id: &'a UserId,
446) -> impl Stream<Item = &RoomId> + Send + 'a {
447 self.db
448 .userroomid_joinedcount
449 .keys_raw_prefix(user_id)
450 .ignore_err()
451 .map(|(_, room_id): (Ignore, &RoomId)| room_id)
452}
453
454#[implement(Service)]
456#[tracing::instrument(skip(self), level = "debug")]
457pub fn rooms_invited<'a>(
458 &'a self,
459 user_id: &'a UserId,
460) -> impl Stream<Item = &RoomId> + Send + 'a {
461 self.db
462 .userroomid_invitestate
463 .keys_raw_prefix(user_id)
464 .ignore_err()
465 .map(|(_, room_id): (Ignore, &RoomId)| room_id)
466}
467
468#[implement(Service)]
470#[tracing::instrument(skip(self), level = "debug")]
471pub fn rooms_knocked<'a>(
472 &'a self,
473 user_id: &'a UserId,
474) -> impl Stream<Item = &RoomId> + Send + 'a {
475 self.db
476 .userroomid_knockedstate
477 .keys_raw_prefix(user_id)
478 .ignore_err()
479 .map(|(_, room_id): (Ignore, &RoomId)| room_id)
480}
481
482#[implement(Service)]
484#[tracing::instrument(skip(self), level = "debug")]
485pub fn rooms_left<'a>(&'a self, user_id: &'a UserId) -> impl Stream<Item = &RoomId> + Send + 'a {
486 self.db
487 .userroomid_leftstate
488 .keys_raw_prefix(user_id)
489 .ignore_err()
490 .map(|(_, room_id): (Ignore, &RoomId)| room_id)
491}
492
493#[implement(Service)]
495#[tracing::instrument(skip(self), level = "debug")]
496pub fn rooms_invited_state<'a>(
497 &'a self,
498 user_id: &'a UserId,
499) -> impl Stream<Item = StrippedStateEventItem> + Send + 'a {
500 type KeyVal<'a> = (Key<'a>, Raw<Vec<AnyStrippedStateEvent>>);
501 type Key<'a> = (&'a UserId, &'a RoomId);
502
503 let prefix = (user_id, Interfix);
504 self.db
505 .userroomid_invitestate
506 .stream_prefix(&prefix)
507 .ignore_err()
508 .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
509 .map(|(room_id, state)| Ok((room_id, state.deserialize_as_unchecked()?)))
510 .ignore_err()
511}
512
513#[implement(Service)]
515#[tracing::instrument(skip(self), level = "trace")]
516pub fn rooms_knocked_state<'a>(
517 &'a self,
518 user_id: &'a UserId,
519) -> impl Stream<Item = StrippedStateEventItem> + Send + 'a {
520 type KeyVal<'a> = (Key<'a>, Raw<Vec<AnyStrippedStateEvent>>);
521 type Key<'a> = (&'a UserId, &'a RoomId);
522
523 let prefix = (user_id, Interfix);
524 self.db
525 .userroomid_knockedstate
526 .stream_prefix(&prefix)
527 .ignore_err()
528 .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
529 .map(|(room_id, state)| Ok((room_id, state.deserialize_as_unchecked()?)))
530 .ignore_err()
531}
532
533#[implement(Service)]
535#[tracing::instrument(skip(self), level = "debug")]
536pub fn rooms_left_state<'a>(
537 &'a self,
538 user_id: &'a UserId,
539) -> impl Stream<Item = SyncStateEventItem> + Send + 'a {
540 type KeyVal<'a> = (Key<'a>, Raw<Vec<Raw<AnySyncStateEvent>>>);
541 type Key<'a> = (&'a UserId, &'a RoomId);
542
543 let prefix = (user_id, Interfix);
544 self.db
545 .userroomid_leftstate
546 .stream_prefix(&prefix)
547 .ignore_err()
548 .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state))
549 .map(|(room_id, state)| Ok((room_id, state.deserialize_as_unchecked()?)))
550 .ignore_err()
551}
552
553#[implement(Service)]
554#[tracing::instrument(skip(self), level = "trace")]
555pub async fn invite_state(
556 &self,
557 user_id: &UserId,
558 room_id: &RoomId,
559) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
560 let key = (user_id, room_id);
561 self.db
562 .userroomid_invitestate
563 .qry(&key)
564 .await
565 .deserialized()
566 .and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| {
567 val.deserialize_as_unchecked().map_err(Into::into)
568 })
569}
570
571#[implement(Service)]
572#[tracing::instrument(skip(self), level = "trace")]
573pub async fn knock_state(
574 &self,
575 user_id: &UserId,
576 room_id: &RoomId,
577) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
578 let key = (user_id, room_id);
579 self.db
580 .userroomid_knockedstate
581 .qry(&key)
582 .await
583 .deserialized()
584 .and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| {
585 val.deserialize_as_unchecked().map_err(Into::into)
586 })
587}
588
589#[implement(Service)]
590#[tracing::instrument(skip(self), level = "trace")]
591pub async fn left_state(
592 &self,
593 user_id: &UserId,
594 room_id: &RoomId,
595) -> Result<Vec<Raw<AnyStrippedStateEvent>>> {
596 let key = (user_id, room_id);
597 self.db
598 .userroomid_leftstate
599 .qry(&key)
600 .await
601 .deserialized()
602 .and_then(|val: Raw<Vec<AnyStrippedStateEvent>>| {
603 val.deserialize_as_unchecked().map_err(Into::into)
604 })
605}
606
607#[implement(Service)]
608#[tracing::instrument(skip(self), level = "trace")]
609pub async fn user_membership(
610 &self,
611 user_id: &UserId,
612 room_id: &RoomId,
613) -> Option<MembershipState> {
614 let states = join5(
615 self.is_joined(user_id, room_id),
616 self.is_left(user_id, room_id),
617 self.is_knocked(user_id, room_id),
618 self.is_invited(user_id, room_id),
619 self.once_joined(user_id, room_id),
620 )
621 .await;
622
623 match states {
624 | (true, ..) => Some(MembershipState::Join),
625 | (_, true, ..) => Some(MembershipState::Leave),
626 | (_, _, true, ..) => Some(MembershipState::Knock),
627 | (_, _, _, true, ..) => Some(MembershipState::Invite),
628 | (false, false, false, false, true) => Some(MembershipState::Ban),
629 | _ => None,
630 }
631}
632
633#[implement(Service)]
634#[tracing::instrument(skip(self), level = "debug")]
635pub async fn once_joined(&self, user_id: &UserId, room_id: &RoomId) -> bool {
636 let key = (user_id, room_id);
637 self.db.roomuseroncejoinedids.contains(&key).await
638}
639
640#[implement(Service)]
641#[tracing::instrument(skip(self), level = "trace")]
642pub async fn is_joined<'a>(&'a self, user_id: &'a UserId, room_id: &'a RoomId) -> bool {
643 let key = (user_id, room_id);
644 self.db
645 .userroomid_joinedcount
646 .contains(&key)
647 .await
648}
649
650#[implement(Service)]
651#[tracing::instrument(skip(self), level = "trace")]
652pub async fn is_knocked<'a>(&'a self, user_id: &'a UserId, room_id: &'a RoomId) -> bool {
653 let key = (user_id, room_id);
654 self.db
655 .userroomid_knockedstate
656 .contains(&key)
657 .await
658}
659
660#[implement(Service)]
661#[tracing::instrument(skip(self), level = "trace")]
662pub async fn is_invited(&self, user_id: &UserId, room_id: &RoomId) -> bool {
663 let key = (user_id, room_id);
664 self.db
665 .userroomid_invitestate
666 .contains(&key)
667 .await
668}
669
670#[implement(Service)]
671#[tracing::instrument(skip(self), level = "trace")]
672pub async fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> bool {
673 let key = (user_id, room_id);
674 self.db.userroomid_leftstate.contains(&key).await
675}
676
677#[implement(Service)]
678#[tracing::instrument(skip(self), level = "trace")]
679pub async fn delete_room_join_counts(&self, room_id: &RoomId, force: bool) -> Result {
680 let prefix = (room_id, Interfix);
681
682 self.db.roomid_knockedcount.remove(room_id);
683
684 self.db.roomid_invitedcount.remove(room_id);
685
686 self.db.roomid_inviteviaservers.remove(room_id);
687
688 self.db.roomid_joinedcount.remove(room_id);
689
690 self.db
691 .roomserverids
692 .keys_prefix(&prefix)
693 .ignore_err()
694 .ready_for_each(|key: (&RoomId, &ServerName)| {
695 trace!("Removing key: {key:?}");
696 self.db.roomserverids.del(key);
697
698 let reverse_key = (key.1, key.0);
699 trace!("Removing reverse key: {reverse_key:?}");
700 self.db.serverroomids.del(reverse_key);
701 })
702 .await;
703
704 self.db
705 .roomuserid_invitecount
706 .keys_prefix(&prefix)
707 .ignore_err()
708 .ready_for_each(|key: (&RoomId, &UserId)| {
709 trace!("Removing key: {key:?}");
710 self.db.roomuserid_invitecount.del(key);
711
712 let reverse_key = (key.1, key.0);
713 trace!("Removing reverse key: {reverse_key:?}");
714 self.db.userroomid_invitestate.del(reverse_key);
715 })
716 .await;
717
718 self.db
719 .roomuserid_joinedcount
720 .keys_prefix(&prefix)
721 .ignore_err()
722 .ready_for_each(|key: (&RoomId, &UserId)| {
723 trace!("Removing key: {key:?}");
724 self.db.roomuserid_joinedcount.del(key);
725
726 let reverse_key = (key.1, key.0);
727 trace!("Removing reverse key: {reverse_key:?}");
728 self.db.userroomid_joinedcount.del(reverse_key);
729 })
730 .await;
731
732 self.db
733 .roomuserid_knockedcount
734 .keys_prefix(&prefix)
735 .ignore_err()
736 .ready_for_each(|key: (&RoomId, &UserId)| {
737 trace!("Removing key: {key:?}");
738 self.db.roomuserid_knockedcount.del(key);
739
740 let reverse_key = (key.1, key.0);
741 trace!("Removing reverse key: {reverse_key:?}");
742 self.db.userroomid_knockedstate.del(reverse_key);
743 })
744 .await;
745
746 self.db
747 .roomuserid_leftcount
748 .keys_prefix(&prefix)
749 .ignore_err()
750 .ready_filter(|(_, user_id): &(&RoomId, &UserId)| {
751 force || !self.services.globals.user_is_local(user_id)
752 })
753 .ready_for_each(|key: (&RoomId, &UserId)| {
754 trace!("Removing key: {key:?}");
755 self.db.roomuserid_leftcount.del(key);
756
757 let reverse_key = (key.1, key.0);
758 trace!("Removing reverse key: {reverse_key:?}");
759 self.db.userroomid_leftstate.del(reverse_key);
760 })
761 .await;
762
763 Ok(())
764}