Skip to main content

tuwunel_api/client/
message.rs

1use axum::extract::State;
2use futures::{FutureExt, StreamExt, TryFutureExt, future::Either, pin_mut};
3use ruma::{
4	RoomId, UserId,
5	api::{
6		Direction,
7		client::{filter::RoomEventFilter, message::get_message_events},
8	},
9	events::{AnyStateEvent, StateEventType, TimelineEventType, TimelineEventType::*},
10	serde::Raw,
11};
12use tuwunel_core::{
13	Err, Result, at,
14	matrix::{
15		event::{Event, Matches},
16		pdu::{PduCount, PduEvent},
17	},
18	ref_at,
19	utils::{
20		BoolExt, IterStream, ReadyExt,
21		result::{FlatOk, LogErr},
22		stream::{BroadbandExt, TryIgnore, WidebandExt},
23	},
24};
25use tuwunel_service::{
26	Services,
27	rooms::{
28		lazy_loading,
29		lazy_loading::{Options, Witness},
30		timeline::PdusIterItem,
31	},
32};
33
34use crate::Ruma;
35
36/// list of safe and common non-state events to ignore if the user is ignored.
37/// MUST be sorted by `TimelineEventType::event_type_str()` for `binary_search`.
38const IGNORED_MESSAGE_TYPES: &[TimelineEventType] = &[
39	CallInvite,           // m.call.invite
40	KeyVerificationStart, // m.key.verification.start
41	Location,             // m.location
42	PollStart,            // m.poll.start
43	Reaction,             // m.reaction
44	RoomEncrypted,        // m.room.encrypted
45	RoomMessage,          // m.room.message
46	Sticker,              // m.sticker
47	Audio,                // org.matrix.msc1767.audio
48	Emote,                // org.matrix.msc1767.emote
49	File,                 // org.matrix.msc1767.file
50	Image,                // org.matrix.msc1767.image
51	Video,                // org.matrix.msc1767.video
52	Voice,                // org.matrix.msc3245.voice.v2
53	UnstablePollStart,    // org.matrix.msc3381.poll.start
54	Beacon,               // org.matrix.msc3672.beacon
55	CallNotify,           // org.matrix.msc4075.call.notify
56];
57
58const LIMIT_MAX: usize = 1000;
59const LIMIT_DEFAULT: usize = 10;
60
61/// # `GET /_matrix/client/r0/rooms/{roomId}/messages`
62///
63/// Allows paginating through room history.
64///
65/// - Only works if the user is joined (TODO: always allow, but only show events
66///   where the user was joined, depending on `history_visibility`)
67pub(crate) async fn get_message_events_route(
68	State(services): State<crate::State>,
69	body: Ruma<get_message_events::v3::Request>,
70) -> Result<get_message_events::v3::Response> {
71	let sender_user = body.sender_user();
72	let sender_device = body.sender_device.as_deref();
73	let room_id = &body.room_id;
74	let filter = &body.filter;
75
76	if !services.metadata.exists(room_id).await {
77		return Err!(Request(Forbidden("Room does not exist to this server")));
78	}
79
80	let from: PduCount = body
81		.from
82		.as_deref()
83		.map(str::parse)
84		.transpose()?
85		.unwrap_or_else(|| match body.dir {
86			| Direction::Forward => PduCount::min(),
87			| Direction::Backward => PduCount::max(),
88		});
89
90	let to: Option<PduCount> = body.to.as_deref().map(str::parse).flat_ok();
91
92	let limit: usize = body
93		.limit
94		.try_into()
95		.unwrap_or(LIMIT_DEFAULT)
96		.min(LIMIT_MAX);
97
98	if matches!(body.dir, Direction::Backward) {
99		services
100			.timeline
101			.backfill_if_required(room_id, from)
102			.await
103			.log_err()
104			.ok();
105	}
106
107	let it = match body.dir {
108		| Direction::Forward => Either::Left(
109			services
110				.timeline
111				.pdus(Some(sender_user), room_id, Some(from))
112				.ignore_err(),
113		),
114		| Direction::Backward => Either::Right(
115			services
116				.timeline
117				.pdus_rev(Some(sender_user), room_id, Some(from))
118				.ignore_err(),
119		),
120	};
121
122	let encrypted = services
123		.state_accessor
124		.is_encrypted_room(room_id)
125		.await;
126
127	let events: Vec<_> = it
128		.ready_take_while(|(count, _)| Some(*count) != to)
129		.ready_filter_map(|item| event_filter(item, filter))
130		.wide_filter_map(|item| event_filters(&services, sender_user, item))
131		.take(limit)
132		.wide_then(|item| add_membership_unsigned(&services, item, sender_user, encrypted))
133		.collect()
134		.await;
135
136	let lazy_loading_context = lazy_loading::Context {
137		user_id: sender_user,
138		device_id: sender_device,
139		room_id,
140		token: Some(from.into_unsigned()),
141		options: Some(&filter.lazy_load_options),
142		mode: lazy_loading::Mode::Update,
143	};
144
145	let witness = filter
146		.lazy_load_options
147		.is_enabled()
148		.then_async(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter()));
149
150	let state = witness
151		.map(Option::into_iter)
152		.map(|option| option.flat_map(Witness::into_iter))
153		.map(IterStream::stream)
154		.into_stream()
155		.flatten()
156		.broad_filter_map(async |user_id| get_member_event(&services, room_id, &user_id).await)
157		.collect()
158		.await;
159
160	let next_token = events.last().map(at!(0));
161
162	let chunk = events
163		.into_iter()
164		.map(at!(1))
165		.map(Event::into_format)
166		.collect();
167
168	Ok(get_message_events::v3::Response {
169		start: from.to_string(),
170		end: next_token.as_ref().map(ToString::to_string),
171		chunk,
172		state,
173	})
174}
175
176pub(crate) async fn lazy_loading_witness<'a, I>(
177	services: &Services,
178	lazy_loading_context: &lazy_loading::Context<'_>,
179	events: I,
180) -> Witness
181where
182	I: Iterator<Item = &'a PdusIterItem> + Clone + Send,
183{
184	let oldest = events
185		.clone()
186		.map(|(count, _)| count)
187		.copied()
188		.min()
189		.unwrap_or_else(PduCount::max);
190
191	let newest = events
192		.clone()
193		.map(|(count, _)| count)
194		.copied()
195		.max()
196		.unwrap_or_else(PduCount::max);
197
198	let receipts = services.read_receipt.readreceipts_since(
199		lazy_loading_context.room_id,
200		oldest.into_unsigned(),
201		Some(newest.into_unsigned()),
202	);
203
204	pin_mut!(receipts);
205	let witness: Witness = events
206		.stream()
207		.map(ref_at!(1))
208		.map(Event::sender)
209		.map(ToOwned::to_owned)
210		.chain(
211			receipts
212				.ready_take_while(|(_, c, _)| *c <= newest.into_unsigned())
213				.map(|(user_id, ..)| user_id.to_owned()),
214		)
215		.collect()
216		.await;
217
218	services
219		.lazy_loading
220		.witness_retain(witness, lazy_loading_context)
221		.await
222}
223
224async fn get_member_event(
225	services: &Services,
226	room_id: &RoomId,
227	user_id: &UserId,
228) -> Option<Raw<AnyStateEvent>> {
229	services
230		.state_accessor
231		.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())
232		.map_ok(Event::into_format)
233		.await
234		.ok()
235}
236
237async fn event_filters(
238	services: &Services,
239	user_id: &UserId,
240	item: PdusIterItem,
241) -> Option<PdusIterItem> {
242	let item = ignored_filter(services, item, user_id).await?;
243	let item = visibility_filter(services, item, user_id).await?;
244
245	Some(item)
246}
247
248#[inline]
249pub(crate) async fn ignored_filter(
250	services: &Services,
251	item: PdusIterItem,
252	user_id: &UserId,
253) -> Option<PdusIterItem> {
254	let (_, ref pdu) = item;
255
256	is_ignored_pdu(services, pdu, user_id)
257		.await
258		.is_false()
259		.then_some(item)
260}
261
262#[inline]
263pub(crate) async fn is_ignored_pdu<Pdu>(
264	services: &Services,
265	event: &Pdu,
266	user_id: &UserId,
267) -> bool
268where
269	Pdu: Event,
270{
271	// exclude Synapse's dummy events from bloating up response bodies. clients
272	// don't need to see this.
273	if event.kind().to_cow_str() == "org.matrix.dummy_event" {
274		return true;
275	}
276
277	if IGNORED_MESSAGE_TYPES
278		.binary_search(event.kind())
279		.is_err()
280	{
281		return false;
282	}
283
284	let ignored_server = services
285		.config
286		.is_forbidden_remote_server_name(event.sender().server_name());
287
288	ignored_server
289		|| services
290			.users
291			.user_is_ignored(event.sender(), user_id)
292			.await
293}
294
295#[inline]
296pub(crate) async fn visibility_filter(
297	services: &Services,
298	item: PdusIterItem,
299	user_id: &UserId,
300) -> Option<PdusIterItem> {
301	let (_, pdu) = &item;
302
303	services
304		.state_accessor
305		.user_can_see_event(user_id, pdu.room_id(), pdu.event_id())
306		.await
307		.then_some(item)
308}
309
310#[inline]
311pub(crate) fn event_filter(item: PdusIterItem, filter: &RoomEventFilter) -> Option<PdusIterItem> {
312	let (_, pdu) = &item;
313	filter.matches(pdu).then_some(item)
314}
315
316/// MSC4115: stamp `unsigned.membership` on a served PDU with the requesting
317/// user's membership at the time of the event. The MSC permits omitting the
318/// property when calculating it is expensive, so the project restricts it to
319/// encrypted rooms where membership-vs-event ordering matters for key share.
320#[inline]
321pub(crate) async fn annotate_membership(
322	services: &Services,
323	pdu: &mut PduEvent,
324	user_id: &UserId,
325	encrypted: bool,
326) {
327	if !encrypted {
328		return;
329	}
330
331	let membership = services
332		.state_accessor
333		.user_membership_at_pdu(user_id, pdu)
334		.await;
335
336	pdu.add_membership(&membership).log_err().ok();
337}
338
339/// `annotate_membership` consume-and-return adapter for stream chains.
340#[inline]
341pub(crate) async fn with_membership(
342	services: &Services,
343	mut pdu: PduEvent,
344	user_id: &UserId,
345	encrypted: bool,
346) -> PduEvent {
347	annotate_membership(services, &mut pdu, user_id, encrypted).await;
348	pdu
349}
350
351/// `with_membership` adapter for timeline-iterator items.
352#[inline]
353pub(crate) async fn add_membership_unsigned(
354	services: &Services,
355	(count, pdu): PdusIterItem,
356	user_id: &UserId,
357	encrypted: bool,
358) -> PdusIterItem {
359	(count, with_membership(services, pdu, user_id, encrypted).await)
360}
361
362#[cfg_attr(debug_assertions, tuwunel_core::ctor)]
363fn _is_sorted() {
364	debug_assert!(
365		IGNORED_MESSAGE_TYPES.is_sorted(),
366		"IGNORED_MESSAGE_TYPES must be sorted by the developer"
367	);
368}