Skip to main content

tuwunel_service/membership/
join.rs

1use std::{
2	borrow::Borrow,
3	collections::{BTreeSet, HashMap, HashSet},
4	iter::once,
5	mem::take,
6	sync::Arc,
7};
8
9use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
10use ruma::{
11	CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedServerName, OwnedUserId, RoomId,
12	RoomOrAliasId, RoomVersionId, UserId,
13	api::{error::ErrorKind, federation},
14	canonical_json::to_canonical_value,
15	events::{
16		StateEventType,
17		room::{
18			create::RoomCreateEventContent,
19			join_rules::RoomJoinRulesEventContent,
20			member::{MembershipState, RoomMemberEventContent},
21		},
22	},
23	room::{AllowRule, JoinRule},
24	room_version_rules::RoomVersionRules,
25};
26use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
27use tuwunel_core::{
28	Err, Result, at, debug, debug_error, debug_info, debug_warn, err, error, implement, info,
29	matrix::{event::gen_event_id_canonical_json, room_version},
30	pdu::{Pdu, PduBuilder, check_rules},
31	trace,
32	utils::{self, BoolExt, IterStream, ReadyExt, math::Expected, shuffle},
33	warn,
34};
35
36use super::Service;
37use crate::{
38	Services,
39	federation::{Candidates, WhenAllBackedOff},
40	rooms::{
41		state::RoomMutexGuard,
42		state_compressor::{CompressedState, HashSetCompressStateEvent},
43		state_res,
44	},
45};
46
47#[implement(Service)]
48#[expect(clippy::too_many_arguments)]
49#[tracing::instrument(
50	level = "debug",
51	skip_all,
52	fields(%sender_user, %room_id)
53)]
54pub async fn join(
55	&self,
56	sender_user: &UserId,
57	room_id: &RoomId,
58	orig_room_id: Option<&RoomOrAliasId>,
59	reason: Option<String>,
60	servers: &[OwnedServerName],
61	is_appservice: bool,
62	extra_content: Option<CanonicalJsonObject>,
63) -> Result {
64	let state_lock = self.services.state.mutex.lock(room_id).await;
65
66	let servers =
67		get_servers_for_room(&self.services, sender_user, room_id, orig_room_id, servers).await?;
68
69	let user_is_guest = self
70		.services
71		.users
72		.is_deactivated(sender_user)
73		.await
74		.unwrap_or(false)
75		&& !is_appservice;
76
77	if user_is_guest
78		&& !self
79			.services
80			.state_accessor
81			.guest_can_join(room_id)
82			.await
83	{
84		return Err!(Request(Forbidden("Guests are not allowed to join this room")));
85	}
86
87	if self
88		.services
89		.state_cache
90		.is_joined(sender_user, room_id)
91		.await
92	{
93		debug_warn!("{sender_user} is already joined in {room_id}");
94		return Ok(());
95	}
96
97	// Resolved state can lag a federated re-invite; trust the invite index.
98	if let Ok(membership) = self
99		.services
100		.state_accessor
101		.get_member(room_id, sender_user)
102		.await && membership.membership == MembershipState::Ban
103		&& !self
104			.services
105			.state_cache
106			.is_invited(sender_user, room_id)
107			.await
108	{
109		debug_warn!("{sender_user} is banned from {room_id} but attempted to join");
110		return Err!(Request(Forbidden("You are banned from the room.")));
111	}
112
113	let server_in_room = self
114		.services
115		.state_cache
116		.server_in_room(self.services.globals.server_name(), room_id)
117		.await;
118
119	let local_join = server_in_room
120		|| servers.is_empty()
121		|| (servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]));
122
123	if local_join {
124		self.join_local(sender_user, room_id, reason, &servers, state_lock, extra_content)
125			.boxed()
126			.await?;
127	} else {
128		// Ask a remote server if we are not participating in this room
129		self.join_remote(sender_user, room_id, reason, &servers, state_lock, extra_content)
130			.boxed()
131			.await?;
132	}
133
134	self.copy_predecessor_push_rules(sender_user, room_id)
135		.await;
136
137	Ok(())
138}
139
140#[implement(Service)]
141async fn copy_predecessor_push_rules(&self, user_id: &UserId, room_id: &RoomId) {
142	let Ok(create): Result<RoomCreateEventContent> = self
143		.services
144		.state_accessor
145		.room_state_get_content(room_id, &StateEventType::RoomCreate, "")
146		.await
147	else {
148		return;
149	};
150
151	let Some(predecessor) = create.predecessor else {
152		return;
153	};
154
155	self.services
156		.account_data
157		.copy_room_push_rule(user_id, &predecessor.room_id, room_id)
158		.await
159		.ok();
160}
161
162#[implement(Service)]
163#[tracing::instrument(
164	name = "remote",
165	level = "debug",
166	skip_all,
167	fields(?servers)
168)]
169pub async fn join_remote(
170	&self,
171	sender_user: &UserId,
172	room_id: &RoomId,
173	reason: Option<String>,
174	servers: &[OwnedServerName],
175	state_lock: RoomMutexGuard,
176	extra_content: Option<CanonicalJsonObject>,
177) -> Result {
178	info!("Joining {room_id} over federation.");
179
180	let (make_join_response, remote_server) = self
181		.make_join_request(sender_user, room_id, servers)
182		.await?;
183
184	info!("make_join finished");
185
186	let room_version_id = self.require_supported_remote_room_version(&make_join_response)?;
187	let room_version_rules = room_version::rules(&room_version_id)?;
188	let (mut join_event, event_id, join_authorized_via_users_server) = self
189		.create_join_event(
190			room_id,
191			sender_user,
192			&make_join_response.event,
193			&room_version_id,
194			&room_version_rules,
195			reason,
196			extra_content,
197		)
198		.await?;
199
200	// Once send_join hits the remote server it may start sending us events which
201	// have to be belayed until we process this response first.
202	let _federation_lock = self
203		.services
204		.event_handler
205		.mutex_federation
206		.lock(room_id)
207		.await;
208
209	let mut response = self
210		.execute_send_join(
211			&remote_server,
212			room_id,
213			&event_id,
214			join_event.clone(),
215			&room_version_id,
216		)
217		.await?;
218
219	if response.members_omitted {
220		self.fetch_omitted_state(&remote_server, room_id, &event_id, servers, &mut response)
221			.await?;
222	}
223
224	if join_authorized_via_users_server.is_some() {
225		merge_restricted_signature(
226			&remote_server,
227			&event_id,
228			&room_version_id,
229			&response,
230			&mut join_event,
231		)?;
232	}
233
234	let shortroomid = self
235		.services
236		.short
237		.get_or_create_shortroomid(room_id)
238		.await;
239
240	info!(
241		%room_id,
242		%shortroomid,
243		"Initialized room. Parsing join event..."
244	);
245	let (parsed_join_pdu, join_event) =
246		Pdu::from_object_federation(room_id, &event_id, join_event, &room_version_rules)?;
247
248	info!(
249		events = response
250			.state
251			.len()
252			.expected_add(response.auth_chain.len()),
253		"Acquiring server signing keys for response events..."
254	);
255	self.services
256		.server_keys
257		.acquire_events_pubkeys(
258			response
259				.auth_chain
260				.iter()
261				.chain(response.state.iter()),
262		)
263		.await;
264
265	let state = self
266		.ingest_send_join_state(room_id, &room_version_id, &room_version_rules, &response.state)
267		.await;
268
269	self.ingest_send_join_auth_chain(
270		room_id,
271		&room_version_id,
272		&room_version_rules,
273		&response.auth_chain,
274	)
275	.await;
276
277	debug!("Running send_join auth check...");
278	state_res::auth_check(
279		&room_version_rules,
280		&parsed_join_pdu,
281		&async |event_id| self.services.timeline.get_pdu(&event_id).await,
282		&async |event_type, state_key| {
283			let shortstatekey = self
284				.services
285				.short
286				.get_shortstatekey(&event_type, state_key.as_str())
287				.await?;
288
289			let event_id = state.get(&shortstatekey).ok_or_else(|| {
290				err!(Request(NotFound("Missing fetch_state {shortstatekey:?}")))
291			})?;
292
293			self.services.timeline.get_pdu(event_id).await
294		},
295	)
296	.inspect_err(|e| error!("send_join auth check failed: {e:?}"))
297	.boxed()
298	.await?;
299
300	self.apply_send_join_state(room_id, &state, &state_lock)
301		.await?;
302
303	// We append to state before appending the pdu, so we don't have a moment in
304	// time with the pdu without it's state. This is okay because append_pdu can't
305	// fail.
306	let statehash_after_join = self
307		.services
308		.state
309		.append_to_state(&parsed_join_pdu)
310		.await?;
311
312	info!(
313		event_id = %parsed_join_pdu.event_id,
314		"Appending new room join event..."
315	);
316
317	self.services
318		.timeline
319		.append_pdu(
320			&parsed_join_pdu,
321			join_event,
322			once(parsed_join_pdu.event_id.borrow()),
323			&state_lock,
324		)
325		.await?;
326
327	// We set the room state after inserting the pdu, so that we never have a moment
328	// in time where events in the current room state do not exist
329	self.services
330		.state
331		.set_room_state(room_id, statehash_after_join, &state_lock);
332
333	info!(
334		statehash = %statehash_after_join,
335		"Set final room state for new room."
336	);
337
338	Ok(())
339}
340
341#[implement(Service)]
342fn require_supported_remote_room_version(
343	&self,
344	make_join_response: &federation::membership::prepare_join_event::v1::Response,
345) -> Result<RoomVersionId> {
346	let Some(room_version_id) = make_join_response.room_version.clone() else {
347		return Err!(BadServerResponse("Remote room version is not supported by tuwunel"));
348	};
349
350	if !self
351		.services
352		.config
353		.supported_room_version(&room_version_id)
354	{
355		return Err!(BadServerResponse(
356			"Remote room version {room_version_id} is not supported by tuwunel"
357		));
358	}
359
360	Ok(room_version_id)
361}
362
363#[implement(Service)]
364async fn execute_send_join(
365	&self,
366	remote_server: &OwnedServerName,
367	room_id: &RoomId,
368	event_id: &OwnedEventId,
369	join_event: CanonicalJsonObject,
370	room_version_id: &RoomVersionId,
371) -> Result<federation::membership::create_join_event::v2::RoomState> {
372	let send_join_request = federation::membership::create_join_event::v2::Request {
373		room_id: room_id.to_owned(),
374		event_id: event_id.clone(),
375		omit_members: true,
376		pdu: self
377			.services
378			.federation
379			.format_pdu_into(join_event, Some(room_version_id))
380			.await,
381	};
382
383	info!("Asking {remote_server} for fast_join in room {room_id}");
384	let response = self
385		.services
386		.federation
387		.execute(remote_server, send_join_request)
388		.await
389		.inspect_err(|e| error!("send_join failed: {e}"))?
390		.room_state;
391
392	info!(
393		fast_join = response.members_omitted,
394		auth_chain = response.auth_chain.len(),
395		state = response.state.len(),
396		servers = response
397			.servers_in_room
398			.as_ref()
399			.map(Vec::len)
400			.unwrap_or(0),
401		"send_join finished"
402	);
403
404	Ok(response)
405}
406
407#[implement(Service)]
408async fn fetch_omitted_state(
409	&self,
410	remote_server: &OwnedServerName,
411	room_id: &RoomId,
412	event_id: &OwnedEventId,
413	servers: &[OwnedServerName],
414	response: &mut federation::membership::create_join_event::v2::RoomState,
415) -> Result {
416	use federation::event::get_room_state::v1::{Request, Response};
417
418	let eligible =
419		self.omitted_state_servers(remote_server, servers, response.servers_in_room.as_deref());
420
421	let candidates = self
422		.services
423		.federation
424		.rank_candidates(eligible, WhenAllBackedOff::Attempt)
425		.await;
426
427	let mut last_error = Err!(BadServerResponse("No server provided omitted send_join state."));
428	for server in candidates {
429		info!("Asking {server} for state in room {room_id}");
430		let result = self
431			.services
432			.federation
433			.execute(&server, Request {
434				room_id: room_id.to_owned(),
435				event_id: event_id.clone(),
436			})
437			.await;
438
439		match result {
440			| Err(e) => {
441				debug_warn!(?server, "state fetch failed: {e}");
442				last_error = Err(e);
443			},
444			| Ok(Response { mut auth_chain, mut pdus }) => {
445				response.auth_chain = take(&mut auth_chain);
446				response.state = take(&mut pdus);
447
448				info!(
449					auth_chain = response.auth_chain.len(),
450					state = response.state.len(),
451					"state finished"
452				);
453
454				return Ok(());
455			},
456		}
457	}
458
459	last_error
460}
461
462#[implement(Service)]
463fn omitted_state_servers(
464	&self,
465	remote_server: &OwnedServerName,
466	servers: &[OwnedServerName],
467	servers_in_room: Option<&[String]>,
468) -> Candidates {
469	let extracted = servers_in_room
470		.into_iter()
471		.flatten()
472		.filter_map(|server| OwnedServerName::parse(server.as_str()).ok());
473
474	let mut seen = BTreeSet::new();
475	once(remote_server.clone())
476		.chain(extracted)
477		.chain(servers.iter().cloned())
478		.filter(|server| !self.services.globals.server_is_ours(server))
479		.filter(move |server| seen.insert(server.clone()))
480		.take(
481			self.services
482				.config
483				.max_make_join_attempts_per_join_attempt,
484		)
485		.collect()
486}
487
488fn merge_restricted_signature(
489	remote_server: &OwnedServerName,
490	event_id: &OwnedEventId,
491	room_version_id: &RoomVersionId,
492	response: &federation::membership::create_join_event::v2::RoomState,
493	join_event: &mut CanonicalJsonObject,
494) -> Result {
495	let Some(signed_raw) = &response.event else {
496		return Ok(());
497	};
498
499	debug_info!(
500		"There is a signed event with join_authorized_via_users_server. This room is probably \
501		 using restricted joins. Adding signature to our event"
502	);
503
504	let (signed_event_id, signed_value) =
505		gen_event_id_canonical_json(signed_raw, room_version_id).map_err(|e| {
506			err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
507		})?;
508
509	if signed_event_id != *event_id {
510		return Err!(Request(BadJson(warn!(
511			%signed_event_id, %event_id,
512			"Server {remote_server} sent event with wrong event ID"
513		))));
514	}
515
516	let signature = signed_value["signatures"]
517		.as_object()
518		.ok_or_else(|| {
519			err!(BadServerResponse(warn!("Server {remote_server} sent invalid signatures type")))
520		})
521		.and_then(|e| {
522			e.get(remote_server.as_str()).ok_or_else(|| {
523				err!(BadServerResponse(warn!(
524					"Server {remote_server} did not send its signature for a restricted room"
525				)))
526			})
527		});
528
529	match signature {
530		| Ok(signature) => {
531			join_event
532				.get_mut("signatures")
533				.expect("we created a valid pdu")
534				.as_object_mut()
535				.expect("we created a valid pdu")
536				.insert(remote_server.as_str().into(), signature.clone());
537		},
538		| Err(e) => {
539			warn!(
540				"Server {remote_server} sent invalid signature in send_join signatures for \
541				 event {signed_value:?}: {e:?}",
542			);
543		},
544	}
545
546	Ok(())
547}
548
549#[implement(Service)]
550async fn ingest_send_join_state(
551	&self,
552	room_id: &RoomId,
553	room_version_id: &RoomVersionId,
554	room_version_rules: &RoomVersionRules,
555	state_pdus: &[Box<RawJsonValue>],
556) -> HashMap<u64, OwnedEventId> {
557	info!(events = state_pdus.len(), "Going through send_join response room_state...");
558	let cork = self.services.db.cork_and_flush();
559	let state = state_pdus
560		.iter()
561		.stream()
562		.then(|pdu| {
563			self.services
564				.server_keys
565				.validate_and_add_event_id_no_fetch(pdu, room_version_id)
566		})
567		.inspect_err(|e| debug_error!("Invalid send_join state event: {e:?}"))
568		.ready_filter_map(Result::ok)
569		.ready_filter_map(|(event_id, value)| {
570			Pdu::from_object_federation(room_id, &event_id, value, room_version_rules)
571				.inspect_err(|e| {
572					debug_warn!("Invalid PDU {event_id:?} in send_join response: {e:?}");
573				})
574				.map(move |(pdu, value)| (event_id, pdu, value))
575				.ok()
576		})
577		.fold(HashMap::new(), async |mut state, (event_id, pdu, value)| {
578			self.services
579				.timeline
580				.add_pdu_outlier(&event_id, &value);
581
582			if let Some(state_key) = &pdu.state_key {
583				let shortstatekey = self
584					.services
585					.short
586					.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
587					.await;
588
589				state.insert(shortstatekey, pdu.event_id.clone());
590			}
591
592			state
593		})
594		.await;
595
596	drop(cork);
597	state
598}
599
600#[implement(Service)]
601async fn ingest_send_join_auth_chain(
602	&self,
603	room_id: &RoomId,
604	room_version_id: &RoomVersionId,
605	room_version_rules: &RoomVersionRules,
606	auth_chain: &[Box<RawJsonValue>],
607) {
608	info!(events = auth_chain.len(), "Going through send_join response auth_chain...");
609	let cork = self.services.db.cork_and_flush();
610	auth_chain
611		.iter()
612		.stream()
613		.then(|pdu| {
614			self.services
615				.server_keys
616				.validate_and_add_event_id_no_fetch(pdu, room_version_id)
617		})
618		.inspect_err(|e| debug_error!("Invalid send_join auth_chain event: {e:?}"))
619		.ready_filter_map(Result::ok)
620		.ready_for_each(|(event_id, mut value)| {
621			if !room_version_rules
622				.event_format
623				.require_room_create_room_id
624				&& value["type"] == "m.room.create"
625			{
626				let room_id = CanonicalJsonValue::String(room_id.as_str().into());
627				value.insert("room_id".into(), room_id);
628			}
629
630			self.services
631				.timeline
632				.add_pdu_outlier(&event_id, &value);
633		})
634		.await;
635
636	drop(cork);
637}
638
639#[implement(Service)]
640async fn apply_send_join_state(
641	&self,
642	room_id: &RoomId,
643	state: &HashMap<u64, OwnedEventId>,
644	state_lock: &RoomMutexGuard,
645) -> Result {
646	info!(events = state.len(), "Compressing state from send_join...");
647	let compressed: CompressedState = self
648		.services
649		.state_compressor
650		.compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow())))
651		.collect()
652		.await;
653
654	debug!("Saving compressed state...");
655	let HashSetCompressStateEvent {
656		shortstatehash: statehash_before_join,
657		added,
658		removed,
659	} = self
660		.services
661		.state_compressor
662		.save_state(room_id, Arc::new(compressed))
663		.await?;
664
665	debug!(
666		state_hash = ?statehash_before_join,
667		"Forcing state for new room..."
668	);
669	self.services
670		.state
671		.force_state(room_id, statehash_before_join, added, removed, state_lock)
672		.await?;
673
674	self.services
675		.state_cache
676		.update_joined_count(room_id)
677		.await;
678
679	Ok(())
680}
681
682#[implement(Service)]
683#[tracing::instrument(name = "local", level = "debug", skip_all)]
684pub async fn join_local(
685	&self,
686	sender_user: &UserId,
687	room_id: &RoomId,
688	reason: Option<String>,
689	servers: &[OwnedServerName],
690	state_lock: RoomMutexGuard,
691	extra_content: Option<CanonicalJsonObject>,
692) -> Result {
693	debug_info!("We can join locally");
694
695	let join_rules_event_content = self
696		.services
697		.state_accessor
698		.room_state_get_content::<RoomJoinRulesEventContent>(
699			room_id,
700			&StateEventType::RoomJoinRules,
701			"",
702		)
703		.await;
704
705	let restriction_rooms = match join_rules_event_content {
706		| Ok(RoomJoinRulesEventContent {
707			join_rule: JoinRule::Restricted(restricted) | JoinRule::KnockRestricted(restricted),
708		}) => restricted
709			.allow
710			.into_iter()
711			.filter_map(|a| match a {
712				| AllowRule::RoomMembership(r) => Some(r.room_id),
713				| _ => None,
714			})
715			.collect(),
716		| _ => Vec::new(),
717	};
718
719	let is_joined_restricted_rooms = restriction_rooms
720		.iter()
721		.stream()
722		.any(|restriction_room_id| {
723			self.services
724				.state_cache
725				.is_joined(sender_user, restriction_room_id)
726		})
727		.await;
728
729	let join_authorized_via_users_server = is_joined_restricted_rooms
730		.then_async(async || {
731			self.services
732				.state_cache
733				.local_users_in_room(room_id)
734				.filter(|user| {
735					self.services.state_accessor.user_can_invite(
736						room_id,
737						user,
738						sender_user,
739						&state_lock,
740					)
741				})
742				.map(ToOwned::to_owned)
743				.boxed()
744				.next()
745				.await
746		})
747		.map(Option::flatten)
748		.await;
749
750	let mut content = RoomMemberEventContent {
751		reason: reason.clone(),
752		join_authorized_via_users_server,
753		..RoomMemberEventContent::new(MembershipState::Join)
754	};
755
756	self.services
757		.profile
758		.fill_profile_data(sender_user, &mut content)
759		.await;
760
761	let content = merge_member_content(content, extra_content.as_ref())?;
762
763	let pdu_builder = PduBuilder {
764		event_type: StateEventType::RoomMember.into(),
765		content: to_raw_value(&content).map(Into::into)?,
766		state_key: Some(sender_user.to_string().into()),
767		..Default::default()
768	};
769
770	// Try normal join first
771	let Err(error) = self
772		.services
773		.timeline
774		.build_and_append_pdu(pdu_builder, sender_user, room_id, &state_lock)
775		.await
776	else {
777		return Ok(());
778	};
779
780	if restriction_rooms.is_empty()
781		&& (servers.is_empty()
782			|| servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]))
783	{
784		return Err(error);
785	}
786
787	warn!(
788		"We couldn't do the join locally, maybe federation can help to satisfy the restricted \
789		 join requirements"
790	);
791
792	// Drop before the federation fallback: handle_incoming_pdu re-acquires
793	// the same per-room state mutex while ingesting prev_events; deadlock.
794	drop(state_lock);
795
796	let Ok((make_join_response, remote_server)) = self
797		.make_join_request(sender_user, room_id, servers)
798		.await
799	else {
800		return Err(error);
801	};
802
803	let room_version_id = self.require_supported_remote_room_version(&make_join_response)?;
804	let room_version_rules = room_version::rules(&room_version_id)?;
805	let (join_event, event_id, _) = self
806		.create_join_event(
807			room_id,
808			sender_user,
809			&make_join_response.event,
810			&room_version_id,
811			&room_version_rules,
812			reason,
813			extra_content,
814		)
815		.await?;
816
817	let send_join_response = self
818		.execute_send_join(&remote_server, room_id, &event_id, join_event, &room_version_id)
819		.await?;
820
821	let Some(signed_raw) = send_join_response.event else {
822		return Err(error);
823	};
824
825	let (signed_event_id, signed_value) =
826		gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| {
827			err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
828		})?;
829
830	if signed_event_id != event_id {
831		return Err!(Request(BadJson(warn!(
832			%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID"
833		))));
834	}
835
836	self.services
837		.event_handler
838		.handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true)
839		.boxed()
840		.await?;
841
842	Ok(())
843}
844
845#[implement(Service)]
846#[expect(clippy::too_many_arguments)]
847#[tracing::instrument(name = "make_join", level = "debug", skip_all)]
848async fn create_join_event(
849	&self,
850	room_id: &RoomId,
851	sender_user: &UserId,
852	join_event_stub: &RawJsonValue,
853	room_version_id: &RoomVersionId,
854	room_version_rules: &RoomVersionRules,
855	reason: Option<String>,
856	extra_content: Option<CanonicalJsonObject>,
857) -> Result<(CanonicalJsonObject, OwnedEventId, Option<OwnedUserId>)> {
858	let mut event: CanonicalJsonObject =
859		serde_json::from_str(join_event_stub.get()).map_err(|e| {
860			err!(BadServerResponse("Invalid make_join event json received from server: {e:?}"))
861		})?;
862
863	let join_authorized_via_users_server = room_version_rules
864		.authorization
865		.restricted_join_rule
866		.then(|| event.get("content"))
867		.flatten()
868		.and_then(|s| {
869			s.as_object()?
870				.get("join_authorised_via_users_server")
871		})
872		.and_then(|s| OwnedUserId::try_from(s.as_str().unwrap_or_default()).ok());
873
874	let mut content = RoomMemberEventContent {
875		reason,
876		join_authorized_via_users_server: join_authorized_via_users_server.clone(),
877		..RoomMemberEventContent::new(MembershipState::Join)
878	};
879
880	self.services
881		.profile
882		.fill_profile_data(sender_user, &mut content)
883		.await;
884
885	let content = merge_member_content(content, extra_content.as_ref())?;
886
887	event.insert("content".into(), content);
888
889	event.insert(
890		"origin".into(),
891		CanonicalJsonValue::String(
892			self.services
893				.globals
894				.server_name()
895				.as_str()
896				.to_owned(),
897		),
898	);
899
900	event.insert(
901		"origin_server_ts".into(),
902		CanonicalJsonValue::Integer(utils::millis_since_unix_epoch().try_into()?),
903	);
904
905	event.insert("room_id".into(), CanonicalJsonValue::String(room_id.as_str().into()));
906
907	event.insert("sender".into(), CanonicalJsonValue::String(sender_user.as_str().into()));
908
909	event.insert("state_key".into(), CanonicalJsonValue::String(sender_user.as_str().into()));
910
911	event.insert("type".into(), CanonicalJsonValue::String("m.room.member".into()));
912
913	let event_id = self
914		.services
915		.server_keys
916		.gen_id_hash_and_sign_event(&mut event, room_version_id)?;
917
918	check_rules(&event, &room_version_rules.event_format)?;
919
920	Ok((event, event_id, join_authorized_via_users_server))
921}
922
923// Server-computed membership fields win; client custom keys only fill the gaps.
924fn merge_member_content(
925	content: RoomMemberEventContent,
926	extra_content: Option<&CanonicalJsonObject>,
927) -> Result<CanonicalJsonValue> {
928	let mut content = to_canonical_value(content)?;
929
930	if let (CanonicalJsonValue::Object(content), Some(extra_content)) =
931		(&mut content, extra_content)
932	{
933		for (key, value) in extra_content {
934			content
935				.entry(key.clone())
936				.or_insert_with(|| value.clone());
937		}
938	}
939
940	Ok(content)
941}
942
943#[implement(Service)]
944#[tracing::instrument(
945	name = "make_join",
946	level = "debug",
947	skip_all,
948	fields(?servers)
949)]
950async fn make_join_request(
951	&self,
952	sender_user: &UserId,
953	room_id: &RoomId,
954	servers: &[OwnedServerName],
955) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> {
956	let mut make_join_response_and_server =
957		Err!(BadServerResponse("No server available to assist in joining."));
958
959	let mut make_join_counter: usize = 0;
960	let mut incompatible_room_version_count: usize = 0;
961
962	for remote_server in servers {
963		if self
964			.services
965			.globals
966			.server_is_ours(remote_server)
967		{
968			continue;
969		}
970		info!("Asking {remote_server} for make_join ({make_join_counter})");
971		let make_join_response = self
972			.services
973			.federation
974			.execute(remote_server, federation::membership::prepare_join_event::v1::Request {
975				room_id: room_id.to_owned(),
976				user_id: sender_user.to_owned(),
977				ver: self
978					.services
979					.config
980					.supported_room_versions()
981					.map(at!(0))
982					.collect(),
983			})
984			.await;
985
986		trace!("make_join response: {make_join_response:?}");
987		make_join_counter = make_join_counter.saturating_add(1);
988
989		if let Err(ref e) = make_join_response {
990			if matches!(
991				e.kind(),
992				ErrorKind::IncompatibleRoomVersion { .. } | ErrorKind::UnsupportedRoomVersion
993			) {
994				incompatible_room_version_count =
995					incompatible_room_version_count.saturating_add(1);
996			}
997
998			if incompatible_room_version_count > 15 {
999				info!(
1000					"15 servers have responded with M_INCOMPATIBLE_ROOM_VERSION or \
1001					 M_UNSUPPORTED_ROOM_VERSION, assuming that tuwunel does not support the \
1002					 room version {room_id}: {e}"
1003				);
1004
1005				make_join_response_and_server =
1006					Err!(BadServerResponse("Room version is not supported by tuwunel"));
1007
1008				return make_join_response_and_server;
1009			}
1010
1011			let max_attempts = self
1012				.services
1013				.config
1014				.max_make_join_attempts_per_join_attempt;
1015
1016			if make_join_counter >= max_attempts {
1017				warn!(?remote_server, "last make_join failure reason: {e}");
1018				warn!(
1019					"{max_attempts} servers failed to provide valid make_join response, \
1020					 assuming no server can assist in joining."
1021				);
1022
1023				make_join_response_and_server =
1024					Err!(BadServerResponse("No server available to assist in joining."));
1025
1026				return make_join_response_and_server;
1027			}
1028		}
1029
1030		make_join_response_and_server = make_join_response.map(|r| (r, remote_server.clone()));
1031
1032		if make_join_response_and_server.is_ok() {
1033			break;
1034		}
1035	}
1036
1037	make_join_response_and_server
1038}
1039
1040pub(super) async fn get_servers_for_room(
1041	services: &Services,
1042	user_id: &UserId,
1043	room_id: &RoomId,
1044	orig_room_id: Option<&RoomOrAliasId>,
1045	via: &[OwnedServerName],
1046) -> Result<Vec<OwnedServerName>> {
1047	// add invited vias
1048	let mut additional_servers = services
1049		.state_cache
1050		.servers_invite_via(room_id)
1051		.map(ToOwned::to_owned)
1052		.collect::<Vec<_>>()
1053		.await;
1054
1055	// add invite senders' servers
1056	additional_servers.extend(
1057		services
1058			.state_cache
1059			.invite_state(user_id, room_id)
1060			.await
1061			.unwrap_or_default()
1062			.iter()
1063			.filter_map(|event| event.get_field("sender").ok().flatten())
1064			.filter_map(|sender: &str| UserId::parse(sender).ok())
1065			.map(|user| user.server_name().to_owned()),
1066	);
1067
1068	let mut servers = Vec::from(via);
1069	shuffle(&mut servers);
1070
1071	// Strict via: an explicit remote server in via must not be padded with
1072	// the room owner, otherwise failover-probe semantics break.
1073	let has_remote_via = via
1074		.iter()
1075		.any(|s| !services.globals.server_is_ours(s));
1076
1077	if !has_remote_via {
1078		if let Some(server_name) = room_id.server_name() {
1079			servers.insert(0, server_name.to_owned());
1080		}
1081
1082		if let Some(orig_room_id) = orig_room_id
1083			&& let Some(orig_server_name) = orig_room_id.server_name()
1084		{
1085			servers.insert(0, orig_server_name.to_owned());
1086		}
1087	}
1088
1089	shuffle(&mut additional_servers);
1090
1091	servers.extend_from_slice(&additional_servers);
1092
1093	// 1. (room alias server)?
1094	// 2. (room id server)?
1095	// 3. shuffle [via query + resolve servers]?
1096	// 4. shuffle [invited via, inviters servers]?
1097	debug!(?servers);
1098
1099	// dedup preserving order
1100	let mut set = HashSet::new();
1101	servers.retain(|x| set.insert(x.clone()));
1102	debug!(?servers);
1103
1104	// sort deprioritized servers last
1105	if !servers.is_empty() {
1106		for i in 0..servers.len() {
1107			if services
1108				.server
1109				.config
1110				.deprioritize_joins_through_servers
1111				.is_match(servers[i].host())
1112			{
1113				let server = servers.remove(i);
1114				servers.push(server);
1115			}
1116		}
1117	}
1118
1119	debug_info!(?servers);
1120	Ok(servers)
1121}