tuwunel_admin/debug/
force_set_room_state_from_server.rs1use std::collections::HashMap;
2
3use ruma::{
4 CanonicalJsonValue, OwnedEventId, OwnedRoomId, OwnedServerName,
5 api::federation::event::get_room_state,
6};
7use tuwunel_core::{
8 Err, Result, debug_error, err, info,
9 matrix::{Event, pdu::PduEvent},
10 warn,
11};
12use tuwunel_service::rooms::state_compressor::HashSetCompressStateEvent;
13
14use crate::admin_command;
15
16#[admin_command]
17#[tracing::instrument(level = "debug", skip(self))]
18pub(super) async fn force_set_room_state_from_server(
19 &self,
20 room_id: OwnedRoomId,
21 server_name: OwnedServerName,
22) -> Result {
23 if !self
26 .services
27 .state_cache
28 .server_in_room(&self.services.server.name, &room_id)
29 .await
30 {
31 return Err!("We are not participating in the room / we don't know about the room ID.");
32 }
33
34 let first_pdu = self
35 .services
36 .timeline
37 .latest_pdu_in_room(&room_id)
38 .await
39 .map_err(|_| err!(Database("Failed to find the latest PDU in database")))?;
40
41 let room_version = self
42 .services
43 .state
44 .get_room_version(&room_id)
45 .await?;
46
47 let mut state: HashMap<u64, OwnedEventId> = HashMap::new();
48
49 let remote_state_response = self
50 .services
51 .federation
52 .execute(&server_name, get_room_state::v1::Request {
53 room_id: room_id.clone(),
54 event_id: first_pdu.event_id().to_owned(),
55 })
56 .await?;
57
58 for pdu in remote_state_response.pdus.clone() {
59 match self
60 .services
61 .event_handler
62 .parse_incoming_pdu(&pdu)
63 .await
64 {
65 | Ok(t) => t,
66 | Err(e) => {
67 warn!("Could not parse PDU, ignoring: {e}");
68 continue;
69 },
70 };
71 }
72
73 info!("Going through room_state response PDUs");
74 for result in remote_state_response.pdus.iter().map(|pdu| {
75 self.services
76 .server_keys
77 .validate_and_add_event_id(pdu, &room_version)
78 }) {
79 let Ok((event_id, mut value)) = result.await else {
80 continue;
81 };
82
83 let invalid_pdu_err = |e| {
84 debug_error!("Invalid PDU in fetching remote room state PDUs response: {value:#?}");
85 err!(BadServerResponse(debug_error!("Invalid PDU in send_join response: {e:?}")))
86 };
87
88 let pdu = if value["type"] == "m.room.create" {
89 PduEvent::from_object_and_roomid_and_eventid(&room_id, &event_id, value.clone())
90 .map_err(invalid_pdu_err)?
91 } else {
92 PduEvent::from_object_and_eventid(&event_id, value.clone())
93 .map_err(invalid_pdu_err)?
94 };
95
96 if !value.contains_key("room_id") {
97 let room_id = CanonicalJsonValue::String(room_id.as_str().into());
98 value.insert("room_id".into(), room_id);
99 }
100
101 self.services
102 .timeline
103 .add_pdu_outlier(&event_id, &value);
104
105 if let Some(state_key) = &pdu.state_key {
106 let shortstatekey = self
107 .services
108 .short
109 .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
110 .await;
111
112 state.insert(shortstatekey, pdu.event_id.clone());
113 }
114 }
115
116 info!("Going through auth_chain response");
117 for result in remote_state_response
118 .auth_chain
119 .iter()
120 .map(|pdu| {
121 self.services
122 .server_keys
123 .validate_and_add_event_id(pdu, &room_version)
124 }) {
125 let Ok((event_id, value)) = result.await else {
126 continue;
127 };
128
129 self.services
130 .timeline
131 .add_pdu_outlier(&event_id, &value);
132 }
133
134 let new_room_state = self
135 .services
136 .event_handler
137 .resolve_state(&room_id, &room_version, state)
138 .await?;
139
140 info!("Forcing new room state");
141 let HashSetCompressStateEvent {
142 shortstatehash: short_state_hash,
143 added,
144 removed,
145 } = self
146 .services
147 .state_compressor
148 .save_state(room_id.clone().as_ref(), new_room_state)
149 .await?;
150
151 let state_lock = self.services.state.mutex.lock(&*room_id).await;
152
153 self.services
154 .state
155 .force_state(room_id.clone().as_ref(), short_state_hash, added, removed, &state_lock)
156 .await?;
157
158 info!(
159 "Updating joined counts for room just in case (e.g. we may have found a difference in \
160 the room's m.room.member state"
161 );
162 self.services
163 .state_cache
164 .update_joined_count(&room_id)
165 .await;
166
167 self.write_str("Successfully forced the room state from the requested remote server.")
168 .await
169}