tuwunel_api/client/
search.rs1use std::collections::BTreeMap;
2
3use axum::extract::State;
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use ruma::{
6 OwnedRoomId, RoomId, UInt, UserId,
7 api::client::search::search_events::{
8 self,
9 v3::{Criteria, EventContextResult, ResultCategories, ResultRoomEvents, SearchResult},
10 },
11 events::AnyStateEvent,
12 serde::Raw,
13};
14use search_events::v3::{Request, Response};
15use tuwunel_core::{
16 Err, Result, at, is_true,
17 matrix::Event,
18 result::FlatOk,
19 utils::{IterStream, option::OptionExt, stream::ReadyExt},
20};
21use tuwunel_service::{Services, rooms::search::RoomQuery};
22
23use crate::Ruma;
24
25type RoomStates = BTreeMap<OwnedRoomId, RoomState>;
26type RoomState = Vec<Raw<AnyStateEvent>>;
27
28const LIMIT_DEFAULT: usize = 10;
29const LIMIT_MAX: usize = 100;
30const BATCH_MAX: usize = 20;
31
32pub(crate) async fn search_events_route(
39 State(services): State<crate::State>,
40 body: Ruma<Request>,
41) -> Result<Response> {
42 let sender_user = body.sender_user();
43 let next_batch = body.next_batch.as_deref();
44 let room_events = body
45 .search_categories
46 .room_events
47 .as_ref()
48 .map_async(|criteria| category_room_events(&services, sender_user, next_batch, criteria))
49 .await
50 .transpose()?;
51
52 Ok(Response {
53 search_categories: ResultCategories {
54 room_events: room_events.unwrap_or_default(),
55 },
56 })
57}
58
59#[expect(clippy::map_unwrap_or)]
60async fn category_room_events(
61 services: &Services,
62 sender_user: &UserId,
63 next_batch: Option<&str>,
64 criteria: &Criteria,
65) -> Result<ResultRoomEvents> {
66 let filter = &criteria.filter;
67
68 let limit: usize = filter
69 .limit
70 .map(TryInto::try_into)
71 .flat_ok()
72 .unwrap_or(LIMIT_DEFAULT)
73 .min(LIMIT_MAX);
74
75 let next_batch: usize = next_batch
76 .map(str::parse)
77 .transpose()?
78 .unwrap_or(0)
79 .min(limit.saturating_mul(BATCH_MAX));
80
81 let rooms = filter
82 .rooms
83 .clone()
84 .map(IntoIterator::into_iter)
85 .map(IterStream::stream)
86 .map(StreamExt::boxed)
87 .unwrap_or_else(|| {
88 services
89 .state_cache
90 .rooms_joined(sender_user)
91 .map(ToOwned::to_owned)
92 .boxed()
93 });
94
95 let results: Vec<_> = rooms
96 .filter_map(async |room_id| {
97 check_room_visible(services, sender_user, &room_id, criteria)
98 .await
99 .is_ok()
100 .then_some(room_id)
101 })
102 .filter_map(async |room_id| {
103 let query = RoomQuery {
104 room_id: &room_id,
105 user_id: Some(sender_user),
106 criteria,
107 skip: next_batch,
108 limit,
109 };
110
111 let (count, results) = services.search.search_pdus(&query).await.ok()?;
112
113 results
114 .collect::<Vec<_>>()
115 .map(|results| (room_id.clone(), count, results))
116 .map(Some)
117 .await
118 })
119 .collect()
120 .await;
121
122 let total: UInt = results
123 .iter()
124 .fold(0, |a: usize, (_, count, _)| a.saturating_add(*count))
125 .try_into()?;
126
127 let state: RoomStates = results
128 .iter()
129 .stream()
130 .ready_filter(|_| criteria.include_state.is_some_and(is_true!()))
131 .filter_map(async |(room_id, ..)| {
132 procure_room_state(services, room_id)
133 .map_ok(|state| (room_id.clone(), state))
134 .await
135 .ok()
136 })
137 .collect()
138 .await;
139
140 let results: Vec<SearchResult> = results
141 .into_iter()
142 .map(at!(2))
143 .flatten()
144 .stream()
145 .map(Event::into_format)
146 .map(|result| SearchResult {
147 rank: None,
148 result: Some(result),
149 context: EventContextResult {
150 profile_info: BTreeMap::new(), events_after: Vec::new(), events_before: Vec::new(), start: None, end: None, },
156 })
157 .collect()
158 .await;
159
160 let highlights = criteria
161 .search_term
162 .split_terminator(|c: char| !c.is_alphanumeric())
163 .map(str::to_lowercase)
164 .collect();
165
166 let next_batch = (results.len() >= limit)
167 .then_some(next_batch.saturating_add(results.len()))
168 .as_ref()
169 .map(ToString::to_string);
170
171 Ok(ResultRoomEvents {
172 count: Some(total),
173 next_batch,
174 results,
175 state,
176 highlights,
177 groups: Default::default(), })
179}
180
181async fn procure_room_state(services: &Services, room_id: &RoomId) -> Result<RoomState> {
182 let state = services
183 .state_accessor
184 .room_state_full_pdus(room_id)
185 .map_ok(Event::into_format)
186 .try_collect()
187 .await?;
188
189 Ok(state)
190}
191
192async fn check_room_visible(
193 services: &Services,
194 user_id: &UserId,
195 room_id: &RoomId,
196 search: &Criteria,
197) -> Result {
198 let check_visible = search.filter.rooms.is_some();
199 let check_state = check_visible && search.include_state.is_some_and(is_true!());
200
201 let is_joined = !check_visible
202 || services
203 .state_cache
204 .is_joined(user_id, room_id)
205 .await;
206
207 let state_visible = !check_state
208 || services
209 .state_accessor
210 .user_can_see_state_events(user_id, room_id)
211 .await;
212
213 if !is_joined || !state_visible {
214 return Err!(Request(Forbidden("You don't have permission to view {room_id:?}")));
215 }
216
217 Ok(())
218}