Skip to main content

tuwunel_service/rooms/state_cache/
via.rs

1use std::cmp::Reverse;
2
3use futures::{Stream, StreamExt, stream::iter};
4use ruma::{
5	OwnedServerName, RoomId, ServerName,
6	events::{StateEventType, room::power_levels::RoomPowerLevelsEventContent},
7	int,
8};
9use tuwunel_core::{
10	Result, implement,
11	itertools::Itertools,
12	utils::{StreamTools, stream::TryIgnore},
13	warn,
14};
15use tuwunel_database::Ignore;
16
17#[implement(super::Service)]
18#[tracing::instrument(level = "debug", skip(self, servers))]
19pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: Vec<OwnedServerName>) {
20	let mut servers: Vec<_> = self
21		.servers_invite_via(room_id)
22		.map(ToOwned::to_owned)
23		.chain(iter(servers.into_iter()))
24		.collect()
25		.await;
26
27	servers.sort_unstable();
28	servers.dedup();
29
30	let servers = servers
31		.iter()
32		.map(|server| server.as_bytes())
33		.collect_vec()
34		.join(&[0xFF][..]);
35
36	self.db
37		.roomid_inviteviaservers
38		.insert(room_id.as_bytes(), &servers);
39}
40
41/// Gets up to five servers that are likely to be in the room in the
42/// distant future.
43///
44/// See <https://spec.matrix.org/latest/appendices/#routing>
45#[implement(super::Service)]
46#[tracing::instrument(skip(self), level = "trace")]
47pub async fn servers_route_via(&self, room_id: &RoomId) -> Result<Vec<OwnedServerName>> {
48	let most_powerful_user_server = self
49		.services
50		.state_accessor
51		.room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "")
52		.await
53		.ok()
54		.and_then(|content: RoomPowerLevelsEventContent| {
55			content
56				.users
57				.into_iter()
58				.max_by_key(|(_, power)| *power)
59				.filter(|(_, power)| *power >= int!(50))
60				.map(|(user, _)| user.server_name().to_owned())
61		});
62
63	let popular_servers = self
64		.room_members(room_id)
65		.counts_by(|user| user.server_name().to_owned())
66		.await
67		.into_iter()
68		.sorted_by_key(|(_, users)| Reverse(*users))
69		.map(|(server, _)| server);
70
71	Ok(most_powerful_user_server
72		.into_iter()
73		.chain(popular_servers)
74		.take(5)
75		.collect())
76}
77
78#[implement(super::Service)]
79#[tracing::instrument(skip(self), level = "debug")]
80pub fn servers_invite_via<'a>(
81	&'a self,
82	room_id: &'a RoomId,
83) -> impl Stream<Item = &ServerName> + Send + 'a {
84	type KeyVal<'a> = (Ignore, Vec<&'a ServerName>);
85
86	self.db
87		.roomid_inviteviaservers
88		.stream_raw_prefix(room_id)
89		.ignore_err()
90		.map(|(_, servers): KeyVal<'_>| *servers.last().expect("at least one server"))
91}