Skip to main content

tuwunel_service/migrations/
conduit.rs

1use std::{
2	collections::BTreeMap,
3	iter::from_fn,
4	path::{Path, PathBuf},
5	pin::pin,
6	sync::Arc,
7	time::Duration,
8};
9
10use bytes::Bytes;
11use futures::{StreamExt, TryStreamExt};
12use object_store::Error as ObjectStoreError;
13use ruma::{
14	CanonicalJsonObject, CanonicalJsonValue, EventId, Mxc, OwnedRoomId, OwnedUserId, RoomId,
15	ServerName, UserId,
16};
17use serde::{Deserialize, de::IgnoredAny};
18use tokio::time::sleep;
19use tuwunel_core::{
20	Err, Error, Result, debug_warn, err, error, info,
21	itertools::Itertools,
22	utils,
23	utils::{ReadyExt, content_disposition::make_content_disposition, stream::TryIgnore},
24	warn,
25};
26use tuwunel_database::{Map, SEP};
27
28use crate::{Services, storage::Provider};
29
30/// Presence probe: `room_id` parses to `Some` when the stored PDU carries it.
31#[derive(Deserialize)]
32struct HasRoomId {
33	room_id: Option<IgnoredAny>,
34}
35
36/// Where a Conduit database kept its original media files: on the local
37/// filesystem (the default), or in an object store named by
38/// `conduit_source_media_provider` for a Conduit that backed its media with S3.
39enum MediaSource {
40	Filesystem(PathBuf),
41	Provider(Arc<Provider>),
42}
43
44/// One parsed `servernamemediaid_metadata` entry, borrowing the raw key/value.
45struct ConduitMediaEntry<'a> {
46	server_name: &'a ServerName,
47	media_id: &'a str,
48	sha256: &'a [u8],
49	filename: Option<&'a str>,
50	content_type: Option<&'a str>,
51}
52
53/// Resolves the configured Conduit media source. A named provider must already
54/// be defined under `[storage_provider]`; its absence is an operator error that
55/// aborts the import rather than silently dropping every file.
56fn media_source(services: &Services) -> Result<MediaSource> {
57	let config = &services.server.config;
58
59	match config.conduit_source_media_provider.as_deref() {
60		| Some(name) => services
61			.storage
62			.provider(name)
63			.map(|provider| MediaSource::Provider(provider.clone())),
64		| None => {
65			let media_dir = config
66				.conduit_source_media_path
67				.clone()
68				.unwrap_or_else(|| config.database_path.join("media"));
69
70			Ok(MediaSource::Filesystem(media_dir))
71		},
72	}
73}
74
75/// Attempts to read each original from a source storage provider before the
76/// import gives up: one initial try plus one retry. The object store performs
77/// its own internal retries under each attempt, so a transient provider blip
78/// rarely reaches this outer limit.
79const PROVIDER_READ_ATTEMPTS: u32 = 2;
80
81/// Pause between provider read retries, giving a transient fault time to clear.
82const PROVIDER_READ_RETRY_DELAY: Duration = Duration::from_secs(2);
83
84/// Imports the original media files of a Conduit database, re-uploading each
85/// `servernamemediaid_metadata` entry through `media.create`. A malformed entry
86/// or a missing source file is logged and skipped, but a source storage
87/// provider that stays unreachable aborts the import: nothing is committed in a
88/// way that needs cleanup, so the operator can fix the provider and restart to
89/// resume from the beginning (re-importing an already-copied original is
90/// idempotent).
91pub(super) async fn migrate_conduit_media(services: &Services) -> Result {
92	let db = &services.db;
93	let config = &services.server.config;
94
95	let Some(metadata) = db.open_cf("servernamemediaid_metadata")? else {
96		warn!("Conduit database has no media metadata; nothing to import.");
97		return Ok(());
98	};
99
100	let owners = db.open_cf("servernamemediaid_userlocalpart")?;
101	let owners = owners.as_ref();
102
103	let blocklist = db.open_cf("blocked_servername_mediaid")?;
104	let blocklist = blocklist.as_ref();
105
106	let depth = config.conduit_media_directory_depth;
107	let length = config.conduit_media_directory_length;
108	let source = media_source(services)?;
109
110	warn!("Importing Conduit media originals into tuwunel's key-addressed store...");
111
112	let cork = db.cork_and_sync();
113	let (imported, skipped, blocked) = metadata
114		.raw_stream()
115		.ignore_err()
116		.map(Ok::<_, Error>)
117		.try_fold(
118			(0_usize, 0_usize, 0_usize),
119			async |(imported, skipped, blocked), (key, value)| {
120				if conduit_media_blocked(blocklist, key).await? {
121					return Ok((imported, skipped, blocked.saturating_add(1)));
122				}
123
124				let imported_entry =
125					import_conduit_original(services, owners, &source, depth, length, key, value)
126						.await?;
127
128				Ok(if imported_entry {
129					(imported.saturating_add(1), skipped, blocked)
130				} else {
131					(imported, skipped.saturating_add(1), blocked)
132				})
133			},
134		)
135		.await?;
136
137	drop(cork);
138
139	if blocked > 0 {
140		warn!(%blocked, "Skipped Conduit media blocked by a moderator; not imported");
141	}
142
143	if skipped > 0 {
144		warn!(%imported, %skipped, "Imported Conduit media originals; some files were skipped");
145	} else {
146		info!(%imported, "Imported Conduit media originals");
147	}
148
149	Ok(())
150}
151
152/// Imports one `servernamemediaid_metadata` entry, returning whether an
153/// original was imported (`false` = skipped). The skip/abort contract is
154/// decided in [`read_conduit_original`].
155async fn import_conduit_original(
156	services: &Services,
157	owners: Option<&Arc<Map>>,
158	source: &MediaSource,
159	depth: u8,
160	length: u8,
161	key: &[u8],
162	value: &[u8],
163) -> Result<bool> {
164	let entry = match parse_conduit_media_entry(key, value) {
165		| Ok(entry) => entry,
166		| Err(e) => {
167			debug_warn!(error = %e, "skipping unimportable Conduit media entry");
168			return Ok(false);
169		},
170	};
171
172	let Some(file) = read_conduit_original(source, depth, length, entry.sha256).await? else {
173		return Ok(false);
174	};
175
176	let content_disposition = make_content_disposition(None, entry.content_type, entry.filename);
177	let owner = conduit_media_owner(owners, key, entry.server_name).await;
178	let mxc = Mxc {
179		server_name: entry.server_name,
180		media_id: entry.media_id,
181	};
182
183	match services
184		.media
185		.create(&mxc, owner.as_deref(), Some(&content_disposition), entry.content_type, &file)
186		.await
187	{
188		| Ok(()) => Ok(true),
189		| Err(e) => {
190			debug_warn!(error = %e, "skipping Conduit media entry that failed to store");
191			Ok(false)
192		},
193	}
194}
195
196/// Parses a `servernamemediaid_metadata` entry: the key is
197/// `servername 0xff media_id`, the value is the digest, filename and
198/// content type.
199fn parse_conduit_media_entry<'a>(
200	key: &'a [u8],
201	value: &'a [u8],
202) -> Result<ConduitMediaEntry<'a>> {
203	let Some(sep) = key.iter().position(|&byte| byte == SEP) else {
204		return Err!(Database("Conduit media key has no server-name separator"));
205	};
206	let server_name = <&ServerName>::try_from(str::from_utf8(&key[..sep])?)
207		.map_err(|_| err!(Database("Conduit media key has an invalid server name")))?;
208
209	let media_id = str::from_utf8(&key[sep.saturating_add(1)..])?;
210
211	let (sha256, filename, content_type) = parse_conduit_media_value(value)?;
212
213	Ok(ConduitMediaEntry {
214		server_name,
215		media_id,
216		sha256,
217		filename,
218		content_type,
219	})
220}
221
222/// Reads one Conduit original, named by its content digest, from the configured
223/// media source. A filesystem file that cannot be read is reported as `None`
224/// (skipped, like a dangling metadata row). A source storage provider is
225/// retried on a transient fault; a persistent one returns `Err` so the import
226/// aborts instead of dropping reachable media.
227async fn read_conduit_original(
228	source: &MediaSource,
229	depth: u8,
230	length: u8,
231	sha256: &[u8],
232) -> Result<Option<Bytes>> {
233	let sha256_hex = sha256_hex(sha256);
234	match source {
235		| MediaSource::Filesystem(media_dir) => {
236			let path = conduit_media_path(media_dir, depth, length, &sha256_hex);
237			match tokio::fs::read(&path).await {
238				| Ok(file) => Ok(Some(file.into())),
239				| Err(e) => {
240					debug_warn!(?path, error = %e, "skipping unreadable Conduit media file");
241					Ok(None)
242				},
243			}
244		},
245		| MediaSource::Provider(provider) =>
246			read_provider_original(provider, &conduit_media_key(depth, length, &sha256_hex)).await,
247	}
248}
249
250/// Reads one original from the source storage provider. An absent object is
251/// skipped (`Ok(None)`) like a dangling filesystem row; a transient fault is
252/// retried up to `PROVIDER_READ_ATTEMPTS` times, and a persistent one aborts
253/// the import with an `Err`.
254async fn read_provider_original(provider: &Arc<Provider>, key: &str) -> Result<Option<Bytes>> {
255	let mut attempt = 0_u32;
256	loop {
257		attempt = attempt.saturating_add(1);
258		match provider.get(key).await {
259			| Ok(file) => return Ok(Some(file)),
260			| Err(e) if is_missing_object(&e) => {
261				debug_warn!(%key, error = %e, "skipping missing Conduit media object");
262				return Ok(None);
263			},
264			| Err(e) if attempt >= PROVIDER_READ_ATTEMPTS => {
265				error!(
266					%key,
267					attempts = PROVIDER_READ_ATTEMPTS,
268					error = %e,
269					"Aborting the Conduit media import: source storage provider unreachable. No \
270					 media has been imported in a way that needs cleanup; once the provider is \
271					 reachable, restart tuwunel to resume the import from the beginning."
272				);
273				return Err(e);
274			},
275			| Err(e) => {
276				warn!(%key, attempt, error = %e, "Reading Conduit media object failed; retrying");
277				sleep(PROVIDER_READ_RETRY_DELAY).await;
278			},
279		}
280	}
281}
282
283/// Whether a provider read failed because the object is absent (a 404 or a
284/// dangling metadata row), which is skipped rather than retried. tuwunel's
285/// `Error::is_not_found` does not cover the object-store variant, so match it
286/// directly.
287fn is_missing_object(error: &Error) -> bool {
288	matches!(error, Error::ObjectStore(ObjectStoreError::NotFound { .. }))
289}
290
291/// Splits a `servernamemediaid_metadata` value into its digest, filename, and
292/// content type. The value is `sha256(32) | filename | 0xff | content_type`
293/// with an optional trailing `0xff` that Conduit's media-auth migration appends
294/// to flag unauthenticated access; that flag is ignored.
295fn parse_conduit_media_value(value: &[u8]) -> Result<(&[u8], Option<&str>, Option<&str>)> {
296	let (sha256, rest) = value
297		.split_at_checked(32)
298		.ok_or_else(|| err!(Database("Conduit media value shorter than a SHA-256 digest")))?;
299
300	// Take filename and content_type, ignoring the optional trailing 0xff flag.
301	let mut parts = rest.split(|&byte| byte == SEP);
302	let filename = parts.next().unwrap_or_default();
303	let Some(content_type) = parts.next() else {
304		return Err!(Database("Conduit media value has no content-type separator"));
305	};
306	let filename = str::from_utf8(filename)?;
307	let content_type = str::from_utf8(content_type)?;
308	let filename = (!filename.is_empty()).then_some(filename);
309	let content_type = (!content_type.is_empty()).then_some(content_type);
310
311	Ok((sha256, filename, content_type))
312}
313
314/// The local owner of a Conduit media entry from
315/// `servernamemediaid_userlocalpart`; `None` for remote media, which has no
316/// such entry.
317async fn conduit_media_owner(
318	owners: Option<&Arc<Map>>,
319	key: &[u8],
320	server_name: &ServerName,
321) -> Option<OwnedUserId> {
322	let localpart = owners?.get(key).await.ok()?;
323
324	UserId::parse_with_server_name(str::from_utf8(&localpart).ok()?, server_name).ok()
325}
326
327/// Whether a Conduit media entry was blocked by a moderator. Conduit keeps the
328/// file and refuses it only at read time (`blocked_servername_mediaid`);
329/// tuwunel has no per-media blocklist, so importing a blocked original would
330/// serve it again. The blocklist key is `server_name 0xff media_id`, the same
331/// bytes as the `servernamemediaid_metadata` key, so the entry's raw key probes
332/// it directly. Only a clean miss imports; a hard read error aborts the import
333/// (like an unreachable source) rather than silently re-serving blocked media.
334async fn conduit_media_blocked(blocklist: Option<&Arc<Map>>, key: &[u8]) -> Result<bool> {
335	let Some(blocklist) = blocklist else {
336		return Ok(false);
337	};
338
339	match blocklist.exists(key).await {
340		| Ok(()) => Ok(true),
341		| Err(e) if e.is_not_found() => Ok(false),
342		| Err(e) => Err(e),
343	}
344}
345
346/// Reconstructs the on-disk path of a Conduit content-addressed media file from
347/// the lowercase SHA-256 hex digest naming it, matching Conduit's
348/// `split_media_path`: `media_dir` joined with the digest's shard segments.
349fn conduit_media_path(media_dir: &Path, depth: u8, length: u8, sha256_hex: &str) -> PathBuf {
350	let mut path = media_dir.to_path_buf();
351	path.extend(conduit_shards(depth, length, sha256_hex));
352	path
353}
354
355/// The object-store key of a Conduit content-addressed media object, the same
356/// shard segments as `conduit_media_path` joined by `/`. The source provider's
357/// `base_path` supplies any bucket prefix (Conduit's `media.path`).
358fn conduit_media_key(depth: u8, length: u8, sha256_hex: &str) -> String {
359	conduit_shards(depth, length, sha256_hex).join("/")
360}
361
362/// Splits a lowercase SHA-256 hex digest into Conduit's shard segments: `depth`
363/// segments of `length` characters then the remainder, or the whole digest when
364/// `depth` is zero (a flat layout). `config::check` bounds `depth * length`
365/// below the digest length so the segments never overrun it.
366fn conduit_shards(depth: u8, length: u8, sha256_hex: &str) -> impl Iterator<Item = &str> {
367	let mut rest = Some(sha256_hex);
368	let mut remaining = depth;
369	from_fn(move || {
370		let current = rest?;
371		if remaining == 0 {
372			rest = None;
373			return Some(current);
374		}
375
376		remaining = remaining.saturating_sub(1);
377		match current.split_at_checked(length.into()) {
378			| Some((segment, next)) => {
379				rest = Some(next);
380				Some(segment)
381			},
382			| None => {
383				rest = None;
384				Some(current)
385			},
386		}
387	})
388}
389
390/// Lowercase hex digest, matching the names Conduit gives its media files.
391fn sha256_hex(digest: &[u8]) -> String {
392	const HEX: &[u8; 16] = b"0123456789abcdef";
393
394	let mut out = String::with_capacity(digest.len().saturating_mul(2));
395	for &byte in digest {
396		out.push(char::from(HEX[usize::from(byte >> 4)]));
397		out.push(char::from(HEX[usize::from(byte & 0x0F)]));
398	}
399
400	out
401}
402
403/// Injects `room_id` into stored PDUs that lack it. Runs once on every database
404/// (marker-gated by the caller); a native tuwunel DB always serializes the
405/// field, so it no-ops there. Only a room v12 (`hydra`) create event imported
406/// from Conduit omits it, deriving its room from the event's own id per
407/// MSC4291. Scans the `pduid_pdu` timeline (room from the key's leading short
408/// room id) and `eventid_outlierpdu` (room from the create event's own id, the
409/// outlier key).
410pub(super) async fn migrate_conduit_pdus(services: &Services) -> Result {
411	let db = &services.db;
412
413	// shortroomid -> room_id, inverted once so resolving each timeline PDU's
414	// room is a lookup rather than a scan of roomid_shortroomid.
415	let rooms: BTreeMap<u64, OwnedRoomId> = db["roomid_shortroomid"]
416		.stream()
417		.ignore_err()
418		.map(|(room_id, short): (&RoomId, u64)| (short, room_id.to_owned()))
419		.collect()
420		.await;
421
422	warn!("Ensuring stored PDUs carry their room_id field...");
423	let cork = db.cork_and_sync();
424
425	let pduid_pdu = &db["pduid_pdu"];
426	let timeline = pduid_pdu
427		.raw_stream()
428		.ignore_err()
429		.ready_fold((0_usize, 0_usize), |acc, (key, value)| {
430			tally(acc, inject_room_id(pduid_pdu, key, value, |_| pduid_room(&rooms, key)))
431		})
432		.await;
433
434	let outlier = &db["eventid_outlierpdu"];
435	let outliers = outlier
436		.raw_stream()
437		.ignore_err()
438		.ready_fold((0_usize, 0_usize), |acc, (key, value)| {
439			tally(acc, inject_room_id(outlier, key, value, |pdu| outlier_room(key, pdu)))
440		})
441		.await;
442
443	drop(cork);
444
445	let fixed = timeline.0.saturating_add(outliers.0);
446	let skipped = timeline.1.saturating_add(outliers.1);
447	if skipped > 0 {
448		warn!(%fixed, %skipped, "Injected room_id into stored PDUs; some were skipped");
449	} else {
450		info!(%fixed, "Ensured stored PDUs carry room_id");
451	}
452
453	Ok(())
454}
455
456fn tally((fixed, skipped): (usize, usize), result: Result<bool>) -> (usize, usize) {
457	match result {
458		| Ok(true) => (fixed.saturating_add(1), skipped),
459		| Ok(false) => (fixed, skipped),
460		| Err(e) => {
461			debug_warn!(error = %e, "skipping unreconcilable Conduit PDU");
462			(fixed, skipped.saturating_add(1))
463		},
464	}
465}
466
467/// Injects `room_id` into one PDU value that lacks it, sourcing the room from
468/// `resolve`. Returns whether the value was rewritten; `false` means it already
469/// carried a `room_id`. A cheap `HasRoomId` probe short-circuits that common
470/// case, so only the rewritten PDUs pay the full parse and re-serialize.
471fn inject_room_id(
472	map: &Arc<Map>,
473	key: &[u8],
474	value: &[u8],
475	resolve: impl FnOnce(&CanonicalJsonObject) -> Result<OwnedRoomId>,
476) -> Result<bool> {
477	let probe: HasRoomId = serde_json::from_slice(value)
478		.map_err(|e| err!(Database("Conduit PDU is not canonical JSON: {e}")))?;
479
480	if probe.room_id.is_some() {
481		return Ok(false);
482	}
483
484	let mut pdu: CanonicalJsonObject = serde_json::from_slice(value)
485		.map_err(|e| err!(Database("Conduit PDU is not canonical JSON: {e}")))?;
486
487	let room_id = resolve(&pdu)?;
488	pdu.insert("room_id".into(), CanonicalJsonValue::String(room_id.as_str().into()));
489
490	let bytes = serde_json::to_vec(&pdu)
491		.map_err(|e| err!(Database("re-serializing reconciled Conduit PDU: {e}")))?;
492
493	map.insert(key, bytes);
494
495	Ok(true)
496}
497
498/// The room of a `pduid_pdu` entry, from the short room id leading its key.
499fn pduid_room(rooms: &BTreeMap<u64, OwnedRoomId>, key: &[u8]) -> Result<OwnedRoomId> {
500	let short = key
501		.get(..8)
502		.ok_or_else(|| err!(Database("Conduit pduid is shorter than a short room id")))?;
503
504	rooms
505		.get(&utils::u64_from_u8(short))
506		.cloned()
507		.ok_or_else(|| err!(Database("Conduit pduid short room id maps to no room")))
508}
509
510/// The room of an `eventid_outlierpdu` entry that lacks `room_id`. Only a v12
511/// create event omits it, and its room id derives from the create event's own
512/// id, which is this outlier's key.
513fn outlier_room(key: &[u8], pdu: &CanonicalJsonObject) -> Result<OwnedRoomId> {
514	let is_create = matches!(
515		pdu.get("type"),
516		Some(CanonicalJsonValue::String(kind)) if kind == "m.room.create"
517	);
518
519	if !is_create {
520		return Err!(Database("Conduit outlier lacks room_id and is not a create event"));
521	}
522
523	let event_id = <&EventId>::try_from(str::from_utf8(key)?)
524		.map_err(|_| err!(Database("Conduit outlier key is not a valid event id")))?;
525
526	RoomId::new_v2(event_id.localpart())
527		.map_err(|e| err!(Database("deriving room id from create event id: {e}")))
528}
529
530/// Imports Conduit's pending knocks. Conduit names the columns
531/// `roomuserid_knockcount` / `userroomid_knockstate`; tuwunel renamed them to
532/// `*knocked*` but kept the byte layout (`room_id 0xff user_id` -> u64 count;
533/// `user_id 0xff room_id` -> JSON stripped state), so each row copies verbatim.
534/// Imported once: tuwunel clears a knock on the user's join or leave, so a
535/// re-import would resurrect a knock the user has already resolved.
536pub(super) async fn migrate_conduit_knocks(services: &Services) -> Result {
537	let knocks = copy_cf(services, "roomuserid_knockcount", "roomuserid_knockedcount").await?;
538	copy_cf(services, "userroomid_knockstate", "userroomid_knockedstate").await?;
539
540	if knocks > 0 {
541		warn!(%knocks, "Imported Conduit knocks");
542	}
543
544	Ok(())
545}
546
547/// Splits Conduit's conflated highlight-count column. Conduit opens
548/// `roomuserid_lastnotificationread` against the `userroomid_highlightcount`
549/// tree (a copy-paste in its schema), so one column holds both stores:
550/// highlight counts keyed `user_id 0xff room_id` and last-notification-read
551/// tokens keyed `room_id 0xff user_id`. tuwunel keeps the two in separate
552/// columns with those same byte layouts, so every room-keyed (last-read) row
553/// moves verbatim into `roomuserid_lastnotificationread`, leaving the
554/// user-keyed highlight rows in place. The orderings never collide: a user id
555/// leads with `@`, a room id with `!`. Absent any room-keyed row the column is
556/// not aliased, so this returns early and is safe to run on a native database.
557pub(super) async fn migrate_conduit_highlight_split(services: &Services) -> Result {
558	let db = &services.db;
559	let highlight = db["userroomid_highlightcount"].clone();
560
561	// A room-keyed (last-read) row leads with '!'; without one the column is a
562	// plain highlight column needing no split.
563	if pin!(highlight.raw_keys_prefix(b"!"))
564		.next()
565		.await
566		.is_none()
567	{
568		return Ok(());
569	}
570
571	let lastread = db["roomuserid_lastnotificationread"].clone();
572	let cork = db.cork_and_sync();
573	let moved = highlight
574		.raw_stream()
575		.ignore_err()
576		.ready_fold(0_usize, |moved, (key, value)| {
577			if key.first() == Some(&b'!') {
578				lastread.insert(key, value);
579				highlight.remove(key);
580				moved.saturating_add(1)
581			} else {
582				moved
583			}
584		})
585		.await;
586
587	drop(cork);
588
589	if moved > 0 {
590		warn!(%moved, "Split Conduit last-notification-read rows out of the highlight-count column");
591	}
592
593	Ok(())
594}
595
596/// Copies every row of one column verbatim into another whose key and value
597/// share the same byte layout, so neither needs reserialization.
598async fn copy_cf(
599	services: &Services,
600	source_name: &'static str,
601	target_name: &'static str,
602) -> Result<usize> {
603	let db = &services.db;
604	let Some(source) = db.open_cf(source_name)? else {
605		return Ok(0);
606	};
607
608	let target = &db[target_name];
609	let cork = db.cork_and_sync();
610	let copied = source
611		.raw_stream()
612		.ignore_err()
613		.ready_fold(0_usize, |copied, (key, value)| {
614			target.insert(key, value);
615			copied.saturating_add(1)
616		})
617		.await;
618
619	drop(cork);
620
621	Ok(copied)
622}
623
624#[cfg(test)]
625mod tests {
626	use std::path::Path;
627
628	use super::{
629		HasRoomId, conduit_media_key, conduit_media_path, parse_conduit_media_value, sha256_hex,
630	};
631
632	#[test]
633	fn conduit_media_path_deep_matches_conduit_default() {
634		// Conduit default Deep { length: 2, depth: 2 }: two 2-char segments of the
635		// 64-char digest, then the remaining 60 characters.
636		let hex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
637		let path = conduit_media_path(Path::new("/db/media"), 2, 2, hex);
638
639		assert_eq!(
640			path,
641			Path::new(
642				"/db/media/01/23/456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
643			)
644		);
645	}
646
647	#[test]
648	fn conduit_media_path_flat_is_unsharded() {
649		let path = conduit_media_path(Path::new("/db/media"), 0, 2, "abcdef");
650
651		assert_eq!(path, Path::new("/db/media/abcdef"));
652	}
653
654	#[test]
655	fn conduit_media_key_deep_joins_shards_with_slash() {
656		// The object key carries no media_dir; the source provider's base_path
657		// supplies any prefix. Same shard segments as the Deep on-disk path.
658		let hex = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
659		let key = conduit_media_key(2, 2, hex);
660
661		assert_eq!(key, "01/23/456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef");
662	}
663
664	#[test]
665	fn conduit_media_key_flat_is_bare_digest() {
666		assert_eq!(conduit_media_key(0, 2, "abcdef"), "abcdef");
667	}
668
669	#[test]
670	fn sha256_hex_encodes_lowercase_padded() {
671		assert_eq!(sha256_hex(&[0x00, 0x0F, 0xFF, 0xA5]), "000fffa5");
672	}
673
674	#[test]
675	fn conduit_media_value_ignores_unauthenticated_flag() {
676		// Conduit's media-auth migration appends a trailing 0xff after content_type.
677		let mut value = vec![7_u8; 32];
678		value.extend_from_slice(b"pic.png");
679		value.push(0xFF);
680		value.extend_from_slice(b"image/png");
681		value.push(0xFF);
682
683		let (sha256, filename, content_type) = parse_conduit_media_value(&value).unwrap();
684
685		assert_eq!(sha256, [7_u8; 32].as_slice());
686		assert_eq!(filename, Some("pic.png"));
687		assert_eq!(content_type, Some("image/png"));
688	}
689
690	#[test]
691	fn conduit_media_value_empty_filename_is_none() {
692		let mut value = vec![0_u8; 32];
693		value.push(0xFF);
694		value.extend_from_slice(b"image/png");
695
696		let (_, filename, content_type) = parse_conduit_media_value(&value).unwrap();
697
698		assert_eq!(filename, None);
699		assert_eq!(content_type, Some("image/png"));
700	}
701
702	#[test]
703	fn has_room_id_probe_detects_presence() {
704		let with_room_id = br#"{"room_id":"!r:server","type":"m.room.message"}"#;
705		let without_room_id = br#"{"type":"m.room.create","sender":"@u:server"}"#;
706
707		let present: HasRoomId = serde_json::from_slice(with_room_id).unwrap();
708		let absent: HasRoomId = serde_json::from_slice(without_room_id).unwrap();
709
710		assert!(present.room_id.is_some());
711		assert!(absent.room_id.is_none());
712	}
713}