Skip to main content

tuwunel_api/client/
context.rs

1use axum::extract::State;
2use futures::{
3	FutureExt, StreamExt, TryFutureExt, TryStreamExt,
4	future::{join, join3, try_join3},
5};
6use ruma::{OwnedEventId, UserId, api::client::context::get_context, events::StateEventType};
7use tuwunel_core::{
8	Err, Event, Result, at, debug_warn, err, ref_at,
9	utils::{
10		BoolExt, IterStream,
11		future::TryExtExt,
12		stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt},
13	},
14};
15use tuwunel_service::rooms::{lazy_loading, lazy_loading::Options, short::ShortStateKey};
16
17use crate::{
18	Ruma,
19	client::{
20		is_ignored_pdu,
21		message::{
22			add_membership_unsigned, event_filter, ignored_filter, lazy_loading_witness,
23			visibility_filter, with_membership,
24		},
25	},
26};
27
28const LIMIT_MAX: usize = 100;
29const LIMIT_DEFAULT: usize = 10;
30
31/// # `GET /_matrix/client/r0/rooms/{roomId}/context/{eventId}`
32///
33/// Allows loading room history around an event.
34///
35/// - Only works if the user is joined (TODO: always allow, but only show events
36///   if the user was joined, depending on history_visibility)
37pub(crate) async fn get_context_route(
38	State(services): State<crate::State>,
39	body: Ruma<get_context::v3::Request>,
40) -> Result<get_context::v3::Response> {
41	let sender_user = body.sender_user();
42	let sender_device = body.sender_device.as_deref();
43	let room_id = &body.room_id;
44	let event_id = &body.event_id;
45	let filter = &body.filter;
46
47	if !services.metadata.exists(room_id).await {
48		return Err!(Request(Forbidden("Room does not exist to this server")));
49	}
50
51	// Use limit or else 10, with maximum 100
52	let limit: usize = body
53		.limit
54		.try_into()
55		.unwrap_or(LIMIT_DEFAULT)
56		.min(LIMIT_MAX);
57
58	let base_id = services
59		.timeline
60		.get_pdu_id(event_id)
61		.map_err(|_| err!(Request(NotFound("Event not found."))));
62
63	let base_pdu = services
64		.timeline
65		.get_pdu(event_id)
66		.map_err(|_| err!(Request(NotFound("Base event not found."))));
67
68	let visible = services
69		.state_accessor
70		.user_can_see_event(sender_user, room_id, event_id)
71		.map(Ok);
72
73	let (base_id, base_pdu, visible) = try_join3(base_id, base_pdu, visible).await?;
74
75	if base_pdu.room_id != *room_id || base_pdu.event_id != *event_id {
76		return Err!(Request(NotFound("Base event not found.")));
77	}
78
79	if !visible {
80		debug_warn!(
81			req_evt = ?event_id, ?base_id, ?room_id,
82			"Event requested by {sender_user} but is not allowed to see it."
83		);
84
85		return Err!(Request(NotFound("Event not found.")));
86	}
87
88	if is_ignored_pdu(&services, &base_pdu, sender_user).await {
89		return Err!(HttpJson(NOT_FOUND, {
90			"errcode": "M_SENDER_IGNORED",
91			"error": "You have ignored the user that sent this event",
92			"sender": base_pdu.sender().as_str(),
93		}));
94	}
95
96	let base_count = base_id.pdu_count();
97
98	let encrypted = services
99		.state_accessor
100		.is_encrypted_room(room_id)
101		.await;
102
103	let base_event = async {
104		let item = ignored_filter(&services, (base_count, base_pdu), sender_user).await?;
105		Some(add_membership_unsigned(&services, item, sender_user, encrypted).await)
106	};
107
108	let events_before = services
109		.timeline
110		.pdus_rev(Some(sender_user), room_id, Some(base_count))
111		.ignore_err()
112		.ready_filter_map(|item| event_filter(item, filter))
113		.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
114		.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
115		.take(limit / 2)
116		.wide_then(|item| add_membership_unsigned(&services, item, sender_user, encrypted))
117		.collect();
118
119	let events_after = services
120		.timeline
121		.pdus(Some(sender_user), room_id, Some(base_count))
122		.ignore_err()
123		.ready_filter_map(|item| event_filter(item, filter))
124		.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
125		.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
126		.take(limit.div_ceil(2))
127		.wide_then(|item| add_membership_unsigned(&services, item, sender_user, encrypted))
128		.collect();
129
130	let (base_event, events_before, events_after): (_, Vec<_>, Vec<_>) =
131		join3(base_event, events_before, events_after)
132			.boxed()
133			.await;
134
135	let lazy_loading_context = lazy_loading::Context {
136		user_id: sender_user,
137		device_id: sender_device,
138		room_id,
139		token: Some(base_count.into_unsigned()),
140		options: Some(&filter.lazy_load_options),
141		mode: lazy_loading::Mode::Update,
142	};
143
144	let lazy_loading_witnessed = filter
145		.lazy_load_options
146		.is_enabled()
147		.then_async(|| {
148			let witnessed = base_event
149				.iter()
150				.chain(events_before.iter())
151				.chain(events_after.iter());
152
153			lazy_loading_witness(&services, &lazy_loading_context, witnessed)
154		});
155
156	let state_at = events_after
157		.last()
158		.map(ref_at!(1))
159		.map_or_else(|| body.event_id.as_ref(), |pdu| pdu.event_id.as_ref());
160
161	let state_ids = services
162		.state
163		.pdu_shortstatehash(state_at)
164		.or_else(|_| services.state.get_room_shortstatehash(room_id))
165		.map_ok(|shortstatehash| {
166			services
167				.state_accessor
168				.state_full_ids(shortstatehash)
169				.map(Ok)
170		})
171		.map_err(|e| err!(Database("State not found: {e}")))
172		.try_flatten_stream()
173		.try_collect()
174		.boxed();
175
176	let (lazy_loading_witnessed, state_ids) = join(lazy_loading_witnessed, state_ids).await;
177
178	let state_ids: Vec<(ShortStateKey, OwnedEventId)> = state_ids?;
179	let shortstatekeys = state_ids.iter().map(at!(0)).stream();
180	let shorteventids = state_ids.iter().map(ref_at!(1)).stream();
181	let lazy_loading_witnessed = lazy_loading_witnessed.unwrap_or_default();
182	let state: Vec<_> = services
183		.short
184		.multi_get_statekey_from_short(shortstatekeys)
185		.zip(shorteventids)
186		.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
187		.ready_filter_map(|((event_type, state_key), event_id)| {
188			if filter.lazy_load_options.is_enabled()
189				&& event_type == StateEventType::RoomMember
190				&& state_key
191					.as_str()
192					.try_into()
193					.is_ok_and(|user_id: &UserId| !lazy_loading_witnessed.contains(user_id))
194			{
195				return None;
196			}
197
198			Some(event_id)
199		})
200		.broad_filter_map(|event_id: &OwnedEventId| {
201			services.timeline.get_pdu(event_id.as_ref()).ok()
202		})
203		.broad_then(|pdu| with_membership(&services, pdu, sender_user, encrypted))
204		.map(Event::into_format)
205		.collect()
206		.await;
207
208	Ok(get_context::v3::Response {
209		event: base_event.map(at!(1)).map(Event::into_format),
210
211		start: events_before
212			.last()
213			.map(at!(0))
214			.or(Some(base_count))
215			.as_ref()
216			.map(ToString::to_string),
217
218		end: events_after
219			.last()
220			.map(at!(0))
221			.or(Some(base_count))
222			.as_ref()
223			.map(ToString::to_string),
224
225		events_before: events_before
226			.into_iter()
227			.map(at!(1))
228			.map(Event::into_format)
229			.collect(),
230
231		events_after: events_after
232			.into_iter()
233			.map(at!(1))
234			.map(Event::into_format)
235			.collect(),
236
237		state,
238	})
239}