Skip to main content

tuwunel_service/migrations/
moderation.rs

1use std::sync::Arc;
2
3use ruma::{MilliSecondsSinceUnixEpoch, OwnedUserId, UserId};
4use serde::Deserialize;
5use tuwunel_core::{
6	Result, debug_warn, err,
7	utils::{ReadyExt, stream::TryIgnore},
8	warn,
9};
10use tuwunel_database::{Json, Map};
11
12use crate::{Services, users::Moderation};
13
14/// Some databases store suspension and lock state under differently-named
15/// columns and a richer value; tuwunel keeps only the forensic `when`/`by`,
16/// with row presence as the flag. `suspended_by` is an unvalidated string in
17/// the source, so it is parsed leniently when copied.
18#[derive(Deserialize)]
19struct ForeignModeration {
20	suspended: bool,
21	suspended_at: MilliSecondsSinceUnixEpoch,
22	suspended_by: String,
23}
24
25/// Copies foreign suspension and lock columns into tuwunel's own. Runs every
26/// boot and is non-destructive: the source columns are read, never removed, so
27/// a database also opened by its origin server keeps working. A no-op when the
28/// source columns are absent.
29pub(super) async fn migrate_moderation(services: &Services) -> Result {
30	copy_moderation(services, "userid_suspension", "userid_suspended").await?;
31	copy_moderation(services, "userid_lock", "userid_locked").await?;
32
33	Ok(())
34}
35
36async fn copy_moderation(
37	services: &Services,
38	source_name: &'static str,
39	target_name: &'static str,
40) -> Result {
41	let db = &services.db;
42
43	let Some(source) = db.open_cf(source_name)? else {
44		return Ok(());
45	};
46
47	let target = &db[target_name];
48	let server_user = services.globals.server_user.as_ref();
49
50	let cork = db.cork_and_sync();
51	let (copied, skipped) = source
52		.raw_stream()
53		.ignore_err()
54		.ready_fold((0_usize, 0_usize), |acc, (key, value)| {
55			tally(acc, copy_one(target, server_user, key, value))
56		})
57		.await;
58
59	drop(cork);
60
61	if skipped > 0 {
62		warn!(%copied, %skipped, source = %source_name, "Imported moderation entries; some skipped");
63	}
64
65	Ok(())
66}
67
68fn tally((copied, skipped): (usize, usize), result: Result<bool>) -> (usize, usize) {
69	match result {
70		| Ok(true) => (copied.saturating_add(1), skipped),
71		| Ok(false) => (copied, skipped),
72		| Err(e) => {
73			debug_warn!(error = %e, "skipping unreadable moderation entry");
74			(copied, skipped.saturating_add(1))
75		},
76	}
77}
78
79/// Writes one `Moderation` into the target column. A `false` return is a
80/// cleared entry (`suspended == false`), which has no tuwunel representation.
81fn copy_one(target: &Arc<Map>, server_user: &UserId, key: &[u8], value: &[u8]) -> Result<bool> {
82	let entry: ForeignModeration = serde_json::from_slice(value)
83		.map_err(|e| err!(Database("moderation entry is not JSON: {e}")))?;
84
85	if !entry.suspended {
86		return Ok(false);
87	}
88
89	let moderation = to_moderation(entry, server_user);
90
91	target.raw_put(key, Json(moderation));
92
93	Ok(true)
94}
95
96/// Maps a foreign entry to tuwunel's `Moderation`. The suspension is preserved
97/// even when the recorded actor is unparsable, attributing it to the importing
98/// server; the actor is forensic only, while the row's presence is the flag.
99fn to_moderation(entry: ForeignModeration, fallback: &UserId) -> Moderation {
100	Moderation {
101		when: entry.suspended_at,
102		by: OwnedUserId::try_from(entry.suspended_by).unwrap_or_else(|_| fallback.to_owned()),
103	}
104}
105
106#[cfg(test)]
107mod tests {
108	use ruma::user_id;
109
110	use super::{ForeignModeration, to_moderation};
111
112	#[test]
113	fn foreign_suspension_maps_to_moderation() {
114		let json = br#"{"suspended":true,"suspended_at":1700000000000,"suspended_by":"@mod:example.org"}"#;
115
116		let entry: ForeignModeration =
117			serde_json::from_slice(json).expect("foreign moderation deserializes");
118		assert!(entry.suspended);
119
120		let moderation = to_moderation(entry, user_id!("@import:localhost"));
121
122		assert_eq!(u64::from(moderation.when.get()), 1_700_000_000_000);
123		assert_eq!(moderation.by.as_str(), "@mod:example.org");
124	}
125
126	#[test]
127	fn unparsable_actor_falls_back_to_server() {
128		let json = br#"{"suspended":true,"suspended_at":1,"suspended_by":"not-a-user-id"}"#;
129
130		let entry: ForeignModeration =
131			serde_json::from_slice(json).expect("foreign moderation deserializes");
132
133		// The suspension is still carried; only the forensic actor falls back.
134		let moderation = to_moderation(entry, user_id!("@import:localhost"));
135
136		assert_eq!(moderation.by.as_str(), "@import:localhost");
137	}
138
139	#[test]
140	fn cleared_entry_is_recognized() {
141		let json = br#"{"suspended":false,"suspended_at":1,"suspended_by":"@a:b.c"}"#;
142
143		let entry: ForeignModeration =
144			serde_json::from_slice(json).expect("foreign moderation deserializes");
145
146		assert!(!entry.suspended);
147	}
148
149	#[test]
150	fn unknown_fields_are_ignored() {
151		let json =
152			br#"{"suspended":true,"suspended_at":1,"suspended_by":"@a:b.c","reason":"spam"}"#;
153
154		let entry: ForeignModeration =
155			serde_json::from_slice(json).expect("unknown fields are ignored");
156
157		assert!(entry.suspended);
158	}
159}