tuwunel_service/rooms/search/
mod.rs1use std::sync::Arc;
2
3use futures::{Stream, StreamExt};
4use ruma::{RoomId, UserId, api::client::search::search_events::v3::Criteria};
5use tuwunel_core::{
6 PduCount, Result,
7 arrayvec::ArrayVec,
8 implement,
9 matrix::event::{Event, Matches},
10 trace,
11 utils::{
12 ArrayVecExt, IterStream, ReadyExt, set,
13 stream::{TryIgnore, WidebandExt},
14 },
15};
16use tuwunel_database::{Interfix, Map, keyval::Val};
17
18use crate::rooms::{
19 short::ShortRoomId,
20 timeline::{PduId, RawPduId},
21};
22
23pub struct Service {
24 db: Data,
25 services: Arc<crate::services::OnceServices>,
26}
27
28struct Data {
29 tokenids: Arc<Map>,
30}
31
32#[derive(Clone, Debug)]
33pub struct RoomQuery<'a> {
34 pub room_id: &'a RoomId,
35 pub user_id: Option<&'a UserId>,
36 pub criteria: &'a Criteria,
37 pub limit: usize,
38 pub skip: usize,
39}
40
41type TokenId = ArrayVec<u8, TOKEN_ID_MAX_LEN>;
42
43const TOKEN_ID_MAX_LEN: usize =
44 size_of::<ShortRoomId>() + WORD_MAX_LEN + 1 + size_of::<RawPduId>();
45const WORD_MAX_LEN: usize = 50;
46
47impl crate::Service for Service {
48 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
49 Ok(Arc::new(Self {
50 db: Data { tokenids: args.db["tokenids"].clone() },
51 services: args.services.clone(),
52 }))
53 }
54
55 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
56}
57
58#[implement(Service)]
59pub fn index_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_body: &str) {
60 let batch = tokenize(message_body)
61 .map(|word| {
62 let mut key = shortroomid.to_be_bytes().to_vec();
63 key.extend_from_slice(word.as_bytes());
64 key.push(0xFF);
65 key.extend_from_slice(pdu_id.as_ref()); key
67 })
68 .collect::<Vec<_>>();
69
70 self.db
71 .tokenids
72 .insert_batch(batch.iter().map(|k| (k.as_slice(), &[])));
73}
74
75#[implement(Service)]
76pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_body: &str) {
77 let batch = tokenize(message_body).map(|word| {
78 let mut key = shortroomid.to_be_bytes().to_vec();
79 key.extend_from_slice(word.as_bytes());
80 key.push(0xFF);
81 key.extend_from_slice(pdu_id.as_ref()); key
83 });
84
85 for token in batch {
86 self.db.tokenids.remove(&token);
87 }
88}
89
90#[implement(Service)]
91pub async fn search_pdus<'a>(
92 &'a self,
93 query: &'a RoomQuery<'a>,
94) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + '_)> {
95 let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await;
96
97 let filter = &query.criteria.filter;
98 let count = pdu_ids.len();
99 let pdus = pdu_ids
100 .into_iter()
101 .stream()
102 .wide_filter_map(async |result_pdu_id: RawPduId| {
103 self.services
104 .timeline
105 .get_pdu_from_id(&result_pdu_id)
106 .await
107 .ok()
108 })
109 .ready_filter(|pdu| !pdu.is_redacted())
110 .ready_filter(move |pdu| filter.matches(pdu))
111 .wide_filter_map(async |pdu| {
112 self.services
113 .state_accessor
114 .user_can_see_event(query.user_id?, pdu.room_id(), pdu.event_id())
115 .await
116 .then_some(pdu)
117 })
118 .skip(query.skip)
119 .take(query.limit);
120
121 Ok((count, pdus))
122}
123
124#[implement(Service)]
127pub async fn search_pdu_ids(
128 &self,
129 query: &RoomQuery<'_>,
130) -> Result<impl Stream<Item = RawPduId> + Send + '_ + use<'_>> {
131 let shortroomid = self
132 .services
133 .short
134 .get_shortroomid(query.room_id)
135 .await?;
136
137 let pdu_ids = self
138 .search_pdu_ids_query_room(query, shortroomid)
139 .await;
140
141 let iters = pdu_ids.into_iter().map(IntoIterator::into_iter);
142
143 Ok(set::intersection(iters).stream())
144}
145
146#[implement(Service)]
147async fn search_pdu_ids_query_room(
148 &self,
149 query: &RoomQuery<'_>,
150 shortroomid: ShortRoomId,
151) -> Vec<Vec<RawPduId>> {
152 tokenize(&query.criteria.search_term)
153 .stream()
154 .wide_then(async |word| {
155 self.search_pdu_ids_query_words(shortroomid, &word)
156 .collect::<Vec<_>>()
157 .await
158 })
159 .collect::<Vec<_>>()
160 .await
161}
162
163#[implement(Service)]
165fn search_pdu_ids_query_words<'a>(
166 &'a self,
167 shortroomid: ShortRoomId,
168 word: &'a str,
169) -> impl Stream<Item = RawPduId> + Send + '_ {
170 self.search_pdu_ids_query_word(shortroomid, word)
171 .map(move |key| -> RawPduId {
172 let key = &key[prefix_len(word)..];
173 key.into()
174 })
175}
176
177#[implement(Service)]
179fn search_pdu_ids_query_word(
180 &self,
181 shortroomid: ShortRoomId,
182 word: &str,
183) -> impl Stream<Item = Val<'_>> + Send + '_ + use<'_> {
184 let end_id: RawPduId = PduId { shortroomid, count: PduCount::max() }.into();
186
187 let end = make_tokenid(shortroomid, word, &end_id);
189 let prefix = make_prefix(shortroomid, word);
190 self.db
191 .tokenids
192 .rev_raw_keys_from(&end)
193 .ignore_err()
194 .ready_take_while(move |key| key.starts_with(&prefix))
195}
196
197#[implement(Service)]
198pub async fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result {
199 let prefix = (room_id, Interfix);
200
201 self.db
202 .tokenids
203 .keys_prefix_raw(&prefix)
204 .ignore_err()
205 .ready_for_each(|key| {
206 trace!("Removing key: {key:?}");
207 self.db.tokenids.remove(key);
208 })
209 .await;
210
211 Ok(())
212}
213
214fn tokenize(body: &str) -> impl Iterator<Item = String> + Send + '_ {
219 body.split_terminator(|c: char| !c.is_alphanumeric())
220 .filter(|s| !s.is_empty())
221 .filter(|word| word.len() <= WORD_MAX_LEN)
222 .map(str::to_lowercase)
223}
224
225fn make_tokenid(shortroomid: ShortRoomId, word: &str, pdu_id: &RawPduId) -> TokenId {
226 let mut key = make_prefix(shortroomid, word);
227 key.extend_from_slice(pdu_id.as_ref());
228 key
229}
230
231fn make_prefix(shortroomid: ShortRoomId, word: &str) -> TokenId {
232 let mut key = TokenId::new();
233 key.extend_from_slice(&shortroomid.to_be_bytes());
234 key.extend_from_slice(word.as_bytes());
235 key.push(tuwunel_database::SEP);
236 key
237}
238
239fn prefix_len(word: &str) -> usize {
240 size_of::<ShortRoomId>()
241 .saturating_add(word.len())
242 .saturating_add(1)
243}