1use std::{
2 collections::BTreeMap,
3 iter::from_fn,
4 path::{Path, PathBuf},
5 pin::pin,
6 sync::Arc,
7 time::Duration,
8};
9
10use bytes::Bytes;
11use futures::{StreamExt, TryStreamExt};
12use object_store::Error as ObjectStoreError;
13use ruma::{
14 CanonicalJsonObject, CanonicalJsonValue, EventId, Mxc, OwnedRoomId, OwnedUserId, RoomId,
15 ServerName, UserId,
16};
17use serde::{Deserialize, de::IgnoredAny};
18use tokio::time::sleep;
19use tuwunel_core::{
20 Err, Error, Result, debug_warn, err, error, info,
21 itertools::Itertools,
22 utils,
23 utils::{ReadyExt, content_disposition::make_content_disposition, stream::TryIgnore},
24 warn,
25};
26use tuwunel_database::{Map, SEP};
27
28use crate::{Services, storage::Provider};
29
30#[derive(Deserialize)]
32struct HasRoomId {
33 room_id: Option<IgnoredAny>,
34}
35
36enum MediaSource {
40 Filesystem(PathBuf),
41 Provider(Arc<Provider>),
42}
43
44struct ConduitMediaEntry<'a> {
46 server_name: &'a ServerName,
47 media_id: &'a str,
48 sha256: &'a [u8],
49 filename: Option<&'a str>,
50 content_type: Option<&'a str>,
51}
52
53fn media_source(services: &Services) -> Result<MediaSource> {
57 let config = &services.server.config;
58
59 match config.conduit_source_media_provider.as_deref() {
60 | Some(name) => services
61 .storage
62 .provider(name)
63 .map(|provider| MediaSource::Provider(provider.clone())),
64 | None => {
65 let media_dir = config
66 .conduit_source_media_path
67 .clone()
68 .unwrap_or_else(|| config.database_path.join("media"));
69
70 Ok(MediaSource::Filesystem(media_dir))
71 },
72 }
73}
74
75const PROVIDER_READ_ATTEMPTS: u32 = 2;
80
81const PROVIDER_READ_RETRY_DELAY: Duration = Duration::from_secs(2);
83
84pub(super) async fn migrate_conduit_media(services: &Services) -> Result {
92 let db = &services.db;
93 let config = &services.server.config;
94
95 let Some(metadata) = db.open_cf("servernamemediaid_metadata")? else {
96 warn!("Conduit database has no media metadata; nothing to import.");
97 return Ok(());
98 };
99
100 let owners = db.open_cf("servernamemediaid_userlocalpart")?;
101 let owners = owners.as_ref();
102
103 let blocklist = db.open_cf("blocked_servername_mediaid")?;
104 let blocklist = blocklist.as_ref();
105
106 let depth = config.conduit_media_directory_depth;
107 let length = config.conduit_media_directory_length;
108 let source = media_source(services)?;
109
110 warn!("Importing Conduit media originals into tuwunel's key-addressed store...");
111
112 let cork = db.cork_and_sync();
113 let (imported, skipped, blocked) = metadata
114 .raw_stream()
115 .ignore_err()
116 .map(Ok::<_, Error>)
117 .try_fold(
118 (0_usize, 0_usize, 0_usize),
119 async |(imported, skipped, blocked), (key, value)| {
120 if conduit_media_blocked(blocklist, key).await? {
121 return Ok((imported, skipped, blocked.saturating_add(1)));
122 }
123
124 let imported_entry =
125 import_conduit_original(services, owners, &source, depth, length, key, value)
126 .await?;
127
128 Ok(if imported_entry {
129 (imported.saturating_add(1), skipped, blocked)
130 } else {
131 (imported, skipped.saturating_add(1), blocked)
132 })
133 },
134 )
135 .await?;
136
137 drop(cork);
138
139 if blocked > 0 {
140 warn!(%blocked, "Skipped Conduit media blocked by a moderator; not imported");
141 }
142
143 if skipped > 0 {
144 warn!(%imported, %skipped, "Imported Conduit media originals; some files were skipped");
145 } else {
146 info!(%imported, "Imported Conduit media originals");
147 }
148
149 Ok(())
150}
151
152async fn import_conduit_original(
156 services: &Services,
157 owners: Option<&Arc<Map>>,
158 source: &MediaSource,
159 depth: u8,
160 length: u8,
161 key: &[u8],
162 value: &[u8],
163) -> Result<bool> {
164 let entry = match parse_conduit_media_entry(key, value) {
165 | Ok(entry) => entry,
166 | Err(e) => {
167 debug_warn!(error = %e, "skipping unimportable Conduit media entry");
168 return Ok(false);
169 },
170 };
171
172 let Some(file) = read_conduit_original(source, depth, length, entry.sha256).await? else {
173 return Ok(false);
174 };
175
176 let content_disposition = make_content_disposition(None, entry.content_type, entry.filename);
177 let owner = conduit_media_owner(owners, key, entry.server_name).await;
178 let mxc = Mxc {
179 server_name: entry.server_name,
180 media_id: entry.media_id,
181 };
182
183 match services
184 .media
185 .create(&mxc, owner.as_deref(), Some(&content_disposition), entry.content_type, &file)
186 .await
187 {
188 | Ok(()) => Ok(true),
189 | Err(e) => {
190 debug_warn!(error = %e, "skipping Conduit media entry that failed to store");
191 Ok(false)
192 },
193 }
194}
195
196fn parse_conduit_media_entry<'a>(
200 key: &'a [u8],
201 value: &'a [u8],
202) -> Result<ConduitMediaEntry<'a>> {
203 let Some(sep) = key.iter().position(|&byte| byte == SEP) else {
204 return Err!(Database("Conduit media key has no server-name separator"));
205 };
206 let server_name = <&ServerName>::try_from(str::from_utf8(&key[..sep])?)
207 .map_err(|_| err!(Database("Conduit media key has an invalid server name")))?;
208
209 let media_id = str::from_utf8(&key[sep.saturating_add(1)..])?;
210
211 let (sha256, filename, content_type) = parse_conduit_media_value(value)?;
212
213 Ok(ConduitMediaEntry {
214 server_name,
215 media_id,
216 sha256,
217 filename,
218 content_type,
219 })
220}
221
222async fn read_conduit_original(
228 source: &MediaSource,
229 depth: u8,
230 length: u8,
231 sha256: &[u8],
232) -> Result<Option<Bytes>> {
233 let sha256_hex = sha256_hex(sha256);
234 match source {
235 | MediaSource::Filesystem(media_dir) => {
236 let path = conduit_media_path(media_dir, depth, length, &sha256_hex);
237 match tokio::fs::read(&path).await {
238 | Ok(file) => Ok(Some(file.into())),
239 | Err(e) => {
240 debug_warn!(?path, error = %e, "skipping unreadable Conduit media file");
241 Ok(None)
242 },
243 }
244 },
245 | MediaSource::Provider(provider) =>
246 read_provider_original(provider, &conduit_media_key(depth, length, &sha256_hex)).await,
247 }
248}
249
250async fn read_provider_original(provider: &Arc<Provider>, key: &str) -> Result<Option<Bytes>> {
255 let mut attempt = 0_u32;
256 loop {
257 attempt = attempt.saturating_add(1);
258 match provider.get(key).await {
259 | Ok(file) => return Ok(Some(file)),
260 | Err(e) if is_missing_object(&e) => {
261 debug_warn!(%key, error = %e, "skipping missing Conduit media object");
262 return Ok(None);
263 },
264 | Err(e) if attempt >= PROVIDER_READ_ATTEMPTS => {
265 error!(
266 %key,
267 attempts = PROVIDER_READ_ATTEMPTS,
268 error = %e,
269 "Aborting the Conduit media import: source storage provider unreachable. No \
270 media has been imported in a way that needs cleanup; once the provider is \
271 reachable, restart tuwunel to resume the import from the beginning."
272 );
273 return Err(e);
274 },
275 | Err(e) => {
276 warn!(%key, attempt, error = %e, "Reading Conduit media object failed; retrying");
277 sleep(PROVIDER_READ_RETRY_DELAY).await;
278 },
279 }
280 }
281}
282
283fn is_missing_object(error: &Error) -> bool {
288 matches!(error, Error::ObjectStore(ObjectStoreError::NotFound { .. }))
289}
290
291fn parse_conduit_media_value(value: &[u8]) -> Result<(&[u8], Option<&str>, Option<&str>)> {
296 let (sha256, rest) = value
297 .split_at_checked(32)
298 .ok_or_else(|| err!(Database("Conduit media value shorter than a SHA-256 digest")))?;
299
300 let mut parts = rest.split(|&byte| byte == SEP);
302 let filename = parts.next().unwrap_or_default();
303 let Some(content_type) = parts.next() else {
304 return Err!(Database("Conduit media value has no content-type separator"));
305 };
306 let filename = str::from_utf8(filename)?;
307 let content_type = str::from_utf8(content_type)?;
308 let filename = (!filename.is_empty()).then_some(filename);
309 let content_type = (!content_type.is_empty()).then_some(content_type);
310
311 Ok((sha256, filename, content_type))
312}
313
314async fn conduit_media_owner(
318 owners: Option<&Arc<Map>>,
319 key: &[u8],
320 server_name: &ServerName,
321) -> Option<OwnedUserId> {
322 let localpart = owners?.get(key).await.ok()?;
323
324 UserId::parse_with_server_name(str::from_utf8(&localpart).ok()?, server_name).ok()
325}
326
327async fn conduit_media_blocked(blocklist: Option<&Arc<Map>>, key: &[u8]) -> Result<bool> {
335 let Some(blocklist) = blocklist else {
336 return Ok(false);
337 };
338
339 match blocklist.exists(key).await {
340 | Ok(()) => Ok(true),
341 | Err(e) if e.is_not_found() => Ok(false),
342 | Err(e) => Err(e),
343 }
344}
345
346fn conduit_media_path(media_dir: &Path, depth: u8, length: u8, sha256_hex: &str) -> PathBuf {
350 let mut path = media_dir.to_path_buf();
351 path.extend(conduit_shards(depth, length, sha256_hex));
352 path
353}
354
355fn conduit_media_key(depth: u8, length: u8, sha256_hex: &str) -> String {
359 conduit_shards(depth, length, sha256_hex).join("/")
360}
361
362fn conduit_shards(depth: u8, length: u8, sha256_hex: &str) -> impl Iterator<Item = &str> {
367 let mut rest = Some(sha256_hex);
368 let mut remaining = depth;
369 from_fn(move || {
370 let current = rest?;
371 if remaining == 0 {
372 rest = None;
373 return Some(current);
374 }
375
376 remaining = remaining.saturating_sub(1);
377 match current.split_at_checked(length.into()) {
378 | Some((segment, next)) => {
379 rest = Some(next);
380 Some(segment)
381 },
382 | None => {
383 rest = None;
384 Some(current)
385 },
386 }
387 })
388}
389
390fn sha256_hex(digest: &[u8]) -> String {
392 const HEX: &[u8; 16] = b"0123456789abcdef";
393
394 let mut out = String::with_capacity(digest.len().saturating_mul(2));
395 for &byte in digest {
396 out.push(char::from(HEX[usize::from(byte >> 4)]));
397 out.push(char::from(HEX[usize::from(byte & 0x0F)]));
398 }
399
400 out
401}
402
403pub(super) async fn migrate_conduit_pdus(services: &Services) -> Result {
411 let db = &services.db;
412
413 let rooms: BTreeMap<u64, OwnedRoomId> = db["roomid_shortroomid"]
416 .stream()
417 .ignore_err()
418 .map(|(room_id, short): (&RoomId, u64)| (short, room_id.to_owned()))
419 .collect()
420 .await;
421
422 warn!("Ensuring stored PDUs carry their room_id field...");
423 let cork = db.cork_and_sync();
424
425 let pduid_pdu = &db["pduid_pdu"];
426 let timeline = pduid_pdu
427 .raw_stream()
428 .ignore_err()
429 .ready_fold((0_usize, 0_usize), |acc, (key, value)| {
430 tally(acc, inject_room_id(pduid_pdu, key, value, |_| pduid_room(&rooms, key)))
431 })
432 .await;
433
434 let outlier = &db["eventid_outlierpdu"];
435 let outliers = outlier
436 .raw_stream()
437 .ignore_err()
438 .ready_fold((0_usize, 0_usize), |acc, (key, value)| {
439 tally(acc, inject_room_id(outlier, key, value, |pdu| outlier_room(key, pdu)))
440 })
441 .await;
442
443 drop(cork);
444
445 let fixed = timeline.0.saturating_add(outliers.0);
446 let skipped = timeline.1.saturating_add(outliers.1);
447 if skipped > 0 {
448 warn!(%fixed, %skipped, "Injected room_id into stored PDUs; some were skipped");
449 } else {
450 info!(%fixed, "Ensured stored PDUs carry room_id");
451 }
452
453 Ok(())
454}
455
456fn tally((fixed, skipped): (usize, usize), result: Result<bool>) -> (usize, usize) {
457 match result {
458 | Ok(true) => (fixed.saturating_add(1), skipped),
459 | Ok(false) => (fixed, skipped),
460 | Err(e) => {
461 debug_warn!(error = %e, "skipping unreconcilable Conduit PDU");
462 (fixed, skipped.saturating_add(1))
463 },
464 }
465}
466
467fn inject_room_id(
472 map: &Arc<Map>,
473 key: &[u8],
474 value: &[u8],
475 resolve: impl FnOnce(&CanonicalJsonObject) -> Result<OwnedRoomId>,
476) -> Result<bool> {
477 let probe: HasRoomId = serde_json::from_slice(value)
478 .map_err(|e| err!(Database("Conduit PDU is not canonical JSON: {e}")))?;
479
480 if probe.room_id.is_some() {
481 return Ok(false);
482 }
483
484 let mut pdu: CanonicalJsonObject = serde_json::from_slice(value)
485 .map_err(|e| err!(Database("Conduit PDU is not canonical JSON: {e}")))?;
486
487 let room_id = resolve(&pdu)?;
488 pdu.insert("room_id".into(), CanonicalJsonValue::String(room_id.as_str().into()));
489
490 let bytes = serde_json::to_vec(&pdu)
491 .map_err(|e| err!(Database("re-serializing reconciled Conduit PDU: {e}")))?;
492
493 map.insert(key, bytes);
494
495 Ok(true)
496}
497
498fn pduid_room(rooms: &BTreeMap<u64, OwnedRoomId>, key: &[u8]) -> Result<OwnedRoomId> {
500 let short = key
501 .get(..8)
502 .ok_or_else(|| err!(Database("Conduit pduid is shorter than a short room id")))?;
503
504 rooms
505 .get(&utils::u64_from_u8(short))
506 .cloned()
507 .ok_or_else(|| err!(Database("Conduit pduid short room id maps to no room")))
508}
509
510fn outlier_room(key: &[u8], pdu: &CanonicalJsonObject) -> Result<OwnedRoomId> {
514 let is_create = matches!(
515 pdu.get("type"),
516 Some(CanonicalJsonValue::String(kind)) if kind == "m.room.create"
517 );
518
519 if !is_create {
520 return Err!(Database("Conduit outlier lacks room_id and is not a create event"));
521 }
522
523 let event_id = <&EventId>::try_from(str::from_utf8(key)?)
524 .map_err(|_| err!(Database("Conduit outlier key is not a valid event id")))?;
525
526 RoomId::new_v2(event_id.localpart())
527 .map_err(|e| err!(Database("deriving room id from create event id: {e}")))
528}
529
530pub(super) async fn migrate_conduit_knocks(services: &Services) -> Result {
537 let knocks = copy_cf(services, "roomuserid_knockcount", "roomuserid_knockedcount").await?;
538 copy_cf(services, "userroomid_knockstate", "userroomid_knockedstate").await?;
539
540 if knocks > 0 {
541 warn!(%knocks, "Imported Conduit knocks");
542 }
543
544 Ok(())
545}
546
547pub(super) async fn migrate_conduit_highlight_split(services: &Services) -> Result {
558 let db = &services.db;
559 let highlight = db["userroomid_highlightcount"].clone();
560
561 if pin!(highlight.raw_keys_prefix(b"!"))
564 .next()
565 .await
566 .is_none()
567 {
568 return Ok(());
569 }
570
571 let lastread = db["roomuserid_lastnotificationread"].clone();
572 let cork = db.cork_and_sync();
573 let moved = highlight
574 .raw_stream()
575 .ignore_err()
576 .ready_fold(0_usize, |moved, (key, value)| {
577 if key.first() == Some(&b'!') {
578 lastread.insert(key, value);
579 highlight.remove(key);
580 moved.saturating_add(1)
581 } else {
582 moved
583 }
584 })
585 .await;
586
587 drop(cork);
588
589 if moved > 0 {
590 warn!(%moved, "Split Conduit last-notification-read rows out of the highlight-count column");
591 }
592
593 Ok(())
594}
595
596async fn copy_cf(
599 services: &Services,
600 source_name: &'static str,
601 target_name: &'static str,
602) -> Result<usize> {
603 let db = &services.db;
604 let Some(source) = db.open_cf(source_name)? else {
605 return Ok(0);
606 };
607
608 let target = &db[target_name];
609 let cork = db.cork_and_sync();
610 let copied = source
611 .raw_stream()
612 .ignore_err()
613 .ready_fold(0_usize, |copied, (key, value)| {
614 target.insert(key, value);
615 copied.saturating_add(1)
616 })
617 .await;
618
619 drop(cork);
620
621 Ok(copied)
622}
623
624#[cfg(test)]
625mod tests {
626 use std::path::Path;
627
628 use super::{
629 HasRoomId, conduit_media_key, conduit_media_path, parse_conduit_media_value, sha256_hex,
630 };
631
632 #[test]
633 fn conduit_media_path_deep_matches_conduit_default() {
634 let hex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
637 let path = conduit_media_path(Path::new("/db/media"), 2, 2, hex);
638
639 assert_eq!(
640 path,
641 Path::new(
642 "/db/media/01/23/456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
643 )
644 );
645 }
646
647 #[test]
648 fn conduit_media_path_flat_is_unsharded() {
649 let path = conduit_media_path(Path::new("/db/media"), 0, 2, "abcdef");
650
651 assert_eq!(path, Path::new("/db/media/abcdef"));
652 }
653
654 #[test]
655 fn conduit_media_key_deep_joins_shards_with_slash() {
656 let hex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
659 let key = conduit_media_key(2, 2, hex);
660
661 assert_eq!(key, "01/23/456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef");
662 }
663
664 #[test]
665 fn conduit_media_key_flat_is_bare_digest() {
666 assert_eq!(conduit_media_key(0, 2, "abcdef"), "abcdef");
667 }
668
669 #[test]
670 fn sha256_hex_encodes_lowercase_padded() {
671 assert_eq!(sha256_hex(&[0x00, 0x0F, 0xFF, 0xA5]), "000fffa5");
672 }
673
674 #[test]
675 fn conduit_media_value_ignores_unauthenticated_flag() {
676 let mut value = vec![7_u8; 32];
678 value.extend_from_slice(b"pic.png");
679 value.push(0xFF);
680 value.extend_from_slice(b"image/png");
681 value.push(0xFF);
682
683 let (sha256, filename, content_type) = parse_conduit_media_value(&value).unwrap();
684
685 assert_eq!(sha256, [7_u8; 32].as_slice());
686 assert_eq!(filename, Some("pic.png"));
687 assert_eq!(content_type, Some("image/png"));
688 }
689
690 #[test]
691 fn conduit_media_value_empty_filename_is_none() {
692 let mut value = vec![0_u8; 32];
693 value.push(0xFF);
694 value.extend_from_slice(b"image/png");
695
696 let (_, filename, content_type) = parse_conduit_media_value(&value).unwrap();
697
698 assert_eq!(filename, None);
699 assert_eq!(content_type, Some("image/png"));
700 }
701
702 #[test]
703 fn has_room_id_probe_detects_presence() {
704 let with_room_id = br#"{"room_id":"!r:server","type":"m.room.message"}"#;
705 let without_room_id = br#"{"type":"m.room.create","sender":"@u:server"}"#;
706
707 let present: HasRoomId = serde_json::from_slice(with_room_id).unwrap();
708 let absent: HasRoomId = serde_json::from_slice(without_room_id).unwrap();
709
710 assert!(present.room_id.is_some());
711 assert!(absent.room_id.is_none());
712 }
713}