tuwunel_service/rooms/state_cache/
via.rs1use std::cmp::Reverse;
2
3use futures::{Stream, StreamExt, stream::iter};
4use ruma::{
5 OwnedServerName, RoomId, ServerName,
6 events::{StateEventType, room::power_levels::RoomPowerLevelsEventContent},
7 int,
8};
9use tuwunel_core::{
10 Result, implement,
11 itertools::Itertools,
12 utils::{StreamTools, stream::TryIgnore},
13 warn,
14};
15use tuwunel_database::Ignore;
16
17#[implement(super::Service)]
18#[tracing::instrument(level = "debug", skip(self, servers))]
19pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: Vec<OwnedServerName>) {
20 let mut servers: Vec<_> = self
21 .servers_invite_via(room_id)
22 .map(ToOwned::to_owned)
23 .chain(iter(servers.into_iter()))
24 .collect()
25 .await;
26
27 servers.sort_unstable();
28 servers.dedup();
29
30 let servers = servers
31 .iter()
32 .map(|server| server.as_bytes())
33 .collect_vec()
34 .join(&[0xFF][..]);
35
36 self.db
37 .roomid_inviteviaservers
38 .insert(room_id.as_bytes(), &servers);
39}
40
41#[implement(super::Service)]
46#[tracing::instrument(skip(self), level = "trace")]
47pub async fn servers_route_via(&self, room_id: &RoomId) -> Result<Vec<OwnedServerName>> {
48 let most_powerful = self.most_powerful_user_server(room_id).await;
49
50 Ok(most_powerful
51 .into_iter()
52 .chain(self.popular_servers(room_id).await)
53 .take(5)
54 .collect())
55}
56
57#[implement(super::Service)]
60#[tracing::instrument(skip(self), level = "trace")]
61pub async fn most_powerful_user_server(&self, room_id: &RoomId) -> Option<OwnedServerName> {
62 self.services
63 .state_accessor
64 .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "")
65 .await
66 .ok()
67 .and_then(|content: RoomPowerLevelsEventContent| {
68 content
69 .users
70 .into_iter()
71 .max_by_key(|(_, power)| *power)
72 .filter(|(_, power)| *power >= int!(50))
73 .map(|(user, _)| user.server_name().to_owned())
74 })
75}
76
77#[implement(super::Service)]
81#[tracing::instrument(skip(self), level = "trace")]
82pub async fn popular_servers(&self, room_id: &RoomId) -> Vec<OwnedServerName> {
83 self.room_members(room_id)
84 .counts_by(|user| user.server_name().to_owned())
85 .await
86 .into_iter()
87 .sorted_by_key(|(_, users)| Reverse(*users))
88 .map(|(server, _)| server)
89 .collect()
90}
91
92#[implement(super::Service)]
93#[tracing::instrument(skip(self), level = "debug")]
94pub fn servers_invite_via<'a>(
95 &'a self,
96 room_id: &'a RoomId,
97) -> impl Stream<Item = &ServerName> + Send + 'a {
98 type KeyVal<'a> = (Ignore, Vec<&'a ServerName>);
99
100 self.db
101 .roomid_inviteviaservers
102 .stream_raw_prefix(room_id)
103 .ignore_err()
104 .map(|(_, servers): KeyVal<'_>| *servers.last().expect("at least one server"))
105}