Skip to main content

tuwunel_api/client/
context.rs

1use axum::extract::State;
2use futures::{
3	FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
4	future::{OptionFuture, join, join3, try_join3},
5};
6use ruma::{
7	EventId, OwnedEventId, RoomId, UserId,
8	api::client::{context::get_context, filter::RoomEventFilter},
9	events::{AnyStateEvent, StateEventType},
10	serde::Raw,
11};
12use tuwunel_core::{
13	Err, Event, Result, at, debug_warn, err,
14	matrix::pdu::{PduEvent, RawPduId},
15	ref_at,
16	utils::{
17		BoolExt, IterStream,
18		future::TryExtExt,
19		stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt},
20	},
21};
22use tuwunel_service::{
23	Services,
24	rooms::{
25		lazy_loading,
26		lazy_loading::{Options, Witness},
27		short::ShortStateKey,
28		timeline::PdusIterItem,
29	},
30};
31
32use crate::{
33	Ruma,
34	client::{
35		is_ignored_pdu,
36		message::{
37			add_membership_unsigned, event_filter, ignored_filter, lazy_loading_witness,
38			visibility_filter, with_membership,
39		},
40	},
41};
42
43const LIMIT_MAX: usize = 100;
44const LIMIT_DEFAULT: usize = 10;
45
46/// # `GET /_matrix/client/r0/rooms/{roomId}/context/{eventId}`
47///
48/// Allows loading room history around an event.
49///
50/// - Only works if the user is joined (TODO: always allow, but only show events
51///   if the user was joined, depending on history_visibility)
52pub(crate) async fn get_context_route(
53	State(services): State<crate::State>,
54	body: Ruma<get_context::v3::Request>,
55) -> Result<get_context::v3::Response> {
56	let sender_user = body.sender_user();
57	let sender_device = body.sender_device.as_deref();
58	let room_id = &body.room_id;
59	let event_id = &body.event_id;
60	let filter = &body.filter;
61
62	if !services.metadata.exists(room_id).await {
63		return Err!(Request(Forbidden("Room does not exist to this server")));
64	}
65
66	let limit: usize = body
67		.limit
68		.try_into()
69		.unwrap_or(LIMIT_DEFAULT)
70		.min(LIMIT_MAX);
71
72	let (base_id, base_pdu) =
73		resolve_base_event(&services, room_id, event_id, sender_user).await?;
74	let base_count = base_id.pdu_count();
75
76	let encrypted = services
77		.state_accessor
78		.is_encrypted_room(room_id)
79		.await;
80
81	let base_event = async {
82		let item = ignored_filter(&services, (base_count, base_pdu), sender_user).await?;
83		Some(add_membership_unsigned(&services, item, sender_user, encrypted).await)
84	};
85
86	let events_before = collect_timeline_half(
87		&services,
88		services
89			.timeline
90			.pdus_rev(Some(sender_user), room_id, Some(base_count)),
91		filter,
92		sender_user,
93		encrypted,
94		limit / 2,
95	);
96
97	let events_after = collect_timeline_half(
98		&services,
99		services
100			.timeline
101			.pdus(Some(sender_user), room_id, Some(base_count)),
102		filter,
103		sender_user,
104		encrypted,
105		limit.div_ceil(2),
106	);
107
108	let (base_event, events_before, events_after): (_, Vec<_>, Vec<_>) =
109		join3(base_event, events_before, events_after)
110			.boxed()
111			.await;
112
113	let lazy_loading_context = lazy_loading::Context {
114		user_id: sender_user,
115		device_id: sender_device,
116		room_id,
117		token: Some(base_count.into_unsigned()),
118		options: Some(&filter.lazy_load_options),
119		mode: lazy_loading::Mode::Update,
120	};
121
122	let lazy_loading_witnessed = filter
123		.lazy_load_options
124		.is_enabled()
125		.then_async(|| {
126			let witnessed = base_event
127				.iter()
128				.chain(events_before.iter())
129				.chain(events_after.iter());
130
131			lazy_loading_witness(&services, &lazy_loading_context, witnessed)
132		});
133
134	let state_at = events_after
135		.last()
136		.map(ref_at!(1))
137		.map_or_else(|| body.event_id.as_ref(), |pdu| pdu.event_id.as_ref());
138
139	let (lazy_loading_witnessed, state_ids) =
140		join(lazy_loading_witnessed, load_state_ids(&services, room_id, state_at)).await;
141
142	let state = build_state_response(
143		&services,
144		state_ids?,
145		lazy_loading_witnessed.unwrap_or_default(),
146		filter,
147		sender_user,
148		encrypted,
149	)
150	.await;
151
152	let event = OptionFuture::from(base_event.map(at!(1)).map(|pdu| {
153		services
154			.pdu_metadata
155			.bundle_aggregations(sender_user, pdu)
156	}))
157	.await
158	.map(Event::into_format);
159
160	Ok(get_context::v3::Response {
161		event,
162
163		start: events_before
164			.last()
165			.map(at!(0))
166			.or(Some(base_count))
167			.as_ref()
168			.map(ToString::to_string),
169
170		// `end` is one past the base so a backward page from it still yields the base;
171		// `start` stays at `base_count` (a bare count can't suit both directions).
172		end: events_after
173			.last()
174			.map(at!(0))
175			.or_else(|| Some(base_count.saturating_add(1)))
176			.as_ref()
177			.map(ToString::to_string),
178
179		events_before: events_before
180			.into_iter()
181			.map(at!(1))
182			.map(Event::into_format)
183			.collect(),
184
185		events_after: events_after
186			.into_iter()
187			.map(at!(1))
188			.map(Event::into_format)
189			.collect(),
190
191		state,
192	})
193}
194
195async fn resolve_base_event(
196	services: &Services,
197	room_id: &RoomId,
198	event_id: &EventId,
199	sender_user: &UserId,
200) -> Result<(RawPduId, PduEvent)> {
201	let lookup = || {
202		let base_id = services
203			.timeline
204			.get_pdu_id(event_id)
205			.map_err(|_| err!(Request(NotFound("Event not found."))));
206
207		let base_pdu = services
208			.timeline
209			.get_pdu(event_id)
210			.map_err(|_| err!(Request(NotFound("Base event not found."))));
211
212		let visible = services
213			.state_accessor
214			.user_can_see_event(sender_user, room_id, event_id)
215			.map(Ok);
216
217		try_join3(base_id, base_pdu, visible)
218	};
219
220	let resolve_remote = services
221		.config
222		.fetch_unreceived_contexts_over_federation
223		&& services.config.allow_federation;
224
225	let (base_id, base_pdu, visible) = match lookup().await {
226		| Ok(found) => found,
227		| Err(e) if !resolve_remote => return Err(e),
228		| Err(_) => {
229			services
230				.timeline
231				.fetch_remote_event(room_id, event_id)
232				.await
233				.ok();
234
235			lookup().await?
236		},
237	};
238
239	if base_pdu.room_id != *room_id || base_pdu.event_id != *event_id {
240		return Err!(Request(NotFound("Base event not found.")));
241	}
242
243	if !visible {
244		debug_warn!(
245			req_evt = ?event_id, ?base_id, ?room_id,
246			"Event requested by {sender_user} but is not allowed to see it."
247		);
248
249		return Err!(Request(NotFound("Event not found.")));
250	}
251
252	if is_ignored_pdu(services, &base_pdu, sender_user).await {
253		return Err!(HttpJson(NOT_FOUND, {
254			"errcode": "M_SENDER_IGNORED",
255			"error": "You have ignored the user that sent this event",
256			"sender": base_pdu.sender().as_str(),
257		}));
258	}
259
260	Ok((base_id, base_pdu))
261}
262
263async fn collect_timeline_half<'a, S>(
264	services: &'a Services,
265	pdus: S,
266	filter: &'a RoomEventFilter,
267	sender_user: &'a UserId,
268	encrypted: bool,
269	take: usize,
270) -> Vec<PdusIterItem>
271where
272	S: Stream<Item = Result<PdusIterItem>> + Send + 'a,
273{
274	pdus.ignore_err()
275		.ready_filter_map(|item| event_filter(item, filter))
276		.wide_filter_map(|item| ignored_filter(services, item, sender_user))
277		.wide_filter_map(|item| visibility_filter(services, item, sender_user))
278		.take(take)
279		.wide_then(|item| add_membership_unsigned(services, item, sender_user, encrypted))
280		.wide_then(async |(count, pdu)| {
281			let pdu = services
282				.pdu_metadata
283				.bundle_aggregations(sender_user, pdu)
284				.await;
285
286			(count, pdu)
287		})
288		.collect()
289		.await
290}
291
292async fn load_state_ids(
293	services: &Services,
294	room_id: &RoomId,
295	state_at: &EventId,
296) -> Result<Vec<(ShortStateKey, OwnedEventId)>> {
297	services
298		.state
299		.pdu_shortstatehash(state_at)
300		.or_else(|_| services.state.get_room_shortstatehash(room_id))
301		.map_ok(|shortstatehash| {
302			services
303				.state_accessor
304				.state_full_ids(shortstatehash)
305				.map(Ok)
306		})
307		.map_err(|e| err!(Database("State not found: {e}")))
308		.try_flatten_stream()
309		.try_collect()
310		.boxed()
311		.await
312}
313
314async fn build_state_response(
315	services: &Services,
316	state_ids: Vec<(ShortStateKey, OwnedEventId)>,
317	lazy_loading_witnessed: Witness,
318	filter: &RoomEventFilter,
319	sender_user: &UserId,
320	encrypted: bool,
321) -> Vec<Raw<AnyStateEvent>> {
322	let shortstatekeys = state_ids.iter().map(at!(0)).stream();
323	let shorteventids = state_ids.iter().map(ref_at!(1)).stream();
324
325	services
326		.short
327		.multi_get_statekey_from_short(shortstatekeys)
328		.zip(shorteventids)
329		.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
330		.ready_filter_map(|((event_type, state_key), event_id)| {
331			if filter.lazy_load_options.is_enabled()
332				&& event_type == StateEventType::RoomMember
333				&& state_key
334					.as_str()
335					.try_into()
336					.is_ok_and(|user_id: &UserId| !lazy_loading_witnessed.contains(user_id))
337			{
338				return None;
339			}
340
341			Some(event_id)
342		})
343		.broad_filter_map(|event_id: &OwnedEventId| {
344			services.timeline.get_pdu(event_id.as_ref()).ok()
345		})
346		.broad_then(|pdu| with_membership(services, pdu, sender_user, encrypted))
347		.map(Event::into_format)
348		.collect()
349		.await
350}