Skip to main content

tuwunel_service/migrations/
mod.rs

1use std::cmp::{self, Ordering};
2
3use futures::{FutureExt, StreamExt};
4use ruma::{
5	MilliSecondsSinceUnixEpoch, MxcUri, OwnedRoomId, OwnedUserId, RoomId, UserId,
6	events::room::member::MembershipState,
7};
8use serde::{Deserialize, de::IgnoredAny};
9use tuwunel_core::{
10	Err, Result, debug, debug_info, debug_warn, err, info,
11	itertools::Itertools,
12	matrix::{PduCount, pdu::RawPduId},
13	result::NotFound,
14	utils,
15	utils::{
16		BoolExt, IterStream, ReadyExt,
17		stream::{TryExpect, TryIgnore},
18	},
19	warn,
20};
21use tuwunel_database::{Deserialized, Json, SEP};
22
23use crate::{Services, media, rooms::timeline::bias_count};
24
25mod conduit;
26mod moderation;
27
28/// The current schema version.
29/// - If database is opened at greater version we reject with error. The
30///   software must be updated for backward-incompatible changes.
31/// - If database is opened at lesser version we apply migrations up to this.
32///   Note that named-feature migrations may also be performed when opening at
33///   equal or lesser version. These are expected to be backward-compatible.
34pub(crate) const DATABASE_VERSION: u64 = 17;
35
36const SERVER_NAME_KEY: &[u8] = b"server_name";
37
38/// A marker written by a sibling conduwuit-lineage server but never by tuwunel.
39/// Its presence identifies a foreign database at a higher schema number even
40/// after tuwunel has stamped its own `server_name`, so a database opened by
41/// both servers in turn keeps booting rather than being refused as too new.
42const FOREIGN_LINEAGE_MARKER: &[u8] = b"populate_userroomid_leftstate_table";
43
44pub(crate) async fn migrations(services: &Services) -> Result {
45	if !services.config.database_migrations {
46		warn!("Skipping database migrations due to configuration...");
47		return Ok(());
48	}
49
50	let users_count = services.users.count().await;
51	if users_count == 0 {
52		return fresh(services).await;
53	}
54
55	// Computed before check_server_name backfills SERVER_NAME_KEY, which would
56	// otherwise mask a Conduit-lineage database (it carries no foreign marker).
57	let foreign_lineage = is_foreign_lineage(services).await;
58
59	check_database_version(services, foreign_lineage).await?;
60	check_server_name(services).await?;
61	migrate(services, foreign_lineage).await
62}
63
64/// Whether the database comes from a foreign (non-tuwunel) lineage: it predates
65/// our SERVER_NAME_KEY stamp, or carries a conduwuit-lineage migration marker
66/// that persists even after we stamp ours. Must be read before the server_name
67/// backfill, which removes the first signal.
68async fn is_foreign_lineage(services: &Services) -> bool {
69	let global = &services.db["global"];
70
71	global.get(SERVER_NAME_KEY).await.is_not_found()
72		|| global.get(FOREIGN_LINEAGE_MARKER).await.is_ok()
73}
74
75/// Gate the discovered schema version before migrations and the server_name
76/// backfill run. The integer is comparable only within tuwunel's own lineage; a
77/// foreign database (Conduit and forks) numbers schema on a colliding ladder
78/// and is recognized as foreign by [`is_foreign_lineage`], so its number is not
79/// gated. Within our lineage a version below 13 is refused as unmigratable and
80/// one above this build as too new to open safely; force_migration overrides
81/// the latter for a deliberate downgrade.
82async fn check_database_version(services: &Services, foreign_lineage: bool) -> Result {
83	let discovered = services.globals.db.database_version().await;
84
85	if discovered < 13 {
86		return Err!(Database("Database schema version {discovered} is no longer supported"));
87	}
88
89	if discovered > DATABASE_VERSION && !foreign_lineage && !services.config.force_migration {
90		return Err!(Database(
91			"Database schema version {discovered} is newer than this build supports \
92			 ({DATABASE_VERSION}). Upgrade tuwunel, or set force_migration = true to open it \
93			 anyway; a downgrade may cause permanent data loss."
94		));
95	}
96
97	Ok(())
98}
99
100/// Matrix resource ownership is based on the server name; changing it
101/// requires recreating the database from scratch. The marker is stamped
102/// once in fresh(); pre-marker databases are backfilled by probing for
103/// any user from the configured server.
104async fn check_server_name(services: &Services) -> Result {
105	let server_name = &services.server.name;
106
107	let existing = services.db["global"]
108		.get(SERVER_NAME_KEY)
109		.await
110		.deserialized::<String>();
111
112	match existing {
113		| Err(_) => backfill_server_name(services).await,
114		| Ok(existing) if existing.eq(server_name) => Ok(()),
115		| Ok(existing) => Err!(Database(
116			"Database belongs to {existing}; configured server name is {server_name}. Cannot \
117			 reuse."
118		)),
119	}
120}
121
122/// Stamp the marker on a database that pre-dates SERVER_NAME_KEY by probing
123/// for any user from the configured server. If none, the database belongs
124/// to a different server and reuse is refused.
125async fn backfill_server_name(services: &Services) -> Result {
126	let server_name = &services.server.name;
127
128	services
129		.users
130		.stream()
131		.ready_any(|user_id| services.globals.user_is_local(user_id))
132		.await
133		.into_option()
134		.ok_or_else(|| {
135			err!(Database(
136				"Database has no users from {server_name}; refusing to reuse with this \
137				 server_name."
138			))
139		})?;
140
141	services.db["global"].insert(SERVER_NAME_KEY, server_name.as_str());
142	info!(%server_name, "Stamped server_name marker on upgraded database");
143
144	Ok(())
145}
146
147async fn fresh(services: &Services) -> Result {
148	let db = &services.db;
149
150	services
151		.globals
152		.db
153		.bump_database_version(DATABASE_VERSION);
154
155	db["global"].insert(SERVER_NAME_KEY, services.server.name.as_str());
156	db["global"].insert(b"feat_sha256_media", []);
157	db["global"].insert(b"fix_pdu_missing_room_id", []);
158	db["global"].insert(b"fix_bad_double_separator_in_state_cache", []);
159	db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []);
160	db["global"].insert(b"fix_referencedevents_missing_sep", []);
161	db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []);
162	db["global"].insert(b"fix_hashed_sentinel_passwords", []);
163	db["global"].insert(b"upgrade_legacy_mediaid_user", []);
164	db["global"].insert(b"remove_remote_media_userid", []);
165	db["global"].insert(b"rebuild_roomid_tscount_pducount", []);
166	db["global"].insert(b"rebuild_relatesto_typed", []);
167	db["global"].insert(b"migrate_profile_keys_to_useridprofilekey", []);
168
169	// Create the admin room and server user on first run
170	if services.config.create_admin_room {
171		crate::admin::create_admin_room(services)
172			.boxed()
173			.await?;
174	}
175
176	warn!("Created new RocksDB database with version {DATABASE_VERSION}");
177
178	Ok(())
179}
180
181/// Apply any migrations
182#[expect(clippy::too_many_lines)]
183async fn migrate(services: &Services, foreign_lineage: bool) -> Result {
184	let db = &services.db;
185
186	let target_version = DATABASE_VERSION;
187	let discovered = services.globals.db.database_version().await;
188
189	// Claim our schema version up front when importing a foreign database
190	// numbered above ours (e.g. Conduit at 18). Stamping only at the end would
191	// leave an aborted import unbootable: the server_name backfill has already
192	// run, so a restart no longer sees the database as foreign and the version
193	// gate refuses it. The per-step markers below remain the real idempotency
194	// gates, so an aborted import still resumes where it left off.
195	if foreign_lineage && discovered > target_version {
196		services
197			.globals
198			.db
199			.bump_database_version(target_version);
200	}
201
202	migrate_media(services).await?;
203
204	if db["global"]
205		.get(b"fix_pdu_missing_room_id")
206		.await
207		.is_not_found()
208	{
209		conduit::migrate_conduit_pdus(services).await?;
210		db["global"].insert(b"fix_pdu_missing_room_id", []);
211	}
212
213	import_conduit_knocks(services).await?;
214	split_conduit_highlight_counts(services).await?;
215
216	if db["global"]
217		.get(b"fix_bad_double_separator_in_state_cache")
218		.await
219		.is_not_found()
220	{
221		fix_bad_double_separator_in_state_cache(services).await?;
222	}
223
224	if db["global"]
225		.get(b"retroactively_fix_bad_data_from_roomuserid_joined")
226		.await
227		.is_not_found()
228	{
229		retroactively_fix_bad_data_from_roomuserid_joined(services).await?;
230	}
231
232	if db["global"]
233		.get(b"fix_referencedevents_missing_sep")
234		.await
235		.is_not_found()
236	{
237		fix_referencedevents_missing_sep(services).await?;
238	}
239
240	if db["global"]
241		.get(b"fix_readreceiptid_readreceipt_duplicates")
242		.await
243		.is_not_found()
244	{
245		fix_readreceiptid_readreceipt_duplicates(services).await?;
246	}
247
248	if db["global"]
249		.get(b"fix_hashed_sentinel_passwords")
250		.await
251		.is_not_found()
252	{
253		fix_hashed_sentinel_passwords(services).await?;
254	}
255
256	if db["global"]
257		.get(b"upgrade_legacy_mediaid_user")
258		.await
259		.is_not_found()
260	{
261		upgrade_legacy_mediaid_user(services).await?;
262	}
263
264	if db["global"]
265		.get(b"remove_remote_media_userid")
266		.await
267		.is_not_found()
268	{
269		remove_remote_media_userid(services).await?;
270	}
271
272	if db["global"]
273		.get(b"rebuild_roomid_tscount_pducount")
274		.await
275		.is_not_found()
276	{
277		rebuild_roomid_tscount_pducount(services).await?;
278	}
279
280	if db["global"]
281		.get(b"rebuild_relatesto_typed")
282		.await
283		.is_not_found()
284	{
285		services
286			.pdu_metadata
287			.rebuild_typed_relations()
288			.await?;
289
290		db["global"].insert(b"rebuild_relatesto_typed", []);
291	}
292
293	if db["global"]
294		.get(b"migrate_profile_keys_to_useridprofilekey")
295		.await
296		.is_not_found()
297	{
298		migrate_profile_keys(services).await?;
299	}
300
301	// Non-destructive and idempotent, so it runs every boot rather than once: a
302	// suspension added by an origin server after a prior tuwunel boot still
303	// carries on the next one.
304	moderation::migrate_moderation(services).await?;
305
306	// A newer same-lineage database was already refused; stamping ours is safe. A
307	// foreign import above our version was already stamped down before the import
308	// ran, so this is a no-op for it.
309	services
310		.globals
311		.db
312		.bump_database_version(target_version);
313
314	match discovered.cmp(&target_version) {
315		| Ordering::Less =>
316			info!("Database: migrated schema version from {discovered} to {target_version}."),
317		| Ordering::Greater => warn!(
318			"Database: stamped schema version {target_version} over a higher discovered version \
319			 {discovered} (forced downgrade or foreign import)."
320		),
321		| Ordering::Equal => {},
322	}
323
324	if !services.config.forbidden_usernames.is_empty() {
325		services
326			.users
327			.stream()
328			.filter(|user_id| services.users.is_active_local(user_id))
329			.ready_filter_map(|user_id| {
330				let patterns = &services.config.forbidden_usernames;
331				let matches = patterns.matches(user_id.localpart());
332				let matched = matches
333					.iter()
334					.map(|x| &patterns.patterns()[x])
335					.join(", ");
336
337				matches
338					.matched_any()
339					.then_some((user_id, matched))
340			})
341			.ready_for_each(|(user_id, matched)| {
342				warn!("User {user_id} matches forbidden username patterns: {matched:#?}");
343			})
344			.await;
345	}
346
347	if !services.config.forbidden_alias_names.is_empty() {
348		services
349			.metadata
350			.iter_ids()
351			.map(|room_id| {
352				services
353					.alias
354					.local_aliases_for_room(room_id)
355					.map(move |alias| (room_id, alias))
356			})
357			.flatten()
358			.ready_filter_map(|(room_id, room_alias)| {
359				let patterns = &services.config.forbidden_alias_names;
360				let matches = patterns.matches(room_alias.alias());
361				let matched = matches
362					.iter()
363					.map(|x| &patterns.patterns()[x])
364					.join(", ");
365
366				matches
367					.matched_any()
368					.then_some((room_id, room_alias, matched))
369			})
370			.ready_for_each(|(room_id, room_alias, matched)| {
371				warn!(
372					"Room {room_id} with alias {room_alias} matches the following forbidden \
373					 room name patterns: {matched}"
374				);
375			})
376			.boxed()
377			.await;
378	}
379
380	info!("Loaded RocksDB database with schema version {DATABASE_VERSION}");
381
382	Ok(())
383}
384
385/// Imports a Conduit database's pending knocks once. Gated on its own marker
386/// and the source column's presence, so it runs only for a Conduit database and
387/// only the first time; a re-import would resurrect a knock the user later
388/// resolved.
389async fn import_conduit_knocks(services: &Services) -> Result {
390	let db = &services.db;
391
392	let pending = db["global"]
393		.get(b"imported_conduit_knocks")
394		.await
395		.is_not_found();
396
397	if pending && db.open_cf("roomuserid_knockcount")?.is_some() {
398		conduit::migrate_conduit_knocks(services).await?;
399		db["global"].insert(b"imported_conduit_knocks", []);
400	}
401
402	Ok(())
403}
404
405/// Splits a Conduit database's conflated highlight-count column once. Conduit
406/// aliased `roomuserid_lastnotificationread` onto the
407/// `userroomid_highlightcount` tree, so one column holds both stores; tuwunel
408/// keeps them apart. Gated on its own marker; the split itself returns early
409/// unless a room-keyed row is present, so it is a cheap no-op on a native
410/// database.
411async fn split_conduit_highlight_counts(services: &Services) -> Result {
412	let db = &services.db;
413
414	if db["global"]
415		.get(b"split_conduit_highlight")
416		.await
417		.is_not_found()
418	{
419		conduit::migrate_conduit_highlight_split(services).await?;
420		db["global"].insert(b"split_conduit_highlight", []);
421	}
422
423	Ok(())
424}
425
426/// Imports a Conduit database's content-addressed media into tuwunel's
427/// key-addressed store when it is present and not yet imported; otherwise runs
428/// the key-addressed media migrations.
429async fn migrate_media(services: &Services) -> Result {
430	let db = &services.db;
431	let config = &services.server.config;
432
433	let sha256_done = !db["global"]
434		.get(b"feat_sha256_media")
435		.await
436		.is_not_found();
437
438	// The foreign CF persists, so the marker (not its presence) is the latch.
439	if !sha256_done
440		&& db
441			.open_cf("servernamemediaid_metadata")?
442			.is_some()
443	{
444		conduit::migrate_conduit_media(services).await?;
445		db["global"].insert(b"feat_sha256_media", []);
446		return Ok(());
447	}
448
449	if !sha256_done {
450		media::migrations::migrate_sha256_media(services).await?;
451	} else if config.media_startup_check {
452		media::migrations::checkup_sha256_media(services).await?;
453	}
454
455	Ok(())
456}
457
458async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result {
459	warn!("Fixing bad double separator in state_cache roomuserid_joined");
460
461	let db = &services.db;
462	let roomuserid_joined = &db["roomuserid_joined"];
463	let _cork = db.cork_and_sync();
464
465	let mut iter_count: usize = 0;
466	roomuserid_joined
467		.raw_stream()
468		.ignore_err()
469		.ready_for_each(|(key, value)| {
470			let mut key = key.to_vec();
471			iter_count = iter_count.saturating_add(1);
472			debug_info!(%iter_count);
473			let Some(first_sep_index) = key.iter().position(|&i| i == 0xFF) else {
474				debug_warn!(?key, "roomuserid_joined key has no 0xFF separator; skipping");
475				return;
476			};
477
478			if key
479				.iter()
480				.get(first_sep_index..=first_sep_index.saturating_add(1))
481				.copied()
482				.collect_vec()
483				== vec![0xFF, 0xFF]
484			{
485				debug_warn!("Found bad key: {key:?}");
486				roomuserid_joined.remove(&key);
487
488				key.remove(first_sep_index);
489				debug_warn!("Fixed key: {key:?}");
490				roomuserid_joined.insert(&key, value);
491			}
492		})
493		.await;
494
495	db.engine.sort()?;
496	db["global"].insert(b"fix_bad_double_separator_in_state_cache", []);
497
498	info!("Finished fixing");
499	Ok(())
500}
501
502async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) -> Result {
503	warn!("Retroactively fixing bad data from broken roomuserid_joined");
504
505	let db = &services.db;
506	let _cork = db.cork_and_sync();
507
508	let room_ids = services
509		.metadata
510		.iter_ids()
511		.map(ToOwned::to_owned)
512		.collect::<Vec<_>>()
513		.await;
514
515	for room_id in &room_ids {
516		debug_info!("Fixing room {room_id}");
517
518		let users_in_room: Vec<OwnedUserId> = services
519			.state_cache
520			.room_members(room_id)
521			.map(ToOwned::to_owned)
522			.collect()
523			.await;
524
525		let joined_members = users_in_room
526			.iter()
527			.stream()
528			.filter(|user_id| {
529				services
530					.state_accessor
531					.get_member(room_id, user_id)
532					.map(|member| {
533						member.is_ok_and(|member| member.membership == MembershipState::Join)
534					})
535			})
536			.collect::<Vec<_>>()
537			.await;
538
539		let non_joined_members = users_in_room
540			.iter()
541			.stream()
542			.filter(|user_id| {
543				services
544					.state_accessor
545					.get_member(room_id, user_id)
546					.map(|member| {
547						member.is_ok_and(|member| member.membership != MembershipState::Join)
548					})
549			})
550			.collect::<Vec<_>>()
551			.await;
552
553		for user_id in &joined_members {
554			debug_info!("User is joined, marking as joined");
555			let count = services.globals.next_count();
556			services
557				.state_cache
558				.mark_as_joined(user_id, room_id, PduCount::Normal(*count));
559		}
560
561		for user_id in &non_joined_members {
562			debug_info!("User is left or banned, marking as left");
563			let count = services.globals.next_count();
564			services
565				.state_cache
566				.mark_as_left(user_id, room_id, PduCount::Normal(*count));
567		}
568	}
569
570	for room_id in &room_ids {
571		debug_info!(
572			"Updating joined count for room {room_id} to fix servers in room after correcting \
573			 membership states"
574		);
575
576		services
577			.state_cache
578			.update_joined_count(room_id)
579			.await;
580	}
581
582	db.engine.sort()?;
583	db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []);
584
585	info!("Finished fixing");
586	Ok(())
587}
588
589async fn fix_referencedevents_missing_sep(services: &Services) -> Result {
590	warn!("Fixing missing record separator between room_id and event_id in referencedevents");
591
592	let db = &services.db;
593	let cork = db.cork_and_sync();
594
595	let referencedevents = db["referencedevents"].clone();
596
597	let totals: (usize, usize) = (0, 0);
598	let (total, fixed) = referencedevents
599		.raw_stream()
600		.expect_ok()
601		.enumerate()
602		.ready_fold(totals, |mut a, (i, (key, val))| {
603			debug_assert!(val.is_empty(), "expected no value");
604
605			let has_sep = key.contains(&SEP);
606
607			if !has_sep {
608				let key_str = std::str::from_utf8(key).expect("key not utf-8");
609				let room_id_len = key_str.find('$').expect("missing '$' in key");
610				let (room_id, event_id) = key_str.split_at(room_id_len);
611				debug!(?a, "fixing {room_id}, {event_id}");
612
613				let new_key = (room_id, event_id);
614				referencedevents.put_raw(new_key, val);
615				referencedevents.remove(key);
616			}
617
618			a.0 = cmp::max(i, a.0);
619			a.1 = a.1.saturating_add((!has_sep).into());
620			a
621		})
622		.await;
623
624	drop(cork);
625	info!(?total, ?fixed, "Fixed missing record separators in 'referencedevents'.");
626
627	db["global"].insert(b"fix_referencedevents_missing_sep", []);
628	db.engine.sort()
629}
630
631async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result {
632	use ruma::identifiers_validation::ID_MAX_BYTES;
633	use tuwunel_core::arrayvec::ArrayString;
634
635	type ArrayId = ArrayString<ID_MAX_BYTES>;
636	type Key<'a> = (&'a RoomId, u64, &'a UserId);
637
638	warn!("Fixing undeleted entries in readreceiptid_readreceipt...");
639
640	let db = &services.db;
641	let cork = db.cork_and_sync();
642	let readreceiptid_readreceipt = db["readreceiptid_readreceipt"].clone();
643
644	let mut cur_room: Option<ArrayId> = None;
645	let mut cur_user: Option<ArrayId> = None;
646	let (mut total, mut fixed): (usize, usize) = (0, 0);
647	readreceiptid_readreceipt
648		.keys()
649		.expect_ok()
650		.ready_for_each(|key: Key<'_>| {
651			let (room_id, _, user_id) = key;
652			let last_room = cur_room.replace(
653				room_id
654					.as_str()
655					.try_into()
656					.expect("invalid room_id in database"),
657			);
658
659			let last_user = cur_user.replace(
660				user_id
661					.as_str()
662					.try_into()
663					.expect("invalid user_id in database"),
664			);
665
666			let is_dup = cur_room == last_room && cur_user == last_user;
667			if is_dup {
668				readreceiptid_readreceipt.del(key);
669			}
670
671			fixed = fixed.saturating_add(is_dup.into());
672			total = total.saturating_add(1);
673		})
674		.await;
675
676	drop(cork);
677	info!(?total, ?fixed, "Fixed undeleted entries in readreceiptid_readreceipt.");
678
679	db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []);
680	db.engine.sort()
681}
682
683async fn fix_hashed_sentinel_passwords(services: &Services) -> Result {
684	use tuwunel_core::utils::hash::verify_password;
685
686	const PASSWORD_SENTINEL: &str = "*";
687
688	if services.config.identity_provider.is_empty() {
689		debug!("Skipping sentinel password migration since no SSO IdP configured.");
690		return Ok(());
691	}
692
693	let db = &services.db;
694	let cork = db.cork_and_sync();
695	let userid_password = db["userid_password"].clone();
696	let hashed_sentinel = utils::hash::password(PASSWORD_SENTINEL).map_err(|e| {
697		err!("Could not apply migration: failed to hash sentinel password: {e:?}")
698	})?;
699
700	warn!(
701		"Fixing occurrences of password-hash {hashed_sentinel:?} generated from \
702		 {PASSWORD_SENTINEL:?}"
703	);
704
705	let (checked, good, bad) = userid_password
706		.stream()
707		.expect_ok()
708		.ready_fold(
709			(0, 0, 0),
710			|(mut checked, mut good, mut bad): (usize, usize, usize),
711			 (key, val): (&str, &str)| {
712				let good_sentinel = val == PASSWORD_SENTINEL;
713				let bad_sentinel = !val.is_empty()
714					&& !good_sentinel
715					&& verify_password(PASSWORD_SENTINEL, val).is_ok();
716
717				checked = checked.saturating_add(usize::from(true));
718				good = good.saturating_add(usize::from(good_sentinel));
719				bad = bad.saturating_add(usize::from(bad_sentinel));
720
721				if bad_sentinel {
722					userid_password.insert(key, PASSWORD_SENTINEL);
723				}
724
725				(checked, good, bad)
726			},
727		)
728		.await;
729
730	drop(cork);
731	info!(?checked, ?good, ?bad, "Fixed any occurrences of hashed sentinel passwords");
732
733	db["global"].insert(b"fix_hashed_sentinel_passwords", []);
734	db.engine.sort()
735}
736
737async fn upgrade_legacy_mediaid_user(services: &Services) -> Result {
738	let db = &services.db;
739	let cork = db.cork_and_sync();
740	let mediaid_user = db["mediaid_user"].clone();
741
742	warn!("Upgrading legacy mediaid_user keys to composite (mxc, user_id) layout");
743
744	let (checked, upgraded, removed_invalid) = mediaid_user
745		.raw_stream()
746		.ignore_err()
747		.ready_fold(
748			(0_usize, 0_usize, 0_usize),
749			|(mut checked, mut upgraded, mut removed_invalid), (raw_key, raw_val)| {
750				checked = checked.saturating_add(1);
751
752				let has_sep = raw_key.contains(&SEP);
753				let user_id = str::from_utf8(raw_val)
754					.ok()
755					.and_then(|s| <&UserId>::try_from(s).ok());
756
757				match (has_sep, user_id) {
758					| (true, _) => {},
759					| (false, None) => {
760						warn!(
761							?raw_key,
762							?raw_val,
763							"Legacy entry has unparsable user_id, removing"
764						);
765
766						mediaid_user.remove(raw_key);
767						removed_invalid = removed_invalid.saturating_add(1);
768					},
769					| (false, Some(user_id)) => {
770						let mut new_key = raw_key.to_vec();
771						new_key.push(SEP);
772						new_key.extend_from_slice(user_id.as_bytes());
773
774						mediaid_user.put_raw(new_key, user_id.as_str());
775						mediaid_user.remove(raw_key);
776
777						upgraded = upgraded.saturating_add(1);
778					},
779				}
780
781				(checked, upgraded, removed_invalid)
782			},
783		)
784		.await;
785
786	drop(cork);
787	info!(
788		%checked,
789		%upgraded,
790		%removed_invalid,
791		"Upgraded legacy mediaid_user keys"
792	);
793
794	db["global"].insert(b"upgrade_legacy_mediaid_user", []);
795	db.engine.sort()
796}
797
798async fn remove_remote_media_userid(services: &Services) -> Result {
799	let db = &services.db;
800	let cork = db.cork_and_sync();
801	let mediaid_user = db["mediaid_user"].clone();
802
803	warn!("Removing stored user id for remote media");
804
805	let (checked, removed_remote, removed_invalid) = mediaid_user
806		.keys()
807		.expect_ok()
808		.ready_fold(
809			(0, 0, 0),
810			|(mut checked, mut removed_remote, mut removed_invalid): (usize, usize, usize),
811			 (mxc_uri, user_id): (&MxcUri, &UserId)| {
812				checked = checked.saturating_add(1);
813
814				let Ok(mxc) = mxc_uri.parts() else {
815					warn!(?mxc_uri, "Invalid MXC URL, removing it");
816
817					mediaid_user.del((mxc_uri, user_id));
818
819					removed_invalid = removed_invalid.saturating_add(1);
820
821					return (checked, removed_remote, removed_invalid);
822				};
823
824				if !services.globals.server_is_ours(mxc.server_name) {
825					mediaid_user.del((mxc_uri, user_id));
826
827					removed_remote = removed_remote.saturating_add(1);
828
829					return (checked, removed_remote, removed_invalid);
830				}
831
832				(checked, removed_remote, removed_invalid)
833			},
834		)
835		.await;
836
837	drop(cork);
838	info!(
839		%checked,
840		%removed_remote,
841		%removed_invalid,
842		"Removed stored user id for remote media"
843	);
844
845	db["global"].insert(b"remove_remote_media_userid", []);
846	db.engine.sort()
847}
848
849#[derive(Deserialize)]
850struct PduRoomTs {
851	room_id: OwnedRoomId,
852	origin_server_ts: MilliSecondsSinceUnixEpoch,
853}
854
855async fn rebuild_roomid_tscount_pducount(services: &Services) -> Result {
856	let db = &services.db;
857	let cork = db.cork_and_sync();
858	let pduid_pdu = db["pduid_pdu"].clone();
859	let roomid_tscount_pducount = db["roomid_tscount_pducount"].clone();
860
861	warn!("Rebuilding roomid_tscount_pducount index for same-timestamp event ordering");
862
863	let count = pduid_pdu
864		.raw_stream()
865		.ignore_err()
866		.ready_fold(0_usize, |count, (key, value)| {
867			let Ok(pdu) = serde_json::from_slice::<PduRoomTs>(value) else {
868				return count;
869			};
870
871			let ts = u64::from(pdu.origin_server_ts.get());
872			let pdu_id = RawPduId::from(key);
873			let count_key = bias_count(pdu_id.count());
874			let room_id: &RoomId = &pdu.room_id;
875
876			roomid_tscount_pducount.put_raw((room_id, ts, count_key), pdu_id.count());
877
878			count.saturating_add(1)
879		})
880		.await;
881
882	drop(cork);
883	info!(%count, "Rebuilt roomid_tscount_pducount index");
884
885	db["global"].insert(b"rebuild_roomid_tscount_pducount", []);
886	db.engine.sort()
887}
888
889/// Relocates the per-user displayname and avatar_url out of their dedicated
890/// columns into the unified useridprofilekey_value store keyed by MSC4133 field
891/// name, where the profile service now reads them. The dedicated columns are
892/// left intact, so an older binary opening the same database still resolves.
893async fn migrate_profile_keys(services: &Services) -> Result {
894	use ruma::profile::ProfileFieldName;
895
896	let db = &services.db;
897	let cork = db.cork_and_sync();
898
899	let userid_displayname = db["userid_displayname"].clone();
900	let userid_avatarurl = db["userid_avatarurl"].clone();
901	let userid_blurhash = db["userid_blurhash"].clone();
902	let useridprofilekey_value = db["useridprofilekey_value"].clone();
903
904	warn!(
905		"Relocating displaynames, avatar_urls and blurhashes into the unified profile-key store"
906	);
907
908	let displaynames = userid_displayname
909		.stream()
910		.expect_ok()
911		.ready_fold(0_usize, |count, (user_id, displayname): (&UserId, &str)| {
912			let key = (user_id, ProfileFieldName::DisplayName.as_str());
913			let value = displayname.to_owned();
914
915			useridprofilekey_value.put(key, Json(value));
916
917			count.saturating_add(1)
918		})
919		.await;
920
921	let avatar_urls = userid_avatarurl
922		.stream()
923		.expect_ok()
924		.ready_fold(0_usize, |count, (user_id, avatar_url): (&UserId, &str)| {
925			let key = (user_id, ProfileFieldName::AvatarUrl.as_str());
926			let value = avatar_url.to_owned();
927
928			useridprofilekey_value.put(key, Json(value));
929
930			count.saturating_add(1)
931		})
932		.await;
933
934	let blurhashes = userid_blurhash
935		.stream()
936		.expect_ok()
937		.ready_fold(0_usize, |count, (user_id, blurhash): (&UserId, &str)| {
938			let key = (user_id, "xyz.amorgan.blurhash");
939			let value = blurhash.to_owned();
940
941			useridprofilekey_value.put(key, Json(value));
942
943			count.saturating_add(1)
944		})
945		.await;
946
947	let fixed_strings = useridprofilekey_value
948		.raw_stream()
949		.expect_ok()
950		.ready_fold(0_usize, |count, (key, value)| {
951			if serde_json::from_slice::<IgnoredAny>(value).is_err() {
952				let Ok(string) = str::from_utf8(value) else {
953					warn!("Non-UTF8 data in profile value: {key:?} => {value:?}");
954					useridprofilekey_value.remove(key);
955					return count;
956				};
957				useridprofilekey_value.raw_put(key, Json(string));
958				return count.saturating_add(1);
959			}
960
961			count
962		})
963		.await;
964
965	drop(cork);
966	info!(%displaynames, %avatar_urls, %blurhashes, %fixed_strings, "Relocated profile keys into useridprofilekey_value");
967
968	db["global"].insert(b"migrate_profile_keys_to_useridprofilekey", []);
969	db.engine.sort()
970}