Skip to main content

tuwunel_admin/debug/
force_set_room_state_from_server.rs

1use std::collections::HashMap;
2
3use ruma::{
4	CanonicalJsonValue, OwnedEventId, OwnedRoomId, OwnedServerName,
5	api::federation::event::get_room_state,
6};
7use tuwunel_core::{
8	Err, Result, debug_error, err, info,
9	matrix::{Event, pdu::PduEvent},
10	warn,
11};
12use tuwunel_service::rooms::state_compressor::HashSetCompressStateEvent;
13
14use crate::admin_command;
15
16#[admin_command]
17#[tracing::instrument(level = "debug", skip(self))]
18pub(super) async fn force_set_room_state_from_server(
19	&self,
20	room_id: OwnedRoomId,
21	server_name: OwnedServerName,
22) -> Result {
23	// TODO: diverged from join remote
24
25	if !self
26		.services
27		.state_cache
28		.server_in_room(&self.services.server.name, &room_id)
29		.await
30	{
31		return Err!("We are not participating in the room / we don't know about the room ID.");
32	}
33
34	let first_pdu = self
35		.services
36		.timeline
37		.latest_pdu_in_room(&room_id)
38		.await
39		.map_err(|_| err!(Database("Failed to find the latest PDU in database")))?;
40
41	let room_version = self
42		.services
43		.state
44		.get_room_version(&room_id)
45		.await?;
46
47	let mut state: HashMap<u64, OwnedEventId> = HashMap::new();
48
49	let remote_state_response = self
50		.services
51		.federation
52		.execute(&server_name, get_room_state::v1::Request {
53			room_id: room_id.clone(),
54			event_id: first_pdu.event_id().to_owned(),
55		})
56		.await?;
57
58	for pdu in remote_state_response.pdus.clone() {
59		match self
60			.services
61			.event_handler
62			.parse_incoming_pdu(&pdu)
63			.await
64		{
65			| Ok(t) => t,
66			| Err(e) => {
67				warn!("Could not parse PDU, ignoring: {e}");
68				continue;
69			},
70		};
71	}
72
73	info!("Going through room_state response PDUs");
74	for result in remote_state_response.pdus.iter().map(|pdu| {
75		self.services
76			.server_keys
77			.validate_and_add_event_id(pdu, &room_version)
78	}) {
79		let Ok((event_id, mut value)) = result.await else {
80			continue;
81		};
82
83		let invalid_pdu_err = |e| {
84			debug_error!("Invalid PDU in fetching remote room state PDUs response: {value:#?}");
85			err!(BadServerResponse(debug_error!("Invalid PDU in send_join response: {e:?}")))
86		};
87
88		let pdu = if value["type"] == "m.room.create" {
89			PduEvent::from_object_and_roomid_and_eventid(&room_id, &event_id, value.clone())
90				.map_err(invalid_pdu_err)?
91		} else {
92			PduEvent::from_object_and_eventid(&event_id, value.clone())
93				.map_err(invalid_pdu_err)?
94		};
95
96		if !value.contains_key("room_id") {
97			let room_id = CanonicalJsonValue::String(room_id.as_str().into());
98			value.insert("room_id".into(), room_id);
99		}
100
101		self.services
102			.timeline
103			.add_pdu_outlier(&event_id, &value);
104
105		if let Some(state_key) = &pdu.state_key {
106			let shortstatekey = self
107				.services
108				.short
109				.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
110				.await;
111
112			state.insert(shortstatekey, pdu.event_id.clone());
113		}
114	}
115
116	info!("Going through auth_chain response");
117	for result in remote_state_response
118		.auth_chain
119		.iter()
120		.map(|pdu| {
121			self.services
122				.server_keys
123				.validate_and_add_event_id(pdu, &room_version)
124		}) {
125		let Ok((event_id, value)) = result.await else {
126			continue;
127		};
128
129		self.services
130			.timeline
131			.add_pdu_outlier(&event_id, &value);
132	}
133
134	let new_room_state = self
135		.services
136		.event_handler
137		.resolve_state(&room_id, &room_version, state)
138		.await?;
139
140	info!("Forcing new room state");
141	let HashSetCompressStateEvent {
142		shortstatehash: short_state_hash,
143		added,
144		removed,
145	} = self
146		.services
147		.state_compressor
148		.save_state(room_id.clone().as_ref(), new_room_state)
149		.await?;
150
151	let state_lock = self.services.state.mutex.lock(&*room_id).await;
152
153	self.services
154		.state
155		.force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock)
156		.await?;
157
158	info!(
159		"Updating joined counts for room just in case (e.g. we may have found a difference in \
160		 the room's m.room.member state"
161	);
162	self.services
163		.state_cache
164		.update_joined_count(&room_id)
165		.await;
166
167	self.write_str("Successfully forced the room state from the requested remote server.")
168		.await
169}