Skip to main content

tuwunel_api/client/
search.rs

1use std::collections::BTreeMap;
2
3use axum::extract::State;
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use ruma::{
6	OwnedRoomId, RoomId, UInt, UserId,
7	api::client::search::search_events::{
8		self,
9		v3::{Criteria, EventContextResult, ResultCategories, ResultRoomEvents, SearchResult},
10	},
11	events::AnyStateEvent,
12	serde::Raw,
13};
14use search_events::v3::{Request, Response};
15use tuwunel_core::{
16	Err, Result, at, is_true,
17	matrix::Event,
18	result::FlatOk,
19	utils::{IterStream, option::OptionExt, stream::ReadyExt},
20};
21use tuwunel_service::{Services, rooms::search::RoomQuery};
22
23use crate::Ruma;
24
25type RoomStates = BTreeMap<OwnedRoomId, RoomState>;
26type RoomState = Vec<Raw<AnyStateEvent>>;
27
28const LIMIT_DEFAULT: usize = 10;
29const LIMIT_MAX: usize = 100;
30const BATCH_MAX: usize = 20;
31
32/// # `POST /_matrix/client/r0/search`
33///
34/// Searches rooms for messages.
35///
36/// - Only works if the user is currently joined to the room (TODO: Respect
37///   history visibility)
38pub(crate) async fn search_events_route(
39	State(services): State<crate::State>,
40	body: Ruma<Request>,
41) -> Result<Response> {
42	let sender_user = body.sender_user();
43	let next_batch = body.next_batch.as_deref();
44	let room_events = body
45		.search_categories
46		.room_events
47		.as_ref()
48		.map_async(|criteria| category_room_events(&services, sender_user, next_batch, criteria))
49		.await
50		.transpose()?;
51
52	Ok(Response {
53		search_categories: ResultCategories {
54			room_events: room_events.unwrap_or_default(),
55		},
56	})
57}
58
59#[expect(clippy::map_unwrap_or)]
60async fn category_room_events(
61	services: &Services,
62	sender_user: &UserId,
63	next_batch: Option<&str>,
64	criteria: &Criteria,
65) -> Result<ResultRoomEvents> {
66	let filter = &criteria.filter;
67
68	let limit: usize = filter
69		.limit
70		.map(TryInto::try_into)
71		.flat_ok()
72		.unwrap_or(LIMIT_DEFAULT)
73		.min(LIMIT_MAX);
74
75	let next_batch: usize = next_batch
76		.map(str::parse)
77		.transpose()?
78		.unwrap_or(0)
79		.min(limit.saturating_mul(BATCH_MAX));
80
81	let rooms = filter
82		.rooms
83		.clone()
84		.map(IntoIterator::into_iter)
85		.map(IterStream::stream)
86		.map(StreamExt::boxed)
87		.unwrap_or_else(|| {
88			services
89				.state_cache
90				.rooms_joined(sender_user)
91				.map(ToOwned::to_owned)
92				.boxed()
93		});
94
95	let results: Vec<_> = rooms
96		.filter_map(async |room_id| {
97			check_room_visible(services, sender_user, &room_id, criteria)
98				.await
99				.is_ok()
100				.then_some(room_id)
101		})
102		.filter_map(async |room_id| {
103			let query = RoomQuery {
104				room_id: &room_id,
105				user_id: Some(sender_user),
106				criteria,
107				skip: next_batch,
108				limit,
109			};
110
111			let (count, results) = services.search.search_pdus(&query).await.ok()?;
112
113			results
114				.collect::<Vec<_>>()
115				.map(|results| (room_id.clone(), count, results))
116				.map(Some)
117				.await
118		})
119		.collect()
120		.await;
121
122	let total: UInt = results
123		.iter()
124		.fold(0, |a: usize, (_, count, _)| a.saturating_add(*count))
125		.try_into()?;
126
127	let state: RoomStates = results
128		.iter()
129		.stream()
130		.ready_filter(|_| criteria.include_state.is_some_and(is_true!()))
131		.filter_map(async |(room_id, ..)| {
132			procure_room_state(services, room_id)
133				.map_ok(|state| (room_id.clone(), state))
134				.await
135				.ok()
136		})
137		.collect()
138		.await;
139
140	let results: Vec<SearchResult> = results
141		.into_iter()
142		.map(at!(2))
143		.flatten()
144		.stream()
145		.map(Event::into_format)
146		.map(|result| SearchResult {
147			rank: None,
148			result: Some(result),
149			context: EventContextResult {
150				profile_info: BTreeMap::new(), //TODO
151				events_after: Vec::new(),      //TODO
152				events_before: Vec::new(),     //TODO
153				start: None,                   //TODO
154				end: None,                     //TODO
155			},
156		})
157		.collect()
158		.await;
159
160	let highlights = criteria
161		.search_term
162		.split_terminator(|c: char| !c.is_alphanumeric())
163		.map(str::to_lowercase)
164		.collect();
165
166	let next_batch = (results.len() >= limit)
167		.then_some(next_batch.saturating_add(results.len()))
168		.as_ref()
169		.map(ToString::to_string);
170
171	Ok(ResultRoomEvents {
172		count: Some(total),
173		next_batch,
174		results,
175		state,
176		highlights,
177		groups: Default::default(), // TODO
178	})
179}
180
181async fn procure_room_state(services: &Services, room_id: &RoomId) -> Result<RoomState> {
182	let state = services
183		.state_accessor
184		.room_state_full_pdus(room_id)
185		.map_ok(Event::into_format)
186		.try_collect()
187		.await?;
188
189	Ok(state)
190}
191
192async fn check_room_visible(
193	services: &Services,
194	user_id: &UserId,
195	room_id: &RoomId,
196	search: &Criteria,
197) -> Result {
198	let check_visible = search.filter.rooms.is_some();
199	let check_state = check_visible && search.include_state.is_some_and(is_true!());
200
201	let is_joined = !check_visible
202		|| services
203			.state_cache
204			.is_joined(user_id, room_id)
205			.await;
206
207	let state_visible = !check_state
208		|| services
209			.state_accessor
210			.user_can_see_state_events(user_id, room_id)
211			.await;
212
213	if !is_joined || !state_visible {
214		return Err!(Request(Forbidden("You don't have permission to view {room_id:?}")));
215	}
216
217	Ok(())
218}