tuwunel_api/client/keys/
get_key_changes.rs1use axum::extract::State;
2use futures::{StreamExt, stream};
3use itertools::Itertools;
4use ruma::{OwnedRoomId, api::client::keys::get_key_changes};
5use tuwunel_core::{Result, at, err, utils::stream::BroadbandExt};
6
7use crate::Ruma;
8
9pub(crate) async fn get_key_changes_route(
16 State(services): State<crate::State>,
17 body: Ruma<get_key_changes::v3::Request>,
18) -> Result<get_key_changes::v3::Response> {
19 let sender_user = body.sender_user();
20
21 let from = body
22 .from
23 .parse()
24 .map_err(|_| err!(Request(InvalidParam("Invalid `from`."))))?;
25
26 let to = body
27 .to
28 .parse()
29 .map_err(|_| err!(Request(InvalidParam("Invalid `to`."))))?;
30
31 let user_changes = services
32 .users
33 .keys_changed(sender_user, from, Some(to))
34 .map(ToOwned::to_owned);
35
36 let room_changes = services
37 .state_cache
38 .rooms_joined(sender_user)
39 .map(ToOwned::to_owned)
40 .broad_then(async |room_id: OwnedRoomId| {
41 services
42 .users
43 .room_keys_changed(&room_id, from, Some(to))
44 .map(at!(0))
45 .map(ToOwned::to_owned)
46 .collect::<Vec<_>>()
47 .await
48 })
49 .flat_map(stream::iter);
50
51 let changed = user_changes
52 .chain(room_changes)
53 .collect::<Vec<_>>()
54 .await
55 .into_iter()
56 .sorted_unstable()
57 .dedup()
58 .collect();
59
60 Ok(get_key_changes::v3::Response {
61 left: Vec::new(), changed,
63 })
64}