Skip to main content

tuwunel_service/rooms/event_handler/
parse_incoming_pdu.rs

1use futures::{StreamExt, pin_mut};
2use ruma::{
3	CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
4};
5use serde_json::value::RawValue as RawJsonValue;
6use tuwunel_core::{Result, err, implement, matrix::event::gen_event_id, result::FlatOk};
7
8type Parsed = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
9
10#[implement(super::Service)]
11#[tracing::instrument(
12    name = "parse_incoming",
13    level = "trace",
14    skip_all,
15    fields(
16        len = pdu.get().len(),
17    )
18)]
19pub async fn parse_incoming_pdu(&self, pdu: &RawJsonValue) -> Result<Parsed> {
20	let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
21		err!(BadServerResponse(debug_error!("Error parsing incoming event: {e} {pdu:#?}")))
22	})?;
23
24	let room_id: OwnedRoomId = value
25		.get("room_id")
26		.and_then(CanonicalJsonValue::as_str)
27		.map(OwnedRoomId::parse)
28		.flat_ok_or(err!(Request(InvalidParam("Invalid room_id in pdu"))))?;
29
30	let room_version_id = match self
31		.services
32		.state
33		.get_room_version(&room_id)
34		.await
35	{
36		| Ok(room_version_id) => room_version_id,
37		// We may not be resident (e.g. a rescinded out-of-band invite); recover the
38		// version from a locally-invited member's stored stripped state.
39		| Err(_) => self
40			.invited_room_version(&room_id)
41			.await
42			.ok_or_else(|| err!("Server is not in room {room_id}"))?,
43	};
44
45	gen_event_id(&value, &room_version_id)
46		.map(move |event_id| (room_id, event_id, value))
47		.map_err(|e| {
48			err!(Request(InvalidParam("Could not convert event to canonical json: {e}")))
49		})
50}
51
52/// Recover a room's version from a locally-invited member's stored stripped
53/// state, for a room we are not resident in (e.g. a rescinded out-of-band
54/// invite). The create event in the stripped state carries the version.
55#[implement(super::Service)]
56async fn invited_room_version(&self, room_id: &RoomId) -> Option<RoomVersionId> {
57	let invited = self
58		.services
59		.state_cache
60		.room_members_invited(room_id)
61		.map(ToOwned::to_owned);
62
63	pin_mut!(invited);
64	while let Some(user_id) = invited.next().await {
65		if self.services.globals.user_is_local(&user_id)
66			&& let Ok(stripped) = self
67				.services
68				.state_cache
69				.invite_state(&user_id, room_id)
70				.await && let Some(room_version) = super::room_version_of(&stripped)
71		{
72			return Some(room_version);
73		}
74	}
75
76	None
77}