1use axum::extract::State;
2use futures::{
3 FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
4 future::{OptionFuture, join, join3, try_join3},
5};
6use ruma::{
7 EventId, OwnedEventId, RoomId, UserId,
8 api::client::{context::get_context, filter::RoomEventFilter},
9 events::{AnyStateEvent, StateEventType},
10 serde::Raw,
11};
12use tuwunel_core::{
13 Err, Event, Result, at, debug_warn, err,
14 matrix::pdu::{PduEvent, RawPduId},
15 ref_at,
16 utils::{
17 BoolExt, IterStream,
18 future::TryExtExt,
19 stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt},
20 },
21};
22use tuwunel_service::{
23 Services,
24 rooms::{
25 lazy_loading,
26 lazy_loading::{Options, Witness},
27 short::ShortStateKey,
28 timeline::PdusIterItem,
29 },
30};
31
32use crate::{
33 Ruma,
34 client::{
35 is_ignored_pdu,
36 message::{
37 add_membership_unsigned, event_filter, ignored_filter, lazy_loading_witness,
38 visibility_filter, with_membership,
39 },
40 },
41};
42
43const LIMIT_MAX: usize = 100;
44const LIMIT_DEFAULT: usize = 10;
45
46pub(crate) async fn get_context_route(
53 State(services): State<crate::State>,
54 body: Ruma<get_context::v3::Request>,
55) -> Result<get_context::v3::Response> {
56 let sender_user = body.sender_user();
57 let sender_device = body.sender_device.as_deref();
58 let room_id = &body.room_id;
59 let event_id = &body.event_id;
60 let filter = &body.filter;
61
62 if !services.metadata.exists(room_id).await {
63 return Err!(Request(Forbidden("Room does not exist to this server")));
64 }
65
66 let limit: usize = body
67 .limit
68 .try_into()
69 .unwrap_or(LIMIT_DEFAULT)
70 .min(LIMIT_MAX);
71
72 let (base_id, base_pdu) =
73 resolve_base_event(&services, room_id, event_id, sender_user).await?;
74 let base_count = base_id.pdu_count();
75
76 let encrypted = services
77 .state_accessor
78 .is_encrypted_room(room_id)
79 .await;
80
81 let base_event = async {
82 let item = ignored_filter(&services, (base_count, base_pdu), sender_user).await?;
83 Some(add_membership_unsigned(&services, item, sender_user, encrypted).await)
84 };
85
86 let events_before = collect_timeline_half(
87 &services,
88 services
89 .timeline
90 .pdus_rev(Some(sender_user), room_id, Some(base_count)),
91 filter,
92 sender_user,
93 encrypted,
94 limit / 2,
95 );
96
97 let events_after = collect_timeline_half(
98 &services,
99 services
100 .timeline
101 .pdus(Some(sender_user), room_id, Some(base_count)),
102 filter,
103 sender_user,
104 encrypted,
105 limit.div_ceil(2),
106 );
107
108 let (base_event, events_before, events_after): (_, Vec<_>, Vec<_>) =
109 join3(base_event, events_before, events_after)
110 .boxed()
111 .await;
112
113 let lazy_loading_context = lazy_loading::Context {
114 user_id: sender_user,
115 device_id: sender_device,
116 room_id,
117 token: Some(base_count.into_unsigned()),
118 options: Some(&filter.lazy_load_options),
119 mode: lazy_loading::Mode::Update,
120 };
121
122 let lazy_loading_witnessed = filter
123 .lazy_load_options
124 .is_enabled()
125 .then_async(|| {
126 let witnessed = base_event
127 .iter()
128 .chain(events_before.iter())
129 .chain(events_after.iter());
130
131 lazy_loading_witness(&services, &lazy_loading_context, witnessed)
132 });
133
134 let state_at = events_after
135 .last()
136 .map(ref_at!(1))
137 .map_or_else(|| body.event_id.as_ref(), |pdu| pdu.event_id.as_ref());
138
139 let (lazy_loading_witnessed, state_ids) =
140 join(lazy_loading_witnessed, load_state_ids(&services, room_id, state_at)).await;
141
142 let state = build_state_response(
143 &services,
144 state_ids?,
145 lazy_loading_witnessed.unwrap_or_default(),
146 filter,
147 sender_user,
148 encrypted,
149 )
150 .await;
151
152 let event = OptionFuture::from(base_event.map(at!(1)).map(|pdu| {
153 services
154 .pdu_metadata
155 .bundle_aggregations(sender_user, pdu)
156 }))
157 .await
158 .map(Event::into_format);
159
160 Ok(get_context::v3::Response {
161 event,
162
163 start: events_before
164 .last()
165 .map(at!(0))
166 .or(Some(base_count))
167 .as_ref()
168 .map(ToString::to_string),
169
170 end: events_after
173 .last()
174 .map(at!(0))
175 .or_else(|| Some(base_count.saturating_add(1)))
176 .as_ref()
177 .map(ToString::to_string),
178
179 events_before: events_before
180 .into_iter()
181 .map(at!(1))
182 .map(Event::into_format)
183 .collect(),
184
185 events_after: events_after
186 .into_iter()
187 .map(at!(1))
188 .map(Event::into_format)
189 .collect(),
190
191 state,
192 })
193}
194
195async fn resolve_base_event(
196 services: &Services,
197 room_id: &RoomId,
198 event_id: &EventId,
199 sender_user: &UserId,
200) -> Result<(RawPduId, PduEvent)> {
201 let lookup = || {
202 let base_id = services
203 .timeline
204 .get_pdu_id(event_id)
205 .map_err(|_| err!(Request(NotFound("Event not found."))));
206
207 let base_pdu = services
208 .timeline
209 .get_pdu(event_id)
210 .map_err(|_| err!(Request(NotFound("Base event not found."))));
211
212 let visible = services
213 .state_accessor
214 .user_can_see_event(sender_user, room_id, event_id)
215 .map(Ok);
216
217 try_join3(base_id, base_pdu, visible)
218 };
219
220 let resolve_remote = services
221 .config
222 .fetch_unreceived_contexts_over_federation
223 && services.config.allow_federation;
224
225 let (base_id, base_pdu, visible) = match lookup().await {
226 | Ok(found) => found,
227 | Err(e) if !resolve_remote => return Err(e),
228 | Err(_) => {
229 services
230 .timeline
231 .fetch_remote_event(room_id, event_id)
232 .await
233 .ok();
234
235 lookup().await?
236 },
237 };
238
239 if base_pdu.room_id != *room_id || base_pdu.event_id != *event_id {
240 return Err!(Request(NotFound("Base event not found.")));
241 }
242
243 if !visible {
244 debug_warn!(
245 req_evt = ?event_id, ?base_id, ?room_id,
246 "Event requested by {sender_user} but is not allowed to see it."
247 );
248
249 return Err!(Request(NotFound("Event not found.")));
250 }
251
252 if is_ignored_pdu(services, &base_pdu, sender_user).await {
253 return Err!(HttpJson(NOT_FOUND, {
254 "errcode": "M_SENDER_IGNORED",
255 "error": "You have ignored the user that sent this event",
256 "sender": base_pdu.sender().as_str(),
257 }));
258 }
259
260 Ok((base_id, base_pdu))
261}
262
263async fn collect_timeline_half<'a, S>(
264 services: &'a Services,
265 pdus: S,
266 filter: &'a RoomEventFilter,
267 sender_user: &'a UserId,
268 encrypted: bool,
269 take: usize,
270) -> Vec<PdusIterItem>
271where
272 S: Stream<Item = Result<PdusIterItem>> + Send + 'a,
273{
274 pdus.ignore_err()
275 .ready_filter_map(|item| event_filter(item, filter))
276 .wide_filter_map(|item| ignored_filter(services, item, sender_user))
277 .wide_filter_map(|item| visibility_filter(services, item, sender_user))
278 .take(take)
279 .wide_then(|item| add_membership_unsigned(services, item, sender_user, encrypted))
280 .wide_then(async |(count, pdu)| {
281 let pdu = services
282 .pdu_metadata
283 .bundle_aggregations(sender_user, pdu)
284 .await;
285
286 (count, pdu)
287 })
288 .collect()
289 .await
290}
291
292async fn load_state_ids(
293 services: &Services,
294 room_id: &RoomId,
295 state_at: &EventId,
296) -> Result<Vec<(ShortStateKey, OwnedEventId)>> {
297 services
298 .state
299 .pdu_shortstatehash(state_at)
300 .or_else(|_| services.state.get_room_shortstatehash(room_id))
301 .map_ok(|shortstatehash| {
302 services
303 .state_accessor
304 .state_full_ids(shortstatehash)
305 .map(Ok)
306 })
307 .map_err(|e| err!(Database("State not found: {e}")))
308 .try_flatten_stream()
309 .try_collect()
310 .boxed()
311 .await
312}
313
314async fn build_state_response(
315 services: &Services,
316 state_ids: Vec<(ShortStateKey, OwnedEventId)>,
317 lazy_loading_witnessed: Witness,
318 filter: &RoomEventFilter,
319 sender_user: &UserId,
320 encrypted: bool,
321) -> Vec<Raw<AnyStateEvent>> {
322 let shortstatekeys = state_ids.iter().map(at!(0)).stream();
323 let shorteventids = state_ids.iter().map(ref_at!(1)).stream();
324
325 services
326 .short
327 .multi_get_statekey_from_short(shortstatekeys)
328 .zip(shorteventids)
329 .ready_filter_map(|item| Some((item.0.ok()?, item.1)))
330 .ready_filter_map(|((event_type, state_key), event_id)| {
331 if filter.lazy_load_options.is_enabled()
332 && event_type == StateEventType::RoomMember
333 && state_key
334 .as_str()
335 .try_into()
336 .is_ok_and(|user_id: &UserId| !lazy_loading_witnessed.contains(user_id))
337 {
338 return None;
339 }
340
341 Some(event_id)
342 })
343 .broad_filter_map(|event_id: &OwnedEventId| {
344 services.timeline.get_pdu(event_id.as_ref()).ok()
345 })
346 .broad_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
347 .map(Event::into_format)
348 .collect()
349 .await
350}