Skip to main content

tuwunel_service/rooms/event_handler/
parse_incoming_pdu.rs

1use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId};
2use serde_json::value::RawValue as RawJsonValue;
3use tuwunel_core::{Result, err, implement, matrix::event::gen_event_id, result::FlatOk};
4
5type Parsed = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
6
7#[implement(super::Service)]
8#[tracing::instrument(
9    name = "parse_incoming",
10    level = "trace",
11    skip_all,
12    fields(
13        len = pdu.get().len(),
14    )
15)]
16pub async fn parse_incoming_pdu(&self, pdu: &RawJsonValue) -> Result<Parsed> {
17	let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
18		err!(BadServerResponse(debug_error!("Error parsing incoming event: {e} {pdu:#?}")))
19	})?;
20
21	let room_id: OwnedRoomId = value
22		.get("room_id")
23		.and_then(CanonicalJsonValue::as_str)
24		.map(OwnedRoomId::parse)
25		.flat_ok_or(err!(Request(InvalidParam("Invalid room_id in pdu"))))?;
26
27	let room_version_id = self
28		.services
29		.state
30		.get_room_version(&room_id)
31		.await
32		.map_err(|_| err!("Server is not in room {room_id}"))?;
33
34	gen_event_id(&value, &room_version_id)
35		.map(move |event_id| (room_id, event_id, value))
36		.map_err(|e| {
37			err!(Request(InvalidParam("Could not convert event to canonical json: {e}")))
38		})
39}