Skip to main content

tuwunel_api/client/room/
upgrade.rs

1use std::{cmp::max, iter::once};
2
3use axum::extract::State;
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use ruma::{
6	CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UserId,
7	api::client::room::upgrade_room::v3,
8	events::{
9		StateEventType, TimelineEventType,
10		room::{
11			create::{PreviousRoom, RoomCreateEventContent},
12			member::{MembershipState, RoomMemberEventContent},
13			power_levels::RoomPowerLevelsEventContent,
14			tombstone::RoomTombstoneEventContent,
15		},
16	},
17	int,
18	room::RoomType,
19	room_version_rules::{RoomIdFormatVersion, RoomVersionRules},
20};
21use serde_json::{Value as JsonValue, json, value::to_raw_value};
22use tuwunel_core::{
23	Err, Result, debug_info, err, error, implement, info, is_equal_to, is_less_than,
24	matrix::{Event, StateKey, pdu::PduBuilder, room_version},
25	utils::{
26		ReadyExt,
27		future::TryExtExt,
28		stream::{IterStream, TryIgnore, WidebandExt},
29	},
30};
31use tuwunel_service::{Services, rooms::timeline::RoomMutexGuard};
32
33use crate::Ruma;
34
35//TODO: Upgrade Ruma
36const RECOMMENDED_TRANSFERABLE_STATE_EVENT_TYPES: &[StateEventType; 9] = &[
37	StateEventType::RoomServerAcl,
38	StateEventType::RoomEncryption,
39	StateEventType::RoomName,
40	StateEventType::RoomAvatar,
41	StateEventType::RoomTopic,
42	StateEventType::RoomGuestAccess,
43	StateEventType::RoomHistoryVisibility,
44	StateEventType::RoomJoinRules,
45	StateEventType::RoomPowerLevels,
46];
47
48#[derive(Debug)]
49struct RoomUpgradeContext<'a> {
50	services: &'a Services,
51	sender_user: &'a UserId,
52	creator: &'a UserId,
53	old_room_id: &'a RoomId,
54	old_state_lock: &'a RoomMutexGuard,
55	old_version_rules: &'a RoomVersionRules,
56	new_room_id: &'a RoomId,
57	new_state_lock: &'a RoomMutexGuard,
58	new_version_rules: &'a RoomVersionRules,
59	additional_creators: &'a [OwnedUserId],
60}
61
62/// # `POST /_matrix/client/r0/rooms/{roomId}/upgrade`
63///
64/// Upgrades the room.
65///
66/// - Creates a replacement room
67/// - Sends a tombstone event into the current room
68/// - Sender user joins the room
69/// - Transfers some state events
70/// - Moves local aliases
71/// - Modifies old room power levels to prevent users from speaking
72#[tracing::instrument(level = "debug")]
73pub(crate) async fn upgrade_room_route(
74	State(services): State<crate::State>,
75	body: Ruma<v3::Request>,
76) -> Result<v3::Response> {
77	let sender_user = body.sender_user();
78	let new_version = &body.new_version;
79	let version_rules = room_version::rules(new_version)?;
80
81	if !services
82		.config
83		.supported_room_version(new_version)
84	{
85		return Err!(Request(UnsupportedRoomVersion(
86			"This server does not support that room version.",
87		)));
88	}
89
90	let old_room_id = &body.room_id;
91	let old_state_lock = services.state.mutex.lock(old_room_id).await;
92
93	if !services
94		.state_accessor
95		.user_can_tombstone(old_room_id, sender_user, &old_state_lock)
96		.await
97	{
98		return Err!(Request(Forbidden("You are not permitted to upgrade the room.")));
99	}
100
101	let latest_event = services
102		.timeline
103		.latest_pdu_in_room(old_room_id)
104		.await
105		.ok();
106
107	let predecessor = PreviousRoom {
108		room_id: old_room_id.to_owned(),
109		event_id: latest_event
110			.as_ref()
111			.map(Event::event_id)
112			.map(ToOwned::to_owned),
113	};
114
115	debug_info!(
116		%sender_user,
117		%old_room_id,
118		last_event = ?predecessor.event_id,
119		?new_version,
120		"Attempting upgrade of room..."
121	);
122
123	let creator = if services.admin.is_admin_room(&body.room_id).await {
124		&services.globals.server_user
125	} else {
126		sender_user
127	};
128
129	let (replacement_room, state_lock) = match version_rules.room_id_format {
130		| RoomIdFormatVersion::V2 =>
131			upgrade_room_create(
132				&services,
133				creator,
134				old_room_id,
135				new_version,
136				&version_rules,
137				predecessor,
138				body.additional_creators.clone(),
139			)
140			.await,
141
142		| RoomIdFormatVersion::V1 =>
143			upgrade_room_create_legacy(
144				&services,
145				creator,
146				old_room_id,
147				new_version,
148				&version_rules,
149				predecessor,
150			)
151			.await,
152	}
153	.inspect_err(|e| error!(?body, "Upgrade m.room.create event failed: {e}"))?;
154
155	let old_room_id = &body.room_id;
156	let old_version = services
157		.state
158		.get_room_version(old_room_id)
159		.await?;
160	let old_version_rules = room_version::rules(&old_version)?;
161
162	let context = RoomUpgradeContext {
163		services: &services,
164		sender_user,
165		creator,
166		old_room_id,
167		old_state_lock: &old_state_lock,
168		old_version_rules: &old_version_rules,
169		new_room_id: &replacement_room,
170		new_state_lock: &state_lock,
171		new_version_rules: &version_rules,
172		additional_creators: &body.additional_creators,
173	};
174
175	if let Err(e) = context.transfer_room().await {
176		error!(?e, ?context, "Room upgrade failed. Cleaning up incomplete room...");
177
178		if let Err(e) = services
179			.delete
180			.delete_room(&replacement_room, false, state_lock)
181			.await
182		{
183			error!("Additional errors while deleting incomplete room: {e}");
184		}
185
186		return Err(e);
187	}
188
189	info!(
190		old_room_id = %context.old_room_id,
191		new_room_id = %context.new_room_id,
192		upgraded_by = %sender_user,
193		"Room upgraded",
194	);
195
196	Ok(v3::Response { replacement_room })
197}
198
199#[tracing::instrument(level = "info")]
200async fn upgrade_room_create(
201	services: &Services,
202	sender_user: &UserId,
203	old_room_id: &RoomId,
204	new_version: &RoomVersionId,
205	version_rules: &RoomVersionRules,
206	predecessor: PreviousRoom,
207	mut additional_creators: Vec<OwnedUserId>,
208) -> Result<(OwnedRoomId, RoomMutexGuard)> {
209	// Get the old room creation event
210	let mut content: CanonicalJsonObject = services
211		.state_accessor
212		.room_state_get_content(old_room_id, &StateEventType::RoomCreate, "")
213		.await
214		.map_err(|_| err!(Database("Found room without m.room.create event.")))?;
215
216	content.remove("creator");
217	content.insert("predecessor".into(), json!(predecessor).try_into()?);
218	content.insert("room_version".into(), json!(new_version).try_into()?);
219
220	if version_rules
221		.authorization
222		.additional_room_creators
223	{
224		additional_creators.sort();
225		additional_creators.dedup();
226		content.remove("additional_creators");
227		if !additional_creators.is_empty() {
228			content.insert("additional_creators".into(), json!(additional_creators).try_into()?);
229		}
230	}
231
232	// Validate creation event content
233	let raw_content = to_raw_value(&content)?;
234	if let Err(e) = serde_json::from_str::<CanonicalJsonObject>(raw_content.get()) {
235		return Err!(Request(BadJson("Error forming creation event: {e}")));
236	}
237
238	let room_id = ruma::room_id!("!thiswillbereplaced").to_owned();
239	let state_lock = services.state.mutex.lock(&room_id).await;
240	let create_event_id = services
241		.timeline
242		.build_and_append_pdu(
243			PduBuilder {
244				event_type: TimelineEventType::RoomCreate,
245				content: to_raw_value(&content)?,
246				state_key: Some(StateKey::new()),
247				..Default::default()
248			},
249			sender_user,
250			&room_id,
251			&state_lock,
252		)
253		.boxed()
254		.await?;
255
256	drop(state_lock);
257
258	// The real room_id is now the event_id.
259	let room_id = OwnedRoomId::from_parts('!', create_event_id.localpart(), None)?;
260	let state_lock = services.state.mutex.lock(&room_id).await;
261
262	Ok((room_id, state_lock))
263}
264
265#[tracing::instrument(level = "info")]
266async fn upgrade_room_create_legacy(
267	services: &Services,
268	sender_user: &UserId,
269	old_room_id: &RoomId,
270	new_version: &RoomVersionId,
271	version_rules: &RoomVersionRules,
272	predecessor: PreviousRoom,
273) -> Result<(OwnedRoomId, RoomMutexGuard)> {
274	// Create a replacement room
275	let new_room_id = RoomId::new_v1(services.globals.server_name());
276	let state_lock = services.state.mutex.lock(&new_room_id).await;
277	let _short_id = services
278		.short
279		.get_or_create_shortroomid(&new_room_id)
280		.await;
281
282	// Get the old room creation event
283	let mut content: CanonicalJsonObject = services
284		.state_accessor
285		.room_state_get_content(old_room_id, &StateEventType::RoomCreate, "")
286		.await
287		.map_err(|_| err!(Database("Found room without m.room.create event.")))?;
288
289	// Send a m.room.create event containing a predecessor field and the applicable
290	// room_version. "creator" key no longer exists in V11+ rooms.
291	{
292		use RoomVersionId::*;
293		match new_version {
294			| V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 =>
295				content.insert("creator".into(), json!(&sender_user).try_into()?),
296			| _ => content.remove("creator"),
297		}
298	};
299
300	content.insert("predecessor".into(), json!(predecessor).try_into()?);
301	content.insert("room_version".into(), json!(new_version).try_into()?);
302
303	// Validate creation event content
304	let raw_content = to_raw_value(&content)?;
305	if let Err(e) = serde_json::from_str::<CanonicalJsonObject>(raw_content.get()) {
306		return Err!(Request(BadJson("Error forming creation event: {e}")));
307	}
308
309	services
310		.timeline
311		.build_and_append_pdu(
312			PduBuilder {
313				event_type: TimelineEventType::RoomCreate,
314				content: to_raw_value(&content)?,
315				state_key: Some(StateKey::new()),
316				..Default::default()
317			},
318			sender_user,
319			&new_room_id,
320			&state_lock,
321		)
322		.await?;
323
324	Ok((new_room_id, state_lock))
325}
326
327#[implement(RoomUpgradeContext, params = "<'_>")]
328#[tracing::instrument(level = "debug")]
329async fn transfer_room(&self) -> Result {
330	self.move_creator().await?;
331
332	self.move_state_events().await?;
333
334	self.move_space_state().await?;
335
336	self.move_sender_user().await?;
337
338	self.move_local_aliases().await?;
339
340	self.tombstone_old_room().await?;
341
342	// After commitment to the tombstone above no more errors can propagate.
343	self.lockdown_old_room()
344		.await
345		.inspect_err(|e| error!(?self, "Failed to lockdown old room: {e}"))
346		.ok();
347
348	Ok(())
349}
350
351// Join the new room
352#[implement(RoomUpgradeContext, params = "<'_>")]
353#[tracing::instrument(level = "debug")]
354async fn move_creator(&self) -> Result {
355	self.move_member(self.creator).await?;
356
357	Ok(())
358}
359
360#[implement(RoomUpgradeContext, params = "<'_>")]
361#[tracing::instrument(level = "debug")]
362async fn move_sender_user(&self) -> Result {
363	if self.sender_user != self.creator {
364		self.services
365			.timeline
366			.build_and_append_pdu(
367				PduBuilder::state(
368					self.sender_user.as_str(),
369					&RoomMemberEventContent::new(MembershipState::Invite),
370				),
371				self.creator,
372				self.new_room_id,
373				self.new_state_lock,
374			)
375			.await?;
376
377		self.move_member(self.sender_user).await?;
378	}
379
380	Ok(())
381}
382
383#[implement(RoomUpgradeContext, params = "<'_>")]
384#[tracing::instrument(level = "debug")]
385async fn move_member(&self, user_id: &UserId) -> Result {
386	let old_content: RoomMemberEventContent = self
387		.services
388		.state_accessor
389		.room_state_get_content(self.old_room_id, &StateEventType::RoomMember, user_id.as_str())
390		.inspect_err(|e| error!(?self, "Missing room member event: {e}"))
391		.await?;
392
393	self.services
394		.timeline
395		.build_and_append_pdu(
396			PduBuilder::state(user_id.as_str(), &RoomMemberEventContent {
397				membership: MembershipState::Join,
398				join_authorized_via_users_server: None,
399				..old_content
400			}),
401			user_id,
402			self.new_room_id,
403			self.new_state_lock,
404		)
405		.await?;
406
407	Ok(())
408}
409
410// Replicate transferable state events to the new room
411#[implement(RoomUpgradeContext, params = "<'_>")]
412#[tracing::instrument(level = "debug")]
413async fn move_state_events(&self) -> Result {
414	RECOMMENDED_TRANSFERABLE_STATE_EVENT_TYPES
415		.iter()
416		.rev()
417		.stream()
418		.wide_filter_map(|event_type| {
419			self.services
420				.state_accessor
421				.room_state_get(self.old_room_id, event_type, "")
422				.ok()
423		})
424		.map(Ok)
425		.try_for_each(async |event| {
426			self.services
427				.timeline
428				.build_and_append_pdu(
429					self.rebuild_state_event(&event).await?,
430					self.creator,
431					self.new_room_id,
432					self.new_state_lock,
433				)
434				.inspect_err(|e| {
435					error!(?event, ?self, "Failed to transfer state on upgrade: {e}");
436				})
437				.map_ok(|_| ())
438				.await
439		})
440		.await
441}
442
443// MSC4168: copy m.space.parent for any room, plus m.space.child when the
444// old room is itself a space, into the upgraded room.
445#[implement(RoomUpgradeContext, params = "<'_>")]
446#[tracing::instrument(level = "debug")]
447async fn move_space_state(&self) -> Result {
448	let old_room_is_space = self
449		.services
450		.state_accessor
451		.room_state_get_content::<RoomCreateEventContent>(
452			self.old_room_id,
453			&StateEventType::RoomCreate,
454			"",
455		)
456		.await
457		.ok()
458		.and_then(|c| c.room_type)
459		.is_some_and(|t| matches!(t, RoomType::Space));
460
461	let event_types: &[StateEventType] = match old_room_is_space {
462		| true => &[StateEventType::SpaceParent, StateEventType::SpaceChild],
463		| false => &[StateEventType::SpaceParent],
464	};
465
466	for event_type in event_types {
467		let state_keys: Vec<StateKey> = self
468			.services
469			.state_accessor
470			.room_state_keys(self.old_room_id, event_type)
471			.ignore_err()
472			.collect()
473			.await;
474
475		for state_key in state_keys {
476			let Ok(event) = self
477				.services
478				.state_accessor
479				.room_state_get(self.old_room_id, event_type, &state_key)
480				.await
481			else {
482				continue;
483			};
484
485			self.services
486				.timeline
487				.build_and_append_pdu(
488					self.rebuild_state_event(&event).await?,
489					self.creator,
490					self.new_room_id,
491					self.new_state_lock,
492				)
493				.inspect_err(|e| error!(?event, ?self, "Failed to copy space state: {e}"))
494				.await
495				.ok();
496		}
497	}
498
499	Ok(())
500}
501
502#[implement(RoomUpgradeContext, params = "<'_>")]
503#[tracing::instrument(level = "debug")]
504async fn rebuild_state_event<Pdu: Event>(&self, event: &Pdu) -> Result<PduBuilder> {
505	let content = match event.kind() {
506		| TimelineEventType::RoomPowerLevels => {
507			let mut content = event.get_content_as_value();
508
509			if self
510				.new_version_rules
511				.authorization
512				.explicitly_privilege_room_creators
513			{
514				if let Some(users) = content
515					.get_mut("users")
516					.and_then(JsonValue::as_object_mut)
517				{
518					users.retain(|user_id, _pl| {
519						!self
520							.additional_creators
521							.iter()
522							.map(AsRef::as_ref)
523							.chain(once(self.creator))
524							.map(UserId::as_str)
525							.any(is_equal_to!(user_id.as_str()))
526					});
527				}
528
529				if self.creator == self.sender_user
530					&& content["events"]["m.room.tombstone"]
531						.as_i64()
532						.is_none_or(is_less_than!(150))
533				{
534					content["events"]["m.room.tombstone"] = json!(150);
535				}
536			} else if self
537				.old_version_rules
538				.authorization
539				.explicitly_privilege_room_creators
540			{
541				#[expect(clippy::collapsible_if)]
542				if let Some(users) = content
543					.as_object_mut()
544					.expect("power levels event content must be an object")
545					.entry("users")
546					.or_insert(json!({}))
547					.as_object_mut()
548				{
549					let level = json!(1000);
550
551					self.services
552						.state_accessor
553						.get_create(self.old_room_id)
554						.await?
555						.creators(&self.old_version_rules.authorization)?
556						.for_each(|user_id| {
557							users.insert(user_id.to_string(), level.clone());
558						});
559				}
560			}
561
562			to_raw_value(&content)?
563		},
564		// MSC4168: rewrite `via` to the upgrading server's name on copied
565		// space-graph state events, since the previous room's via list may
566		// no longer cover the upgraded room.
567		| TimelineEventType::SpaceChild | TimelineEventType::SpaceParent => {
568			let mut content = event.get_content_as_value();
569			if let Some(obj) = content.as_object_mut() {
570				obj.insert("via".to_owned(), json!([self.sender_user.server_name().as_str()]));
571			}
572
573			to_raw_value(&content)?
574		},
575		| _ => to_raw_value(event.content())?,
576	};
577
578	Ok(PduBuilder {
579		content,
580		event_type: event.kind().clone(),
581		state_key: event.state_key().map(Into::into),
582		..Default::default()
583	})
584}
585
586// Moves any local aliases to the new room
587#[implement(RoomUpgradeContext, params = "<'_>")]
588#[tracing::instrument(level = "debug")]
589async fn move_local_aliases(&self) -> Result {
590	self.services
591		.alias
592		.local_aliases_for_room(self.old_room_id)
593		.ready_for_each(|alias| {
594			self.services
595				.alias
596				.set_alias_by(alias, self.new_room_id, self.creator)
597				.inspect_err(|e| error!(?self, "Failed to add alias: {e}"))
598				.ok();
599		})
600		.map(Ok)
601		.await
602}
603
604// Send a m.room.tombstone event to the old room to indicate that it is not
605// intended to be used any further Fail if the sender does not have the required
606// permissions.
607#[implement(RoomUpgradeContext, params = "<'_>")]
608#[tracing::instrument(level = "debug")]
609async fn tombstone_old_room(&self) -> Result<OwnedEventId> {
610	self.services
611		.timeline
612		.build_and_append_pdu(
613			PduBuilder::state(StateKey::new(), &RoomTombstoneEventContent {
614				body: "This room has been upgraded.".to_owned(),
615				replacement_room: self.new_room_id.to_owned(),
616			}),
617			self.sender_user,
618			self.old_room_id,
619			self.old_state_lock,
620		)
621		.await
622}
623
624// Modify the power levels in the old room to prevent sending of events and
625// inviting new users. Though a Result is returned, the callsite above treats it
626// as infallible because the tombstone represents the commitment.
627#[implement(RoomUpgradeContext, params = "<'_>")]
628#[tracing::instrument(level = "debug")]
629async fn lockdown_old_room(&self) -> Result<OwnedEventId> {
630	// Get the old room power levels
631	let old_content: RoomPowerLevelsEventContent = self
632		.services
633		.state_accessor
634		.room_state_get_content(self.old_room_id, &StateEventType::RoomPowerLevels, "")
635		.await
636		.map_err(|_| err!(Database("Found room without m.room.power_levels event.")))?;
637
638	let old_users_default = old_content
639		.users_default
640		.checked_add(int!(1))
641		.ok_or_else(|| {
642			err!(Request(BadJson("users_default power levels event content is not valid")))
643		})?;
644
645	// Setting events_default and invite to the greater of 50 and users_default + 1
646	let new_level = max(int!(50), old_users_default);
647
648	self.services
649		.timeline
650		.build_and_append_pdu(
651			PduBuilder::state(StateKey::new(), &RoomPowerLevelsEventContent {
652				events_default: new_level,
653				invite: new_level,
654				..old_content
655			}),
656			self.sender_user,
657			self.old_room_id,
658			self.old_state_lock,
659		)
660		.await
661}