tuwunel_service/membership/
leave.rs1use std::collections::HashSet;
2
3use futures::{
4 FutureExt, StreamExt, TryFutureExt,
5 future::{join3, ready},
6 pin_mut,
7};
8use ruma::{
9 CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, RoomId, UserId,
10 api::federation,
11 canonical_json::to_canonical_value,
12 events::{
13 StateEventType,
14 room::member::{MembershipState, RoomMemberEventContent},
15 },
16};
17use tuwunel_core::{
18 Err, Result, debug_info, debug_warn, err, implement,
19 matrix::{PduCount, pdu::check_rules, room_version},
20 pdu::PduBuilder,
21 utils::{
22 self, FutureBoolExt,
23 future::{ReadyBoolExt, TryExtExt},
24 },
25 warn,
26};
27
28use super::Service;
29use crate::rooms::timeline::RoomMutexGuard;
30
31#[implement(Service)]
32#[tracing::instrument(
33 level = "debug",
34 skip_all,
35 fields(%room_id, %user_id)
36)]
37pub async fn leave(
38 &self,
39 user_id: &UserId,
40 room_id: &RoomId,
41 reason: Option<String>,
42 remote_leave_now: bool,
43 state_lock: &RoomMutexGuard,
44) -> Result {
45 let default_member_content = RoomMemberEventContent {
46 membership: MembershipState::Leave,
47 reason: reason.clone(),
48 join_authorized_via_users_server: None,
49 is_direct: None,
50 avatar_url: None,
51 displayname: None,
52 third_party_invite: None,
53 blurhash: None,
54 };
55
56 let is_banned = self.services.metadata.is_banned(room_id);
57 let is_disabled = self.services.metadata.is_disabled(room_id);
58
59 pin_mut!(is_banned, is_disabled);
60 if is_banned.or(is_disabled).await {
61 let count = self.services.globals.next_count();
64 self.services
65 .state_cache
66 .update_membership(
67 room_id,
68 user_id,
69 default_member_content,
70 user_id,
71 None,
72 None,
73 true,
74 PduCount::Normal(*count),
75 )
76 .await?;
77
78 return Ok(());
79 }
80
81 let member_event = self
82 .services
83 .state_accessor
84 .room_state_get_content::<RoomMemberEventContent>(
85 room_id,
86 &StateEventType::RoomMember,
87 user_id.as_str(),
88 )
89 .await;
90
91 let dont_have_room = self
92 .services
93 .state_cache
94 .server_in_room(self.services.globals.server_name(), room_id)
95 .is_false()
96 .and(ready(member_event.as_ref().is_err()));
97
98 let not_knocked = self
99 .services
100 .state_cache
101 .is_knocked(user_id, room_id)
102 .is_false();
103
104 if remote_leave_now || dont_have_room.and(not_knocked).await {
106 if let Err(e) = self
107 .remote_leave(user_id, room_id, reason)
108 .boxed()
109 .await
110 {
111 warn!(%user_id, "Failed to leave room {room_id} remotely: {e}");
112 }
114
115 let last_state = self
116 .services
117 .state_cache
118 .invite_state(user_id, room_id)
119 .or_else(|_| {
120 self.services
121 .state_cache
122 .knock_state(user_id, room_id)
123 })
124 .or_else(|_| {
125 self.services
126 .state_cache
127 .left_state(user_id, room_id)
128 })
129 .await
130 .ok();
131
132 let count = self.services.globals.next_count();
134 self.services
135 .state_cache
136 .update_membership(
137 room_id,
138 user_id,
139 default_member_content,
140 user_id,
141 last_state,
142 None,
143 true,
144 PduCount::Normal(*count),
145 )
146 .await?;
147 } else {
148 let Ok(event) = member_event else {
149 debug_warn!(
150 "Trying to leave a room you are not a member of, marking room as left locally."
151 );
152
153 let count = self.services.globals.next_count();
154 return self
155 .services
156 .state_cache
157 .update_membership(
158 room_id,
159 user_id,
160 default_member_content,
161 user_id,
162 None,
163 None,
164 true,
165 PduCount::Normal(*count),
166 )
167 .await;
168 };
169
170 self.services
171 .timeline
172 .build_and_append_pdu(
173 PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
174 membership: MembershipState::Leave,
175 reason,
176 join_authorized_via_users_server: None,
177 is_direct: None,
178 ..event
179 }),
180 user_id,
181 room_id,
182 state_lock,
183 )
184 .await?;
185 }
186
187 Ok(())
188}
189
190#[implement(Service)]
191#[tracing::instrument(name = "remote", level = "debug", skip_all)]
192async fn remote_leave(
193 &self,
194 user_id: &UserId,
195 room_id: &RoomId,
196 reason: Option<String>,
197) -> Result {
198 let mut make_leave_response_and_server =
199 Err!(BadServerResponse("No remote server available to assist in leaving {room_id}."));
200
201 let mut servers: HashSet<OwnedServerName> = self
202 .services
203 .state_cache
204 .servers_invite_via(room_id)
205 .chain(self.services.state_cache.room_servers(room_id))
206 .map(ToOwned::to_owned)
207 .collect()
208 .await;
209
210 match self
211 .services
212 .state_cache
213 .invite_state(user_id, room_id)
214 .await
215 {
216 | Ok(invite_state) => {
217 servers.extend(
218 invite_state
219 .iter()
220 .filter_map(|event| event.get_field("sender").ok().flatten())
221 .filter_map(|sender: &str| UserId::parse(sender).ok())
222 .map(|user| user.server_name().to_owned()),
223 );
224 },
225 | _ => {
226 match self
227 .services
228 .state_cache
229 .knock_state(user_id, room_id)
230 .await
231 {
232 | Ok(knock_state) => {
233 servers.extend(
234 knock_state
235 .iter()
236 .filter_map(|event| event.get_field("sender").ok().flatten())
237 .filter_map(|sender: &str| UserId::parse(sender).ok())
238 .filter_map(|sender| {
239 if !self.services.globals.user_is_local(&sender) {
240 Some(sender.server_name().to_owned())
241 } else {
242 None
243 }
244 }),
245 );
246 },
247 | _ => {},
248 }
249 },
250 }
251
252 servers.insert(user_id.server_name().to_owned());
253 if let Some(room_id_server_name) = room_id.server_name() {
254 servers.insert(room_id_server_name.to_owned());
255 }
256
257 debug_info!("servers in remote_leave_room: {servers:?}");
258
259 for remote_server in servers
260 .into_iter()
261 .filter(|server| !self.services.globals.server_is_ours(server))
262 {
263 let make_leave_response = self
264 .services
265 .federation
266 .execute(&remote_server, federation::membership::prepare_leave_event::v1::Request {
267 room_id: room_id.to_owned(),
268 user_id: user_id.to_owned(),
269 })
270 .await;
271
272 make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server));
273
274 if make_leave_response_and_server.is_ok() {
275 break;
276 }
277 }
278
279 let (make_leave_response, remote_server) = make_leave_response_and_server?;
280
281 let Some(room_version_id) = make_leave_response.room_version else {
282 return Err!(BadServerResponse(warn!(
283 "No room version was returned by {remote_server} for {room_id}, room version is \
284 likely not supported by tuwunel"
285 )));
286 };
287
288 if !self
289 .services
290 .config
291 .supported_room_version(&room_version_id)
292 {
293 return Err!(BadServerResponse(warn!(
294 "Remote room version {room_version_id} for {room_id} is not supported by conduwuit",
295 )));
296 }
297
298 let room_version_rules = room_version::rules(&room_version_id)?;
299
300 let mut event = serde_json::from_str::<CanonicalJsonObject>(make_leave_response.event.get())
301 .map_err(|e| {
302 err!(BadServerResponse(warn!(
303 "Invalid make_leave event json received from {remote_server} for {room_id}: \
304 {e:?}"
305 )))
306 })?;
307
308 let displayname = self.services.users.displayname(user_id).ok();
309
310 let avatar_url = self.services.users.avatar_url(user_id).ok();
311
312 let blurhash = self.services.users.blurhash(user_id).ok();
313
314 let (displayname, avatar_url, blurhash) = join3(displayname, avatar_url, blurhash).await;
315
316 event.insert(
317 "content".into(),
318 to_canonical_value(RoomMemberEventContent {
319 displayname,
320 avatar_url,
321 blurhash,
322 reason,
323 ..RoomMemberEventContent::new(MembershipState::Leave)
324 })?,
325 );
326
327 event.insert(
328 "origin".into(),
329 CanonicalJsonValue::String(
330 self.services
331 .globals
332 .server_name()
333 .as_str()
334 .to_owned(),
335 ),
336 );
337
338 event.insert(
339 "origin_server_ts".into(),
340 CanonicalJsonValue::Integer(utils::millis_since_unix_epoch().try_into()?),
341 );
342
343 event.insert("room_id".into(), CanonicalJsonValue::String(room_id.as_str().into()));
344
345 event.insert("state_key".into(), CanonicalJsonValue::String(user_id.as_str().into()));
346
347 event.insert("sender".into(), CanonicalJsonValue::String(user_id.as_str().into()));
348
349 event.insert("type".into(), CanonicalJsonValue::String("m.room.member".into()));
350
351 let event_id = self
352 .services
353 .server_keys
354 .gen_id_hash_and_sign_event(&mut event, &room_version_id)?;
355
356 check_rules(&event, &room_version_rules.event_format)?;
357
358 self.services
359 .federation
360 .execute(&remote_server, federation::membership::create_leave_event::v2::Request {
361 room_id: room_id.to_owned(),
362 event_id,
363 pdu: self
364 .services
365 .federation
366 .format_pdu_into(event.clone(), Some(&room_version_id))
367 .await,
368 })
369 .await?;
370
371 Ok(())
372}