Skip to main content

tuwunel_api/client/
message.rs

1use axum::extract::State;
2use futures::{FutureExt, StreamExt, TryFutureExt, future::Either, pin_mut};
3use ruma::{
4	RoomId, UserId,
5	api::{
6		Direction,
7		client::{filter::RoomEventFilter, message::get_message_events},
8	},
9	events::{AnyStateEvent, StateEventType, TimelineEventType, TimelineEventType::*},
10	serde::Raw,
11};
12use tuwunel_core::{
13	Err, Result, at,
14	matrix::{
15		event::{Event, Matches},
16		pdu::{PduCount, PduEvent},
17	},
18	ref_at,
19	utils::{
20		BoolExt, IterStream, ReadyExt,
21		result::{FlatOk, LogErr},
22		stream::{BroadbandExt, TryIgnore, WidebandExt},
23	},
24};
25use tuwunel_service::{
26	Services,
27	rooms::{
28		lazy_loading,
29		lazy_loading::{Options, Witness},
30		timeline::PdusIterItem,
31	},
32};
33
34use crate::Ruma;
35
36/// list of safe and common non-state events to ignore if the user is ignored.
37/// MUST be sorted by `TimelineEventType::event_type_str()` for `binary_search`.
38const IGNORED_MESSAGE_TYPES: &[TimelineEventType] = &[
39	CallInvite,           // m.call.invite
40	KeyVerificationStart, // m.key.verification.start
41	Location,             // m.location
42	PollStart,            // m.poll.start
43	Reaction,             // m.reaction
44	RoomEncrypted,        // m.room.encrypted
45	RoomMessage,          // m.room.message
46	Sticker,              // m.sticker
47	Audio,                // org.matrix.msc1767.audio
48	Emote,                // org.matrix.msc1767.emote
49	File,                 // org.matrix.msc1767.file
50	Image,                // org.matrix.msc1767.image
51	Video,                // org.matrix.msc1767.video
52	Voice,                // org.matrix.msc3245.voice.v2
53	UnstablePollStart,    // org.matrix.msc3381.poll.start
54	Beacon,               // org.matrix.msc3672.beacon
55	CallNotify,           // org.matrix.msc4075.call.notify
56];
57
58const LIMIT_MAX: usize = 1000;
59const LIMIT_DEFAULT: usize = 10;
60
61/// # `GET /_matrix/client/r0/rooms/{roomId}/messages`
62///
63/// Allows paginating through room history.
64///
65/// - Only works if the user is joined (TODO: always allow, but only show events
66///   where the user was joined, depending on `history_visibility`)
67pub(crate) async fn get_message_events_route(
68	State(services): State<crate::State>,
69	body: Ruma<get_message_events::v3::Request>,
70) -> Result<get_message_events::v3::Response> {
71	let sender_user = body.sender_user();
72	let sender_device = body.sender_device.as_deref();
73	let room_id = &body.room_id;
74	let filter = &body.filter;
75
76	if !services.metadata.exists(room_id).await {
77		return Err!(Request(Forbidden("Room does not exist to this server")));
78	}
79
80	if !services
81		.state_accessor
82		.user_can_see_room(sender_user, room_id)
83		.await
84	{
85		return Err!(Request(Forbidden("You don't have permission to view this room.")));
86	}
87
88	let from: PduCount = body
89		.from
90		.as_deref()
91		.map(str::parse)
92		.transpose()?
93		.unwrap_or_else(|| match body.dir {
94			| Direction::Forward => PduCount::min(),
95			| Direction::Backward => PduCount::max(),
96		});
97
98	let to: Option<PduCount> = body.to.as_deref().map(str::parse).flat_ok();
99
100	let limit: usize = body
101		.limit
102		.try_into()
103		.unwrap_or(LIMIT_DEFAULT)
104		.min(LIMIT_MAX);
105
106	if matches!(body.dir, Direction::Backward) {
107		services
108			.timeline
109			.backfill_if_required(room_id, from)
110			.await
111			.log_err()
112			.ok();
113	}
114
115	let it = match body.dir {
116		| Direction::Forward => Either::Left(
117			services
118				.timeline
119				.pdus(Some(sender_user), room_id, Some(from))
120				.ignore_err(),
121		),
122		| Direction::Backward => Either::Right(
123			services
124				.timeline
125				.pdus_rev(Some(sender_user), room_id, Some(from))
126				.ignore_err(),
127		),
128	};
129
130	let encrypted = services
131		.state_accessor
132		.is_encrypted_room(room_id)
133		.await;
134
135	let events: Vec<_> = it
136		.ready_take_while(|(count, _)| Some(*count) != to)
137		.ready_filter_map(|item| event_filter(item, filter))
138		.wide_filter_map(|item| event_filters(&services, sender_user, item))
139		.take(limit)
140		.wide_then(|item| add_membership_unsigned(&services, item, sender_user, encrypted))
141		.wide_then(async |(count, pdu)| {
142			let pdu = services
143				.pdu_metadata
144				.bundle_aggregations(sender_user, pdu)
145				.await;
146
147			(count, pdu)
148		})
149		.collect()
150		.await;
151
152	let lazy_loading_context = lazy_loading::Context {
153		user_id: sender_user,
154		device_id: sender_device,
155		room_id,
156		token: Some(from.into_unsigned()),
157		options: Some(&filter.lazy_load_options),
158		mode: lazy_loading::Mode::Update,
159	};
160
161	let witness = filter
162		.lazy_load_options
163		.is_enabled()
164		.then_async(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter()));
165
166	let state = witness
167		.map(Option::into_iter)
168		.map(|option| option.flat_map(Witness::into_iter))
169		.map(IterStream::stream)
170		.into_stream()
171		.flatten()
172		.broad_filter_map(async |user_id| get_member_event(&services, room_id, &user_id).await)
173		.collect()
174		.await;
175
176	let next_token = events.last().map(at!(0));
177
178	let chunk = events
179		.into_iter()
180		.map(at!(1))
181		.map(Event::into_format)
182		.collect();
183
184	Ok(get_message_events::v3::Response {
185		start: from.to_string(),
186		end: next_token.as_ref().map(ToString::to_string),
187		chunk,
188		state,
189	})
190}
191
192pub(crate) async fn lazy_loading_witness<'a, I>(
193	services: &Services,
194	lazy_loading_context: &lazy_loading::Context<'_>,
195	events: I,
196) -> Witness
197where
198	I: Iterator<Item = &'a PdusIterItem> + Clone + Send,
199{
200	let oldest = events
201		.clone()
202		.map(|(count, _)| count)
203		.copied()
204		.min()
205		.unwrap_or_else(PduCount::max);
206
207	let newest = events
208		.clone()
209		.map(|(count, _)| count)
210		.copied()
211		.max()
212		.unwrap_or_else(PduCount::max);
213
214	let receipts = services.read_receipt.readreceipts_since(
215		lazy_loading_context.room_id,
216		oldest.into_unsigned(),
217		Some(newest.into_unsigned()),
218	);
219
220	pin_mut!(receipts);
221	let witness: Witness = events
222		.stream()
223		.map(ref_at!(1))
224		.map(Event::sender)
225		.map(ToOwned::to_owned)
226		.chain(
227			receipts
228				.ready_take_while(|(_, c, _)| *c <= newest.into_unsigned())
229				.map(|(user_id, ..)| user_id.to_owned()),
230		)
231		.collect()
232		.await;
233
234	services
235		.lazy_loading
236		.witness_retain(witness, lazy_loading_context)
237		.await
238}
239
240async fn get_member_event(
241	services: &Services,
242	room_id: &RoomId,
243	user_id: &UserId,
244) -> Option<Raw<AnyStateEvent>> {
245	services
246		.state_accessor
247		.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())
248		.map_ok(Event::into_format)
249		.await
250		.ok()
251}
252
253async fn event_filters(
254	services: &Services,
255	user_id: &UserId,
256	item: PdusIterItem,
257) -> Option<PdusIterItem> {
258	let item = ignored_filter(services, item, user_id).await?;
259	let item = visibility_filter(services, item, user_id).await?;
260
261	Some(item)
262}
263
264#[inline]
265pub(crate) async fn ignored_filter(
266	services: &Services,
267	item: PdusIterItem,
268	user_id: &UserId,
269) -> Option<PdusIterItem> {
270	let (_, ref pdu) = item;
271
272	is_ignored_pdu(services, pdu, user_id)
273		.await
274		.is_false()
275		.then_some(item)
276}
277
278#[inline]
279pub(crate) async fn is_ignored_pdu<Pdu>(
280	services: &Services,
281	event: &Pdu,
282	user_id: &UserId,
283) -> bool
284where
285	Pdu: Event,
286{
287	// exclude Synapse's dummy events from bloating up response bodies. clients
288	// don't need to see this.
289	if event.kind().to_cow_str() == "org.matrix.dummy_event" {
290		return true;
291	}
292
293	if IGNORED_MESSAGE_TYPES
294		.binary_search(event.kind())
295		.is_err()
296	{
297		return false;
298	}
299
300	let ignored_server = services
301		.config
302		.is_forbidden_remote_server_name(event.sender().server_name());
303
304	ignored_server
305		|| services
306			.users
307			.user_is_ignored(event.sender(), user_id)
308			.await
309}
310
311#[inline]
312pub(crate) async fn visibility_filter(
313	services: &Services,
314	item: PdusIterItem,
315	user_id: &UserId,
316) -> Option<PdusIterItem> {
317	let (_, pdu) = &item;
318
319	services
320		.state_accessor
321		.user_can_see_event(user_id, pdu.room_id(), pdu.event_id())
322		.await
323		.then_some(item)
324}
325
326#[inline]
327pub(crate) fn event_filter(item: PdusIterItem, filter: &RoomEventFilter) -> Option<PdusIterItem> {
328	let (_, pdu) = &item;
329	filter.matches(pdu).then_some(item)
330}
331
332/// MSC4115: stamp `unsigned.membership` on a served PDU with the requesting
333/// user's membership at the time of the event. The MSC permits omitting the
334/// property when calculating it is expensive, so the project restricts it to
335/// encrypted rooms where membership-vs-event ordering matters for key share.
336#[inline]
337pub(crate) async fn annotate_membership(
338	services: &Services,
339	pdu: &mut PduEvent,
340	user_id: &UserId,
341	encrypted: bool,
342) {
343	if !encrypted {
344		return;
345	}
346
347	let membership = services
348		.state_accessor
349		.user_membership_at_pdu(user_id, pdu)
350		.await;
351
352	pdu.add_membership(&membership).log_err().ok();
353}
354
355/// `annotate_membership` consume-and-return adapter for stream chains.
356#[inline]
357pub(crate) async fn with_membership(
358	services: &Services,
359	mut pdu: PduEvent,
360	user_id: &UserId,
361	encrypted: bool,
362) -> PduEvent {
363	annotate_membership(services, &mut pdu, user_id, encrypted).await;
364	pdu
365}
366
367/// `with_membership` adapter for timeline-iterator items.
368#[inline]
369pub(crate) async fn add_membership_unsigned(
370	services: &Services,
371	(count, pdu): PdusIterItem,
372	user_id: &UserId,
373	encrypted: bool,
374) -> PdusIterItem {
375	(count, with_membership(services, pdu, user_id, encrypted).await)
376}
377
378#[cfg_attr(debug_assertions, tuwunel_core::ctor(unsafe))]
379fn _is_sorted() {
380	debug_assert!(
381		IGNORED_MESSAGE_TYPES.is_sorted(),
382		"IGNORED_MESSAGE_TYPES must be sorted by the developer"
383	);
384}