1use axum::extract::State;
2use futures::{FutureExt, StreamExt, TryFutureExt, future::Either, pin_mut};
3use ruma::{
4 RoomId, UserId,
5 api::{
6 Direction,
7 client::{filter::RoomEventFilter, message::get_message_events},
8 },
9 events::{AnyStateEvent, StateEventType, TimelineEventType, TimelineEventType::*},
10 serde::Raw,
11};
12use tuwunel_core::{
13 Err, Result, at,
14 matrix::{
15 event::{Event, Matches},
16 pdu::{PduCount, PduEvent},
17 },
18 ref_at,
19 utils::{
20 BoolExt, IterStream, ReadyExt,
21 result::{FlatOk, LogErr},
22 stream::{BroadbandExt, TryIgnore, WidebandExt},
23 },
24};
25use tuwunel_service::{
26 Services,
27 rooms::{
28 lazy_loading,
29 lazy_loading::{Options, Witness},
30 timeline::PdusIterItem,
31 },
32};
33
34use crate::Ruma;
35
36const IGNORED_MESSAGE_TYPES: &[TimelineEventType] = &[
39 CallInvite, KeyVerificationStart, Location, PollStart, Reaction, RoomEncrypted, RoomMessage, Sticker, Audio, Emote, File, Image, Video, Voice, UnstablePollStart, Beacon, CallNotify, ];
57
58const LIMIT_MAX: usize = 1000;
59const LIMIT_DEFAULT: usize = 10;
60
61pub(crate) async fn get_message_events_route(
68 State(services): State<crate::State>,
69 body: Ruma<get_message_events::v3::Request>,
70) -> Result<get_message_events::v3::Response> {
71 let sender_user = body.sender_user();
72 let sender_device = body.sender_device.as_deref();
73 let room_id = &body.room_id;
74 let filter = &body.filter;
75
76 if !services.metadata.exists(room_id).await {
77 return Err!(Request(Forbidden("Room does not exist to this server")));
78 }
79
80 let from: PduCount = body
81 .from
82 .as_deref()
83 .map(str::parse)
84 .transpose()?
85 .unwrap_or_else(|| match body.dir {
86 | Direction::Forward => PduCount::min(),
87 | Direction::Backward => PduCount::max(),
88 });
89
90 let to: Option<PduCount> = body.to.as_deref().map(str::parse).flat_ok();
91
92 let limit: usize = body
93 .limit
94 .try_into()
95 .unwrap_or(LIMIT_DEFAULT)
96 .min(LIMIT_MAX);
97
98 if matches!(body.dir, Direction::Backward) {
99 services
100 .timeline
101 .backfill_if_required(room_id, from)
102 .await
103 .log_err()
104 .ok();
105 }
106
107 let it = match body.dir {
108 | Direction::Forward => Either::Left(
109 services
110 .timeline
111 .pdus(Some(sender_user), room_id, Some(from))
112 .ignore_err(),
113 ),
114 | Direction::Backward => Either::Right(
115 services
116 .timeline
117 .pdus_rev(Some(sender_user), room_id, Some(from))
118 .ignore_err(),
119 ),
120 };
121
122 let encrypted = services
123 .state_accessor
124 .is_encrypted_room(room_id)
125 .await;
126
127 let events: Vec<_> = it
128 .ready_take_while(|(count, _)| Some(*count) != to)
129 .ready_filter_map(|item| event_filter(item, filter))
130 .wide_filter_map(|item| event_filters(&services, sender_user, item))
131 .take(limit)
132 .wide_then(|item| add_membership_unsigned(&services, item, sender_user, encrypted))
133 .collect()
134 .await;
135
136 let lazy_loading_context = lazy_loading::Context {
137 user_id: sender_user,
138 device_id: sender_device,
139 room_id,
140 token: Some(from.into_unsigned()),
141 options: Some(&filter.lazy_load_options),
142 mode: lazy_loading::Mode::Update,
143 };
144
145 let witness = filter
146 .lazy_load_options
147 .is_enabled()
148 .then_async(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter()));
149
150 let state = witness
151 .map(Option::into_iter)
152 .map(|option| option.flat_map(Witness::into_iter))
153 .map(IterStream::stream)
154 .into_stream()
155 .flatten()
156 .broad_filter_map(async |user_id| get_member_event(&services, room_id, &user_id).await)
157 .collect()
158 .await;
159
160 let next_token = events.last().map(at!(0));
161
162 let chunk = events
163 .into_iter()
164 .map(at!(1))
165 .map(Event::into_format)
166 .collect();
167
168 Ok(get_message_events::v3::Response {
169 start: from.to_string(),
170 end: next_token.as_ref().map(ToString::to_string),
171 chunk,
172 state,
173 })
174}
175
176pub(crate) async fn lazy_loading_witness<'a, I>(
177 services: &Services,
178 lazy_loading_context: &lazy_loading::Context<'_>,
179 events: I,
180) -> Witness
181where
182 I: Iterator<Item = &'a PdusIterItem> + Clone + Send,
183{
184 let oldest = events
185 .clone()
186 .map(|(count, _)| count)
187 .copied()
188 .min()
189 .unwrap_or_else(PduCount::max);
190
191 let newest = events
192 .clone()
193 .map(|(count, _)| count)
194 .copied()
195 .max()
196 .unwrap_or_else(PduCount::max);
197
198 let receipts = services.read_receipt.readreceipts_since(
199 lazy_loading_context.room_id,
200 oldest.into_unsigned(),
201 Some(newest.into_unsigned()),
202 );
203
204 pin_mut!(receipts);
205 let witness: Witness = events
206 .stream()
207 .map(ref_at!(1))
208 .map(Event::sender)
209 .map(ToOwned::to_owned)
210 .chain(
211 receipts
212 .ready_take_while(|(_, c, _)| *c <= newest.into_unsigned())
213 .map(|(user_id, ..)| user_id.to_owned()),
214 )
215 .collect()
216 .await;
217
218 services
219 .lazy_loading
220 .witness_retain(witness, lazy_loading_context)
221 .await
222}
223
224async fn get_member_event(
225 services: &Services,
226 room_id: &RoomId,
227 user_id: &UserId,
228) -> Option<Raw<AnyStateEvent>> {
229 services
230 .state_accessor
231 .room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())
232 .map_ok(Event::into_format)
233 .await
234 .ok()
235}
236
237async fn event_filters(
238 services: &Services,
239 user_id: &UserId,
240 item: PdusIterItem,
241) -> Option<PdusIterItem> {
242 let item = ignored_filter(services, item, user_id).await?;
243 let item = visibility_filter(services, item, user_id).await?;
244
245 Some(item)
246}
247
248#[inline]
249pub(crate) async fn ignored_filter(
250 services: &Services,
251 item: PdusIterItem,
252 user_id: &UserId,
253) -> Option<PdusIterItem> {
254 let (_, ref pdu) = item;
255
256 is_ignored_pdu(services, pdu, user_id)
257 .await
258 .is_false()
259 .then_some(item)
260}
261
262#[inline]
263pub(crate) async fn is_ignored_pdu<Pdu>(
264 services: &Services,
265 event: &Pdu,
266 user_id: &UserId,
267) -> bool
268where
269 Pdu: Event,
270{
271 if event.kind().to_cow_str() == "org.matrix.dummy_event" {
274 return true;
275 }
276
277 if IGNORED_MESSAGE_TYPES
278 .binary_search(event.kind())
279 .is_err()
280 {
281 return false;
282 }
283
284 let ignored_server = services
285 .config
286 .is_forbidden_remote_server_name(event.sender().server_name());
287
288 ignored_server
289 || services
290 .users
291 .user_is_ignored(event.sender(), user_id)
292 .await
293}
294
295#[inline]
296pub(crate) async fn visibility_filter(
297 services: &Services,
298 item: PdusIterItem,
299 user_id: &UserId,
300) -> Option<PdusIterItem> {
301 let (_, pdu) = &item;
302
303 services
304 .state_accessor
305 .user_can_see_event(user_id, pdu.room_id(), pdu.event_id())
306 .await
307 .then_some(item)
308}
309
310#[inline]
311pub(crate) fn event_filter(item: PdusIterItem, filter: &RoomEventFilter) -> Option<PdusIterItem> {
312 let (_, pdu) = &item;
313 filter.matches(pdu).then_some(item)
314}
315
316#[inline]
321pub(crate) async fn annotate_membership(
322 services: &Services,
323 pdu: &mut PduEvent,
324 user_id: &UserId,
325 encrypted: bool,
326) {
327 if !encrypted {
328 return;
329 }
330
331 let membership = services
332 .state_accessor
333 .user_membership_at_pdu(user_id, pdu)
334 .await;
335
336 pdu.add_membership(&membership).log_err().ok();
337}
338
339#[inline]
341pub(crate) async fn with_membership(
342 services: &Services,
343 mut pdu: PduEvent,
344 user_id: &UserId,
345 encrypted: bool,
346) -> PduEvent {
347 annotate_membership(services, &mut pdu, user_id, encrypted).await;
348 pdu
349}
350
351#[inline]
353pub(crate) async fn add_membership_unsigned(
354 services: &Services,
355 (count, pdu): PdusIterItem,
356 user_id: &UserId,
357 encrypted: bool,
358) -> PdusIterItem {
359 (count, with_membership(services, pdu, user_id, encrypted).await)
360}
361
362#[cfg_attr(debug_assertions, tuwunel_core::ctor)]
363fn _is_sorted() {
364 debug_assert!(
365 IGNORED_MESSAGE_TYPES.is_sorted(),
366 "IGNORED_MESSAGE_TYPES must be sorted by the developer"
367 );
368}