Skip to main content

tuwunel_service/rooms/search/
mod.rs

1use std::sync::Arc;
2
3use futures::{Stream, StreamExt};
4use ruma::{RoomId, UserId, api::client::search::search_events::v3::Criteria};
5use tuwunel_core::{
6	PduCount, Result,
7	arrayvec::ArrayVec,
8	implement,
9	matrix::event::{Event, Matches},
10	trace,
11	utils::{
12		ArrayVecExt, IterStream, ReadyExt, set,
13		stream::{TryIgnore, WidebandExt},
14	},
15};
16use tuwunel_database::{Interfix, Map, keyval::Val};
17
18use crate::rooms::{
19	short::ShortRoomId,
20	timeline::{PduId, RawPduId},
21};
22
23pub struct Service {
24	db: Data,
25	services: Arc<crate::services::OnceServices>,
26}
27
28struct Data {
29	tokenids: Arc<Map>,
30}
31
32#[derive(Clone, Debug)]
33pub struct RoomQuery<'a> {
34	pub room_id: &'a RoomId,
35	pub user_id: Option<&'a UserId>,
36	pub criteria: &'a Criteria,
37	pub limit: usize,
38	pub skip: usize,
39}
40
41type TokenId = ArrayVec<u8, TOKEN_ID_MAX_LEN>;
42
43const TOKEN_ID_MAX_LEN: usize =
44	size_of::<ShortRoomId>() + WORD_MAX_LEN + 1 + size_of::<RawPduId>();
45const WORD_MAX_LEN: usize = 50;
46
47impl crate::Service for Service {
48	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
49		Ok(Arc::new(Self {
50			db: Data { tokenids: args.db["tokenids"].clone() },
51			services: args.services.clone(),
52		}))
53	}
54
55	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
56}
57
58#[implement(Service)]
59pub fn index_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_body: &str) {
60	let batch = tokenize(message_body)
61		.map(|word| {
62			let mut key = shortroomid.to_be_bytes().to_vec();
63			key.extend_from_slice(word.as_bytes());
64			key.push(0xFF);
65			key.extend_from_slice(pdu_id.as_ref()); // TODO: currently we save the room id a second time here
66			key
67		})
68		.collect::<Vec<_>>();
69
70	self.db
71		.tokenids
72		.insert_batch(batch.iter().map(|k| (k.as_slice(), &[])));
73}
74
75#[implement(Service)]
76pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_body: &str) {
77	let batch = tokenize(message_body).map(|word| {
78		let mut key = shortroomid.to_be_bytes().to_vec();
79		key.extend_from_slice(word.as_bytes());
80		key.push(0xFF);
81		key.extend_from_slice(pdu_id.as_ref()); // TODO: currently we save the room id a second time here
82		key
83	});
84
85	for token in batch {
86		self.db.tokenids.remove(&token);
87	}
88}
89
90#[implement(Service)]
91pub async fn search_pdus<'a>(
92	&'a self,
93	query: &'a RoomQuery<'a>,
94) -> Result<(usize, impl Stream<Item = impl Event + use<>> + Send + '_)> {
95	let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await;
96
97	let filter = &query.criteria.filter;
98	let count = pdu_ids.len();
99	let pdus = pdu_ids
100		.into_iter()
101		.stream()
102		.wide_filter_map(async |result_pdu_id: RawPduId| {
103			self.services
104				.timeline
105				.get_pdu_from_id(&result_pdu_id)
106				.await
107				.ok()
108		})
109		.ready_filter(|pdu| !pdu.is_redacted())
110		.ready_filter(move |pdu| filter.matches(pdu))
111		.wide_filter_map(async |pdu| {
112			self.services
113				.state_accessor
114				.user_can_see_event(query.user_id?, pdu.room_id(), pdu.event_id())
115				.await
116				.then_some(pdu)
117		})
118		.skip(query.skip)
119		.take(query.limit);
120
121	Ok((count, pdus))
122}
123
124// result is modeled as a stream such that callers don't have to be refactored
125// though an additional async/wrap still exists for now
126#[implement(Service)]
127pub async fn search_pdu_ids(
128	&self,
129	query: &RoomQuery<'_>,
130) -> Result<impl Stream<Item = RawPduId> + Send + '_ + use<'_>> {
131	let shortroomid = self
132		.services
133		.short
134		.get_shortroomid(query.room_id)
135		.await?;
136
137	let pdu_ids = self
138		.search_pdu_ids_query_room(query, shortroomid)
139		.await;
140
141	let iters = pdu_ids.into_iter().map(IntoIterator::into_iter);
142
143	Ok(set::intersection(iters).stream())
144}
145
146#[implement(Service)]
147async fn search_pdu_ids_query_room(
148	&self,
149	query: &RoomQuery<'_>,
150	shortroomid: ShortRoomId,
151) -> Vec<Vec<RawPduId>> {
152	tokenize(&query.criteria.search_term)
153		.stream()
154		.wide_then(async |word| {
155			self.search_pdu_ids_query_words(shortroomid, &word)
156				.collect::<Vec<_>>()
157				.await
158		})
159		.collect::<Vec<_>>()
160		.await
161}
162
163/// Iterate over PduId's containing a word
164#[implement(Service)]
165fn search_pdu_ids_query_words<'a>(
166	&'a self,
167	shortroomid: ShortRoomId,
168	word: &'a str,
169) -> impl Stream<Item = RawPduId> + Send + '_ {
170	self.search_pdu_ids_query_word(shortroomid, word)
171		.map(move |key| -> RawPduId {
172			let key = &key[prefix_len(word)..];
173			key.into()
174		})
175}
176
177/// Iterate over raw database results for a word
178#[implement(Service)]
179fn search_pdu_ids_query_word(
180	&self,
181	shortroomid: ShortRoomId,
182	word: &str,
183) -> impl Stream<Item = Val<'_>> + Send + '_ + use<'_> {
184	// rustc says const'ing this not yet stable
185	let end_id: RawPduId = PduId { shortroomid, count: PduCount::max() }.into();
186
187	// Newest pdus first
188	let end = make_tokenid(shortroomid, word, &end_id);
189	let prefix = make_prefix(shortroomid, word);
190	self.db
191		.tokenids
192		.rev_raw_keys_from(&end)
193		.ignore_err()
194		.ready_take_while(move |key| key.starts_with(&prefix))
195}
196
197#[implement(Service)]
198pub async fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result {
199	let prefix = (room_id, Interfix);
200
201	self.db
202		.tokenids
203		.keys_prefix_raw(&prefix)
204		.ignore_err()
205		.ready_for_each(|key| {
206			trace!("Removing key: {key:?}");
207			self.db.tokenids.remove(key);
208		})
209		.await;
210
211	Ok(())
212}
213
214/// Splits a string into tokens used as keys in the search inverted index
215///
216/// This may be used to tokenize both message bodies (for indexing) or search
217/// queries (for querying).
218fn tokenize(body: &str) -> impl Iterator<Item = String> + Send + '_ {
219	body.split_terminator(|c: char| !c.is_alphanumeric())
220		.filter(|s| !s.is_empty())
221		.filter(|word| word.len() <= WORD_MAX_LEN)
222		.map(str::to_lowercase)
223}
224
225fn make_tokenid(shortroomid: ShortRoomId, word: &str, pdu_id: &RawPduId) -> TokenId {
226	let mut key = make_prefix(shortroomid, word);
227	key.extend_from_slice(pdu_id.as_ref());
228	key
229}
230
231fn make_prefix(shortroomid: ShortRoomId, word: &str) -> TokenId {
232	let mut key = TokenId::new();
233	key.extend_from_slice(&shortroomid.to_be_bytes());
234	key.extend_from_slice(word.as_bytes());
235	key.push(tuwunel_database::SEP);
236	key
237}
238
239fn prefix_len(word: &str) -> usize {
240	size_of::<ShortRoomId>()
241		.saturating_add(word.len())
242		.saturating_add(1)
243}