tuwunel_service/rooms/event_handler/
handle_outlier_pdu.rs1use futures::{StreamExt, TryFutureExt};
2use ruma::{
3 CanonicalJsonObject, EventId, RoomId, RoomVersionId, ServerName, events::TimelineEventType,
4};
5use tuwunel_core::{
6 Err, Result, debug, debug_info, err, implement,
7 matrix::{Event, PduEvent, event::TypeExt, room_version},
8 ref_at, trace,
9 utils::{future::TryExtExt, stream::IterStream},
10 warn,
11};
12
13use crate::rooms::state_res;
14
15#[implement(super::Service)]
16#[cfg_attr(unabridged, tracing::instrument(
17 name = "outlier",
18 level = "debug",
19 skip_all,
20 fields(lev = %recursion_level)
21))]
22#[expect(clippy::too_many_arguments)]
23pub(super) async fn handle_outlier_pdu(
24 &self,
25 origin: &ServerName,
26 room_id: &RoomId,
27 event_id: &EventId,
28 mut pdu_json: CanonicalJsonObject,
29 room_version: &RoomVersionId,
30 recursion_level: usize,
31 auth_events_known: bool,
32) -> Result<(PduEvent, CanonicalJsonObject)> {
33 debug!(?event_id, ?auth_events_known, %recursion_level, "handle outlier");
34
35 pdu_json.remove("unsigned");
37
38 let pdu_json = match self
43 .services
44 .server_keys
45 .verify_event(&pdu_json, Some(room_version))
46 .await
47 {
48 | Ok(ruma::signatures::Verified::All) => pdu_json,
49 | Ok(ruma::signatures::Verified::Signatures) => {
50 debug_info!("Calculated hash does not match (redaction): {event_id}");
52 let Some(rules) = room_version.rules() else {
53 return Err!(Request(UnsupportedRoomVersion(
54 "Cannot redact event for unknown room version {room_version:?}."
55 )));
56 };
57
58 let Ok(pdu_json) = ruma::canonical_json::redact(pdu_json, &rules.redaction, None)
59 else {
60 return Err!(Request(InvalidParam("Redaction failed")));
61 };
62
63 if self.services.timeline.pdu_exists(event_id).await {
65 return Err!(Request(InvalidParam(
66 "Event was redacted and we already knew about it"
67 )));
68 }
69
70 pdu_json
71 },
72 | Err(e) => {
73 return Err!(Request(InvalidParam(debug_error!(
74 "Signature verification failed for {event_id}: {e}"
75 ))));
76 },
77 };
78
79 let room_rules = room_version::rules(room_version)?;
82 let (event, pdu_json) =
83 PduEvent::from_object_federation(room_id, event_id, pdu_json, &room_rules)?;
84
85 if !auth_events_known {
86 debug!("Fetching auth events");
92 Box::pin(self.fetch_auth(
93 origin,
94 room_id,
95 event.auth_events(),
96 room_version,
97 recursion_level,
98 ))
99 .await;
100 }
101
102 debug!("Checking based on auth events");
105
106 let is_hydra = !room_rules
107 .event_format
108 .allow_room_create_in_auth_events;
109
110 let not_create = *event.kind() != TimelineEventType::RoomCreate;
111 let hydra_create_id = (not_create && is_hydra)
112 .then(|| event.room_id().as_event_id().ok())
113 .flatten();
114
115 let auth_events: Vec<_> = event
116 .auth_events()
117 .chain(hydra_create_id.as_deref())
118 .stream()
119 .filter_map(|auth_event_id| {
120 self.event_fetch(auth_event_id)
121 .inspect_err(move |e| warn!("Missing auth_event {auth_event_id}: {e}"))
122 .ok()
123 })
124 .map(|auth_event| {
125 let event_type = auth_event.event_type();
126 let state_key = auth_event
127 .state_key()
128 .expect("all auth events have state_key");
129
130 (event_type.with_state_key(state_key), auth_event)
131 })
132 .collect()
133 .await;
134
135 state_res::auth_check(
136 &room_rules,
137 &event,
138 &async |event_id| self.event_fetch(&event_id).await,
139 &async |event_type, state_key| {
140 let target = event_type.with_state_key(state_key);
141 auth_events
142 .iter()
143 .find(|(type_state_key, _)| *type_state_key == target)
144 .map(ref_at!(1))
145 .cloned()
146 .ok_or_else(|| err!(Request(NotFound("state not found"))))
147 },
148 )
149 .inspect_ok(|()| trace!("Validation successful."))
150 .await?;
151
152 self.services
154 .timeline
155 .add_pdu_outlier(event.event_id(), &pdu_json);
156
157 trace!("Added pdu as outlier.");
158
159 Ok((event, pdu_json))
160}