1use axum::extract::State;
2use futures::{
3 FutureExt, StreamExt, TryFutureExt, TryStreamExt,
4 future::{join, join3, try_join3},
5};
6use ruma::{OwnedEventId, UserId, api::client::context::get_context, events::StateEventType};
7use tuwunel_core::{
8 Err, Event, Result, at, debug_warn, err, ref_at,
9 utils::{
10 BoolExt, IterStream,
11 future::TryExtExt,
12 stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt},
13 },
14};
15use tuwunel_service::rooms::{lazy_loading, lazy_loading::Options, short::ShortStateKey};
16
17use crate::{
18 Ruma,
19 client::{
20 is_ignored_pdu,
21 message::{
22 add_membership_unsigned, event_filter, ignored_filter, lazy_loading_witness,
23 visibility_filter, with_membership,
24 },
25 },
26};
27
28const LIMIT_MAX: usize = 100;
29const LIMIT_DEFAULT: usize = 10;
30
31pub(crate) async fn get_context_route(
38 State(services): State<crate::State>,
39 body: Ruma<get_context::v3::Request>,
40) -> Result<get_context::v3::Response> {
41 let sender_user = body.sender_user();
42 let sender_device = body.sender_device.as_deref();
43 let room_id = &body.room_id;
44 let event_id = &body.event_id;
45 let filter = &body.filter;
46
47 if !services.metadata.exists(room_id).await {
48 return Err!(Request(Forbidden("Room does not exist to this server")));
49 }
50
51 let limit: usize = body
53 .limit
54 .try_into()
55 .unwrap_or(LIMIT_DEFAULT)
56 .min(LIMIT_MAX);
57
58 let base_id = services
59 .timeline
60 .get_pdu_id(event_id)
61 .map_err(|_| err!(Request(NotFound("Event not found."))));
62
63 let base_pdu = services
64 .timeline
65 .get_pdu(event_id)
66 .map_err(|_| err!(Request(NotFound("Base event not found."))));
67
68 let visible = services
69 .state_accessor
70 .user_can_see_event(sender_user, room_id, event_id)
71 .map(Ok);
72
73 let (base_id, base_pdu, visible) = try_join3(base_id, base_pdu, visible).await?;
74
75 if base_pdu.room_id != *room_id || base_pdu.event_id != *event_id {
76 return Err!(Request(NotFound("Base event not found.")));
77 }
78
79 if !visible {
80 debug_warn!(
81 req_evt = ?event_id, ?base_id, ?room_id,
82 "Event requested by {sender_user} but is not allowed to see it."
83 );
84
85 return Err!(Request(NotFound("Event not found.")));
86 }
87
88 if is_ignored_pdu(&services, &base_pdu, sender_user).await {
89 return Err!(HttpJson(NOT_FOUND, {
90 "errcode": "M_SENDER_IGNORED",
91 "error": "You have ignored the user that sent this event",
92 "sender": base_pdu.sender().as_str(),
93 }));
94 }
95
96 let base_count = base_id.pdu_count();
97
98 let encrypted = services
99 .state_accessor
100 .is_encrypted_room(room_id)
101 .await;
102
103 let base_event = async {
104 let item = ignored_filter(&services, (base_count, base_pdu), sender_user).await?;
105 Some(add_membership_unsigned(&services, item, sender_user, encrypted).await)
106 };
107
108 let events_before = services
109 .timeline
110 .pdus_rev(Some(sender_user), room_id, Some(base_count))
111 .ignore_err()
112 .ready_filter_map(|item| event_filter(item, filter))
113 .wide_filter_map(|item| ignored_filter(&services, item, sender_user))
114 .wide_filter_map(|item| visibility_filter(&services, item, sender_user))
115 .take(limit / 2)
116 .wide_then(|item| add_membership_unsigned(&services, item, sender_user, encrypted))
117 .collect();
118
119 let events_after = services
120 .timeline
121 .pdus(Some(sender_user), room_id, Some(base_count))
122 .ignore_err()
123 .ready_filter_map(|item| event_filter(item, filter))
124 .wide_filter_map(|item| ignored_filter(&services, item, sender_user))
125 .wide_filter_map(|item| visibility_filter(&services, item, sender_user))
126 .take(limit.div_ceil(2))
127 .wide_then(|item| add_membership_unsigned(&services, item, sender_user, encrypted))
128 .collect();
129
130 let (base_event, events_before, events_after): (_, Vec<_>, Vec<_>) =
131 join3(base_event, events_before, events_after)
132 .boxed()
133 .await;
134
135 let lazy_loading_context = lazy_loading::Context {
136 user_id: sender_user,
137 device_id: sender_device,
138 room_id,
139 token: Some(base_count.into_unsigned()),
140 options: Some(&filter.lazy_load_options),
141 mode: lazy_loading::Mode::Update,
142 };
143
144 let lazy_loading_witnessed = filter
145 .lazy_load_options
146 .is_enabled()
147 .then_async(|| {
148 let witnessed = base_event
149 .iter()
150 .chain(events_before.iter())
151 .chain(events_after.iter());
152
153 lazy_loading_witness(&services, &lazy_loading_context, witnessed)
154 });
155
156 let state_at = events_after
157 .last()
158 .map(ref_at!(1))
159 .map_or_else(|| body.event_id.as_ref(), |pdu| pdu.event_id.as_ref());
160
161 let state_ids = services
162 .state
163 .pdu_shortstatehash(state_at)
164 .or_else(|_| services.state.get_room_shortstatehash(room_id))
165 .map_ok(|shortstatehash| {
166 services
167 .state_accessor
168 .state_full_ids(shortstatehash)
169 .map(Ok)
170 })
171 .map_err(|e| err!(Database("State not found: {e}")))
172 .try_flatten_stream()
173 .try_collect()
174 .boxed();
175
176 let (lazy_loading_witnessed, state_ids) = join(lazy_loading_witnessed, state_ids).await;
177
178 let state_ids: Vec<(ShortStateKey, OwnedEventId)> = state_ids?;
179 let shortstatekeys = state_ids.iter().map(at!(0)).stream();
180 let shorteventids = state_ids.iter().map(ref_at!(1)).stream();
181 let lazy_loading_witnessed = lazy_loading_witnessed.unwrap_or_default();
182 let state: Vec<_> = services
183 .short
184 .multi_get_statekey_from_short(shortstatekeys)
185 .zip(shorteventids)
186 .ready_filter_map(|item| Some((item.0.ok()?, item.1)))
187 .ready_filter_map(|((event_type, state_key), event_id)| {
188 if filter.lazy_load_options.is_enabled()
189 && event_type == StateEventType::RoomMember
190 && state_key
191 .as_str()
192 .try_into()
193 .is_ok_and(|user_id: &UserId| !lazy_loading_witnessed.contains(user_id))
194 {
195 return None;
196 }
197
198 Some(event_id)
199 })
200 .broad_filter_map(|event_id: &OwnedEventId| {
201 services.timeline.get_pdu(event_id.as_ref()).ok()
202 })
203 .broad_then(|pdu| with_membership(&services, pdu, sender_user, encrypted))
204 .map(Event::into_format)
205 .collect()
206 .await;
207
208 Ok(get_context::v3::Response {
209 event: base_event.map(at!(1)).map(Event::into_format),
210
211 start: events_before
212 .last()
213 .map(at!(0))
214 .or(Some(base_count))
215 .as_ref()
216 .map(ToString::to_string),
217
218 end: events_after
219 .last()
220 .map(at!(0))
221 .or(Some(base_count))
222 .as_ref()
223 .map(ToString::to_string),
224
225 events_before: events_before
226 .into_iter()
227 .map(at!(1))
228 .map(Event::into_format)
229 .collect(),
230
231 events_after: events_after
232 .into_iter()
233 .map(at!(1))
234 .map(Event::into_format)
235 .collect(),
236
237 state,
238 })
239}