1use axum::extract::State;
2use futures::{FutureExt, StreamExt, TryFutureExt, future::Either, pin_mut};
3use ruma::{
4 RoomId, UserId,
5 api::{
6 Direction,
7 client::{filter::RoomEventFilter, message::get_message_events},
8 },
9 events::{AnyStateEvent, StateEventType, TimelineEventType, TimelineEventType::*},
10 serde::Raw,
11};
12use tuwunel_core::{
13 Err, Result, at,
14 matrix::{
15 event::{Event, Matches},
16 pdu::{PduCount, PduEvent},
17 },
18 ref_at,
19 utils::{
20 BoolExt, IterStream, ReadyExt,
21 result::{FlatOk, LogErr},
22 stream::{BroadbandExt, TryIgnore, WidebandExt},
23 },
24};
25use tuwunel_service::{
26 Services,
27 rooms::{
28 lazy_loading,
29 lazy_loading::{Options, Witness},
30 timeline::PdusIterItem,
31 },
32};
33
34use crate::Ruma;
35
36const IGNORED_MESSAGE_TYPES: &[TimelineEventType] = &[
39 CallInvite, KeyVerificationStart, Location, PollStart, Reaction, RoomEncrypted, RoomMessage, Sticker, Audio, Emote, File, Image, Video, Voice, UnstablePollStart, Beacon, CallNotify, ];
57
58const LIMIT_MAX: usize = 1000;
59const LIMIT_DEFAULT: usize = 10;
60
61pub(crate) async fn get_message_events_route(
68 State(services): State<crate::State>,
69 body: Ruma<get_message_events::v3::Request>,
70) -> Result<get_message_events::v3::Response> {
71 let sender_user = body.sender_user();
72 let sender_device = body.sender_device.as_deref();
73 let room_id = &body.room_id;
74 let filter = &body.filter;
75
76 if !services.metadata.exists(room_id).await {
77 return Err!(Request(Forbidden("Room does not exist to this server")));
78 }
79
80 if !services
81 .state_accessor
82 .user_can_see_room(sender_user, room_id)
83 .await
84 {
85 return Err!(Request(Forbidden("You don't have permission to view this room.")));
86 }
87
88 let from: PduCount = body
89 .from
90 .as_deref()
91 .map(str::parse)
92 .transpose()?
93 .unwrap_or_else(|| match body.dir {
94 | Direction::Forward => PduCount::min(),
95 | Direction::Backward => PduCount::max(),
96 });
97
98 let to: Option<PduCount> = body.to.as_deref().map(str::parse).flat_ok();
99
100 let limit: usize = body
101 .limit
102 .try_into()
103 .unwrap_or(LIMIT_DEFAULT)
104 .min(LIMIT_MAX);
105
106 if matches!(body.dir, Direction::Backward) {
107 services
108 .timeline
109 .backfill_if_required(room_id, from)
110 .await
111 .log_err()
112 .ok();
113 }
114
115 let it = match body.dir {
116 | Direction::Forward => Either::Left(
117 services
118 .timeline
119 .pdus(Some(sender_user), room_id, Some(from))
120 .ignore_err(),
121 ),
122 | Direction::Backward => Either::Right(
123 services
124 .timeline
125 .pdus_rev(Some(sender_user), room_id, Some(from))
126 .ignore_err(),
127 ),
128 };
129
130 let encrypted = services
131 .state_accessor
132 .is_encrypted_room(room_id)
133 .await;
134
135 let events: Vec<_> = it
136 .ready_take_while(|(count, _)| Some(*count) != to)
137 .ready_filter_map(|item| event_filter(item, filter))
138 .wide_filter_map(|item| event_filters(&services, sender_user, item))
139 .take(limit)
140 .wide_then(|item| add_membership_unsigned(&services, item, sender_user, encrypted))
141 .wide_then(async |(count, pdu)| {
142 let pdu = services
143 .pdu_metadata
144 .bundle_aggregations(sender_user, pdu)
145 .await;
146
147 (count, pdu)
148 })
149 .collect()
150 .await;
151
152 let lazy_loading_context = lazy_loading::Context {
153 user_id: sender_user,
154 device_id: sender_device,
155 room_id,
156 token: Some(from.into_unsigned()),
157 options: Some(&filter.lazy_load_options),
158 mode: lazy_loading::Mode::Update,
159 };
160
161 let witness = filter
162 .lazy_load_options
163 .is_enabled()
164 .then_async(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter()));
165
166 let state = witness
167 .map(Option::into_iter)
168 .map(|option| option.flat_map(Witness::into_iter))
169 .map(IterStream::stream)
170 .into_stream()
171 .flatten()
172 .broad_filter_map(async |user_id| get_member_event(&services, room_id, &user_id).await)
173 .collect()
174 .await;
175
176 let next_token = events.last().map(at!(0));
177
178 let chunk = events
179 .into_iter()
180 .map(at!(1))
181 .map(Event::into_format)
182 .collect();
183
184 Ok(get_message_events::v3::Response {
185 start: from.to_string(),
186 end: next_token.as_ref().map(ToString::to_string),
187 chunk,
188 state,
189 })
190}
191
192pub(crate) async fn lazy_loading_witness<'a, I>(
193 services: &Services,
194 lazy_loading_context: &lazy_loading::Context<'_>,
195 events: I,
196) -> Witness
197where
198 I: Iterator<Item = &'a PdusIterItem> + Clone + Send,
199{
200 let oldest = events
201 .clone()
202 .map(|(count, _)| count)
203 .copied()
204 .min()
205 .unwrap_or_else(PduCount::max);
206
207 let newest = events
208 .clone()
209 .map(|(count, _)| count)
210 .copied()
211 .max()
212 .unwrap_or_else(PduCount::max);
213
214 let receipts = services.read_receipt.readreceipts_since(
215 lazy_loading_context.room_id,
216 oldest.into_unsigned(),
217 Some(newest.into_unsigned()),
218 );
219
220 pin_mut!(receipts);
221 let witness: Witness = events
222 .stream()
223 .map(ref_at!(1))
224 .map(Event::sender)
225 .map(ToOwned::to_owned)
226 .chain(
227 receipts
228 .ready_take_while(|(_, c, _)| *c <= newest.into_unsigned())
229 .map(|(user_id, ..)| user_id.to_owned()),
230 )
231 .collect()
232 .await;
233
234 services
235 .lazy_loading
236 .witness_retain(witness, lazy_loading_context)
237 .await
238}
239
240async fn get_member_event(
241 services: &Services,
242 room_id: &RoomId,
243 user_id: &UserId,
244) -> Option<Raw<AnyStateEvent>> {
245 services
246 .state_accessor
247 .room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())
248 .map_ok(Event::into_format)
249 .await
250 .ok()
251}
252
253async fn event_filters(
254 services: &Services,
255 user_id: &UserId,
256 item: PdusIterItem,
257) -> Option<PdusIterItem> {
258 let item = ignored_filter(services, item, user_id).await?;
259 let item = visibility_filter(services, item, user_id).await?;
260
261 Some(item)
262}
263
264#[inline]
265pub(crate) async fn ignored_filter(
266 services: &Services,
267 item: PdusIterItem,
268 user_id: &UserId,
269) -> Option<PdusIterItem> {
270 let (_, ref pdu) = item;
271
272 is_ignored_pdu(services, pdu, user_id)
273 .await
274 .is_false()
275 .then_some(item)
276}
277
278#[inline]
279pub(crate) async fn is_ignored_pdu<Pdu>(
280 services: &Services,
281 event: &Pdu,
282 user_id: &UserId,
283) -> bool
284where
285 Pdu: Event,
286{
287 if event.kind().to_cow_str() == "org.matrix.dummy_event" {
290 return true;
291 }
292
293 if IGNORED_MESSAGE_TYPES
294 .binary_search(event.kind())
295 .is_err()
296 {
297 return false;
298 }
299
300 let ignored_server = services
301 .config
302 .is_forbidden_remote_server_name(event.sender().server_name());
303
304 ignored_server
305 || services
306 .users
307 .user_is_ignored(event.sender(), user_id)
308 .await
309}
310
311#[inline]
312pub(crate) async fn visibility_filter(
313 services: &Services,
314 item: PdusIterItem,
315 user_id: &UserId,
316) -> Option<PdusIterItem> {
317 let (_, pdu) = &item;
318
319 services
320 .state_accessor
321 .user_can_see_event(user_id, pdu.room_id(), pdu.event_id())
322 .await
323 .then_some(item)
324}
325
326#[inline]
327pub(crate) fn event_filter(item: PdusIterItem, filter: &RoomEventFilter) -> Option<PdusIterItem> {
328 let (_, pdu) = &item;
329 filter.matches(pdu).then_some(item)
330}
331
332#[inline]
337pub(crate) async fn annotate_membership(
338 services: &Services,
339 pdu: &mut PduEvent,
340 user_id: &UserId,
341 encrypted: bool,
342) {
343 if !encrypted {
344 return;
345 }
346
347 let membership = services
348 .state_accessor
349 .user_membership_at_pdu(user_id, pdu)
350 .await;
351
352 pdu.add_membership(&membership).log_err().ok();
353}
354
355#[inline]
357pub(crate) async fn with_membership(
358 services: &Services,
359 mut pdu: PduEvent,
360 user_id: &UserId,
361 encrypted: bool,
362) -> PduEvent {
363 annotate_membership(services, &mut pdu, user_id, encrypted).await;
364 pdu
365}
366
367#[inline]
369pub(crate) async fn add_membership_unsigned(
370 services: &Services,
371 (count, pdu): PdusIterItem,
372 user_id: &UserId,
373 encrypted: bool,
374) -> PdusIterItem {
375 (count, with_membership(services, pdu, user_id, encrypted).await)
376}
377
378#[cfg_attr(debug_assertions, tuwunel_core::ctor(unsafe))]
379fn _is_sorted() {
380 debug_assert!(
381 IGNORED_MESSAGE_TYPES.is_sorted(),
382 "IGNORED_MESSAGE_TYPES must be sorted by the developer"
383 );
384}