Skip to main content

tuwunel_service/rooms/event_handler/
fetch_state.rs

1use std::collections::{HashMap, hash_map};
2
3use futures::FutureExt;
4use ruma::{EventId, OwnedEventId, RoomId, RoomVersionId, ServerName, events::StateEventType};
5use serde::Deserialize;
6use tuwunel_core::{Err, Result, debug, debug_warn, err, implement, matrix::Event};
7
8use crate::{
9	fetcher::{Op, Opts},
10	rooms::short::ShortStateKey,
11};
12
13#[derive(Deserialize)]
14struct StateIdsResponse {
15	pdu_ids: Vec<OwnedEventId>,
16}
17
18/// Call /state_ids to find out what the state at this pdu is. We trust the
19/// server's response to some extend (sic), but we still do a lot of checks
20/// on the events
21#[implement(super::Service)]
22#[tracing::instrument(
23	level = "debug",
24	skip_all,
25	fields(%origin),
26)]
27pub(super) async fn fetch_state(
28	&self,
29	origin: &ServerName,
30	room_id: &RoomId,
31	event_id: &EventId,
32	room_version: &RoomVersionId,
33	recursion_level: usize,
34	create_event_id: &EventId,
35) -> Result<Option<HashMap<u64, OwnedEventId>>> {
36	let opts = Opts::new(Op::StateIds, room_id.to_owned())
37		.event_id(event_id.to_owned())
38		.hint(origin.to_owned())
39		.attempt_limit(super::EVENT_FETCH_ATTEMPT_LIMIT)
40		.fanout_for_op();
41
42	let outcome = self
43		.services
44		.fetcher
45		.fetch(opts)
46		.await
47		.inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
48
49	let StateIdsResponse { pdu_ids } = serde_json::from_slice(&outcome.bytes)
50		.map_err(|e| err!(BadServerResponse("malformed state_ids response: {e}")))?;
51
52	debug!("Fetching state events");
53	let state_ids = pdu_ids.iter().map(AsRef::as_ref);
54	let state_vec = self
55		.fetch_auth(origin, room_id, state_ids, room_version, recursion_level)
56		.boxed()
57		.await;
58
59	let mut state: HashMap<ShortStateKey, OwnedEventId> = HashMap::with_capacity(state_vec.len());
60	for (pdu, _) in state_vec {
61		let state_key = pdu
62			.state_key()
63			.ok_or_else(|| err!(Database("Found non-state pdu in state events.")))?;
64
65		let shortstatekey = self
66			.services
67			.short
68			.get_or_create_shortstatekey(&pdu.kind().to_string().into(), state_key)
69			.await;
70
71		match state.entry(shortstatekey) {
72			| hash_map::Entry::Vacant(v) => {
73				v.insert(pdu.event_id().to_owned());
74			},
75			| hash_map::Entry::Occupied(_) => {
76				return Err!(Request(InvalidParam(
77					"State event's type and state_key ({:?},{:?}) exists multiple times.",
78					pdu.event_type(),
79					pdu.state_key()
80						.expect("all state events have state_key"),
81				)));
82			},
83		}
84	}
85
86	// The original create event must still be in the state
87	let create_shortstatekey = self
88		.services
89		.short
90		.get_shortstatekey(&StateEventType::RoomCreate, "")
91		.await?;
92
93	if state
94		.get(&create_shortstatekey)
95		.map(AsRef::as_ref)
96		!= Some(create_event_id)
97	{
98		return Err!(Database("Incoming event refers to wrong create event."));
99	}
100
101	Ok(Some(state))
102}