Skip to main content

tuwunel_service/rooms/state_cache/
via.rs

1use std::cmp::Reverse;
2
3use futures::{Stream, StreamExt, stream::iter};
4use ruma::{
5	OwnedServerName, RoomId, ServerName,
6	events::{StateEventType, room::power_levels::RoomPowerLevelsEventContent},
7	int,
8};
9use tuwunel_core::{
10	Result, implement,
11	itertools::Itertools,
12	utils::{StreamTools, stream::TryIgnore},
13	warn,
14};
15use tuwunel_database::Ignore;
16
17#[implement(super::Service)]
18#[tracing::instrument(level = "debug", skip(self, servers))]
19pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: Vec<OwnedServerName>) {
20	let mut servers: Vec<_> = self
21		.servers_invite_via(room_id)
22		.map(ToOwned::to_owned)
23		.chain(iter(servers.into_iter()))
24		.collect()
25		.await;
26
27	servers.sort_unstable();
28	servers.dedup();
29
30	let servers = servers
31		.iter()
32		.map(|server| server.as_bytes())
33		.collect_vec()
34		.join(&[0xFF][..]);
35
36	self.db
37		.roomid_inviteviaservers
38		.insert(room_id.as_bytes(), &servers);
39}
40
41/// Gets up to five servers that are likely to be in the room in the
42/// distant future.
43///
44/// See <https://spec.matrix.org/latest/appendices/#routing>
45#[implement(super::Service)]
46#[tracing::instrument(skip(self), level = "trace")]
47pub async fn servers_route_via(&self, room_id: &RoomId) -> Result<Vec<OwnedServerName>> {
48	let most_powerful = self.most_powerful_user_server(room_id).await;
49
50	Ok(most_powerful
51		.into_iter()
52		.chain(self.popular_servers(room_id).await)
53		.take(5)
54		.collect())
55}
56
57/// The room's highest power-level user's server, provided that user holds at
58/// least power level 50.
59#[implement(super::Service)]
60#[tracing::instrument(skip(self), level = "trace")]
61pub async fn most_powerful_user_server(&self, room_id: &RoomId) -> Option<OwnedServerName> {
62	self.services
63		.state_accessor
64		.room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "")
65		.await
66		.ok()
67		.and_then(|content: RoomPowerLevelsEventContent| {
68			content
69				.users
70				.into_iter()
71				.max_by_key(|(_, power)| *power)
72				.filter(|(_, power)| *power >= int!(50))
73				.map(|(user, _)| user.server_name().to_owned())
74		})
75}
76
77/// Servers participating in the room, ordered by descending resident user
78/// count. Counting members per server is an aggregation, so the result is
79/// materialized rather than streamed.
80#[implement(super::Service)]
81#[tracing::instrument(skip(self), level = "trace")]
82pub async fn popular_servers(&self, room_id: &RoomId) -> Vec<OwnedServerName> {
83	self.room_members(room_id)
84		.counts_by(|user| user.server_name().to_owned())
85		.await
86		.into_iter()
87		.sorted_by_key(|(_, users)| Reverse(*users))
88		.map(|(server, _)| server)
89		.collect()
90}
91
92#[implement(super::Service)]
93#[tracing::instrument(skip(self), level = "debug")]
94pub fn servers_invite_via<'a>(
95	&'a self,
96	room_id: &'a RoomId,
97) -> impl Stream<Item = &ServerName> + Send + 'a {
98	type KeyVal<'a> = (Ignore, Vec<&'a ServerName>);
99
100	self.db
101		.roomid_inviteviaservers
102		.stream_raw_prefix(room_id)
103		.ignore_err()
104		.map(|(_, servers): KeyVal<'_>| *servers.last().expect("at least one server"))
105}