Skip to main content

tuwunel_service/membership/
join.rs

1use std::{
2	borrow::Borrow,
3	collections::{HashMap, HashSet},
4	iter::once,
5	mem::take,
6	sync::Arc,
7};
8
9use futures::{
10	FutureExt, StreamExt, TryFutureExt, TryStreamExt,
11	future::{join3, join4},
12};
13use ruma::{
14	CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedServerName, OwnedUserId, RoomId,
15	RoomOrAliasId, RoomVersionId, UserId,
16	api::{error::ErrorKind, federation},
17	canonical_json::to_canonical_value,
18	events::{
19		StateEventType,
20		room::{
21			join_rules::RoomJoinRulesEventContent,
22			member::{MembershipState, RoomMemberEventContent},
23		},
24	},
25	room::{AllowRule, JoinRule},
26	room_version_rules::RoomVersionRules,
27};
28use serde_json::value::RawValue as RawJsonValue;
29use tuwunel_core::{
30	Err, Result, at, debug, debug_error, debug_info, debug_warn, err, error, implement, info,
31	matrix::{event::gen_event_id_canonical_json, room_version},
32	pdu::{Pdu, PduBuilder, check_rules},
33	trace,
34	utils::{self, BoolExt, IterStream, ReadyExt, future::TryExtExt, math::Expected, shuffle},
35	warn,
36};
37
38use super::Service;
39use crate::{
40	Services,
41	rooms::{
42		state::RoomMutexGuard,
43		state_compressor::{CompressedState, HashSetCompressStateEvent},
44		state_res,
45	},
46};
47
48#[implement(Service)]
49#[tracing::instrument(
50	level = "debug",
51	skip_all,
52	fields(%sender_user, %room_id)
53)]
54pub async fn join(
55	&self,
56	sender_user: &UserId,
57	room_id: &RoomId,
58	orig_room_id: Option<&RoomOrAliasId>,
59	reason: Option<String>,
60	servers: &[OwnedServerName],
61	is_appservice: bool,
62) -> Result {
63	let state_lock = self.services.state.mutex.lock(room_id).await;
64
65	let servers =
66		get_servers_for_room(&self.services, sender_user, room_id, orig_room_id, servers).await?;
67
68	let user_is_guest = self
69		.services
70		.users
71		.is_deactivated(sender_user)
72		.await
73		.unwrap_or(false)
74		&& !is_appservice;
75
76	if user_is_guest
77		&& !self
78			.services
79			.state_accessor
80			.guest_can_join(room_id)
81			.await
82	{
83		return Err!(Request(Forbidden("Guests are not allowed to join this room")));
84	}
85
86	if self
87		.services
88		.state_cache
89		.is_joined(sender_user, room_id)
90		.await
91	{
92		debug_warn!("{sender_user} is already joined in {room_id}");
93		return Ok(());
94	}
95
96	if let Ok(membership) = self
97		.services
98		.state_accessor
99		.get_member(room_id, sender_user)
100		.await && membership.membership == MembershipState::Ban
101	{
102		debug_warn!("{sender_user} is banned from {room_id} but attempted to join");
103		return Err!(Request(Forbidden("You are banned from the room.")));
104	}
105
106	let server_in_room = self
107		.services
108		.state_cache
109		.server_in_room(self.services.globals.server_name(), room_id)
110		.await;
111
112	let local_join = server_in_room
113		|| servers.is_empty()
114		|| (servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]));
115
116	if local_join {
117		self.join_local(sender_user, room_id, reason, &servers, state_lock)
118			.boxed()
119			.await?;
120	} else {
121		// Ask a remote server if we are not participating in this room
122		self.join_remote(sender_user, room_id, reason, &servers, state_lock)
123			.boxed()
124			.await?;
125	}
126
127	Ok(())
128}
129
130#[implement(Service)]
131#[tracing::instrument(
132	name = "remote",
133	level = "debug",
134	skip_all,
135	fields(?servers)
136)]
137pub async fn join_remote(
138	&self,
139	sender_user: &UserId,
140	room_id: &RoomId,
141	reason: Option<String>,
142	servers: &[OwnedServerName],
143	state_lock: RoomMutexGuard,
144) -> Result {
145	info!("Joining {room_id} over federation.");
146
147	let (make_join_response, remote_server) = self
148		.make_join_request(sender_user, room_id, servers)
149		.await?;
150
151	info!("make_join finished");
152
153	let Some(room_version_id) = make_join_response.room_version else {
154		return Err!(BadServerResponse("Remote room version is not supported by tuwunel"));
155	};
156
157	if !self
158		.services
159		.config
160		.supported_room_version(&room_version_id)
161	{
162		return Err!(BadServerResponse(
163			"Remote room version {room_version_id} is not supported by tuwunel"
164		));
165	}
166
167	let room_version_rules = room_version::rules(&room_version_id)?;
168	let (mut join_event, event_id, join_authorized_via_users_server) = self
169		.create_join_event(
170			room_id,
171			sender_user,
172			&make_join_response.event,
173			&room_version_id,
174			&room_version_rules,
175			reason,
176		)
177		.await?;
178
179	let send_join_request = federation::membership::create_join_event::v2::Request {
180		room_id: room_id.to_owned(),
181		event_id: event_id.clone(),
182		omit_members: true,
183		pdu: self
184			.services
185			.federation
186			.format_pdu_into(join_event.clone(), Some(&room_version_id))
187			.await,
188	};
189
190	// Once send_join hits the remote server it may start sending us events which
191	// have to be belayed until we process this response first.
192	let _federation_lock = self
193		.services
194		.event_handler
195		.mutex_federation
196		.lock(room_id)
197		.await;
198
199	info!("Asking {remote_server} for fast_join in room {room_id}");
200	let mut response = match self
201		.services
202		.federation
203		.execute(&remote_server, send_join_request)
204		.await
205		.inspect_err(|e| error!("send_join failed: {e}"))
206	{
207		| Err(e) => return Err(e),
208		| Ok(response) => response.room_state,
209	};
210
211	info!(
212		fast_join = response.members_omitted,
213		auth_chain = response.auth_chain.len(),
214		state = response.state.len(),
215		servers = response
216			.servers_in_room
217			.as_ref()
218			.map(Vec::len)
219			.unwrap_or(0),
220		"send_join finished"
221	);
222
223	if response.members_omitted {
224		use federation::event::get_room_state::v1::{Request, Response};
225
226		info!("Asking {remote_server} for state in room {room_id}");
227		match self
228			.services
229			.federation
230			.execute(&remote_server, Request {
231				room_id: room_id.to_owned(),
232				event_id: event_id.clone(),
233			})
234			.await
235			.inspect_err(|e| error!("state failed: {e}"))
236		{
237			| Err(e) => return Err(e),
238			| Ok(Response { mut auth_chain, mut pdus }) => {
239				response.auth_chain = take(&mut auth_chain);
240				response.state = take(&mut pdus);
241			},
242		}
243
244		info!(
245			auth_chain = response.auth_chain.len(),
246			state = response.state.len(),
247			"state finished"
248		);
249	}
250
251	if join_authorized_via_users_server.is_some()
252		&& let Some(signed_raw) = &response.event
253	{
254		debug_info!(
255			"There is a signed event with join_authorized_via_users_server. This room is \
256			 probably using restricted joins. Adding signature to our event"
257		);
258
259		let (signed_event_id, signed_value) =
260			gen_event_id_canonical_json(signed_raw, &room_version_id).map_err(|e| {
261				err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
262			})?;
263
264		if signed_event_id != event_id {
265			return Err!(Request(BadJson(warn!(
266				%signed_event_id, %event_id,
267				"Server {remote_server} sent event with wrong event ID"
268			))));
269		}
270
271		match signed_value["signatures"]
272			.as_object()
273			.ok_or_else(|| {
274				err!(BadServerResponse(warn!(
275					"Server {remote_server} sent invalid signatures type"
276				)))
277			})
278			.and_then(|e| {
279				e.get(remote_server.as_str()).ok_or_else(|| {
280					err!(BadServerResponse(warn!(
281						"Server {remote_server} did not send its signature for a restricted room"
282					)))
283				})
284			}) {
285			| Ok(signature) => {
286				join_event
287					.get_mut("signatures")
288					.expect("we created a valid pdu")
289					.as_object_mut()
290					.expect("we created a valid pdu")
291					.insert(remote_server.as_str().into(), signature.clone());
292			},
293			| Err(e) => {
294				warn!(
295					"Server {remote_server} sent invalid signature in send_join signatures for \
296					 event {signed_value:?}: {e:?}",
297				);
298			},
299		}
300	}
301
302	let shortroomid = self
303		.services
304		.short
305		.get_or_create_shortroomid(room_id)
306		.await;
307
308	info!(
309		%room_id,
310		%shortroomid,
311		"Initialized room. Parsing join event..."
312	);
313	let (parsed_join_pdu, join_event) =
314		Pdu::from_object_federation(room_id, &event_id, join_event, &room_version_rules)?;
315
316	let resp_state = &response.state;
317	let resp_auth = &response.auth_chain;
318	info!(
319		events = resp_state.len().expected_add(resp_auth.len()),
320		"Acquiring server signing keys for response events..."
321	);
322	self.services
323		.server_keys
324		.acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter()))
325		.await;
326
327	info!(events = response.state.len(), "Going through send_join response room_state...");
328	let cork = self.services.db.cork_and_flush();
329	let state = response
330		.state
331		.iter()
332		.stream()
333		.then(|pdu| {
334			self.services
335				.server_keys
336				.validate_and_add_event_id_no_fetch(pdu, &room_version_id)
337		})
338		.inspect_err(|e| debug_error!("Invalid send_join state event: {e:?}"))
339		.ready_filter_map(Result::ok)
340		.ready_filter_map(|(event_id, value)| {
341			Pdu::from_object_federation(room_id, &event_id, value, &room_version_rules)
342				.inspect_err(|e| {
343					debug_warn!("Invalid PDU {event_id:?} in send_join response: {e:?}");
344				})
345				.map(move |(pdu, value)| (event_id, pdu, value))
346				.ok()
347		})
348		.fold(HashMap::new(), async |mut state, (event_id, pdu, value)| {
349			self.services
350				.timeline
351				.add_pdu_outlier(&event_id, &value);
352
353			if let Some(state_key) = &pdu.state_key {
354				let shortstatekey = self
355					.services
356					.short
357					.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
358					.await;
359
360				state.insert(shortstatekey, pdu.event_id.clone());
361			}
362
363			state
364		})
365		.await;
366
367	drop(cork);
368
369	info!(
370		events = response.auth_chain.len(),
371		"Going through send_join response auth_chain..."
372	);
373	let cork = self.services.db.cork_and_flush();
374	response
375		.auth_chain
376		.iter()
377		.stream()
378		.then(|pdu| {
379			self.services
380				.server_keys
381				.validate_and_add_event_id_no_fetch(pdu, &room_version_id)
382		})
383		.inspect_err(|e| debug_error!("Invalid send_join auth_chain event: {e:?}"))
384		.ready_filter_map(Result::ok)
385		.ready_for_each(|(event_id, mut value)| {
386			if !room_version_rules
387				.event_format
388				.require_room_create_room_id
389				&& value["type"] == "m.room.create"
390			{
391				let room_id = CanonicalJsonValue::String(room_id.as_str().into());
392				value.insert("room_id".into(), room_id);
393			}
394
395			self.services
396				.timeline
397				.add_pdu_outlier(&event_id, &value);
398		})
399		.await;
400
401	drop(cork);
402
403	debug!("Running send_join auth check...");
404	state_res::auth_check(
405		&room_version_rules,
406		&parsed_join_pdu,
407		&async |event_id| self.services.timeline.get_pdu(&event_id).await,
408		&async |event_type, state_key| {
409			let shortstatekey = self
410				.services
411				.short
412				.get_shortstatekey(&event_type, state_key.as_str())
413				.await?;
414
415			let event_id = state.get(&shortstatekey).ok_or_else(|| {
416				err!(Request(NotFound("Missing fetch_state {shortstatekey:?}")))
417			})?;
418
419			self.services.timeline.get_pdu(event_id).await
420		},
421	)
422	.inspect_err(|e| error!("send_join auth check failed: {e:?}"))
423	.boxed()
424	.await?;
425
426	info!(events = state.len(), "Compressing state from send_join...");
427	let compressed: CompressedState = self
428		.services
429		.state_compressor
430		.compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow())))
431		.collect()
432		.await;
433
434	debug!("Saving compressed state...");
435	let HashSetCompressStateEvent {
436		shortstatehash: statehash_before_join,
437		added,
438		removed,
439	} = self
440		.services
441		.state_compressor
442		.save_state(room_id, Arc::new(compressed))
443		.await?;
444
445	debug!(
446		state_hash = ?statehash_before_join,
447		"Forcing state for new room..."
448	);
449	self.services
450		.state
451		.force_state(room_id, statehash_before_join, added, removed, &state_lock)
452		.await?;
453
454	self.services
455		.state_cache
456		.update_joined_count(room_id)
457		.await;
458
459	// We append to state before appending the pdu, so we don't have a moment in
460	// time with the pdu without it's state. This is okay because append_pdu can't
461	// fail.
462	let statehash_after_join = self
463		.services
464		.state
465		.append_to_state(&parsed_join_pdu)
466		.await?;
467
468	info!(
469		event_id = %parsed_join_pdu.event_id,
470		"Appending new room join event..."
471	);
472
473	self.services
474		.timeline
475		.append_pdu(
476			&parsed_join_pdu,
477			join_event,
478			once(parsed_join_pdu.event_id.borrow()),
479			&state_lock,
480		)
481		.await?;
482
483	// We set the room state after inserting the pdu, so that we never have a moment
484	// in time where events in the current room state do not exist
485	self.services
486		.state
487		.set_room_state(room_id, statehash_after_join, &state_lock);
488
489	info!(
490		statehash = %statehash_after_join,
491		"Set final room state for new room."
492	);
493
494	Ok(())
495}
496
497#[implement(Service)]
498#[tracing::instrument(name = "local", level = "debug", skip_all)]
499pub async fn join_local(
500	&self,
501	sender_user: &UserId,
502	room_id: &RoomId,
503	reason: Option<String>,
504	servers: &[OwnedServerName],
505	state_lock: RoomMutexGuard,
506) -> Result {
507	debug_info!("We can join locally");
508
509	let join_rules_event_content = self
510		.services
511		.state_accessor
512		.room_state_get_content::<RoomJoinRulesEventContent>(
513			room_id,
514			&StateEventType::RoomJoinRules,
515			"",
516		)
517		.await;
518
519	let restriction_rooms = match join_rules_event_content {
520		| Ok(RoomJoinRulesEventContent {
521			join_rule: JoinRule::Restricted(restricted) | JoinRule::KnockRestricted(restricted),
522		}) => restricted
523			.allow
524			.into_iter()
525			.filter_map(|a| match a {
526				| AllowRule::RoomMembership(r) => Some(r.room_id),
527				| _ => None,
528			})
529			.collect(),
530		| _ => Vec::new(),
531	};
532
533	let is_joined_restricted_rooms = restriction_rooms
534		.iter()
535		.stream()
536		.any(|restriction_room_id| {
537			self.services
538				.state_cache
539				.is_joined(sender_user, restriction_room_id)
540		})
541		.await;
542
543	let join_authorized_via_users_server = is_joined_restricted_rooms.then_async(async || {
544		self.services
545			.state_cache
546			.local_users_in_room(room_id)
547			.filter(|user| {
548				self.services.state_accessor.user_can_invite(
549					room_id,
550					user,
551					sender_user,
552					&state_lock,
553				)
554			})
555			.map(ToOwned::to_owned)
556			.boxed()
557			.next()
558			.await
559	});
560
561	let displayname = self.services.users.displayname(sender_user).ok();
562
563	let avatar_url = self.services.users.avatar_url(sender_user).ok();
564
565	let blurhash = self.services.users.blurhash(sender_user).ok();
566
567	let (displayname, avatar_url, blurhash, join_authorized_via_users_server) = join4(
568		displayname,
569		avatar_url,
570		blurhash,
571		join_authorized_via_users_server.map(Option::flatten),
572	)
573	.await;
574
575	let content = RoomMemberEventContent {
576		displayname,
577		avatar_url,
578		blurhash,
579		reason: reason.clone(),
580		join_authorized_via_users_server,
581		..RoomMemberEventContent::new(MembershipState::Join)
582	};
583
584	// Try normal join first
585	let Err(error) = self
586		.services
587		.timeline
588		.build_and_append_pdu(
589			PduBuilder::state(sender_user.to_string(), &content),
590			sender_user,
591			room_id,
592			&state_lock,
593		)
594		.await
595	else {
596		return Ok(());
597	};
598
599	if restriction_rooms.is_empty()
600		&& (servers.is_empty()
601			|| servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]))
602	{
603		return Err(error);
604	}
605
606	warn!(
607		"We couldn't do the join locally, maybe federation can help to satisfy the restricted \
608		 join requirements"
609	);
610
611	// Drop before the federation fallback: handle_incoming_pdu re-acquires
612	// the same per-room state mutex while ingesting prev_events; deadlock.
613	drop(state_lock);
614
615	let Ok((make_join_response, remote_server)) = self
616		.make_join_request(sender_user, room_id, servers)
617		.await
618	else {
619		return Err(error);
620	};
621
622	let Some(room_version_id) = make_join_response.room_version else {
623		return Err!(BadServerResponse("Remote room version is not supported by tuwunel"));
624	};
625
626	if !self
627		.services
628		.config
629		.supported_room_version(&room_version_id)
630	{
631		return Err!(BadServerResponse(
632			"Remote room version {room_version_id} is not supported by tuwunel"
633		));
634	}
635
636	let room_version_rules = room_version::rules(&room_version_id)?;
637	let (join_event, event_id, _) = self
638		.create_join_event(
639			room_id,
640			sender_user,
641			&make_join_response.event,
642			&room_version_id,
643			&room_version_rules,
644			reason,
645		)
646		.await?;
647
648	let send_join_request = federation::membership::create_join_event::v2::Request {
649		room_id: room_id.to_owned(),
650		event_id: event_id.clone(),
651		omit_members: true,
652		pdu: self
653			.services
654			.federation
655			.format_pdu_into(join_event.clone(), Some(&room_version_id))
656			.await,
657	};
658
659	let send_join_response = self
660		.services
661		.federation
662		.execute(&remote_server, send_join_request)
663		.await?;
664
665	let Some(signed_raw) = send_join_response.room_state.event else {
666		return Err(error);
667	};
668
669	let (signed_event_id, signed_value) =
670		gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| {
671			err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
672		})?;
673
674	if signed_event_id != event_id {
675		return Err!(Request(BadJson(warn!(
676			%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID"
677		))));
678	}
679
680	self.services
681		.event_handler
682		.handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true)
683		.boxed()
684		.await?;
685
686	Ok(())
687}
688
689#[implement(Service)]
690#[tracing::instrument(name = "make_join", level = "debug", skip_all)]
691async fn create_join_event(
692	&self,
693	room_id: &RoomId,
694	sender_user: &UserId,
695	join_event_stub: &RawJsonValue,
696	room_version_id: &RoomVersionId,
697	room_version_rules: &RoomVersionRules,
698	reason: Option<String>,
699) -> Result<(CanonicalJsonObject, OwnedEventId, Option<OwnedUserId>)> {
700	let mut event: CanonicalJsonObject =
701		serde_json::from_str(join_event_stub.get()).map_err(|e| {
702			err!(BadServerResponse("Invalid make_join event json received from server: {e:?}"))
703		})?;
704
705	let join_authorized_via_users_server = room_version_rules
706		.authorization
707		.restricted_join_rule
708		.then(|| event.get("content"))
709		.flatten()
710		.and_then(|s| {
711			s.as_object()?
712				.get("join_authorised_via_users_server")
713		})
714		.and_then(|s| OwnedUserId::try_from(s.as_str().unwrap_or_default()).ok());
715
716	let displayname = self.services.users.displayname(sender_user).ok();
717
718	let avatar_url = self.services.users.avatar_url(sender_user).ok();
719
720	let blurhash = self.services.users.blurhash(sender_user).ok();
721
722	let (displayname, avatar_url, blurhash) = join3(displayname, avatar_url, blurhash).await;
723
724	event.insert(
725		"content".into(),
726		to_canonical_value(RoomMemberEventContent {
727			displayname,
728			avatar_url,
729			blurhash,
730			reason,
731			join_authorized_via_users_server: join_authorized_via_users_server.clone(),
732			..RoomMemberEventContent::new(MembershipState::Join)
733		})?,
734	);
735
736	event.insert(
737		"origin".into(),
738		CanonicalJsonValue::String(
739			self.services
740				.globals
741				.server_name()
742				.as_str()
743				.to_owned(),
744		),
745	);
746
747	event.insert(
748		"origin_server_ts".into(),
749		CanonicalJsonValue::Integer(utils::millis_since_unix_epoch().try_into()?),
750	);
751
752	event.insert("room_id".into(), CanonicalJsonValue::String(room_id.as_str().into()));
753
754	event.insert("sender".into(), CanonicalJsonValue::String(sender_user.as_str().into()));
755
756	event.insert("state_key".into(), CanonicalJsonValue::String(sender_user.as_str().into()));
757
758	event.insert("type".into(), CanonicalJsonValue::String("m.room.member".into()));
759
760	let event_id = self
761		.services
762		.server_keys
763		.gen_id_hash_and_sign_event(&mut event, room_version_id)?;
764
765	check_rules(&event, &room_version_rules.event_format)?;
766
767	Ok((event, event_id, join_authorized_via_users_server))
768}
769
770#[implement(Service)]
771#[tracing::instrument(
772	name = "make_join",
773	level = "debug",
774	skip_all,
775	fields(?servers)
776)]
777async fn make_join_request(
778	&self,
779	sender_user: &UserId,
780	room_id: &RoomId,
781	servers: &[OwnedServerName],
782) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> {
783	let mut make_join_response_and_server =
784		Err!(BadServerResponse("No server available to assist in joining."));
785
786	let mut make_join_counter: usize = 0;
787	let mut incompatible_room_version_count: usize = 0;
788
789	for remote_server in servers {
790		if self
791			.services
792			.globals
793			.server_is_ours(remote_server)
794		{
795			continue;
796		}
797		info!("Asking {remote_server} for make_join ({make_join_counter})");
798		let make_join_response = self
799			.services
800			.federation
801			.execute(remote_server, federation::membership::prepare_join_event::v1::Request {
802				room_id: room_id.to_owned(),
803				user_id: sender_user.to_owned(),
804				ver: self
805					.services
806					.config
807					.supported_room_versions()
808					.map(at!(0))
809					.collect(),
810			})
811			.await;
812
813		trace!("make_join response: {make_join_response:?}");
814		make_join_counter = make_join_counter.saturating_add(1);
815
816		if let Err(ref e) = make_join_response {
817			if matches!(
818				e.kind(),
819				ErrorKind::IncompatibleRoomVersion { .. } | ErrorKind::UnsupportedRoomVersion
820			) {
821				incompatible_room_version_count =
822					incompatible_room_version_count.saturating_add(1);
823			}
824
825			if incompatible_room_version_count > 15 {
826				info!(
827					"15 servers have responded with M_INCOMPATIBLE_ROOM_VERSION or \
828					 M_UNSUPPORTED_ROOM_VERSION, assuming that tuwunel does not support the \
829					 room version {room_id}: {e}"
830				);
831
832				make_join_response_and_server =
833					Err!(BadServerResponse("Room version is not supported by tuwunel"));
834
835				return make_join_response_and_server;
836			}
837
838			let max_attempts = self
839				.services
840				.config
841				.max_make_join_attempts_per_join_attempt;
842
843			if make_join_counter >= max_attempts {
844				warn!(?remote_server, "last make_join failure reason: {e}");
845				warn!(
846					"{max_attempts} servers failed to provide valid make_join response, \
847					 assuming no server can assist in joining."
848				);
849
850				make_join_response_and_server =
851					Err!(BadServerResponse("No server available to assist in joining."));
852
853				return make_join_response_and_server;
854			}
855		}
856
857		make_join_response_and_server = make_join_response.map(|r| (r, remote_server.clone()));
858
859		if make_join_response_and_server.is_ok() {
860			break;
861		}
862	}
863
864	make_join_response_and_server
865}
866
867pub(super) async fn get_servers_for_room(
868	services: &Services,
869	user_id: &UserId,
870	room_id: &RoomId,
871	orig_room_id: Option<&RoomOrAliasId>,
872	via: &[OwnedServerName],
873) -> Result<Vec<OwnedServerName>> {
874	// add invited vias
875	let mut additional_servers = services
876		.state_cache
877		.servers_invite_via(room_id)
878		.map(ToOwned::to_owned)
879		.collect::<Vec<_>>()
880		.await;
881
882	// add invite senders' servers
883	additional_servers.extend(
884		services
885			.state_cache
886			.invite_state(user_id, room_id)
887			.await
888			.unwrap_or_default()
889			.iter()
890			.filter_map(|event| event.get_field("sender").ok().flatten())
891			.filter_map(|sender: &str| UserId::parse(sender).ok())
892			.map(|user| user.server_name().to_owned()),
893	);
894
895	let mut servers = Vec::from(via);
896	shuffle(&mut servers);
897
898	// Strict via: an explicit remote server in via must not be padded with
899	// the room owner, otherwise failover-probe semantics break.
900	let has_remote_via = via
901		.iter()
902		.any(|s| !services.globals.server_is_ours(s));
903
904	if !has_remote_via {
905		if let Some(server_name) = room_id.server_name() {
906			servers.insert(0, server_name.to_owned());
907		}
908
909		if let Some(orig_room_id) = orig_room_id
910			&& let Some(orig_server_name) = orig_room_id.server_name()
911		{
912			servers.insert(0, orig_server_name.to_owned());
913		}
914	}
915
916	shuffle(&mut additional_servers);
917
918	servers.extend_from_slice(&additional_servers);
919
920	// 1. (room alias server)?
921	// 2. (room id server)?
922	// 3. shuffle [via query + resolve servers]?
923	// 4. shuffle [invited via, inviters servers]?
924	debug!(?servers);
925
926	// dedup preserving order
927	let mut set = HashSet::new();
928	servers.retain(|x| set.insert(x.clone()));
929	debug!(?servers);
930
931	// sort deprioritized servers last
932	if !servers.is_empty() {
933		for i in 0..servers.len() {
934			if services
935				.server
936				.config
937				.deprioritize_joins_through_servers
938				.is_match(servers[i].host())
939			{
940				let server = servers.remove(i);
941				servers.push(server);
942			}
943		}
944	}
945
946	debug_info!(?servers);
947	Ok(servers)
948}