1use std::borrow::Borrow;
2
3use axum::extract::State;
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join4};
5use ruma::{
6 CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
7 RoomVersionId, ServerName, UserId,
8 api::federation::membership::create_join_event,
9 events::{
10 StateEventType,
11 room::member::{MembershipState, RoomMemberEventContent},
12 },
13};
14use serde_json::value::RawValue as RawJsonValue;
15use tuwunel_core::{
16 Err, Result, at, debug_error, err,
17 itertools::Itertools,
18 matrix::event::gen_event_id_canonical_json,
19 utils::{
20 BoolExt,
21 future::{BoolExt as _, ReadyBoolExt},
22 stream::{BroadbandExt, IterStream, TryBroadbandExt, TryReadyExt},
23 },
24 warn,
25};
26use tuwunel_service::Services;
27
28use crate::{Ruma, client::sync::calculate_heroes};
29
30pub(crate) async fn create_join_event_v2_route(
34 State(services): State<crate::State>,
35 body: Ruma<create_join_event::v2::Request>,
36) -> Result<create_join_event::v2::Response> {
37 let room_id = &body.room_id;
38 let origin = body.origin();
39 let members_omitted = body.omit_members;
40
41 if let Some(server) = room_id.server_name()
42 && services
43 .config
44 .is_forbidden_remote_server_name(server)
45 {
46 warn!(
47 "Server {origin} tried joining {room_id} through us which has a server name that is \
48 globally forbidden. Rejecting.",
49 );
50
51 return Err!(Request(Forbidden(warn!(
52 "Room ID server name {server} is banned on this homeserver."
53 ))));
54 }
55
56 let servers_in_room = members_omitted
58 .then_async(|| {
59 services
60 .state_cache
61 .room_servers(room_id)
62 .map(ToOwned::to_owned)
63 .collect::<Vec<_>>()
64 })
65 .await;
66
67 let mut room_state =
68 create_join_event(&services, origin, room_id, &body.pdu, members_omitted)
69 .boxed()
70 .await?;
71
72 room_state.members_omitted = members_omitted;
73 room_state.servers_in_room =
74 servers_in_room.map(|servers| servers.into_iter().map(Into::into).collect());
75
76 Ok(create_join_event::v2::Response { room_state })
77}
78
79async fn create_join_event(
80 services: &Services,
81 origin: &ServerName,
82 room_id: &RoomId,
83 pdu: &RawJsonValue,
84 omit_members: bool,
85) -> Result<create_join_event::v2::RoomState> {
86 if !services.metadata.exists(room_id).await {
87 return Err!(Request(NotFound("Room is unknown to this server.")));
88 }
89
90 services
92 .event_handler
93 .acl_check(origin, room_id)
94 .await?;
95
96 let shortstatehash = services
99 .state
100 .get_room_shortstatehash(room_id)
101 .await
102 .map_err(|e| err!(Request(NotFound(error!("Room has no state: {e}")))))?;
103
104 let room_version = services.state.get_room_version(room_id).await?;
107
108 let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version) else {
109 return Err!(Request(BadJson("Could not convert event to canonical json.")));
111 };
112
113 let (content, joining_user) =
114 validate_join_event_shape(services, &value, origin, room_id).await?;
115
116 if let Some(authorising_user) = content.join_authorized_via_users_server {
117 validate_restricted_join(
118 services,
119 &authorising_user,
120 &joining_user,
121 room_id,
122 &room_version,
123 )
124 .await?;
125 }
126
127 services
128 .server_keys
129 .hash_and_sign_event(&mut value, &room_version)
130 .map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?;
131
132 let origin: OwnedServerName = serde_json::from_value(
133 value
134 .get("origin")
135 .ok_or_else(|| err!(Request(BadJson("Event does not have an origin server name."))))?
136 .clone()
137 .into(),
138 )
139 .map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?;
140
141 let heroes = omit_members
144 .then_async(|| {
145 let has_name = services.state_accessor.state_contains(
146 shortstatehash,
147 &StateEventType::RoomName,
148 "",
149 );
150
151 let has_alias = services.state_accessor.state_contains(
152 shortstatehash,
153 &StateEventType::RoomCanonicalAlias,
154 "",
155 );
156
157 has_name
158 .is_false()
159 .and(has_alias.is_false())
160 .then(|_| calculate_heroes(services, room_id, &joining_user))
161 })
162 .await
163 .unwrap_or_default();
164
165 let state_ids = services
167 .state_accessor
168 .state_full_ids(shortstatehash)
169 .broad_filter_map(async |(ssk, event_id)| {
170 if omit_members
174 && let Ok((kind, sk)) = services.short.get_statekey_from_short(ssk).await
175 && kind == StateEventType::RoomMember
176 && let Ok(user_id) = sk.as_str().try_into()
177 && joining_user != user_id
178 && !heroes.contains(&user_id)
179 {
180 return None;
181 }
182
183 Some(event_id)
184 })
185 .collect::<Vec<_>>();
186
187 let mutex_lock = services
188 .event_handler
189 .mutex_federation
190 .lock(room_id)
191 .await;
192
193 let pdu_id = services
194 .event_handler
195 .handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true)
196 .boxed()
197 .await?
198 .map(at!(0))
199 .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
200
201 drop(mutex_lock);
202
203 let state_ids = state_ids
205 .await
206 .into_iter()
207 .sorted_unstable()
208 .collect::<Vec<_>>();
209
210 let into_federation_format = |pdu: CanonicalJsonObject| {
211 services
212 .federation
213 .format_pdu_into(pdu, Some(&room_version))
214 .map(Ok)
215 };
216
217 let include_auth_event =
219 |event_id: &OwnedEventId| !omit_members || state_ids.binary_search(event_id).is_err();
220
221 let auth_heads = state_ids.iter().map(Borrow::borrow);
222
223 let auth_chain = services
224 .auth_chain
225 .event_ids_iter(room_id, &room_version, auth_heads)
226 .ready_try_filter(include_auth_event)
227 .broad_and_then(async |event_id| {
228 services
229 .timeline
230 .get_pdu_json(&event_id)
231 .and_then(into_federation_format)
232 .inspect_err(|e| debug_error!(?event_id, "auth_chain event not found: {e}"))
233 .await
234 })
235 .try_collect();
236
237 let state = state_ids
238 .iter()
239 .try_stream()
240 .broad_and_then(async |event_id| {
241 services
242 .timeline
243 .get_pdu_json(event_id)
244 .and_then(into_federation_format)
245 .inspect_err(|e| debug_error!(?event_id, "state event not found: {e}"))
246 .await
247 })
248 .try_collect();
249
250 let event = services
252 .federation
253 .format_pdu_into(value, Some(&room_version))
254 .map(Some)
255 .map(Ok);
256
257 let broadcast = services.sending.send_pdu_room(room_id, &pdu_id);
259
260 let (auth_chain, state, event, ()) = try_join4(auth_chain, state, event, broadcast)
261 .boxed()
262 .await?;
263
264 Ok(create_join_event::v2::RoomState {
265 auth_chain,
266 state,
267 event,
268 ..Default::default()
269 })
270}
271
272async fn validate_join_event_shape(
273 services: &Services,
274 value: &CanonicalJsonObject,
275 origin: &ServerName,
276 room_id: &RoomId,
277) -> Result<(RoomMemberEventContent, OwnedUserId)> {
278 let event_room_id: OwnedRoomId = serde_json::from_value(
279 value
280 .get("room_id")
281 .ok_or_else(|| err!(Request(BadJson("Event missing room_id property."))))?
282 .clone()
283 .into(),
284 )
285 .map_err(|e| err!(Request(BadJson(warn!("room_id field is not a valid room ID: {e}")))))?;
286
287 if event_room_id != room_id {
288 return Err!(Request(BadJson("Event room_id does not match request path room ID.")));
289 }
290
291 let event_type: StateEventType = serde_json::from_value(
292 value
293 .get("type")
294 .ok_or_else(|| err!(Request(BadJson("Event missing type property."))))?
295 .clone()
296 .into(),
297 )
298 .map_err(|e| err!(Request(BadJson(warn!("Event has invalid state event type: {e}")))))?;
299
300 if event_type != StateEventType::RoomMember {
301 return Err!(Request(BadJson(
302 "Not allowed to send non-membership state event to join endpoint."
303 )));
304 }
305
306 let content: RoomMemberEventContent = serde_json::from_value(
307 value
308 .get("content")
309 .ok_or_else(|| err!(Request(BadJson("Event missing content property"))))?
310 .clone()
311 .into(),
312 )
313 .map_err(|e| err!(Request(BadJson(warn!("Event content is empty or invalid: {e}")))))?;
314
315 if content.membership != MembershipState::Join {
316 return Err!(Request(BadJson(
317 "Not allowed to send a non-join membership event to join endpoint."
318 )));
319 }
320
321 let sender: OwnedUserId = serde_json::from_value(
323 value
324 .get("sender")
325 .ok_or_else(|| err!(Request(BadJson("Event missing sender property."))))?
326 .clone()
327 .into(),
328 )
329 .map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?;
330
331 services
332 .event_handler
333 .acl_check(sender.server_name(), room_id)
334 .await?;
335
336 if sender.server_name() != origin {
338 return Err!(Request(Forbidden("Not allowed to join on behalf of another server.")));
339 }
340
341 let joining_user: OwnedUserId = serde_json::from_value(
342 value
343 .get("state_key")
344 .ok_or_else(|| err!(Request(BadJson("Event missing state_key property."))))?
345 .clone()
346 .into(),
347 )
348 .map_err(|e| err!(Request(BadJson(warn!("State key is not a valid user ID: {e}")))))?;
349
350 if joining_user != sender {
351 return Err!(Request(BadJson("State key does not match sender user.")));
352 }
353
354 Ok((content, joining_user))
355}
356
357async fn validate_restricted_join(
358 services: &Services,
359 authorising_user: &UserId,
360 joining_user: &UserId,
361 room_id: &RoomId,
362 room_version: &RoomVersionId,
363) -> Result {
364 use RoomVersionId::*;
365
366 if matches!(room_version, V1 | V2 | V3 | V4 | V5 | V6 | V7) {
367 return Err!(Request(InvalidParam(
368 "Room version {room_version} does not support restricted rooms but \
369 join_authorised_via_users_server ({authorising_user}) was found in the event."
370 )));
371 }
372
373 if !services.globals.user_is_local(authorising_user) {
374 return Err!(Request(InvalidParam(
375 "Cannot authorise membership event through {authorising_user} as they do not belong \
376 to this homeserver"
377 )));
378 }
379
380 if !services
381 .state_cache
382 .is_joined(authorising_user, room_id)
383 .await
384 {
385 return Err!(Request(InvalidParam(
386 "Authorising user {authorising_user} is not in the room you are trying to join, \
387 they cannot authorise your join."
388 )));
389 }
390
391 if !super::user_can_perform_restricted_join(services, joining_user, room_id, room_version)
392 .await?
393 {
394 return Err!(Request(UnableToAuthorizeJoin(
395 "Joining user did not pass restricted room's rules."
396 )));
397 }
398
399 Ok(())
400}