Skip to main content

tuwunel_api/server/
send_join.rs

1use std::borrow::Borrow;
2
3use axum::extract::State;
4use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join4};
5use ruma::{
6	CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId,
7	RoomVersionId, ServerName, UserId,
8	api::federation::membership::create_join_event,
9	events::{
10		StateEventType,
11		room::member::{MembershipState, RoomMemberEventContent},
12	},
13};
14use serde_json::value::RawValue as RawJsonValue;
15use tuwunel_core::{
16	Err, Result, at, debug_error, err,
17	itertools::Itertools,
18	matrix::event::gen_event_id_canonical_json,
19	utils::{
20		BoolExt,
21		future::{BoolExt as _, ReadyBoolExt},
22		stream::{BroadbandExt, IterStream, TryBroadbandExt, TryReadyExt},
23	},
24	warn,
25};
26use tuwunel_service::Services;
27
28use crate::{Ruma, client::sync::calculate_heroes};
29
30/// # `PUT /_matrix/federation/v2/send_join/{roomId}/{eventId}`
31///
32/// Submits a signed join event.
33pub(crate) async fn create_join_event_v2_route(
34	State(services): State<crate::State>,
35	body: Ruma<create_join_event::v2::Request>,
36) -> Result<create_join_event::v2::Response> {
37	let room_id = &body.room_id;
38	let origin = body.origin();
39	let members_omitted = body.omit_members;
40
41	if let Some(server) = room_id.server_name()
42		&& services
43			.config
44			.is_forbidden_remote_server_name(server)
45	{
46		warn!(
47			"Server {origin} tried joining {room_id} through us which has a server name that is \
48			 globally forbidden. Rejecting.",
49		);
50
51		return Err!(Request(Forbidden(warn!(
52			"Room ID server name {server} is banned on this homeserver."
53		))));
54	}
55
56	// Get the servers in the room BEFORE the join
57	let servers_in_room = members_omitted
58		.then_async(|| {
59			services
60				.state_cache
61				.room_servers(room_id)
62				.map(ToOwned::to_owned)
63				.collect::<Vec<_>>()
64		})
65		.await;
66
67	let mut room_state =
68		create_join_event(&services, origin, room_id, &body.pdu, members_omitted)
69			.boxed()
70			.await?;
71
72	room_state.members_omitted = members_omitted;
73	room_state.servers_in_room =
74		servers_in_room.map(|servers| servers.into_iter().map(Into::into).collect());
75
76	Ok(create_join_event::v2::Response { room_state })
77}
78
79async fn create_join_event(
80	services: &Services,
81	origin: &ServerName,
82	room_id: &RoomId,
83	pdu: &RawJsonValue,
84	omit_members: bool,
85) -> Result<create_join_event::v2::RoomState> {
86	if !services.metadata.exists(room_id).await {
87		return Err!(Request(NotFound("Room is unknown to this server.")));
88	}
89
90	// ACL check origin server
91	services
92		.event_handler
93		.acl_check(origin, room_id)
94		.await?;
95
96	// We need to return the state prior to joining, let's keep a reference to that
97	// here
98	let shortstatehash = services
99		.state
100		.get_room_shortstatehash(room_id)
101		.await
102		.map_err(|e| err!(Request(NotFound(error!("Room has no state: {e}")))))?;
103
104	// We do not add the event_id field to the pdu here because of signature and
105	// hashes checks
106	let room_version = services.state.get_room_version(room_id).await?;
107
108	let Ok((event_id, mut value)) = gen_event_id_canonical_json(pdu, &room_version) else {
109		// Event could not be converted to canonical json
110		return Err!(Request(BadJson("Could not convert event to canonical json.")));
111	};
112
113	let (content, joining_user) =
114		validate_join_event_shape(services, &value, origin, room_id).await?;
115
116	if let Some(authorising_user) = content.join_authorized_via_users_server {
117		validate_restricted_join(
118			services,
119			&authorising_user,
120			&joining_user,
121			room_id,
122			&room_version,
123		)
124		.await?;
125	}
126
127	services
128		.server_keys
129		.hash_and_sign_event(&mut value, &room_version)
130		.map_err(|e| err!(Request(InvalidParam(warn!("Failed to sign send_join event: {e}")))))?;
131
132	let origin: OwnedServerName = serde_json::from_value(
133		value
134			.get("origin")
135			.ok_or_else(|| err!(Request(BadJson("Event does not have an origin server name."))))?
136			.clone()
137			.into(),
138	)
139	.map_err(|e| err!(Request(BadJson("Event has an invalid origin server name: {e}"))))?;
140
141	// MSC3943: Only include heroes when the room has no name and no
142	// canonical alias (matching Synapse's behavior in PR #14442).
143	let heroes = omit_members
144		.then_async(|| {
145			let has_name = services.state_accessor.state_contains(
146				shortstatehash,
147				&StateEventType::RoomName,
148				"",
149			);
150
151			let has_alias = services.state_accessor.state_contains(
152				shortstatehash,
153				&StateEventType::RoomCanonicalAlias,
154				"",
155			);
156
157			has_name
158				.is_false()
159				.and(has_alias.is_false())
160				.then(|_| calculate_heroes(services, room_id, &joining_user))
161		})
162		.await
163		.unwrap_or_default();
164
165	// Prestart state gather here since it doesn't involve the new join event.
166	let state_ids = services
167		.state_accessor
168		.state_full_ids(shortstatehash)
169		.broad_filter_map(async |(ssk, event_id)| {
170			// Filter state: keep all non-member events, the joining user's
171			// member event, and hero member events. If get_statekey_from_short
172			// fails, keep the event (safe default, matching original behavior).
173			if omit_members
174				&& let Ok((kind, sk)) = services.short.get_statekey_from_short(ssk).await
175				&& kind == StateEventType::RoomMember
176				&& let Ok(user_id) = sk.as_str().try_into()
177				&& joining_user != user_id
178				&& !heroes.contains(&user_id)
179			{
180				return None;
181			}
182
183			Some(event_id)
184		})
185		.collect::<Vec<_>>();
186
187	let mutex_lock = services
188		.event_handler
189		.mutex_federation
190		.lock(room_id)
191		.await;
192
193	let pdu_id = services
194		.event_handler
195		.handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true)
196		.boxed()
197		.await?
198		.map(at!(0))
199		.ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
200
201	drop(mutex_lock);
202
203	// Wait for state gather which the remaining operations depend on.
204	let state_ids = state_ids
205		.await
206		.into_iter()
207		.sorted_unstable()
208		.collect::<Vec<_>>();
209
210	let into_federation_format = |pdu: CanonicalJsonObject| {
211		services
212			.federation
213			.format_pdu_into(pdu, Some(&room_version))
214			.map(Ok)
215	};
216
217	// MSC3706: Any events returned within state can be omitted from auth_chain.
218	let include_auth_event =
219		|event_id: &OwnedEventId| !omit_members || state_ids.binary_search(event_id).is_err();
220
221	let auth_heads = state_ids.iter().map(Borrow::borrow);
222
223	let auth_chain = services
224		.auth_chain
225		.event_ids_iter(room_id, &room_version, auth_heads)
226		.ready_try_filter(include_auth_event)
227		.broad_and_then(async |event_id| {
228			services
229				.timeline
230				.get_pdu_json(&event_id)
231				.and_then(into_federation_format)
232				.inspect_err(|e| debug_error!(?event_id, "auth_chain event not found: {e}"))
233				.await
234		})
235		.try_collect();
236
237	let state = state_ids
238		.iter()
239		.try_stream()
240		.broad_and_then(async |event_id| {
241			services
242				.timeline
243				.get_pdu_json(event_id)
244				.and_then(into_federation_format)
245				.inspect_err(|e| debug_error!(?event_id, "state event not found: {e}"))
246				.await
247		})
248		.try_collect();
249
250	// Join event for new server.
251	let event = services
252		.federation
253		.format_pdu_into(value, Some(&room_version))
254		.map(Some)
255		.map(Ok);
256
257	// Join event revealed to existing servers.
258	let broadcast = services.sending.send_pdu_room(room_id, &pdu_id);
259
260	let (auth_chain, state, event, ()) = try_join4(auth_chain, state, event, broadcast)
261		.boxed()
262		.await?;
263
264	Ok(create_join_event::v2::RoomState {
265		auth_chain,
266		state,
267		event,
268		..Default::default()
269	})
270}
271
272async fn validate_join_event_shape(
273	services: &Services,
274	value: &CanonicalJsonObject,
275	origin: &ServerName,
276	room_id: &RoomId,
277) -> Result<(RoomMemberEventContent, OwnedUserId)> {
278	let event_room_id: OwnedRoomId = serde_json::from_value(
279		value
280			.get("room_id")
281			.ok_or_else(|| err!(Request(BadJson("Event missing room_id property."))))?
282			.clone()
283			.into(),
284	)
285	.map_err(|e| err!(Request(BadJson(warn!("room_id field is not a valid room ID: {e}")))))?;
286
287	if event_room_id != room_id {
288		return Err!(Request(BadJson("Event room_id does not match request path room ID.")));
289	}
290
291	let event_type: StateEventType = serde_json::from_value(
292		value
293			.get("type")
294			.ok_or_else(|| err!(Request(BadJson("Event missing type property."))))?
295			.clone()
296			.into(),
297	)
298	.map_err(|e| err!(Request(BadJson(warn!("Event has invalid state event type: {e}")))))?;
299
300	if event_type != StateEventType::RoomMember {
301		return Err!(Request(BadJson(
302			"Not allowed to send non-membership state event to join endpoint."
303		)));
304	}
305
306	let content: RoomMemberEventContent = serde_json::from_value(
307		value
308			.get("content")
309			.ok_or_else(|| err!(Request(BadJson("Event missing content property"))))?
310			.clone()
311			.into(),
312	)
313	.map_err(|e| err!(Request(BadJson(warn!("Event content is empty or invalid: {e}")))))?;
314
315	if content.membership != MembershipState::Join {
316		return Err!(Request(BadJson(
317			"Not allowed to send a non-join membership event to join endpoint."
318		)));
319	}
320
321	// ACL check sender user server name
322	let sender: OwnedUserId = serde_json::from_value(
323		value
324			.get("sender")
325			.ok_or_else(|| err!(Request(BadJson("Event missing sender property."))))?
326			.clone()
327			.into(),
328	)
329	.map_err(|e| err!(Request(BadJson(warn!("sender property is not a valid user ID: {e}")))))?;
330
331	services
332		.event_handler
333		.acl_check(sender.server_name(), room_id)
334		.await?;
335
336	// check if origin server is trying to send for another server
337	if sender.server_name() != origin {
338		return Err!(Request(Forbidden("Not allowed to join on behalf of another server.")));
339	}
340
341	let joining_user: OwnedUserId = serde_json::from_value(
342		value
343			.get("state_key")
344			.ok_or_else(|| err!(Request(BadJson("Event missing state_key property."))))?
345			.clone()
346			.into(),
347	)
348	.map_err(|e| err!(Request(BadJson(warn!("State key is not a valid user ID: {e}")))))?;
349
350	if joining_user != sender {
351		return Err!(Request(BadJson("State key does not match sender user.")));
352	}
353
354	Ok((content, joining_user))
355}
356
357async fn validate_restricted_join(
358	services: &Services,
359	authorising_user: &UserId,
360	joining_user: &UserId,
361	room_id: &RoomId,
362	room_version: &RoomVersionId,
363) -> Result {
364	use RoomVersionId::*;
365
366	if matches!(room_version, V1 | V2 | V3 | V4 | V5 | V6 | V7) {
367		return Err!(Request(InvalidParam(
368			"Room version {room_version} does not support restricted rooms but \
369			 join_authorised_via_users_server ({authorising_user}) was found in the event."
370		)));
371	}
372
373	if !services.globals.user_is_local(authorising_user) {
374		return Err!(Request(InvalidParam(
375			"Cannot authorise membership event through {authorising_user} as they do not belong \
376			 to this homeserver"
377		)));
378	}
379
380	if !services
381		.state_cache
382		.is_joined(authorising_user, room_id)
383		.await
384	{
385		return Err!(Request(InvalidParam(
386			"Authorising user {authorising_user} is not in the room you are trying to join, \
387			 they cannot authorise your join."
388		)));
389	}
390
391	if !super::user_can_perform_restricted_join(services, joining_user, room_id, room_version)
392		.await?
393	{
394		return Err!(Request(UnableToAuthorizeJoin(
395			"Joining user did not pass restricted room's rules."
396		)));
397	}
398
399	Ok(())
400}