1use std::cmp::{self, Ordering};
2
3use futures::{FutureExt, StreamExt};
4use ruma::{
5 MilliSecondsSinceUnixEpoch, MxcUri, OwnedRoomId, OwnedUserId, RoomId, UserId,
6 events::room::member::MembershipState,
7};
8use serde::{Deserialize, de::IgnoredAny};
9use tuwunel_core::{
10 Err, Result, debug, debug_info, debug_warn, err, info,
11 itertools::Itertools,
12 matrix::{PduCount, pdu::RawPduId},
13 result::NotFound,
14 utils,
15 utils::{
16 BoolExt, IterStream, ReadyExt,
17 stream::{TryExpect, TryIgnore},
18 },
19 warn,
20};
21use tuwunel_database::{Deserialized, Json, SEP};
22
23use crate::{Services, media, rooms::timeline::bias_count};
24
25mod conduit;
26mod moderation;
27
28pub(crate) const DATABASE_VERSION: u64 = 17;
35
36const SERVER_NAME_KEY: &[u8] = b"server_name";
37
38const FOREIGN_LINEAGE_MARKER: &[u8] = b"populate_userroomid_leftstate_table";
43
44pub(crate) async fn migrations(services: &Services) -> Result {
45 if !services.config.database_migrations {
46 warn!("Skipping database migrations due to configuration...");
47 return Ok(());
48 }
49
50 let users_count = services.users.count().await;
51 if users_count == 0 {
52 return fresh(services).await;
53 }
54
55 let foreign_lineage = is_foreign_lineage(services).await;
58
59 check_database_version(services, foreign_lineage).await?;
60 check_server_name(services).await?;
61 migrate(services, foreign_lineage).await
62}
63
64async fn is_foreign_lineage(services: &Services) -> bool {
69 let global = &services.db["global"];
70
71 global.get(SERVER_NAME_KEY).await.is_not_found()
72 || global.get(FOREIGN_LINEAGE_MARKER).await.is_ok()
73}
74
75async fn check_database_version(services: &Services, foreign_lineage: bool) -> Result {
83 let discovered = services.globals.db.database_version().await;
84
85 if discovered < 13 {
86 return Err!(Database("Database schema version {discovered} is no longer supported"));
87 }
88
89 if discovered > DATABASE_VERSION && !foreign_lineage && !services.config.force_migration {
90 return Err!(Database(
91 "Database schema version {discovered} is newer than this build supports \
92 ({DATABASE_VERSION}). Upgrade tuwunel, or set force_migration = true to open it \
93 anyway; a downgrade may cause permanent data loss."
94 ));
95 }
96
97 Ok(())
98}
99
100async fn check_server_name(services: &Services) -> Result {
105 let server_name = &services.server.name;
106
107 let existing = services.db["global"]
108 .get(SERVER_NAME_KEY)
109 .await
110 .deserialized::<String>();
111
112 match existing {
113 | Err(_) => backfill_server_name(services).await,
114 | Ok(existing) if existing.eq(server_name) => Ok(()),
115 | Ok(existing) => Err!(Database(
116 "Database belongs to {existing}; configured server name is {server_name}. Cannot \
117 reuse."
118 )),
119 }
120}
121
122async fn backfill_server_name(services: &Services) -> Result {
126 let server_name = &services.server.name;
127
128 services
129 .users
130 .stream()
131 .ready_any(|user_id| services.globals.user_is_local(user_id))
132 .await
133 .into_option()
134 .ok_or_else(|| {
135 err!(Database(
136 "Database has no users from {server_name}; refusing to reuse with this \
137 server_name."
138 ))
139 })?;
140
141 services.db["global"].insert(SERVER_NAME_KEY, server_name.as_str());
142 info!(%server_name, "Stamped server_name marker on upgraded database");
143
144 Ok(())
145}
146
147async fn fresh(services: &Services) -> Result {
148 let db = &services.db;
149
150 services
151 .globals
152 .db
153 .bump_database_version(DATABASE_VERSION);
154
155 db["global"].insert(SERVER_NAME_KEY, services.server.name.as_str());
156 db["global"].insert(b"feat_sha256_media", []);
157 db["global"].insert(b"fix_pdu_missing_room_id", []);
158 db["global"].insert(b"fix_bad_double_separator_in_state_cache", []);
159 db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []);
160 db["global"].insert(b"fix_referencedevents_missing_sep", []);
161 db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []);
162 db["global"].insert(b"fix_hashed_sentinel_passwords", []);
163 db["global"].insert(b"upgrade_legacy_mediaid_user", []);
164 db["global"].insert(b"remove_remote_media_userid", []);
165 db["global"].insert(b"rebuild_roomid_tscount_pducount", []);
166 db["global"].insert(b"rebuild_relatesto_typed", []);
167 db["global"].insert(b"migrate_profile_keys_to_useridprofilekey", []);
168
169 if services.config.create_admin_room {
171 crate::admin::create_admin_room(services)
172 .boxed()
173 .await?;
174 }
175
176 warn!("Created new RocksDB database with version {DATABASE_VERSION}");
177
178 Ok(())
179}
180
181#[expect(clippy::too_many_lines)]
183async fn migrate(services: &Services, foreign_lineage: bool) -> Result {
184 let db = &services.db;
185
186 let target_version = DATABASE_VERSION;
187 let discovered = services.globals.db.database_version().await;
188
189 if foreign_lineage && discovered > target_version {
196 services
197 .globals
198 .db
199 .bump_database_version(target_version);
200 }
201
202 migrate_media(services).await?;
203
204 if db["global"]
205 .get(b"fix_pdu_missing_room_id")
206 .await
207 .is_not_found()
208 {
209 conduit::migrate_conduit_pdus(services).await?;
210 db["global"].insert(b"fix_pdu_missing_room_id", []);
211 }
212
213 import_conduit_knocks(services).await?;
214 split_conduit_highlight_counts(services).await?;
215
216 if db["global"]
217 .get(b"fix_bad_double_separator_in_state_cache")
218 .await
219 .is_not_found()
220 {
221 fix_bad_double_separator_in_state_cache(services).await?;
222 }
223
224 if db["global"]
225 .get(b"retroactively_fix_bad_data_from_roomuserid_joined")
226 .await
227 .is_not_found()
228 {
229 retroactively_fix_bad_data_from_roomuserid_joined(services).await?;
230 }
231
232 if db["global"]
233 .get(b"fix_referencedevents_missing_sep")
234 .await
235 .is_not_found()
236 {
237 fix_referencedevents_missing_sep(services).await?;
238 }
239
240 if db["global"]
241 .get(b"fix_readreceiptid_readreceipt_duplicates")
242 .await
243 .is_not_found()
244 {
245 fix_readreceiptid_readreceipt_duplicates(services).await?;
246 }
247
248 if db["global"]
249 .get(b"fix_hashed_sentinel_passwords")
250 .await
251 .is_not_found()
252 {
253 fix_hashed_sentinel_passwords(services).await?;
254 }
255
256 if db["global"]
257 .get(b"upgrade_legacy_mediaid_user")
258 .await
259 .is_not_found()
260 {
261 upgrade_legacy_mediaid_user(services).await?;
262 }
263
264 if db["global"]
265 .get(b"remove_remote_media_userid")
266 .await
267 .is_not_found()
268 {
269 remove_remote_media_userid(services).await?;
270 }
271
272 if db["global"]
273 .get(b"rebuild_roomid_tscount_pducount")
274 .await
275 .is_not_found()
276 {
277 rebuild_roomid_tscount_pducount(services).await?;
278 }
279
280 if db["global"]
281 .get(b"rebuild_relatesto_typed")
282 .await
283 .is_not_found()
284 {
285 services
286 .pdu_metadata
287 .rebuild_typed_relations()
288 .await?;
289
290 db["global"].insert(b"rebuild_relatesto_typed", []);
291 }
292
293 if db["global"]
294 .get(b"migrate_profile_keys_to_useridprofilekey")
295 .await
296 .is_not_found()
297 {
298 migrate_profile_keys(services).await?;
299 }
300
301 moderation::migrate_moderation(services).await?;
305
306 services
310 .globals
311 .db
312 .bump_database_version(target_version);
313
314 match discovered.cmp(&target_version) {
315 | Ordering::Less =>
316 info!("Database: migrated schema version from {discovered} to {target_version}."),
317 | Ordering::Greater => warn!(
318 "Database: stamped schema version {target_version} over a higher discovered version \
319 {discovered} (forced downgrade or foreign import)."
320 ),
321 | Ordering::Equal => {},
322 }
323
324 if !services.config.forbidden_usernames.is_empty() {
325 services
326 .users
327 .stream()
328 .filter(|user_id| services.users.is_active_local(user_id))
329 .ready_filter_map(|user_id| {
330 let patterns = &services.config.forbidden_usernames;
331 let matches = patterns.matches(user_id.localpart());
332 let matched = matches
333 .iter()
334 .map(|x| &patterns.patterns()[x])
335 .join(", ");
336
337 matches
338 .matched_any()
339 .then_some((user_id, matched))
340 })
341 .ready_for_each(|(user_id, matched)| {
342 warn!("User {user_id} matches forbidden username patterns: {matched:#?}");
343 })
344 .await;
345 }
346
347 if !services.config.forbidden_alias_names.is_empty() {
348 services
349 .metadata
350 .iter_ids()
351 .map(|room_id| {
352 services
353 .alias
354 .local_aliases_for_room(room_id)
355 .map(move |alias| (room_id, alias))
356 })
357 .flatten()
358 .ready_filter_map(|(room_id, room_alias)| {
359 let patterns = &services.config.forbidden_alias_names;
360 let matches = patterns.matches(room_alias.alias());
361 let matched = matches
362 .iter()
363 .map(|x| &patterns.patterns()[x])
364 .join(", ");
365
366 matches
367 .matched_any()
368 .then_some((room_id, room_alias, matched))
369 })
370 .ready_for_each(|(room_id, room_alias, matched)| {
371 warn!(
372 "Room {room_id} with alias {room_alias} matches the following forbidden \
373 room name patterns: {matched}"
374 );
375 })
376 .boxed()
377 .await;
378 }
379
380 info!("Loaded RocksDB database with schema version {DATABASE_VERSION}");
381
382 Ok(())
383}
384
385async fn import_conduit_knocks(services: &Services) -> Result {
390 let db = &services.db;
391
392 let pending = db["global"]
393 .get(b"imported_conduit_knocks")
394 .await
395 .is_not_found();
396
397 if pending && db.open_cf("roomuserid_knockcount")?.is_some() {
398 conduit::migrate_conduit_knocks(services).await?;
399 db["global"].insert(b"imported_conduit_knocks", []);
400 }
401
402 Ok(())
403}
404
405async fn split_conduit_highlight_counts(services: &Services) -> Result {
412 let db = &services.db;
413
414 if db["global"]
415 .get(b"split_conduit_highlight")
416 .await
417 .is_not_found()
418 {
419 conduit::migrate_conduit_highlight_split(services).await?;
420 db["global"].insert(b"split_conduit_highlight", []);
421 }
422
423 Ok(())
424}
425
426async fn migrate_media(services: &Services) -> Result {
430 let db = &services.db;
431 let config = &services.server.config;
432
433 let sha256_done = !db["global"]
434 .get(b"feat_sha256_media")
435 .await
436 .is_not_found();
437
438 if !sha256_done
440 && db
441 .open_cf("servernamemediaid_metadata")?
442 .is_some()
443 {
444 conduit::migrate_conduit_media(services).await?;
445 db["global"].insert(b"feat_sha256_media", []);
446 return Ok(());
447 }
448
449 if !sha256_done {
450 media::migrations::migrate_sha256_media(services).await?;
451 } else if config.media_startup_check {
452 media::migrations::checkup_sha256_media(services).await?;
453 }
454
455 Ok(())
456}
457
458async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result {
459 warn!("Fixing bad double separator in state_cache roomuserid_joined");
460
461 let db = &services.db;
462 let roomuserid_joined = &db["roomuserid_joined"];
463 let _cork = db.cork_and_sync();
464
465 let mut iter_count: usize = 0;
466 roomuserid_joined
467 .raw_stream()
468 .ignore_err()
469 .ready_for_each(|(key, value)| {
470 let mut key = key.to_vec();
471 iter_count = iter_count.saturating_add(1);
472 debug_info!(%iter_count);
473 let Some(first_sep_index) = key.iter().position(|&i| i == 0xFF) else {
474 debug_warn!(?key, "roomuserid_joined key has no 0xFF separator; skipping");
475 return;
476 };
477
478 if key
479 .iter()
480 .get(first_sep_index..=first_sep_index.saturating_add(1))
481 .copied()
482 .collect_vec()
483 == vec![0xFF, 0xFF]
484 {
485 debug_warn!("Found bad key: {key:?}");
486 roomuserid_joined.remove(&key);
487
488 key.remove(first_sep_index);
489 debug_warn!("Fixed key: {key:?}");
490 roomuserid_joined.insert(&key, value);
491 }
492 })
493 .await;
494
495 db.engine.sort()?;
496 db["global"].insert(b"fix_bad_double_separator_in_state_cache", []);
497
498 info!("Finished fixing");
499 Ok(())
500}
501
502async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) -> Result {
503 warn!("Retroactively fixing bad data from broken roomuserid_joined");
504
505 let db = &services.db;
506 let _cork = db.cork_and_sync();
507
508 let room_ids = services
509 .metadata
510 .iter_ids()
511 .map(ToOwned::to_owned)
512 .collect::<Vec<_>>()
513 .await;
514
515 for room_id in &room_ids {
516 debug_info!("Fixing room {room_id}");
517
518 let users_in_room: Vec<OwnedUserId> = services
519 .state_cache
520 .room_members(room_id)
521 .map(ToOwned::to_owned)
522 .collect()
523 .await;
524
525 let joined_members = users_in_room
526 .iter()
527 .stream()
528 .filter(|user_id| {
529 services
530 .state_accessor
531 .get_member(room_id, user_id)
532 .map(|member| {
533 member.is_ok_and(|member| member.membership == MembershipState::Join)
534 })
535 })
536 .collect::<Vec<_>>()
537 .await;
538
539 let non_joined_members = users_in_room
540 .iter()
541 .stream()
542 .filter(|user_id| {
543 services
544 .state_accessor
545 .get_member(room_id, user_id)
546 .map(|member| {
547 member.is_ok_and(|member| member.membership != MembershipState::Join)
548 })
549 })
550 .collect::<Vec<_>>()
551 .await;
552
553 for user_id in &joined_members {
554 debug_info!("User is joined, marking as joined");
555 let count = services.globals.next_count();
556 services
557 .state_cache
558 .mark_as_joined(user_id, room_id, PduCount::Normal(*count));
559 }
560
561 for user_id in &non_joined_members {
562 debug_info!("User is left or banned, marking as left");
563 let count = services.globals.next_count();
564 services
565 .state_cache
566 .mark_as_left(user_id, room_id, PduCount::Normal(*count));
567 }
568 }
569
570 for room_id in &room_ids {
571 debug_info!(
572 "Updating joined count for room {room_id} to fix servers in room after correcting \
573 membership states"
574 );
575
576 services
577 .state_cache
578 .update_joined_count(room_id)
579 .await;
580 }
581
582 db.engine.sort()?;
583 db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []);
584
585 info!("Finished fixing");
586 Ok(())
587}
588
589async fn fix_referencedevents_missing_sep(services: &Services) -> Result {
590 warn!("Fixing missing record separator between room_id and event_id in referencedevents");
591
592 let db = &services.db;
593 let cork = db.cork_and_sync();
594
595 let referencedevents = db["referencedevents"].clone();
596
597 let totals: (usize, usize) = (0, 0);
598 let (total, fixed) = referencedevents
599 .raw_stream()
600 .expect_ok()
601 .enumerate()
602 .ready_fold(totals, |mut a, (i, (key, val))| {
603 debug_assert!(val.is_empty(), "expected no value");
604
605 let has_sep = key.contains(&SEP);
606
607 if !has_sep {
608 let key_str = std::str::from_utf8(key).expect("key not utf-8");
609 let room_id_len = key_str.find('$').expect("missing '$' in key");
610 let (room_id, event_id) = key_str.split_at(room_id_len);
611 debug!(?a, "fixing {room_id}, {event_id}");
612
613 let new_key = (room_id, event_id);
614 referencedevents.put_raw(new_key, val);
615 referencedevents.remove(key);
616 }
617
618 a.0 = cmp::max(i, a.0);
619 a.1 = a.1.saturating_add((!has_sep).into());
620 a
621 })
622 .await;
623
624 drop(cork);
625 info!(?total, ?fixed, "Fixed missing record separators in 'referencedevents'.");
626
627 db["global"].insert(b"fix_referencedevents_missing_sep", []);
628 db.engine.sort()
629}
630
631async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result {
632 use ruma::identifiers_validation::ID_MAX_BYTES;
633 use tuwunel_core::arrayvec::ArrayString;
634
635 type ArrayId = ArrayString<ID_MAX_BYTES>;
636 type Key<'a> = (&'a RoomId, u64, &'a UserId);
637
638 warn!("Fixing undeleted entries in readreceiptid_readreceipt...");
639
640 let db = &services.db;
641 let cork = db.cork_and_sync();
642 let readreceiptid_readreceipt = db["readreceiptid_readreceipt"].clone();
643
644 let mut cur_room: Option<ArrayId> = None;
645 let mut cur_user: Option<ArrayId> = None;
646 let (mut total, mut fixed): (usize, usize) = (0, 0);
647 readreceiptid_readreceipt
648 .keys()
649 .expect_ok()
650 .ready_for_each(|key: Key<'_>| {
651 let (room_id, _, user_id) = key;
652 let last_room = cur_room.replace(
653 room_id
654 .as_str()
655 .try_into()
656 .expect("invalid room_id in database"),
657 );
658
659 let last_user = cur_user.replace(
660 user_id
661 .as_str()
662 .try_into()
663 .expect("invalid user_id in database"),
664 );
665
666 let is_dup = cur_room == last_room && cur_user == last_user;
667 if is_dup {
668 readreceiptid_readreceipt.del(key);
669 }
670
671 fixed = fixed.saturating_add(is_dup.into());
672 total = total.saturating_add(1);
673 })
674 .await;
675
676 drop(cork);
677 info!(?total, ?fixed, "Fixed undeleted entries in readreceiptid_readreceipt.");
678
679 db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []);
680 db.engine.sort()
681}
682
683async fn fix_hashed_sentinel_passwords(services: &Services) -> Result {
684 use tuwunel_core::utils::hash::verify_password;
685
686 const PASSWORD_SENTINEL: &str = "*";
687
688 if services.config.identity_provider.is_empty() {
689 debug!("Skipping sentinel password migration since no SSO IdP configured.");
690 return Ok(());
691 }
692
693 let db = &services.db;
694 let cork = db.cork_and_sync();
695 let userid_password = db["userid_password"].clone();
696 let hashed_sentinel = utils::hash::password(PASSWORD_SENTINEL).map_err(|e| {
697 err!("Could not apply migration: failed to hash sentinel password: {e:?}")
698 })?;
699
700 warn!(
701 "Fixing occurrences of password-hash {hashed_sentinel:?} generated from \
702 {PASSWORD_SENTINEL:?}"
703 );
704
705 let (checked, good, bad) = userid_password
706 .stream()
707 .expect_ok()
708 .ready_fold(
709 (0, 0, 0),
710 |(mut checked, mut good, mut bad): (usize, usize, usize),
711 (key, val): (&str, &str)| {
712 let good_sentinel = val == PASSWORD_SENTINEL;
713 let bad_sentinel = !val.is_empty()
714 && !good_sentinel
715 && verify_password(PASSWORD_SENTINEL, val).is_ok();
716
717 checked = checked.saturating_add(usize::from(true));
718 good = good.saturating_add(usize::from(good_sentinel));
719 bad = bad.saturating_add(usize::from(bad_sentinel));
720
721 if bad_sentinel {
722 userid_password.insert(key, PASSWORD_SENTINEL);
723 }
724
725 (checked, good, bad)
726 },
727 )
728 .await;
729
730 drop(cork);
731 info!(?checked, ?good, ?bad, "Fixed any occurrences of hashed sentinel passwords");
732
733 db["global"].insert(b"fix_hashed_sentinel_passwords", []);
734 db.engine.sort()
735}
736
737async fn upgrade_legacy_mediaid_user(services: &Services) -> Result {
738 let db = &services.db;
739 let cork = db.cork_and_sync();
740 let mediaid_user = db["mediaid_user"].clone();
741
742 warn!("Upgrading legacy mediaid_user keys to composite (mxc, user_id) layout");
743
744 let (checked, upgraded, removed_invalid) = mediaid_user
745 .raw_stream()
746 .ignore_err()
747 .ready_fold(
748 (0_usize, 0_usize, 0_usize),
749 |(mut checked, mut upgraded, mut removed_invalid), (raw_key, raw_val)| {
750 checked = checked.saturating_add(1);
751
752 let has_sep = raw_key.contains(&SEP);
753 let user_id = str::from_utf8(raw_val)
754 .ok()
755 .and_then(|s| <&UserId>::try_from(s).ok());
756
757 match (has_sep, user_id) {
758 | (true, _) => {},
759 | (false, None) => {
760 warn!(
761 ?raw_key,
762 ?raw_val,
763 "Legacy entry has unparsable user_id, removing"
764 );
765
766 mediaid_user.remove(raw_key);
767 removed_invalid = removed_invalid.saturating_add(1);
768 },
769 | (false, Some(user_id)) => {
770 let mut new_key = raw_key.to_vec();
771 new_key.push(SEP);
772 new_key.extend_from_slice(user_id.as_bytes());
773
774 mediaid_user.put_raw(new_key, user_id.as_str());
775 mediaid_user.remove(raw_key);
776
777 upgraded = upgraded.saturating_add(1);
778 },
779 }
780
781 (checked, upgraded, removed_invalid)
782 },
783 )
784 .await;
785
786 drop(cork);
787 info!(
788 %checked,
789 %upgraded,
790 %removed_invalid,
791 "Upgraded legacy mediaid_user keys"
792 );
793
794 db["global"].insert(b"upgrade_legacy_mediaid_user", []);
795 db.engine.sort()
796}
797
798async fn remove_remote_media_userid(services: &Services) -> Result {
799 let db = &services.db;
800 let cork = db.cork_and_sync();
801 let mediaid_user = db["mediaid_user"].clone();
802
803 warn!("Removing stored user id for remote media");
804
805 let (checked, removed_remote, removed_invalid) = mediaid_user
806 .keys()
807 .expect_ok()
808 .ready_fold(
809 (0, 0, 0),
810 |(mut checked, mut removed_remote, mut removed_invalid): (usize, usize, usize),
811 (mxc_uri, user_id): (&MxcUri, &UserId)| {
812 checked = checked.saturating_add(1);
813
814 let Ok(mxc) = mxc_uri.parts() else {
815 warn!(?mxc_uri, "Invalid MXC URL, removing it");
816
817 mediaid_user.del((mxc_uri, user_id));
818
819 removed_invalid = removed_invalid.saturating_add(1);
820
821 return (checked, removed_remote, removed_invalid);
822 };
823
824 if !services.globals.server_is_ours(mxc.server_name) {
825 mediaid_user.del((mxc_uri, user_id));
826
827 removed_remote = removed_remote.saturating_add(1);
828
829 return (checked, removed_remote, removed_invalid);
830 }
831
832 (checked, removed_remote, removed_invalid)
833 },
834 )
835 .await;
836
837 drop(cork);
838 info!(
839 %checked,
840 %removed_remote,
841 %removed_invalid,
842 "Removed stored user id for remote media"
843 );
844
845 db["global"].insert(b"remove_remote_media_userid", []);
846 db.engine.sort()
847}
848
849#[derive(Deserialize)]
850struct PduRoomTs {
851 room_id: OwnedRoomId,
852 origin_server_ts: MilliSecondsSinceUnixEpoch,
853}
854
855async fn rebuild_roomid_tscount_pducount(services: &Services) -> Result {
856 let db = &services.db;
857 let cork = db.cork_and_sync();
858 let pduid_pdu = db["pduid_pdu"].clone();
859 let roomid_tscount_pducount = db["roomid_tscount_pducount"].clone();
860
861 warn!("Rebuilding roomid_tscount_pducount index for same-timestamp event ordering");
862
863 let count = pduid_pdu
864 .raw_stream()
865 .ignore_err()
866 .ready_fold(0_usize, |count, (key, value)| {
867 let Ok(pdu) = serde_json::from_slice::<PduRoomTs>(value) else {
868 return count;
869 };
870
871 let ts = u64::from(pdu.origin_server_ts.get());
872 let pdu_id = RawPduId::from(key);
873 let count_key = bias_count(pdu_id.count());
874 let room_id: &RoomId = &pdu.room_id;
875
876 roomid_tscount_pducount.put_raw((room_id, ts, count_key), pdu_id.count());
877
878 count.saturating_add(1)
879 })
880 .await;
881
882 drop(cork);
883 info!(%count, "Rebuilt roomid_tscount_pducount index");
884
885 db["global"].insert(b"rebuild_roomid_tscount_pducount", []);
886 db.engine.sort()
887}
888
889async fn migrate_profile_keys(services: &Services) -> Result {
894 use ruma::profile::ProfileFieldName;
895
896 let db = &services.db;
897 let cork = db.cork_and_sync();
898
899 let userid_displayname = db["userid_displayname"].clone();
900 let userid_avatarurl = db["userid_avatarurl"].clone();
901 let userid_blurhash = db["userid_blurhash"].clone();
902 let useridprofilekey_value = db["useridprofilekey_value"].clone();
903
904 warn!(
905 "Relocating displaynames, avatar_urls and blurhashes into the unified profile-key store"
906 );
907
908 let displaynames = userid_displayname
909 .stream()
910 .expect_ok()
911 .ready_fold(0_usize, |count, (user_id, displayname): (&UserId, &str)| {
912 let key = (user_id, ProfileFieldName::DisplayName.as_str());
913 let value = displayname.to_owned();
914
915 useridprofilekey_value.put(key, Json(value));
916
917 count.saturating_add(1)
918 })
919 .await;
920
921 let avatar_urls = userid_avatarurl
922 .stream()
923 .expect_ok()
924 .ready_fold(0_usize, |count, (user_id, avatar_url): (&UserId, &str)| {
925 let key = (user_id, ProfileFieldName::AvatarUrl.as_str());
926 let value = avatar_url.to_owned();
927
928 useridprofilekey_value.put(key, Json(value));
929
930 count.saturating_add(1)
931 })
932 .await;
933
934 let blurhashes = userid_blurhash
935 .stream()
936 .expect_ok()
937 .ready_fold(0_usize, |count, (user_id, blurhash): (&UserId, &str)| {
938 let key = (user_id, "xyz.amorgan.blurhash");
939 let value = blurhash.to_owned();
940
941 useridprofilekey_value.put(key, Json(value));
942
943 count.saturating_add(1)
944 })
945 .await;
946
947 let fixed_strings = useridprofilekey_value
948 .raw_stream()
949 .expect_ok()
950 .ready_fold(0_usize, |count, (key, value)| {
951 if serde_json::from_slice::<IgnoredAny>(value).is_err() {
952 let Ok(string) = str::from_utf8(value) else {
953 warn!("Non-UTF8 data in profile value: {key:?} => {value:?}");
954 useridprofilekey_value.remove(key);
955 return count;
956 };
957 useridprofilekey_value.raw_put(key, Json(string));
958 return count.saturating_add(1);
959 }
960
961 count
962 })
963 .await;
964
965 drop(cork);
966 info!(%displaynames, %avatar_urls, %blurhashes, %fixed_strings, "Relocated profile keys into useridprofilekey_value");
967
968 db["global"].insert(b"migrate_profile_keys_to_useridprofilekey", []);
969 db.engine.sort()
970}