tuwunel_service/rooms/event_handler/
parse_incoming_pdu.rs1use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId};
2use serde_json::value::RawValue as RawJsonValue;
3use tuwunel_core::{Result, err, implement, matrix::event::gen_event_id, result::FlatOk};
4
5type Parsed = (OwnedRoomId, OwnedEventId, CanonicalJsonObject);
6
7#[implement(super::Service)]
8#[tracing::instrument(
9 name = "parse_incoming",
10 level = "trace",
11 skip_all,
12 fields(
13 len = pdu.get().len(),
14 )
15)]
16pub async fn parse_incoming_pdu(&self, pdu: &RawJsonValue) -> Result<Parsed> {
17 let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
18 err!(BadServerResponse(debug_error!("Error parsing incoming event: {e} {pdu:#?}")))
19 })?;
20
21 let room_id: OwnedRoomId = value
22 .get("room_id")
23 .and_then(CanonicalJsonValue::as_str)
24 .map(OwnedRoomId::parse)
25 .flat_ok_or(err!(Request(InvalidParam("Invalid room_id in pdu"))))?;
26
27 let room_version_id = self
28 .services
29 .state
30 .get_room_version(&room_id)
31 .await
32 .map_err(|_| err!("Server is not in room {room_id}"))?;
33
34 gen_event_id(&value, &room_version_id)
35 .map(move |event_id| (room_id, event_id, value))
36 .map_err(|e| {
37 err!(Request(InvalidParam("Could not convert event to canonical json: {e}")))
38 })
39}