tuwunel_service/rooms/state_cache/
via.rs1use std::cmp::Reverse;
2
3use futures::{Stream, StreamExt, stream::iter};
4use ruma::{
5 OwnedServerName, RoomId, ServerName,
6 events::{StateEventType, room::power_levels::RoomPowerLevelsEventContent},
7 int,
8};
9use tuwunel_core::{
10 Result, implement,
11 itertools::Itertools,
12 utils::{StreamTools, stream::TryIgnore},
13 warn,
14};
15use tuwunel_database::Ignore;
16
17#[implement(super::Service)]
18#[tracing::instrument(level = "debug", skip(self, servers))]
19pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: Vec<OwnedServerName>) {
20 let mut servers: Vec<_> = self
21 .servers_invite_via(room_id)
22 .map(ToOwned::to_owned)
23 .chain(iter(servers.into_iter()))
24 .collect()
25 .await;
26
27 servers.sort_unstable();
28 servers.dedup();
29
30 let servers = servers
31 .iter()
32 .map(|server| server.as_bytes())
33 .collect_vec()
34 .join(&[0xFF][..]);
35
36 self.db
37 .roomid_inviteviaservers
38 .insert(room_id.as_bytes(), &servers);
39}
40
41#[implement(super::Service)]
46#[tracing::instrument(skip(self), level = "trace")]
47pub async fn servers_route_via(&self, room_id: &RoomId) -> Result<Vec<OwnedServerName>> {
48 let most_powerful_user_server = self
49 .services
50 .state_accessor
51 .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "")
52 .await
53 .ok()
54 .and_then(|content: RoomPowerLevelsEventContent| {
55 content
56 .users
57 .into_iter()
58 .max_by_key(|(_, power)| *power)
59 .filter(|(_, power)| *power >= int!(50))
60 .map(|(user, _)| user.server_name().to_owned())
61 });
62
63 let popular_servers = self
64 .room_members(room_id)
65 .counts_by(|user| user.server_name().to_owned())
66 .await
67 .into_iter()
68 .sorted_by_key(|(_, users)| Reverse(*users))
69 .map(|(server, _)| server);
70
71 Ok(most_powerful_user_server
72 .into_iter()
73 .chain(popular_servers)
74 .take(5)
75 .collect())
76}
77
78#[implement(super::Service)]
79#[tracing::instrument(skip(self), level = "debug")]
80pub fn servers_invite_via<'a>(
81 &'a self,
82 room_id: &'a RoomId,
83) -> impl Stream<Item = &ServerName> + Send + 'a {
84 type KeyVal<'a> = (Ignore, Vec<&'a ServerName>);
85
86 self.db
87 .roomid_inviteviaservers
88 .stream_raw_prefix(room_id)
89 .ignore_err()
90 .map(|(_, servers): KeyVal<'_>| *servers.last().expect("at least one server"))
91}