Skip to main content

tuwunel_service/media/
data.rs

1use std::{sync::Arc, time::Duration};
2
3use futures::{StreamExt, pin_mut};
4use ruma::{Mxc, OwnedMxcUri, OwnedUserId, UserId, http_headers::ContentDisposition};
5use tuwunel_core::{
6	Err, Result, debug, debug_info, err,
7	utils::{
8		ReadyExt, str_from_bytes,
9		stream::{TryExpect, TryIgnore},
10		string_from_bytes,
11	},
12};
13use tuwunel_database::{Database, Deserialized, Ignore, Interfix, Map, serialize_key};
14
15use super::{preview::UrlPreviewData, thumbnail::Dim};
16
17pub(crate) struct Data {
18	mediaid_file: Arc<Map>,
19	mediaid_pending: Arc<Map>,
20	mediaid_user: Arc<Map>,
21	url_previews: Arc<Map>,
22}
23
24#[derive(Debug)]
25pub struct Metadata {
26	pub content_disposition: Option<ContentDisposition>,
27	pub content_type: Option<String>,
28	pub(super) key: Vec<u8>,
29}
30
31impl Data {
32	pub(super) fn new(db: &Arc<Database>) -> Self {
33		Self {
34			mediaid_file: db["mediaid_file"].clone(),
35			mediaid_pending: db["mediaid_pending"].clone(),
36			mediaid_user: db["mediaid_user"].clone(),
37			url_previews: db["url_previews"].clone(),
38		}
39	}
40
41	pub(super) fn create_file_metadata(
42		&self,
43		mxc: &Mxc<'_>,
44		user: Option<&UserId>,
45		dim: &Dim,
46		content_disposition: Option<&ContentDisposition>,
47		content_type: Option<&str>,
48	) -> Result<Vec<u8>> {
49		let dim: &[u32] = &[dim.width, dim.height];
50		let key = (mxc, dim, content_disposition, content_type);
51		let key = serialize_key(key)?;
52		self.mediaid_file.insert(&key, []);
53		if let Some(user) = user {
54			let key = (mxc, user);
55			self.mediaid_user.put_raw(key, user);
56		}
57
58		Ok(key.to_vec())
59	}
60
61	/// Insert a pending MXC URI into the database
62	pub(super) fn insert_pending_mxc(
63		&self,
64		mxc: &Mxc<'_>,
65		user: &UserId,
66		unused_expires_at: u64,
67	) {
68		let value = (unused_expires_at, user);
69		debug!(?mxc, ?user, ?unused_expires_at, "Inserting pending");
70
71		self.mediaid_pending
72			.raw_put(mxc.to_string(), value);
73	}
74
75	/// Remove a pending MXC URI from the database
76	pub(super) fn remove_pending_mxc(&self, mxc: &Mxc<'_>) {
77		self.mediaid_pending.remove(&mxc.to_string());
78	}
79
80	/// Count the number of pending MXC URIs for a specific user
81	pub(super) async fn count_pending_mxc_for_user(&self, user_id: &UserId) -> (usize, u64) {
82		type KeyVal<'a> = (Ignore, (u64, &'a UserId));
83
84		self.mediaid_pending
85			.stream()
86			.expect_ok()
87			.ready_filter(|(_, (_, pending_user_id)): &KeyVal<'_>| user_id == *pending_user_id)
88			.ready_fold(
89				(0_usize, u64::MAX),
90				|(count, earliest_expiration), (_, (expires_at, _))| {
91					(count.saturating_add(1), earliest_expiration.min(expires_at))
92				},
93			)
94			.await
95	}
96
97	/// Search for a pending MXC URI in the database
98	pub(super) async fn search_pending_mxc(&self, mxc: &Mxc<'_>) -> Result<(OwnedUserId, u64)> {
99		type Value<'a> = (u64, OwnedUserId);
100
101		self.mediaid_pending
102			.get(&mxc.to_string())
103			.await
104			.deserialized()
105			.map(|(expires_at, user_id): Value<'_>| (user_id, expires_at))
106			.inspect(|(user_id, expires_at)| debug!(?mxc, ?user_id, ?expires_at, "Found pending"))
107			.map_err(|e| err!(Request(NotFound("Pending not found or error: {e}"))))
108	}
109
110	pub(super) async fn delete_file_mxc(&self, mxc: &Mxc<'_>) {
111		debug!("MXC URI: {mxc}");
112
113		let prefix = (mxc, Interfix);
114		self.mediaid_file
115			.keys_prefix_raw(&prefix)
116			.ignore_err()
117			.ready_for_each(|key| self.mediaid_file.remove(key))
118			.await;
119
120		self.mediaid_user
121			.stream_prefix_raw(&prefix)
122			.ignore_err()
123			.ready_for_each(|(key, val)| {
124				debug_assert!(
125					key.starts_with(mxc.to_string().as_bytes()),
126					"key should start with the mxc"
127				);
128
129				let user = str_from_bytes(val).unwrap_or_default();
130				debug_info!("Deleting key {key:?} which was uploaded by user {user}");
131
132				self.mediaid_user.remove(key);
133			})
134			.await;
135	}
136
137	/// Searches for all files with the given MXC
138	pub(super) async fn search_mxc_metadata_prefix(&self, mxc: &Mxc<'_>) -> Result<Vec<Vec<u8>>> {
139		debug!("MXC URI: {mxc}");
140
141		let prefix = (mxc, Interfix);
142		let keys: Vec<Vec<u8>> = self
143			.mediaid_file
144			.keys_prefix_raw(&prefix)
145			.ignore_err()
146			.map(<[u8]>::to_vec)
147			.collect()
148			.await;
149
150		if keys.is_empty() {
151			return Err!(Database("Failed to find any keys in database for `{mxc}`",));
152		}
153
154		debug!("Got the following keys: {keys:?}");
155
156		Ok(keys)
157	}
158
159	pub(super) async fn file_metadata_exists(&self, mxc: &Mxc<'_>, dim: &Dim) -> bool {
160		let dim: &[u32] = &[dim.width, dim.height];
161		let prefix = (mxc, dim, Interfix);
162		let keys = self
163			.mediaid_file
164			.keys_prefix_raw(&prefix)
165			.ignore_err();
166
167		pin_mut!(keys);
168		keys.next().await.is_some()
169	}
170
171	pub(super) async fn search_file_metadata(
172		&self,
173		mxc: &Mxc<'_>,
174		dim: &Dim,
175	) -> Result<Metadata> {
176		let dim: &[u32] = &[dim.width, dim.height];
177		let prefix = (mxc, dim, Interfix);
178
179		let keys = self
180			.mediaid_file
181			.keys_prefix_raw(&prefix)
182			.ignore_err()
183			.map(ToOwned::to_owned);
184
185		pin_mut!(keys);
186		let key = keys
187			.next()
188			.await
189			.ok_or_else(|| err!(Request(NotFound("Media not found"))))?;
190
191		let mut parts = key.rsplit(|&b| b == 0xFF);
192
193		let content_type = parts
194			.next()
195			.map(string_from_bytes)
196			.transpose()
197			.map_err(|e| err!(Database(error!(?mxc, "Content-type is invalid: {e}"))))?;
198
199		let content_disposition = parts
200			.next()
201			.map(Some)
202			.ok_or_else(|| err!(Database(error!(?mxc, "Media ID in db is invalid."))))?
203			.filter(|bytes| !bytes.is_empty())
204			.map(string_from_bytes)
205			.transpose()
206			.map_err(|e| err!(Database(error!(?mxc, "Content-disposition is invalid: {e}"))))?
207			.as_deref()
208			.map(str::parse)
209			.transpose()
210			.map_err(|e| err!(Database(error!(?mxc, "Content-disposition is invalid: {e}"))))?;
211
212		Ok(Metadata { content_disposition, content_type, key })
213	}
214
215	/// Gets all the MXCs associated with a user
216	pub(super) async fn get_all_user_mxcs(&self, user_id: &UserId) -> Vec<OwnedMxcUri> {
217		self.mediaid_user
218			.stream()
219			.ignore_err()
220			.ready_filter_map(|(key, user): (&str, &UserId)| {
221				(user == user_id).then(|| key.into())
222			})
223			.collect()
224			.await
225	}
226
227	/// Gets all the media keys in our database (this includes all the metadata
228	/// associated with it such as width, height, content-type, etc)
229	pub(crate) async fn get_all_media_keys(&self) -> Vec<Vec<u8>> {
230		self.mediaid_file
231			.raw_keys()
232			.ignore_err()
233			.map(<[u8]>::to_vec)
234			.collect()
235			.await
236	}
237
238	#[inline]
239	pub(super) fn remove_url_preview(&self, url: &str) -> Result {
240		self.url_previews.remove(url.as_bytes());
241		Ok(())
242	}
243
244	pub(super) fn set_url_preview(
245		&self,
246		url: &str,
247		data: &UrlPreviewData,
248		timestamp: Duration,
249	) -> Result {
250		let mut value = Vec::<u8>::new();
251		value.extend_from_slice(&timestamp.as_secs().to_be_bytes());
252		value.push(0xFF);
253		value.extend_from_slice(
254			data.title
255				.as_ref()
256				.map(String::as_bytes)
257				.unwrap_or_default(),
258		);
259		value.push(0xFF);
260		value.extend_from_slice(
261			data.description
262				.as_ref()
263				.map(String::as_bytes)
264				.unwrap_or_default(),
265		);
266		value.push(0xFF);
267		value.extend_from_slice(
268			data.image
269				.as_ref()
270				.map(String::as_bytes)
271				.unwrap_or_default(),
272		);
273		value.push(0xFF);
274		value.extend_from_slice(&data.image_size.unwrap_or(0).to_be_bytes());
275		value.push(0xFF);
276		value.extend_from_slice(&data.image_width.unwrap_or(0).to_be_bytes());
277		value.push(0xFF);
278		value.extend_from_slice(&data.image_height.unwrap_or(0).to_be_bytes());
279		value.push(0xFF);
280		value.extend_from_slice(
281			data.video
282				.as_ref()
283				.map(String::as_bytes)
284				.unwrap_or_default(),
285		);
286		value.push(0xFF);
287		value.extend_from_slice(&data.video_size.unwrap_or(0).to_be_bytes());
288		value.push(0xFF);
289		value.extend_from_slice(&data.video_width.unwrap_or(0).to_be_bytes());
290		value.push(0xFF);
291		value.extend_from_slice(&data.video_height.unwrap_or(0).to_be_bytes());
292		value.push(0xFF);
293		value.extend_from_slice(
294			data.audio
295				.as_ref()
296				.map(String::as_bytes)
297				.unwrap_or_default(),
298		);
299		value.push(0xFF);
300		value.extend_from_slice(&data.audio_size.unwrap_or(0).to_be_bytes());
301		value.push(0xFF);
302		value.extend_from_slice(
303			data.og_type
304				.as_ref()
305				.map(String::as_bytes)
306				.unwrap_or_default(),
307		);
308		value.push(0xFF);
309		value.extend_from_slice(
310			data.og_url
311				.as_ref()
312				.map(String::as_bytes)
313				.unwrap_or_default(),
314		);
315
316		self.url_previews.insert(url.as_bytes(), &value);
317
318		Ok(())
319	}
320
321	pub(super) async fn get_url_preview(&self, url: &str) -> Result<UrlPreviewData> {
322		let values = self.url_previews.get(url).await?;
323
324		let mut values = values.split(|&b| b == 0xFF);
325
326		let _ts = values.next();
327		/* if we ever decide to use timestamp, this is here.
328		match values.next().map(|b| u64::from_be_bytes(b.try_into().expect("valid BE array"))) {
329			Some(0) => None,
330			x => x,
331		};*/
332
333		let title = match values
334			.next()
335			.and_then(|b| String::from_utf8(b.to_vec()).ok())
336		{
337			| Some(s) if s.is_empty() => None,
338			| x => x,
339		};
340		let description = match values
341			.next()
342			.and_then(|b| String::from_utf8(b.to_vec()).ok())
343		{
344			| Some(s) if s.is_empty() => None,
345			| x => x,
346		};
347		let image = match values
348			.next()
349			.and_then(|b| String::from_utf8(b.to_vec()).ok())
350		{
351			| Some(s) if s.is_empty() => None,
352			| x => x,
353		};
354		let image_size = match values
355			.next()
356			.map(|b| usize::from_be_bytes(b.try_into().unwrap_or_default()))
357		{
358			| Some(0) => None,
359			| x => x,
360		};
361		let image_width = match values
362			.next()
363			.map(|b| u32::from_be_bytes(b.try_into().unwrap_or_default()))
364		{
365			| Some(0) => None,
366			| x => x,
367		};
368		let image_height = match values
369			.next()
370			.map(|b| u32::from_be_bytes(b.try_into().unwrap_or_default()))
371		{
372			| Some(0) => None,
373			| x => x,
374		};
375		let video = match values
376			.next()
377			.and_then(|b| String::from_utf8(b.to_vec()).ok())
378		{
379			| Some(s) if s.is_empty() => None,
380			| x => x,
381		};
382		let video_size = match values
383			.next()
384			.map(|b| usize::from_be_bytes(b.try_into().unwrap_or_default()))
385		{
386			| Some(0) => None,
387			| x => x,
388		};
389		let video_width = match values
390			.next()
391			.map(|b| u32::from_be_bytes(b.try_into().unwrap_or_default()))
392		{
393			| Some(0) => None,
394			| x => x,
395		};
396		let video_height = match values
397			.next()
398			.map(|b| u32::from_be_bytes(b.try_into().unwrap_or_default()))
399		{
400			| Some(0) => None,
401			| x => x,
402		};
403		let audio = match values
404			.next()
405			.and_then(|b| String::from_utf8(b.to_vec()).ok())
406		{
407			| Some(s) if s.is_empty() => None,
408			| x => x,
409		};
410		let audio_size = match values
411			.next()
412			.map(|b| usize::from_be_bytes(b.try_into().unwrap_or_default()))
413		{
414			| Some(0) => None,
415			| x => x,
416		};
417		let og_type = match values
418			.next()
419			.and_then(|b| String::from_utf8(b.to_vec()).ok())
420		{
421			| Some(s) if s.is_empty() => None,
422			| x => x,
423		};
424		let og_url = match values
425			.next()
426			.and_then(|b| String::from_utf8(b.to_vec()).ok())
427		{
428			| Some(s) if s.is_empty() => None,
429			| x => x,
430		};
431
432		Ok(UrlPreviewData {
433			title,
434			description,
435			image,
436			image_size,
437			image_width,
438			image_height,
439			video,
440			video_size,
441			video_width,
442			video_height,
443			audio,
444			audio_size,
445			og_type,
446			og_url,
447		})
448	}
449}