tuwunel_api/server/
state.rs1use std::{borrow::Borrow, iter::once};
2
3use axum::extract::State;
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join};
5use ruma::{OwnedEventId, api::federation::event::get_room_state};
6use tuwunel_core::{
7 Result, at, err,
8 utils::stream::{IterStream, TryBroadbandExt},
9};
10
11use super::AccessCheck;
12use crate::Ruma;
13
14pub(crate) async fn get_room_state_route(
18 State(services): State<crate::State>,
19 body: Ruma<get_room_state::v1::Request>,
20) -> Result<get_room_state::v1::Response> {
21 AccessCheck {
22 services: &services,
23 origin: body.origin(),
24 room_id: &body.room_id,
25 event_id: None,
26 }
27 .check()
28 .await?;
29
30 let shortstatehash = services
31 .state
32 .pdu_shortstatehash(&body.event_id)
33 .await
34 .map_err(|_| err!(Request(NotFound("PDU state not found."))))?;
35
36 let state_ids = services
37 .state_accessor
38 .state_full_ids(shortstatehash)
39 .map(at!(1))
40 .collect::<Vec<OwnedEventId>>()
41 .map(Ok);
42
43 let room_version = services.state.get_room_version(&body.room_id);
44
45 let (room_version, state_ids) = try_join(room_version, state_ids).await?;
46
47 let into_federation_format = |pdu| {
48 services
49 .federation
50 .format_pdu_into(pdu, Some(&room_version))
51 .map(Ok)
52 };
53
54 let auth_chain = services
55 .auth_chain
56 .event_ids_iter(&body.room_id, &room_version, once(body.event_id.borrow()))
57 .broad_and_then(async |id| {
58 services
59 .timeline
60 .get_pdu_json(&id)
61 .and_then(into_federation_format)
62 .await
63 })
64 .try_collect();
65
66 let pdus = state_ids
67 .iter()
68 .try_stream()
69 .broad_and_then(|id| {
70 services
71 .timeline
72 .get_pdu_json(id)
73 .and_then(into_federation_format)
74 })
75 .try_collect();
76
77 let (auth_chain, pdus) = try_join(auth_chain, pdus).await?;
78
79 Ok(get_room_state::v1::Response { auth_chain, pdus })
80}