1use std::borrow::Borrow;
2
3use axum::extract::State;
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join4};
5use ruma::{
6 CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
7 ServerName,
8 api::federation::membership::create_join_event,
9 events::{
10 StateEventType,
11 room::member::{MembershipState, RoomMemberEventContent},
12 },
13};
14use serde_json::value::RawValue as RawJsonValue;
15use tuwunel_core::{
16 Err, Result, at, debug_error, err,
17 itertools::Itertools,
18 matrix::event::gen_event_id_canonical_json,
19 utils::{
20 BoolExt,
21 future::{BoolExt as _, ReadyBoolExt},
22 stream::{BroadbandExt, IterStream, TryBroadbandExt, TryReadyExt},
23 },
24 warn,
25};
26use tuwunel_service::Services;
27
28use crate::{Ruma, client::sync::calculate_heroes};
29
30pub(crate) async fn create_join_event_v2_route(
34 State(services): State<crate::State>,
35 body: Ruma<create_join_event::v2::Request>,
36) -> Result<create_join_event::v2::Response> {
37 let room_id = &body.room_id;
38 let origin = body.origin();
39 let members_omitted = body.omit_members;
40
41 if let Some(server) = room_id.server_name()
42 && services
43 .config
44 .is_forbidden_remote_server_name(server)
45 {
46 warn!(
47 "Server {origin} tried joining {room_id} through us which has a server name that is \
48 globally forbidden. Rejecting.",
49 );
50
51 return Err!(Request(Forbidden(warn!(
52 "Room ID server name {server} is banned on this homeserver."
53 ))));
54 }
55
56 let servers_in_room = members_omitted
58 .then_async(|| {
59 services
60 .state_cache
61 .room_servers(room_id)
62 .map(ToOwned::to_owned)
63 .collect::<Vec<_>>()
64 })
65 .await;
66
67 let mut room_state =
68 create_join_event(&services, origin, room_id, &body.pdu, members_omitted)
69 .boxed()
70 .await?;
71
72 room_state.members_omitted = members_omitted;
73 room_state.servers_in_room =
74 servers_in_room.map(|servers| servers.into_iter().map(Into::into).collect());
75
76 Ok(create_join_event::v2::Response { room_state })
77}
78
79async fn create_join_event(
80 services: &Services,
81 origin: &ServerName,
82 room_id: &RoomId,
83 pdu: &RawJsonValue,
84 omit_members: bool,
85) -> Result<create_join_event::v2::RoomState> {
86 if !services.metadata.exists(room_id).await {
87 return Err!(Request(NotFound("Room is unknown to this server.")));
88 }
89
90 services
92 .event_handler
93 .acl_check(origin, room_id)
94 .await?;
95
96 let shortstatehash = services
99 .state
100 .get_room_shortstatehash(room_id)
101 .await
102 .map_err(|e| err!(Request(NotFound(error!("Room has no state: {e}")))))?;
103
104 let room_version = services.state.get_room_version(room_id).await?;
107
108 let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version) else {
109 return Err!(Request(BadJson("Could not convert event to canonical json.")));
111 };
112
113 let event_room_id: OwnedRoomId = serde_json::from_value(
114 value
115 .get("room_id")
116 .ok_or_else(|| err!(Request(BadJson("Event missing room_id property."))))?
117 .clone()
118 .into(),
119 )
120 .map_err(|e| err!(Request(BadJson(warn!("room_id field is not a valid room ID: {e}")))))?;
121
122 if event_room_id != room_id {
123 return Err!(Request(BadJson("Event room_id does not match request path room ID.")));
124 }
125
126 let event_type: StateEventType = serde_json::from_value(
127 value
128 .get("type")
129 .ok_or_else(|| err!(Request(BadJson("Event missing type property."))))?
130 .clone()
131 .into(),
132 )
133 .map_err(|e| err!(Request(BadJson(warn!("Event has invalid state event type: {e}")))))?;
134
135 if event_type != StateEventType::RoomMember {
136 return Err!(Request(BadJson(
137 "Not allowed to send non-membership state event to join endpoint."
138 )));
139 }
140
141 let content: RoomMemberEventContent = serde_json::from_value(
142 value
143 .get("content")
144 .ok_or_else(|| err!(Request(BadJson("Event missing content property"))))?
145 .clone()
146 .into(),
147 )
148 .map_err(|e| err!(Request(BadJson(warn!("Event content is empty or invalid: {e}")))))?;
149
150 if content.membership != MembershipState::Join {
151 return Err!(Request(BadJson(
152 "Not allowed to send a non-join membership event to join endpoint."
153 )));
154 }
155
156 let sender: OwnedUserId = serde_json::from_value(
158 value
159 .get("sender")
160 .ok_or_else(|| err!(Request(BadJson("Event missing sender property."))))?
161 .clone()
162 .into(),
163 )
164 .map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?;
165
166 services
167 .event_handler
168 .acl_check(sender.server_name(), room_id)
169 .await?;
170
171 if sender.server_name() != origin {
173 return Err!(Request(Forbidden("Not allowed to join on behalf of another server.")));
174 }
175
176 let joining_user: OwnedUserId = serde_json::from_value(
177 value
178 .get("state_key")
179 .ok_or_else(|| err!(Request(BadJson("Event missing state_key property."))))?
180 .clone()
181 .into(),
182 )
183 .map_err(|e| err!(Request(BadJson(warn!("State key is not a valid user ID: {e}")))))?;
184
185 if joining_user != sender {
186 return Err!(Request(BadJson("State key does not match sender user.")));
187 }
188
189 if let Some(authorising_user) = content.join_authorized_via_users_server {
190 use ruma::RoomVersionId::*;
191
192 if matches!(room_version, V1 | V2 | V3 | V4 | V5 | V6 | V7) {
193 return Err!(Request(InvalidParam(
194 "Room version {room_version} does not support restricted rooms but \
195 join_authorised_via_users_server ({authorising_user}) was found in the event."
196 )));
197 }
198
199 if !services.globals.user_is_local(&authorising_user) {
200 return Err!(Request(InvalidParam(
201 "Cannot authorise membership event through {authorising_user} as they do not \
202 belong to this homeserver"
203 )));
204 }
205
206 if !services
207 .state_cache
208 .is_joined(&authorising_user, room_id)
209 .await
210 {
211 return Err!(Request(InvalidParam(
212 "Authorising user {authorising_user} is not in the room you are trying to join, \
213 they cannot authorise your join."
214 )));
215 }
216
217 if !super::user_can_perform_restricted_join(
218 services,
219 &joining_user,
220 room_id,
221 &room_version,
222 )
223 .await?
224 {
225 return Err!(Request(UnableToAuthorizeJoin(
226 "Joining user did not pass restricted room's rules."
227 )));
228 }
229 }
230
231 services
232 .server_keys
233 .hash_and_sign_event(&mut value, &room_version)
234 .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?;
235
236 let origin: OwnedServerName = serde_json::from_value(
237 value
238 .get("origin")
239 .ok_or_else(|| err!(Request(BadJson("Event does not have an origin server name."))))?
240 .clone()
241 .into(),
242 )
243 .map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?;
244
245 let heroes = omit_members
248 .then_async(|| {
249 let has_name = services.state_accessor.state_contains(
250 shortstatehash,
251 &StateEventType::RoomName,
252 "",
253 );
254
255 let has_alias = services.state_accessor.state_contains(
256 shortstatehash,
257 &StateEventType::RoomCanonicalAlias,
258 "",
259 );
260
261 has_name
262 .is_false()
263 .and(has_alias.is_false())
264 .then(|_| calculate_heroes(services, room_id, &joining_user))
265 })
266 .await
267 .unwrap_or_default();
268
269 let state_ids = services
271 .state_accessor
272 .state_full_ids(shortstatehash)
273 .broad_filter_map(async |(ssk, event_id)| {
274 if omit_members
278 && let Ok((kind, sk)) = services.short.get_statekey_from_short(ssk).await
279 && kind == StateEventType::RoomMember
280 && let Ok(user_id) = sk.as_str().try_into()
281 && joining_user != user_id
282 && !heroes.contains(&user_id)
283 {
284 return None;
285 }
286
287 Some(event_id)
288 })
289 .collect::<Vec<_>>();
290
291 let mutex_lock = services
292 .event_handler
293 .mutex_federation
294 .lock(room_id)
295 .await;
296
297 let pdu_id = services
298 .event_handler
299 .handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true)
300 .boxed()
301 .await?
302 .map(at!(0))
303 .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
304
305 drop(mutex_lock);
306
307 let state_ids = state_ids
309 .await
310 .into_iter()
311 .sorted_unstable()
312 .collect::<Vec<_>>();
313
314 let into_federation_format = |pdu: CanonicalJsonObject| {
315 services
316 .federation
317 .format_pdu_into(pdu, Some(&room_version))
318 .map(Ok)
319 };
320
321 let include_auth_event =
323 |event_id: &OwnedEventId| !omit_members || state_ids.binary_search(event_id).is_err();
324
325 let auth_heads = state_ids.iter().map(Borrow::borrow);
326
327 let auth_chain = services
328 .auth_chain
329 .event_ids_iter(room_id, &room_version, auth_heads)
330 .ready_try_filter(include_auth_event)
331 .broad_and_then(async |event_id| {
332 services
333 .timeline
334 .get_pdu_json(&event_id)
335 .and_then(into_federation_format)
336 .inspect_err(|e| debug_error!(?event_id, "auth_chain event not found: {e}"))
337 .await
338 })
339 .try_collect();
340
341 let state = state_ids
342 .iter()
343 .try_stream()
344 .broad_and_then(async |event_id| {
345 services
346 .timeline
347 .get_pdu_json(event_id)
348 .and_then(into_federation_format)
349 .inspect_err(|e| debug_error!(?event_id, "state event not found: {e}"))
350 .await
351 })
352 .try_collect();
353
354 let event = services
356 .federation
357 .format_pdu_into(value, Some(&room_version))
358 .map(Some)
359 .map(Ok);
360
361 let broadcast = services.sending.send_pdu_room(room_id, &pdu_id);
363
364 let (auth_chain, state, event, ()) = try_join4(auth_chain, state, event, broadcast)
365 .boxed()
366 .await?;
367
368 Ok(create_join_event::v2::RoomState {
369 auth_chain,
370 state,
371 event,
372 ..Default::default()
373 })
374}