1use std::{sync::Arc, time::Duration};
2
3use futures::{StreamExt, pin_mut};
4use ruma::{Mxc, OwnedMxcUri, OwnedUserId, UserId, http_headers::ContentDisposition};
5use tuwunel_core::{
6 Err, Result, debug, debug_info, err,
7 utils::{
8 ReadyExt, str_from_bytes,
9 stream::{TryExpect, TryIgnore},
10 string_from_bytes,
11 },
12};
13use tuwunel_database::{Database, Deserialized, Ignore, Interfix, Map, serialize_key};
14
15use super::{preview::UrlPreviewData, thumbnail::Dim};
16
17pub(crate) struct Data {
18 mediaid_file: Arc<Map>,
19 mediaid_pending: Arc<Map>,
20 mediaid_user: Arc<Map>,
21 url_previews: Arc<Map>,
22}
23
24#[derive(Debug)]
25pub struct Metadata {
26 pub content_disposition: Option<ContentDisposition>,
27 pub content_type: Option<String>,
28 pub(super) key: Vec<u8>,
29}
30
31impl Data {
32 pub(super) fn new(db: &Arc<Database>) -> Self {
33 Self {
34 mediaid_file: db["mediaid_file"].clone(),
35 mediaid_pending: db["mediaid_pending"].clone(),
36 mediaid_user: db["mediaid_user"].clone(),
37 url_previews: db["url_previews"].clone(),
38 }
39 }
40
41 pub(super) fn create_file_metadata(
42 &self,
43 mxc: &Mxc<'_>,
44 user: Option<&UserId>,
45 dim: &Dim,
46 content_disposition: Option<&ContentDisposition>,
47 content_type: Option<&str>,
48 ) -> Result<Vec<u8>> {
49 let dim: &[u32] = &[dim.width, dim.height];
50 let key = (mxc, dim, content_disposition, content_type);
51 let key = serialize_key(key)?;
52 self.mediaid_file.insert(&key, []);
53 if let Some(user) = user {
54 let key = (mxc, user);
55 self.mediaid_user.put_raw(key, user);
56 }
57
58 Ok(key.to_vec())
59 }
60
61 pub(super) fn insert_pending_mxc(
63 &self,
64 mxc: &Mxc<'_>,
65 user: &UserId,
66 unused_expires_at: u64,
67 ) {
68 let value = (unused_expires_at, user);
69 debug!(?mxc, ?user, ?unused_expires_at, "Inserting pending");
70
71 self.mediaid_pending
72 .raw_put(mxc.to_string(), value);
73 }
74
75 pub(super) fn remove_pending_mxc(&self, mxc: &Mxc<'_>) {
77 self.mediaid_pending.remove(&mxc.to_string());
78 }
79
80 pub(super) async fn count_pending_mxc_for_user(&self, user_id: &UserId) -> (usize, u64) {
82 type KeyVal<'a> = (Ignore, (u64, &'a UserId));
83
84 self.mediaid_pending
85 .stream()
86 .expect_ok()
87 .ready_filter(|(_, (_, pending_user_id)): &KeyVal<'_>| user_id == *pending_user_id)
88 .ready_fold(
89 (0_usize, u64::MAX),
90 |(count, earliest_expiration), (_, (expires_at, _))| {
91 (count.saturating_add(1), earliest_expiration.min(expires_at))
92 },
93 )
94 .await
95 }
96
97 pub(super) async fn search_pending_mxc(&self, mxc: &Mxc<'_>) -> Result<(OwnedUserId, u64)> {
99 type Value<'a> = (u64, OwnedUserId);
100
101 self.mediaid_pending
102 .get(&mxc.to_string())
103 .await
104 .deserialized()
105 .map(|(expires_at, user_id): Value<'_>| (user_id, expires_at))
106 .inspect(|(user_id, expires_at)| debug!(?mxc, ?user_id, ?expires_at, "Found pending"))
107 .map_err(|e| err!(Request(NotFound("Pending not found or error: {e}"))))
108 }
109
110 pub(super) async fn delete_file_mxc(&self, mxc: &Mxc<'_>) {
111 debug!("MXC URI: {mxc}");
112
113 let prefix = (mxc, Interfix);
114 self.mediaid_file
115 .keys_prefix_raw(&prefix)
116 .ignore_err()
117 .ready_for_each(|key| self.mediaid_file.remove(key))
118 .await;
119
120 self.mediaid_user
121 .stream_prefix_raw(&prefix)
122 .ignore_err()
123 .ready_for_each(|(key, val)| {
124 debug_assert!(
125 key.starts_with(mxc.to_string().as_bytes()),
126 "key should start with the mxc"
127 );
128
129 let user = str_from_bytes(val).unwrap_or_default();
130 debug_info!("Deleting key {key:?} which was uploaded by user {user}");
131
132 self.mediaid_user.remove(key);
133 })
134 .await;
135 }
136
137 pub(super) async fn search_mxc_metadata_prefix(&self, mxc: &Mxc<'_>) -> Result<Vec<Vec<u8>>> {
139 debug!("MXC URI: {mxc}");
140
141 let prefix = (mxc, Interfix);
142 let keys: Vec<Vec<u8>> = self
143 .mediaid_file
144 .keys_prefix_raw(&prefix)
145 .ignore_err()
146 .map(<[u8]>::to_vec)
147 .collect()
148 .await;
149
150 if keys.is_empty() {
151 return Err!(Database("Failed to find any keys in database for `{mxc}`",));
152 }
153
154 debug!("Got the following keys: {keys:?}");
155
156 Ok(keys)
157 }
158
159 pub(super) async fn file_metadata_exists(&self, mxc: &Mxc<'_>, dim: &Dim) -> bool {
160 let dim: &[u32] = &[dim.width, dim.height];
161 let prefix = (mxc, dim, Interfix);
162 let keys = self
163 .mediaid_file
164 .keys_prefix_raw(&prefix)
165 .ignore_err();
166
167 pin_mut!(keys);
168 keys.next().await.is_some()
169 }
170
171 pub(super) async fn search_file_metadata(
172 &self,
173 mxc: &Mxc<'_>,
174 dim: &Dim,
175 ) -> Result<Metadata> {
176 let dim: &[u32] = &[dim.width, dim.height];
177 let prefix = (mxc, dim, Interfix);
178
179 let keys = self
180 .mediaid_file
181 .keys_prefix_raw(&prefix)
182 .ignore_err()
183 .map(ToOwned::to_owned);
184
185 pin_mut!(keys);
186 let key = keys
187 .next()
188 .await
189 .ok_or_else(|| err!(Request(NotFound("Media not found"))))?;
190
191 let mut parts = key.rsplit(|&b| b == 0xFF);
192
193 let content_type = parts
194 .next()
195 .map(string_from_bytes)
196 .transpose()
197 .map_err(|e| err!(Database(error!(?mxc, "Content-type is invalid: {e}"))))?;
198
199 let content_disposition = parts
200 .next()
201 .map(Some)
202 .ok_or_else(|| err!(Database(error!(?mxc, "Media ID in db is invalid."))))?
203 .filter(|bytes| !bytes.is_empty())
204 .map(string_from_bytes)
205 .transpose()
206 .map_err(|e| err!(Database(error!(?mxc, "Content-disposition is invalid: {e}"))))?
207 .as_deref()
208 .map(str::parse)
209 .transpose()
210 .map_err(|e| err!(Database(error!(?mxc, "Content-disposition is invalid: {e}"))))?;
211
212 Ok(Metadata { content_disposition, content_type, key })
213 }
214
215 pub(super) async fn get_all_user_mxcs(&self, user_id: &UserId) -> Vec<OwnedMxcUri> {
217 self.mediaid_user
218 .stream()
219 .ignore_err()
220 .ready_filter_map(|(key, user): (&str, &UserId)| {
221 (user == user_id).then(|| key.into())
222 })
223 .collect()
224 .await
225 }
226
227 pub(crate) async fn get_all_media_keys(&self) -> Vec<Vec<u8>> {
230 self.mediaid_file
231 .raw_keys()
232 .ignore_err()
233 .map(<[u8]>::to_vec)
234 .collect()
235 .await
236 }
237
238 #[inline]
239 pub(super) fn remove_url_preview(&self, url: &str) -> Result {
240 self.url_previews.remove(url.as_bytes());
241 Ok(())
242 }
243
244 pub(super) fn set_url_preview(
245 &self,
246 url: &str,
247 data: &UrlPreviewData,
248 timestamp: Duration,
249 ) -> Result {
250 let mut value = Vec::<u8>::new();
251 value.extend_from_slice(×tamp.as_secs().to_be_bytes());
252 value.push(0xFF);
253 value.extend_from_slice(
254 data.title
255 .as_ref()
256 .map(String::as_bytes)
257 .unwrap_or_default(),
258 );
259 value.push(0xFF);
260 value.extend_from_slice(
261 data.description
262 .as_ref()
263 .map(String::as_bytes)
264 .unwrap_or_default(),
265 );
266 value.push(0xFF);
267 value.extend_from_slice(
268 data.image
269 .as_ref()
270 .map(String::as_bytes)
271 .unwrap_or_default(),
272 );
273 value.push(0xFF);
274 value.extend_from_slice(&data.image_size.unwrap_or(0).to_be_bytes());
275 value.push(0xFF);
276 value.extend_from_slice(&data.image_width.unwrap_or(0).to_be_bytes());
277 value.push(0xFF);
278 value.extend_from_slice(&data.image_height.unwrap_or(0).to_be_bytes());
279 value.push(0xFF);
280 value.extend_from_slice(
281 data.video
282 .as_ref()
283 .map(String::as_bytes)
284 .unwrap_or_default(),
285 );
286 value.push(0xFF);
287 value.extend_from_slice(&data.video_size.unwrap_or(0).to_be_bytes());
288 value.push(0xFF);
289 value.extend_from_slice(&data.video_width.unwrap_or(0).to_be_bytes());
290 value.push(0xFF);
291 value.extend_from_slice(&data.video_height.unwrap_or(0).to_be_bytes());
292 value.push(0xFF);
293 value.extend_from_slice(
294 data.audio
295 .as_ref()
296 .map(String::as_bytes)
297 .unwrap_or_default(),
298 );
299 value.push(0xFF);
300 value.extend_from_slice(&data.audio_size.unwrap_or(0).to_be_bytes());
301 value.push(0xFF);
302 value.extend_from_slice(
303 data.og_type
304 .as_ref()
305 .map(String::as_bytes)
306 .unwrap_or_default(),
307 );
308 value.push(0xFF);
309 value.extend_from_slice(
310 data.og_url
311 .as_ref()
312 .map(String::as_bytes)
313 .unwrap_or_default(),
314 );
315
316 self.url_previews.insert(url.as_bytes(), &value);
317
318 Ok(())
319 }
320
321 pub(super) async fn get_url_preview(&self, url: &str) -> Result<UrlPreviewData> {
322 let values = self.url_previews.get(url).await?;
323
324 let mut values = values.split(|&b| b == 0xFF);
325
326 let _ts = values.next();
327 let title = match values
334 .next()
335 .and_then(|b| String::from_utf8(b.to_vec()).ok())
336 {
337 | Some(s) if s.is_empty() => None,
338 | x => x,
339 };
340 let description = match values
341 .next()
342 .and_then(|b| String::from_utf8(b.to_vec()).ok())
343 {
344 | Some(s) if s.is_empty() => None,
345 | x => x,
346 };
347 let image = match values
348 .next()
349 .and_then(|b| String::from_utf8(b.to_vec()).ok())
350 {
351 | Some(s) if s.is_empty() => None,
352 | x => x,
353 };
354 let image_size = match values
355 .next()
356 .map(|b| usize::from_be_bytes(b.try_into().unwrap_or_default()))
357 {
358 | Some(0) => None,
359 | x => x,
360 };
361 let image_width = match values
362 .next()
363 .map(|b| u32::from_be_bytes(b.try_into().unwrap_or_default()))
364 {
365 | Some(0) => None,
366 | x => x,
367 };
368 let image_height = match values
369 .next()
370 .map(|b| u32::from_be_bytes(b.try_into().unwrap_or_default()))
371 {
372 | Some(0) => None,
373 | x => x,
374 };
375 let video = match values
376 .next()
377 .and_then(|b| String::from_utf8(b.to_vec()).ok())
378 {
379 | Some(s) if s.is_empty() => None,
380 | x => x,
381 };
382 let video_size = match values
383 .next()
384 .map(|b| usize::from_be_bytes(b.try_into().unwrap_or_default()))
385 {
386 | Some(0) => None,
387 | x => x,
388 };
389 let video_width = match values
390 .next()
391 .map(|b| u32::from_be_bytes(b.try_into().unwrap_or_default()))
392 {
393 | Some(0) => None,
394 | x => x,
395 };
396 let video_height = match values
397 .next()
398 .map(|b| u32::from_be_bytes(b.try_into().unwrap_or_default()))
399 {
400 | Some(0) => None,
401 | x => x,
402 };
403 let audio = match values
404 .next()
405 .and_then(|b| String::from_utf8(b.to_vec()).ok())
406 {
407 | Some(s) if s.is_empty() => None,
408 | x => x,
409 };
410 let audio_size = match values
411 .next()
412 .map(|b| usize::from_be_bytes(b.try_into().unwrap_or_default()))
413 {
414 | Some(0) => None,
415 | x => x,
416 };
417 let og_type = match values
418 .next()
419 .and_then(|b| String::from_utf8(b.to_vec()).ok())
420 {
421 | Some(s) if s.is_empty() => None,
422 | x => x,
423 };
424 let og_url = match values
425 .next()
426 .and_then(|b| String::from_utf8(b.to_vec()).ok())
427 {
428 | Some(s) if s.is_empty() => None,
429 | x => x,
430 };
431
432 Ok(UrlPreviewData {
433 title,
434 description,
435 image,
436 image_size,
437 image_width,
438 image_height,
439 video,
440 video_size,
441 video_width,
442 video_height,
443 audio,
444 audio_size,
445 og_type,
446 og_url,
447 })
448 }
449}