tuwunel_service/rooms/event_handler/
fetch_state.rs1use std::collections::{HashMap, hash_map};
2
3use futures::FutureExt;
4use ruma::{EventId, OwnedEventId, RoomId, RoomVersionId, ServerName, events::StateEventType};
5use serde::Deserialize;
6use tuwunel_core::{Err, Result, debug, debug_warn, err, implement, matrix::Event};
7
8use crate::{
9 fetcher::{Op, Opts},
10 rooms::short::ShortStateKey,
11};
12
13#[derive(Deserialize)]
14struct StateIdsResponse {
15 pdu_ids: Vec<OwnedEventId>,
16}
17
18#[implement(super::Service)]
22#[tracing::instrument(
23 level = "debug",
24 skip_all,
25 fields(%origin),
26)]
27pub(super) async fn fetch_state(
28 &self,
29 origin: &ServerName,
30 room_id: &RoomId,
31 event_id: &EventId,
32 room_version: &RoomVersionId,
33 recursion_level: usize,
34 create_event_id: &EventId,
35) -> Result<Option<HashMap<u64, OwnedEventId>>> {
36 let opts = Opts::new(Op::StateIds, room_id.to_owned())
37 .event_id(event_id.to_owned())
38 .hint(origin.to_owned())
39 .attempt_limit(super::EVENT_FETCH_ATTEMPT_LIMIT)
40 .fanout_for_op();
41
42 let outcome = self
43 .services
44 .fetcher
45 .fetch(opts)
46 .await
47 .inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
48
49 let StateIdsResponse { pdu_ids } = serde_json::from_slice(&outcome.bytes)
50 .map_err(|e| err!(BadServerResponse("malformed state_ids response: {e}")))?;
51
52 debug!("Fetching state events");
53 let state_ids = pdu_ids.iter().map(AsRef::as_ref);
54 let state_vec = self
55 .fetch_auth(origin, room_id, state_ids, room_version, recursion_level)
56 .boxed()
57 .await;
58
59 let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
60 for (pdu, _) in state_vec {
61 let state_key = pdu
62 .state_key()
63 .ok_or_else(|| err!(Database("Found non-state pdu in state events.")))?;
64
65 let shortstatekey = self
66 .services
67 .short
68 .get_or_create_shortstatekey(&pdu.kind().to_string().into(), state_key)
69 .await;
70
71 match state.entry(shortstatekey) {
72 | hash_map::Entry::Vacant(v) => {
73 v.insert(pdu.event_id().to_owned());
74 },
75 | hash_map::Entry::Occupied(_) => {
76 return Err!(Request(InvalidParam(
77 "State event's type and state_key ({:?},{:?}) exists multiple times.",
78 pdu.event_type(),
79 pdu.state_key()
80 .expect("all state events have state_key"),
81 )));
82 },
83 }
84 }
85
86 let create_shortstatekey = self
88 .services
89 .short
90 .get_shortstatekey(&StateEventType::RoomCreate, "")
91 .await?;
92
93 if state
94 .get(&create_shortstatekey)
95 .map(AsRef::as_ref)
96 != Some(create_event_id)
97 {
98 return Err!(Database("Incoming event refers to wrong create event."));
99 }
100
101 Ok(Some(state))
102}