tuwunel_service/rooms/event_handler/
handle_incoming_pdu.rs1use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join5};
2use ruma::{
3 CanonicalJsonObject, EventId, OwnedEventId, RoomId, ServerName, UserId,
4 events::StateEventType,
5};
6use tuwunel_core::{
7 Err, Result, debug,
8 debug::INFO_SPAN_LEVEL,
9 debug_warn, err, implement,
10 matrix::{Event, pdu::MAX_PREV_EVENTS, room_version},
11 smallvec::SmallVec,
12 trace,
13 utils::{
14 BoolExt,
15 stream::{IterStream, TryWidebandExt},
16 },
17 warn,
18};
19
20use crate::rooms::timeline::RawPduId;
21
22type PrevResultsHandled = SmallVec<[PrevHandled; MAX_PREV_EVENTS]>;
23type PrevHandled = (OwnedEventId, Handled);
24
25type PrevResults = SmallVec<[PrevResult; MAX_PREV_EVENTS]>;
26type PrevResult = (OwnedEventId, Result<Handled>);
27
28type Handled = Option<(RawPduId, bool)>;
29
30#[implement(super::Service)]
58#[tracing::instrument(
59 name = "pdu",
60 level = INFO_SPAN_LEVEL,
61 skip_all,
62 fields(%room_id, %event_id),
63 ret(level = "debug"),
64)]
65pub async fn handle_incoming_pdu<'a>(
66 &'a self,
67 origin: &'a ServerName,
68 room_id: &'a RoomId,
69 event_id: &'a EventId,
70 pdu: CanonicalJsonObject,
71 is_timeline_event: bool,
72) -> Result<Handled> {
73 if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
75 debug!(?pdu_id, "Exists.");
76 return Ok(Some((pdu_id, false)));
77 }
78
79 let meta_exists = self.services.metadata.exists(room_id).map(Ok);
81
82 let is_disabled = self
84 .services
85 .metadata
86 .is_disabled(room_id)
87 .map(Ok);
88
89 let origin_acl_check = self.acl_check(origin, room_id);
91
92 let sender: &UserId = pdu
94 .get("sender")
95 .try_into()
96 .map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?;
97
98 let sender_acl_check = sender
99 .server_name()
100 .ne(origin)
101 .then_async(|| self.acl_check(sender.server_name(), room_id));
102
103 let create_event =
105 self.services
106 .state_accessor
107 .room_state_get(room_id, &StateEventType::RoomCreate, "");
108
109 let (meta_exists, is_disabled, (), (), ref create_event) = try_join5(
110 meta_exists,
111 is_disabled,
112 origin_acl_check,
113 sender_acl_check.map(|o| o.unwrap_or(Ok(()))),
114 create_event,
115 )
116 .await?;
117
118 if !meta_exists {
119 return Err!(Request(NotFound("Room is unknown to this server")));
120 }
121
122 if is_disabled {
123 return Err!(Request(Forbidden("Federation of this room is disabled by this server.")));
124 }
125
126 let room_version = room_version::from_create_event(create_event)?;
127 let recursion_level = 0;
128
129 let (incoming_pdu, pdu) = self
130 .handle_outlier_pdu(origin, room_id, event_id, pdu, &room_version, recursion_level, false)
131 .await?;
132
133 if !is_timeline_event {
135 debug!(
136 kind = ?incoming_pdu.event_type(),
137 "Not a timeline event.",
138 );
139 return Ok(None);
140 }
141
142 let first_ts_in_room = self
144 .services
145 .timeline
146 .first_pdu_in_room(room_id)
147 .await?
148 .origin_server_ts();
149
150 if incoming_pdu.origin_server_ts() < first_ts_in_room {
151 debug!(
152 origin_server_ts = ?incoming_pdu.origin_server_ts(),
153 ?first_ts_in_room,
154 "Skipping old event."
155 );
156 return Ok(None);
157 }
158
159 let (sorted_prev_events, mut eventid_info) = self
162 .fetch_prev(
163 origin,
164 room_id,
165 incoming_pdu.prev_events(),
166 &room_version,
167 recursion_level,
168 first_ts_in_room,
169 )
170 .await?;
171
172 trace!(
173 events = sorted_prev_events.len(),
174 event_ids = ?sorted_prev_events,
175 "Handling previous events"
176 );
177 let _prev_handles: PrevResultsHandled = sorted_prev_events
178 .into_iter()
179 .enumerate()
180 .try_stream()
181 .map_ok(|(i, prev_id)| (i, eventid_info.remove(&prev_id), prev_id))
182 .widen_and_then(MAX_PREV_EVENTS, async |(i, eventid_info, prev_id)| {
183 self.services.server.check_running()?;
184 match self
185 .handle_prev_pdu(
186 origin,
187 room_id,
188 event_id,
189 eventid_info,
190 &room_version,
191 recursion_level,
192 first_ts_in_room,
193 &prev_id,
194 create_event.event_id(),
195 )
196 .await
197 {
198 | Ok(Some(handled)) => {
199 self.cancel_back_off(&prev_id);
200 debug!(?i, ?prev_id, ?handled, "Prev event processed.");
201
202 Ok((prev_id, Ok(Some(handled))))
203 },
204 | Ok(None) => {
205 debug_warn!(?i, ?prev_id, "Prev event not processed.");
206
207 Ok((prev_id, Ok(None)))
208 },
209 | Err(e) => {
210 self.back_off(&prev_id);
211 warn!(?i, ?prev_id, ?event_id, ?room_id, "Prev event processing failed: {e}");
212
213 Ok((prev_id, Err(e)))
214 },
215 }
216 })
217 .try_collect::<PrevResults>()
218 .map_ok(PrevResults::into_iter)
219 .map_ok(IterStream::stream)
220 .map_ok(|s| s.map(|(id, res)| res.map(|res| (id, res))))
221 .try_flatten_stream()
222 .try_collect()
223 .boxed()
224 .await?;
225
226 self.upgrade_outlier_to_timeline_pdu(
228 origin,
229 room_id,
230 incoming_pdu,
231 pdu,
232 &room_version,
233 recursion_level,
234 create_event.event_id(),
235 )
236 .boxed()
237 .await
238}