Skip to main content

tuwunel_api/client/room/
upgrade.rs

1use std::{cmp::max, iter::once};
2
3use axum::extract::State;
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
5use ruma::{
6	CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UserId,
7	api::client::room::upgrade_room::v3,
8	events::{
9		StateEventType, TimelineEventType,
10		room::{
11			create::{PreviousRoom, RoomCreateEventContent},
12			member::{MembershipState, RoomMemberEventContent},
13			power_levels::RoomPowerLevelsEventContent,
14			tombstone::RoomTombstoneEventContent,
15		},
16	},
17	int,
18	room::RoomType,
19	room_version_rules::{RoomIdFormatVersion, RoomVersionRules},
20};
21use serde_json::{Value as JsonValue, json, value::to_raw_value};
22use tuwunel_core::{
23	Err, Result, debug_info, err, error, implement, info, is_equal_to, is_less_than,
24	matrix::{Event, StateKey, pdu::PduBuilder, room_version},
25	utils::{
26		ReadyExt,
27		future::TryExtExt,
28		stream::{IterStream, TryIgnore, WidebandExt},
29	},
30};
31use tuwunel_service::{Services, rooms::timeline::RoomMutexGuard};
32
33use crate::Ruma;
34
35//TODO: Upgrade Ruma
36const RECOMMENDED_TRANSFERABLE_STATE_EVENT_TYPES: &[StateEventType; 9] = &[
37	StateEventType::RoomServerAcl,
38	StateEventType::RoomEncryption,
39	StateEventType::RoomName,
40	StateEventType::RoomAvatar,
41	StateEventType::RoomTopic,
42	StateEventType::RoomGuestAccess,
43	StateEventType::RoomHistoryVisibility,
44	StateEventType::RoomJoinRules,
45	StateEventType::RoomPowerLevels,
46];
47
48#[derive(Debug)]
49struct RoomUpgradeContext<'a> {
50	services: &'a Services,
51	sender_user: &'a UserId,
52	creator: &'a UserId,
53	old_room_id: &'a RoomId,
54	old_state_lock: &'a RoomMutexGuard,
55	old_version_rules: &'a RoomVersionRules,
56	new_room_id: &'a RoomId,
57	new_state_lock: &'a RoomMutexGuard,
58	new_version_rules: &'a RoomVersionRules,
59	additional_creators: &'a [OwnedUserId],
60}
61
62/// # `POST /_matrix/client/r0/rooms/{roomId}/upgrade`
63///
64/// Upgrades the room.
65///
66/// - Creates a replacement room
67/// - Sends a tombstone event into the current room
68/// - Sender user joins the room
69/// - Transfers some state events
70/// - Moves local aliases
71/// - Modifies old room power levels to prevent users from speaking
72#[tracing::instrument(level = "debug")]
73pub(crate) async fn upgrade_room_route(
74	State(services): State<crate::State>,
75	body: Ruma<v3::Request>,
76) -> Result<v3::Response> {
77	let sender_user = body.sender_user();
78	let new_version = &body.new_version;
79	let version_rules = room_version::rules(new_version)?;
80
81	if !services
82		.config
83		.supported_room_version(new_version)
84	{
85		return Err!(Request(UnsupportedRoomVersion(
86			"This server does not support that room version.",
87		)));
88	}
89
90	let old_room_id = &body.room_id;
91	let old_state_lock = services.state.mutex.lock(old_room_id).await;
92
93	if !services
94		.state_accessor
95		.user_can_tombstone(old_room_id, sender_user, &old_state_lock)
96		.await
97	{
98		return Err!(Request(Forbidden("You are not permitted to upgrade the room.")));
99	}
100
101	let latest_event = services
102		.timeline
103		.latest_pdu_in_room(old_room_id)
104		.await
105		.ok();
106
107	let predecessor = PreviousRoom {
108		room_id: old_room_id.to_owned(),
109		event_id: latest_event
110			.as_ref()
111			.map(Event::event_id)
112			.map(ToOwned::to_owned),
113	};
114
115	debug_info!(
116		%sender_user,
117		%old_room_id,
118		last_event = ?predecessor.event_id,
119		?new_version,
120		"Attempting upgrade of room..."
121	);
122
123	let creator = if services.admin.is_admin_room(&body.room_id).await {
124		&services.globals.server_user
125	} else {
126		sender_user
127	};
128
129	let (replacement_room, state_lock) = match version_rules.room_id_format {
130		| RoomIdFormatVersion::V2 =>
131			upgrade_room_create(
132				&services,
133				creator,
134				old_room_id,
135				new_version,
136				&version_rules,
137				predecessor,
138				body.additional_creators.clone(),
139			)
140			.await,
141
142		| RoomIdFormatVersion::V1 =>
143			upgrade_room_create_legacy(
144				&services,
145				creator,
146				old_room_id,
147				new_version,
148				&version_rules,
149				predecessor,
150			)
151			.await,
152	}
153	.inspect_err(|e| error!(?body, "Upgrade m.room.create event failed: {e}"))?;
154
155	let old_room_id = &body.room_id;
156	let old_version = services
157		.state
158		.get_room_version(old_room_id)
159		.await?;
160	let old_version_rules = room_version::rules(&old_version)?;
161
162	let context = RoomUpgradeContext {
163		services: &services,
164		sender_user,
165		creator,
166		old_room_id,
167		old_state_lock: &old_state_lock,
168		old_version_rules: &old_version_rules,
169		new_room_id: &replacement_room,
170		new_state_lock: &state_lock,
171		new_version_rules: &version_rules,
172		additional_creators: &body.additional_creators,
173	};
174
175	if let Err(e) = context.transfer_room().await {
176		error!(?e, ?context, "Room upgrade failed. Cleaning up incomplete room...");
177
178		if let Err(e) = services
179			.delete
180			.delete_room(&replacement_room, false, state_lock)
181			.await
182		{
183			error!("Additional errors while deleting incomplete room: {e}");
184		}
185
186		return Err(e);
187	}
188
189	info!(
190		old_room_id = %context.old_room_id,
191		new_room_id = %context.new_room_id,
192		upgraded_by = %sender_user,
193		"Room upgraded",
194	);
195
196	Ok(v3::Response { replacement_room })
197}
198
199#[tracing::instrument(level = "info")]
200async fn upgrade_room_create(
201	services: &Services,
202	sender_user: &UserId,
203	old_room_id: &RoomId,
204	new_version: &RoomVersionId,
205	version_rules: &RoomVersionRules,
206	predecessor: PreviousRoom,
207	mut additional_creators: Vec<OwnedUserId>,
208) -> Result<(OwnedRoomId, RoomMutexGuard)> {
209	// Get the old room creation event
210	let mut content: CanonicalJsonObject = services
211		.state_accessor
212		.room_state_get_content(old_room_id, &StateEventType::RoomCreate, "")
213		.await
214		.map_err(|_| err!(Database("Found room without m.room.create event.")))?;
215
216	content.remove("creator");
217	// MSC4291: v12 create events omit the deprecated predecessor.event_id.
218	let predecessor = PreviousRoom { event_id: None, ..predecessor };
219
220	content.insert("predecessor".into(), json!(predecessor).try_into()?);
221	content.insert("room_version".into(), json!(new_version).try_into()?);
222
223	if version_rules
224		.authorization
225		.additional_room_creators
226	{
227		additional_creators.sort();
228		additional_creators.dedup();
229		content.remove("additional_creators");
230		if !additional_creators.is_empty() {
231			content.insert("additional_creators".into(), json!(additional_creators).try_into()?);
232		}
233	}
234
235	// Validate creation event content
236	let raw_content = to_raw_value(&content)?;
237	if let Err(e) = serde_json::from_str::<CanonicalJsonObject>(raw_content.get()) {
238		return Err!(Request(BadJson("Error forming creation event: {e}")));
239	}
240
241	let room_id = ruma::room_id!("!thiswillbereplaced").to_owned();
242	let state_lock = services.state.mutex.lock(&room_id).await;
243	let create_event_id = services
244		.timeline
245		.build_and_append_pdu(
246			PduBuilder {
247				event_type: TimelineEventType::RoomCreate,
248				content: to_raw_value(&content)?.into(),
249				state_key: Some(StateKey::new()),
250				..Default::default()
251			},
252			sender_user,
253			&room_id,
254			&state_lock,
255		)
256		.boxed()
257		.await?;
258
259	drop(state_lock);
260
261	// The real room_id is now the event_id.
262	let room_id = OwnedRoomId::from_parts('!', create_event_id.localpart(), None)?;
263	let state_lock = services.state.mutex.lock(&room_id).await;
264
265	Ok((room_id, state_lock))
266}
267
268#[tracing::instrument(level = "info")]
269async fn upgrade_room_create_legacy(
270	services: &Services,
271	sender_user: &UserId,
272	old_room_id: &RoomId,
273	new_version: &RoomVersionId,
274	version_rules: &RoomVersionRules,
275	predecessor: PreviousRoom,
276) -> Result<(OwnedRoomId, RoomMutexGuard)> {
277	// Create a replacement room
278	let new_room_id = RoomId::new_v1(services.globals.server_name());
279	let state_lock = services.state.mutex.lock(&new_room_id).await;
280	let _short_id = services
281		.short
282		.get_or_create_shortroomid(&new_room_id)
283		.await;
284
285	// Get the old room creation event
286	let mut content: CanonicalJsonObject = services
287		.state_accessor
288		.room_state_get_content(old_room_id, &StateEventType::RoomCreate, "")
289		.await
290		.map_err(|_| err!(Database("Found room without m.room.create event.")))?;
291
292	// Send a m.room.create event containing a predecessor field and the applicable
293	// room_version. "creator" key no longer exists in V11+ rooms.
294	{
295		use RoomVersionId::*;
296		match new_version {
297			| V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 =>
298				content.insert("creator".into(), json!(&sender_user).try_into()?),
299			| _ => content.remove("creator"),
300		}
301	};
302
303	content.insert("predecessor".into(), json!(predecessor).try_into()?);
304	content.insert("room_version".into(), json!(new_version).try_into()?);
305
306	// Validate creation event content
307	let raw_content = to_raw_value(&content)?;
308	if let Err(e) = serde_json::from_str::<CanonicalJsonObject>(raw_content.get()) {
309		return Err!(Request(BadJson("Error forming creation event: {e}")));
310	}
311
312	services
313		.timeline
314		.build_and_append_pdu(
315			PduBuilder {
316				event_type: TimelineEventType::RoomCreate,
317				content: to_raw_value(&content)?.into(),
318				state_key: Some(StateKey::new()),
319				..Default::default()
320			},
321			sender_user,
322			&new_room_id,
323			&state_lock,
324		)
325		.await?;
326
327	Ok((new_room_id, state_lock))
328}
329
330#[implement(RoomUpgradeContext, params = "<'_>")]
331#[tracing::instrument(level = "debug")]
332async fn transfer_room(&self) -> Result {
333	self.move_creator().await?;
334
335	self.move_state_events().await?;
336
337	self.move_space_state().await?;
338
339	self.move_sender_user().await?;
340
341	self.move_push_rules().await?;
342
343	self.move_local_aliases().await?;
344
345	self.tombstone_old_room().await?;
346
347	// After commitment to the tombstone above no more errors can propagate.
348	self.lockdown_old_room()
349		.await
350		.inspect_err(|e| error!(?self, "Failed to lockdown old room: {e}"))
351		.ok();
352
353	Ok(())
354}
355
356// Join the new room
357#[implement(RoomUpgradeContext, params = "<'_>")]
358#[tracing::instrument(level = "debug")]
359async fn move_creator(&self) -> Result {
360	self.move_member(self.creator).await?;
361
362	Ok(())
363}
364
365#[implement(RoomUpgradeContext, params = "<'_>")]
366#[tracing::instrument(level = "debug")]
367async fn move_sender_user(&self) -> Result {
368	if self.sender_user != self.creator {
369		self.services
370			.timeline
371			.build_and_append_pdu(
372				PduBuilder::state(
373					self.sender_user.as_str(),
374					&RoomMemberEventContent::new(MembershipState::Invite),
375				),
376				self.creator,
377				self.new_room_id,
378				self.new_state_lock,
379			)
380			.await?;
381
382		self.move_member(self.sender_user).await?;
383	}
384
385	Ok(())
386}
387
388#[implement(RoomUpgradeContext, params = "<'_>")]
389#[tracing::instrument(level = "debug")]
390async fn move_push_rules(&self) -> Result {
391	self.services
392		.account_data
393		.copy_room_push_rule(self.sender_user, self.old_room_id, self.new_room_id)
394		.await
395}
396
397#[implement(RoomUpgradeContext, params = "<'_>")]
398#[tracing::instrument(level = "debug")]
399async fn move_member(&self, user_id: &UserId) -> Result {
400	let old_content: RoomMemberEventContent = self
401		.services
402		.state_accessor
403		.room_state_get_content(self.old_room_id, &StateEventType::RoomMember, user_id.as_str())
404		.inspect_err(|e| error!(?self, "Missing room member event: {e}"))
405		.await?;
406
407	self.services
408		.timeline
409		.build_and_append_pdu(
410			PduBuilder::state(user_id.as_str(), &RoomMemberEventContent {
411				membership: MembershipState::Join,
412				join_authorized_via_users_server: None,
413				..old_content
414			}),
415			user_id,
416			self.new_room_id,
417			self.new_state_lock,
418		)
419		.await?;
420
421	Ok(())
422}
423
424// Replicate transferable state events to the new room
425#[implement(RoomUpgradeContext, params = "<'_>")]
426#[tracing::instrument(level = "debug")]
427async fn move_state_events(&self) -> Result {
428	RECOMMENDED_TRANSFERABLE_STATE_EVENT_TYPES
429		.iter()
430		.rev()
431		.stream()
432		.wide_filter_map(|event_type| {
433			self.services
434				.state_accessor
435				.room_state_get(self.old_room_id, event_type, "")
436				.ok()
437		})
438		.map(Ok)
439		.try_for_each(async |event| {
440			self.services
441				.timeline
442				.build_and_append_pdu(
443					self.rebuild_state_event(&event).await?,
444					self.creator,
445					self.new_room_id,
446					self.new_state_lock,
447				)
448				.inspect_err(|e| {
449					error!(?event, ?self, "Failed to transfer state on upgrade: {e}");
450				})
451				.map_ok(|_| ())
452				.await
453		})
454		.await
455}
456
457// MSC4168: copy m.space.parent for any room, plus m.space.child when the
458// old room is itself a space, into the upgraded room.
459#[implement(RoomUpgradeContext, params = "<'_>")]
460#[tracing::instrument(level = "debug")]
461async fn move_space_state(&self) -> Result {
462	let old_room_is_space = self
463		.services
464		.state_accessor
465		.room_state_get_content::<RoomCreateEventContent>(
466			self.old_room_id,
467			&StateEventType::RoomCreate,
468			"",
469		)
470		.await
471		.ok()
472		.and_then(|c| c.room_type)
473		.is_some_and(|t| matches!(t, RoomType::Space));
474
475	let event_types: &[StateEventType] = match old_room_is_space {
476		| true => &[StateEventType::SpaceParent, StateEventType::SpaceChild],
477		| false => &[StateEventType::SpaceParent],
478	};
479
480	event_types
481		.iter()
482		.stream()
483		.map(Ok)
484		.try_for_each(|event_type| {
485			self.services
486				.state_accessor
487				.room_state_keys(self.old_room_id, event_type)
488				.ignore_err()
489				.map(Ok)
490				.try_for_each(move |state_key| async move {
491					let Ok(event) = self
492						.services
493						.state_accessor
494						.room_state_get(self.old_room_id, event_type, &state_key)
495						.await
496					else {
497						return Ok(());
498					};
499
500					self.services
501						.timeline
502						.build_and_append_pdu(
503							self.rebuild_state_event(&event).await?,
504							self.creator,
505							self.new_room_id,
506							self.new_state_lock,
507						)
508						.inspect_err(|e| {
509							error!(?event, ?self, "Failed to copy space state: {e}");
510						})
511						.await
512						.ok();
513
514					Ok(())
515				})
516		})
517		.await
518}
519
520#[implement(RoomUpgradeContext, params = "<'_>")]
521#[tracing::instrument(level = "debug")]
522async fn rebuild_state_event<Pdu: Event>(&self, event: &Pdu) -> Result<PduBuilder> {
523	let content = match event.kind() {
524		| TimelineEventType::RoomPowerLevels => {
525			let mut content = event.get_content_as_value();
526
527			if self
528				.new_version_rules
529				.authorization
530				.explicitly_privilege_room_creators
531			{
532				if let Some(users) = content
533					.get_mut("users")
534					.and_then(JsonValue::as_object_mut)
535				{
536					users.retain(|user_id, _pl| {
537						!self
538							.additional_creators
539							.iter()
540							.map(AsRef::as_ref)
541							.chain(once(self.creator))
542							.map(UserId::as_str)
543							.any(is_equal_to!(user_id.as_str()))
544					});
545				}
546
547				if self.creator == self.sender_user
548					&& content["events"]["m.room.tombstone"]
549						.as_i64()
550						.is_none_or(is_less_than!(150))
551				{
552					content["events"]["m.room.tombstone"] = json!(150);
553				}
554			} else if self
555				.old_version_rules
556				.authorization
557				.explicitly_privilege_room_creators
558			{
559				#[expect(clippy::collapsible_if)]
560				if let Some(users) = content
561					.as_object_mut()
562					.expect("power levels event content must be an object")
563					.entry("users")
564					.or_insert(json!({}))
565					.as_object_mut()
566				{
567					let level = json!(1000);
568
569					self.services
570						.state_accessor
571						.get_create(self.old_room_id)
572						.await?
573						.creators(&self.old_version_rules.authorization)?
574						.for_each(|user_id| {
575							users.insert(user_id.to_string(), level.clone());
576						});
577				}
578			}
579
580			to_raw_value(&content)?
581		},
582		// MSC4168: rewrite `via` to the upgrading server's name on copied
583		// space-graph state events, since the previous room's via list may
584		// no longer cover the upgraded room.
585		| TimelineEventType::SpaceChild | TimelineEventType::SpaceParent => {
586			let mut content = event.get_content_as_value();
587			if let Some(obj) = content.as_object_mut() {
588				obj.insert("via".to_owned(), json!([self.sender_user.server_name().as_str()]));
589			}
590
591			to_raw_value(&content)?
592		},
593		| _ => to_raw_value(event.content())?,
594	};
595
596	Ok(PduBuilder {
597		content: content.into(),
598		event_type: event.kind().clone(),
599		state_key: event.state_key().map(Into::into),
600		..Default::default()
601	})
602}
603
604// Moves any local aliases to the new room
605#[implement(RoomUpgradeContext, params = "<'_>")]
606#[tracing::instrument(level = "debug")]
607async fn move_local_aliases(&self) -> Result {
608	self.services
609		.alias
610		.local_aliases_for_room(self.old_room_id)
611		.ready_for_each(|alias| {
612			self.services
613				.alias
614				.set_alias_by(alias, self.new_room_id, self.creator)
615				.inspect_err(|e| error!(?self, "Failed to add alias: {e}"))
616				.ok();
617		})
618		.map(Ok)
619		.await
620}
621
622// Send a m.room.tombstone event to the old room to indicate that it is not
623// intended to be used any further Fail if the sender does not have the required
624// permissions.
625#[implement(RoomUpgradeContext, params = "<'_>")]
626#[tracing::instrument(level = "debug")]
627async fn tombstone_old_room(&self) -> Result<OwnedEventId> {
628	self.services
629		.timeline
630		.build_and_append_pdu(
631			PduBuilder::state(StateKey::new(), &RoomTombstoneEventContent {
632				body: "This room has been upgraded.".to_owned(),
633				replacement_room: self.new_room_id.to_owned(),
634			}),
635			self.sender_user,
636			self.old_room_id,
637			self.old_state_lock,
638		)
639		.await
640}
641
642// Modify the power levels in the old room to prevent sending of events and
643// inviting new users. Though a Result is returned, the callsite above treats it
644// as infallible because the tombstone represents the commitment.
645#[implement(RoomUpgradeContext, params = "<'_>")]
646#[tracing::instrument(level = "debug")]
647async fn lockdown_old_room(&self) -> Result<OwnedEventId> {
648	// Get the old room power levels
649	let old_content: RoomPowerLevelsEventContent = self
650		.services
651		.state_accessor
652		.room_state_get_content(self.old_room_id, &StateEventType::RoomPowerLevels, "")
653		.await
654		.map_err(|_| err!(Database("Found room without m.room.power_levels event.")))?;
655
656	let old_users_default = old_content
657		.users_default
658		.checked_add(int!(1))
659		.ok_or_else(|| {
660			err!(Request(BadJson("users_default power levels event content is not valid")))
661		})?;
662
663	// Setting events_default and invite to the greater of 50 and users_default + 1
664	let new_level = max(int!(50), old_users_default);
665
666	self.services
667		.timeline
668		.build_and_append_pdu(
669			PduBuilder::state(StateKey::new(), &RoomPowerLevelsEventContent {
670				events_default: new_level,
671				invite: new_level,
672				..old_content
673			}),
674			self.sender_user,
675			self.old_room_id,
676			self.old_state_lock,
677		)
678		.await
679}