1use std::{
2 borrow::Borrow,
3 collections::{HashMap, HashSet},
4 iter::once,
5 mem::take,
6 sync::Arc,
7};
8
9use futures::{
10 FutureExt, StreamExt, TryFutureExt, TryStreamExt,
11 future::{join3, join4},
12};
13use ruma::{
14 CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedServerName, OwnedUserId, RoomId,
15 RoomOrAliasId, RoomVersionId, UserId,
16 api::{error::ErrorKind, federation},
17 canonical_json::to_canonical_value,
18 events::{
19 StateEventType,
20 room::{
21 join_rules::RoomJoinRulesEventContent,
22 member::{MembershipState, RoomMemberEventContent},
23 },
24 },
25 room::{AllowRule, JoinRule},
26 room_version_rules::RoomVersionRules,
27};
28use serde_json::value::RawValue as RawJsonValue;
29use tuwunel_core::{
30 Err, Result, at, debug, debug_error, debug_info, debug_warn, err, error, implement, info,
31 matrix::{event::gen_event_id_canonical_json, room_version},
32 pdu::{Pdu, PduBuilder, check_rules},
33 trace,
34 utils::{self, BoolExt, IterStream, ReadyExt, future::TryExtExt, math::Expected, shuffle},
35 warn,
36};
37
38use super::Service;
39use crate::{
40 Services,
41 rooms::{
42 state::RoomMutexGuard,
43 state_compressor::{CompressedState, HashSetCompressStateEvent},
44 state_res,
45 },
46};
47
48#[implement(Service)]
49#[tracing::instrument(
50 level = "debug",
51 skip_all,
52 fields(%sender_user, %room_id)
53)]
54pub async fn join(
55 &self,
56 sender_user: &UserId,
57 room_id: &RoomId,
58 orig_room_id: Option<&RoomOrAliasId>,
59 reason: Option<String>,
60 servers: &[OwnedServerName],
61 is_appservice: bool,
62) -> Result {
63 let state_lock = self.services.state.mutex.lock(room_id).await;
64
65 let servers =
66 get_servers_for_room(&self.services, sender_user, room_id, orig_room_id, servers).await?;
67
68 let user_is_guest = self
69 .services
70 .users
71 .is_deactivated(sender_user)
72 .await
73 .unwrap_or(false)
74 && !is_appservice;
75
76 if user_is_guest
77 && !self
78 .services
79 .state_accessor
80 .guest_can_join(room_id)
81 .await
82 {
83 return Err!(Request(Forbidden("Guests are not allowed to join this room")));
84 }
85
86 if self
87 .services
88 .state_cache
89 .is_joined(sender_user, room_id)
90 .await
91 {
92 debug_warn!("{sender_user} is already joined in {room_id}");
93 return Ok(());
94 }
95
96 if let Ok(membership) = self
97 .services
98 .state_accessor
99 .get_member(room_id, sender_user)
100 .await && membership.membership == MembershipState::Ban
101 {
102 debug_warn!("{sender_user} is banned from {room_id} but attempted to join");
103 return Err!(Request(Forbidden("You are banned from the room.")));
104 }
105
106 let server_in_room = self
107 .services
108 .state_cache
109 .server_in_room(self.services.globals.server_name(), room_id)
110 .await;
111
112 let local_join = server_in_room
113 || servers.is_empty()
114 || (servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]));
115
116 if local_join {
117 self.join_local(sender_user, room_id, reason, &servers, state_lock)
118 .boxed()
119 .await?;
120 } else {
121 self.join_remote(sender_user, room_id, reason, &servers, state_lock)
123 .boxed()
124 .await?;
125 }
126
127 Ok(())
128}
129
130#[implement(Service)]
131#[tracing::instrument(
132 name = "remote",
133 level = "debug",
134 skip_all,
135 fields(?servers)
136)]
137pub async fn join_remote(
138 &self,
139 sender_user: &UserId,
140 room_id: &RoomId,
141 reason: Option<String>,
142 servers: &[OwnedServerName],
143 state_lock: RoomMutexGuard,
144) -> Result {
145 info!("Joining {room_id} over federation.");
146
147 let (make_join_response, remote_server) = self
148 .make_join_request(sender_user, room_id, servers)
149 .await?;
150
151 info!("make_join finished");
152
153 let Some(room_version_id) = make_join_response.room_version else {
154 return Err!(BadServerResponse("Remote room version is not supported by tuwunel"));
155 };
156
157 if !self
158 .services
159 .config
160 .supported_room_version(&room_version_id)
161 {
162 return Err!(BadServerResponse(
163 "Remote room version {room_version_id} is not supported by tuwunel"
164 ));
165 }
166
167 let room_version_rules = room_version::rules(&room_version_id)?;
168 let (mut join_event, event_id, join_authorized_via_users_server) = self
169 .create_join_event(
170 room_id,
171 sender_user,
172 &make_join_response.event,
173 &room_version_id,
174 &room_version_rules,
175 reason,
176 )
177 .await?;
178
179 let send_join_request = federation::membership::create_join_event::v2::Request {
180 room_id: room_id.to_owned(),
181 event_id: event_id.clone(),
182 omit_members: true,
183 pdu: self
184 .services
185 .federation
186 .format_pdu_into(join_event.clone(), Some(&room_version_id))
187 .await,
188 };
189
190 let _federation_lock = self
193 .services
194 .event_handler
195 .mutex_federation
196 .lock(room_id)
197 .await;
198
199 info!("Asking {remote_server} for fast_join in room {room_id}");
200 let mut response = match self
201 .services
202 .federation
203 .execute(&remote_server, send_join_request)
204 .await
205 .inspect_err(|e| error!("send_join failed: {e}"))
206 {
207 | Err(e) => return Err(e),
208 | Ok(response) => response.room_state,
209 };
210
211 info!(
212 fast_join = response.members_omitted,
213 auth_chain = response.auth_chain.len(),
214 state = response.state.len(),
215 servers = response
216 .servers_in_room
217 .as_ref()
218 .map(Vec::len)
219 .unwrap_or(0),
220 "send_join finished"
221 );
222
223 if response.members_omitted {
224 use federation::event::get_room_state::v1::{Request, Response};
225
226 info!("Asking {remote_server} for state in room {room_id}");
227 match self
228 .services
229 .federation
230 .execute(&remote_server, Request {
231 room_id: room_id.to_owned(),
232 event_id: event_id.clone(),
233 })
234 .await
235 .inspect_err(|e| error!("state failed: {e}"))
236 {
237 | Err(e) => return Err(e),
238 | Ok(Response { mut auth_chain, mut pdus }) => {
239 response.auth_chain = take(&mut auth_chain);
240 response.state = take(&mut pdus);
241 },
242 }
243
244 info!(
245 auth_chain = response.auth_chain.len(),
246 state = response.state.len(),
247 "state finished"
248 );
249 }
250
251 if join_authorized_via_users_server.is_some()
252 && let Some(signed_raw) = &response.event
253 {
254 debug_info!(
255 "There is a signed event with join_authorized_via_users_server. This room is \
256 probably using restricted joins. Adding signature to our event"
257 );
258
259 let (signed_event_id, signed_value) =
260 gen_event_id_canonical_json(signed_raw, &room_version_id).map_err(|e| {
261 err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
262 })?;
263
264 if signed_event_id != event_id {
265 return Err!(Request(BadJson(warn!(
266 %signed_event_id, %event_id,
267 "Server {remote_server} sent event with wrong event ID"
268 ))));
269 }
270
271 match signed_value["signatures"]
272 .as_object()
273 .ok_or_else(|| {
274 err!(BadServerResponse(warn!(
275 "Server {remote_server} sent invalid signatures type"
276 )))
277 })
278 .and_then(|e| {
279 e.get(remote_server.as_str()).ok_or_else(|| {
280 err!(BadServerResponse(warn!(
281 "Server {remote_server} did not send its signature for a restricted room"
282 )))
283 })
284 }) {
285 | Ok(signature) => {
286 join_event
287 .get_mut("signatures")
288 .expect("we created a valid pdu")
289 .as_object_mut()
290 .expect("we created a valid pdu")
291 .insert(remote_server.as_str().into(), signature.clone());
292 },
293 | Err(e) => {
294 warn!(
295 "Server {remote_server} sent invalid signature in send_join signatures for \
296 event {signed_value:?}: {e:?}",
297 );
298 },
299 }
300 }
301
302 let shortroomid = self
303 .services
304 .short
305 .get_or_create_shortroomid(room_id)
306 .await;
307
308 info!(
309 %room_id,
310 %shortroomid,
311 "Initialized room. Parsing join event..."
312 );
313 let (parsed_join_pdu, join_event) =
314 Pdu::from_object_federation(room_id, &event_id, join_event, &room_version_rules)?;
315
316 let resp_state = &response.state;
317 let resp_auth = &response.auth_chain;
318 info!(
319 events = resp_state.len().expected_add(resp_auth.len()),
320 "Acquiring server signing keys for response events..."
321 );
322 self.services
323 .server_keys
324 .acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter()))
325 .await;
326
327 info!(events = response.state.len(), "Going through send_join response room_state...");
328 let cork = self.services.db.cork_and_flush();
329 let state = response
330 .state
331 .iter()
332 .stream()
333 .then(|pdu| {
334 self.services
335 .server_keys
336 .validate_and_add_event_id_no_fetch(pdu, &room_version_id)
337 })
338 .inspect_err(|e| debug_error!("Invalid send_join state event: {e:?}"))
339 .ready_filter_map(Result::ok)
340 .ready_filter_map(|(event_id, value)| {
341 Pdu::from_object_federation(room_id, &event_id, value, &room_version_rules)
342 .inspect_err(|e| {
343 debug_warn!("Invalid PDU {event_id:?} in send_join response: {e:?}");
344 })
345 .map(move |(pdu, value)| (event_id, pdu, value))
346 .ok()
347 })
348 .fold(HashMap::new(), async |mut state, (event_id, pdu, value)| {
349 self.services
350 .timeline
351 .add_pdu_outlier(&event_id, &value);
352
353 if let Some(state_key) = &pdu.state_key {
354 let shortstatekey = self
355 .services
356 .short
357 .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
358 .await;
359
360 state.insert(shortstatekey, pdu.event_id.clone());
361 }
362
363 state
364 })
365 .await;
366
367 drop(cork);
368
369 info!(
370 events = response.auth_chain.len(),
371 "Going through send_join response auth_chain..."
372 );
373 let cork = self.services.db.cork_and_flush();
374 response
375 .auth_chain
376 .iter()
377 .stream()
378 .then(|pdu| {
379 self.services
380 .server_keys
381 .validate_and_add_event_id_no_fetch(pdu, &room_version_id)
382 })
383 .inspect_err(|e| debug_error!("Invalid send_join auth_chain event: {e:?}"))
384 .ready_filter_map(Result::ok)
385 .ready_for_each(|(event_id, mut value)| {
386 if !room_version_rules
387 .event_format
388 .require_room_create_room_id
389 && value["type"] == "m.room.create"
390 {
391 let room_id = CanonicalJsonValue::String(room_id.as_str().into());
392 value.insert("room_id".into(), room_id);
393 }
394
395 self.services
396 .timeline
397 .add_pdu_outlier(&event_id, &value);
398 })
399 .await;
400
401 drop(cork);
402
403 debug!("Running send_join auth check...");
404 state_res::auth_check(
405 &room_version_rules,
406 &parsed_join_pdu,
407 &async |event_id| self.services.timeline.get_pdu(&event_id).await,
408 &async |event_type, state_key| {
409 let shortstatekey = self
410 .services
411 .short
412 .get_shortstatekey(&event_type, state_key.as_str())
413 .await?;
414
415 let event_id = state.get(&shortstatekey).ok_or_else(|| {
416 err!(Request(NotFound("Missing fetch_state {shortstatekey:?}")))
417 })?;
418
419 self.services.timeline.get_pdu(event_id).await
420 },
421 )
422 .inspect_err(|e| error!("send_join auth check failed: {e:?}"))
423 .boxed()
424 .await?;
425
426 info!(events = state.len(), "Compressing state from send_join...");
427 let compressed: CompressedState = self
428 .services
429 .state_compressor
430 .compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow())))
431 .collect()
432 .await;
433
434 debug!("Saving compressed state...");
435 let HashSetCompressStateEvent {
436 shortstatehash: statehash_before_join,
437 added,
438 removed,
439 } = self
440 .services
441 .state_compressor
442 .save_state(room_id, Arc::new(compressed))
443 .await?;
444
445 debug!(
446 state_hash = ?statehash_before_join,
447 "Forcing state for new room..."
448 );
449 self.services
450 .state
451 .force_state(room_id, statehash_before_join, added, removed, &state_lock)
452 .await?;
453
454 self.services
455 .state_cache
456 .update_joined_count(room_id)
457 .await;
458
459 let statehash_after_join = self
463 .services
464 .state
465 .append_to_state(&parsed_join_pdu)
466 .await?;
467
468 info!(
469 event_id = %parsed_join_pdu.event_id,
470 "Appending new room join event..."
471 );
472
473 self.services
474 .timeline
475 .append_pdu(
476 &parsed_join_pdu,
477 join_event,
478 once(parsed_join_pdu.event_id.borrow()),
479 &state_lock,
480 )
481 .await?;
482
483 self.services
486 .state
487 .set_room_state(room_id, statehash_after_join, &state_lock);
488
489 info!(
490 statehash = %statehash_after_join,
491 "Set final room state for new room."
492 );
493
494 Ok(())
495}
496
497#[implement(Service)]
498#[tracing::instrument(name = "local", level = "debug", skip_all)]
499pub async fn join_local(
500 &self,
501 sender_user: &UserId,
502 room_id: &RoomId,
503 reason: Option<String>,
504 servers: &[OwnedServerName],
505 state_lock: RoomMutexGuard,
506) -> Result {
507 debug_info!("We can join locally");
508
509 let join_rules_event_content = self
510 .services
511 .state_accessor
512 .room_state_get_content::<RoomJoinRulesEventContent>(
513 room_id,
514 &StateEventType::RoomJoinRules,
515 "",
516 )
517 .await;
518
519 let restriction_rooms = match join_rules_event_content {
520 | Ok(RoomJoinRulesEventContent {
521 join_rule: JoinRule::Restricted(restricted) | JoinRule::KnockRestricted(restricted),
522 }) => restricted
523 .allow
524 .into_iter()
525 .filter_map(|a| match a {
526 | AllowRule::RoomMembership(r) => Some(r.room_id),
527 | _ => None,
528 })
529 .collect(),
530 | _ => Vec::new(),
531 };
532
533 let is_joined_restricted_rooms = restriction_rooms
534 .iter()
535 .stream()
536 .any(|restriction_room_id| {
537 self.services
538 .state_cache
539 .is_joined(sender_user, restriction_room_id)
540 })
541 .await;
542
543 let join_authorized_via_users_server = is_joined_restricted_rooms.then_async(async || {
544 self.services
545 .state_cache
546 .local_users_in_room(room_id)
547 .filter(|user| {
548 self.services.state_accessor.user_can_invite(
549 room_id,
550 user,
551 sender_user,
552 &state_lock,
553 )
554 })
555 .map(ToOwned::to_owned)
556 .boxed()
557 .next()
558 .await
559 });
560
561 let displayname = self.services.users.displayname(sender_user).ok();
562
563 let avatar_url = self.services.users.avatar_url(sender_user).ok();
564
565 let blurhash = self.services.users.blurhash(sender_user).ok();
566
567 let (displayname, avatar_url, blurhash, join_authorized_via_users_server) = join4(
568 displayname,
569 avatar_url,
570 blurhash,
571 join_authorized_via_users_server.map(Option::flatten),
572 )
573 .await;
574
575 let content = RoomMemberEventContent {
576 displayname,
577 avatar_url,
578 blurhash,
579 reason: reason.clone(),
580 join_authorized_via_users_server,
581 ..RoomMemberEventContent::new(MembershipState::Join)
582 };
583
584 let Err(error) = self
586 .services
587 .timeline
588 .build_and_append_pdu(
589 PduBuilder::state(sender_user.to_string(), &content),
590 sender_user,
591 room_id,
592 &state_lock,
593 )
594 .await
595 else {
596 return Ok(());
597 };
598
599 if restriction_rooms.is_empty()
600 && (servers.is_empty()
601 || servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]))
602 {
603 return Err(error);
604 }
605
606 warn!(
607 "We couldn't do the join locally, maybe federation can help to satisfy the restricted \
608 join requirements"
609 );
610
611 drop(state_lock);
614
615 let Ok((make_join_response, remote_server)) = self
616 .make_join_request(sender_user, room_id, servers)
617 .await
618 else {
619 return Err(error);
620 };
621
622 let Some(room_version_id) = make_join_response.room_version else {
623 return Err!(BadServerResponse("Remote room version is not supported by tuwunel"));
624 };
625
626 if !self
627 .services
628 .config
629 .supported_room_version(&room_version_id)
630 {
631 return Err!(BadServerResponse(
632 "Remote room version {room_version_id} is not supported by tuwunel"
633 ));
634 }
635
636 let room_version_rules = room_version::rules(&room_version_id)?;
637 let (join_event, event_id, _) = self
638 .create_join_event(
639 room_id,
640 sender_user,
641 &make_join_response.event,
642 &room_version_id,
643 &room_version_rules,
644 reason,
645 )
646 .await?;
647
648 let send_join_request = federation::membership::create_join_event::v2::Request {
649 room_id: room_id.to_owned(),
650 event_id: event_id.clone(),
651 omit_members: true,
652 pdu: self
653 .services
654 .federation
655 .format_pdu_into(join_event.clone(), Some(&room_version_id))
656 .await,
657 };
658
659 let send_join_response = self
660 .services
661 .federation
662 .execute(&remote_server, send_join_request)
663 .await?;
664
665 let Some(signed_raw) = send_join_response.room_state.event else {
666 return Err(error);
667 };
668
669 let (signed_event_id, signed_value) =
670 gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| {
671 err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
672 })?;
673
674 if signed_event_id != event_id {
675 return Err!(Request(BadJson(warn!(
676 %signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID"
677 ))));
678 }
679
680 self.services
681 .event_handler
682 .handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true)
683 .boxed()
684 .await?;
685
686 Ok(())
687}
688
689#[implement(Service)]
690#[tracing::instrument(name = "make_join", level = "debug", skip_all)]
691async fn create_join_event(
692 &self,
693 room_id: &RoomId,
694 sender_user: &UserId,
695 join_event_stub: &RawJsonValue,
696 room_version_id: &RoomVersionId,
697 room_version_rules: &RoomVersionRules,
698 reason: Option<String>,
699) -> Result<(CanonicalJsonObject, OwnedEventId, Option<OwnedUserId>)> {
700 let mut event: CanonicalJsonObject =
701 serde_json::from_str(join_event_stub.get()).map_err(|e| {
702 err!(BadServerResponse("Invalid make_join event json received from server: {e:?}"))
703 })?;
704
705 let join_authorized_via_users_server = room_version_rules
706 .authorization
707 .restricted_join_rule
708 .then(|| event.get("content"))
709 .flatten()
710 .and_then(|s| {
711 s.as_object()?
712 .get("join_authorised_via_users_server")
713 })
714 .and_then(|s| OwnedUserId::try_from(s.as_str().unwrap_or_default()).ok());
715
716 let displayname = self.services.users.displayname(sender_user).ok();
717
718 let avatar_url = self.services.users.avatar_url(sender_user).ok();
719
720 let blurhash = self.services.users.blurhash(sender_user).ok();
721
722 let (displayname, avatar_url, blurhash) = join3(displayname, avatar_url, blurhash).await;
723
724 event.insert(
725 "content".into(),
726 to_canonical_value(RoomMemberEventContent {
727 displayname,
728 avatar_url,
729 blurhash,
730 reason,
731 join_authorized_via_users_server: join_authorized_via_users_server.clone(),
732 ..RoomMemberEventContent::new(MembershipState::Join)
733 })?,
734 );
735
736 event.insert(
737 "origin".into(),
738 CanonicalJsonValue::String(
739 self.services
740 .globals
741 .server_name()
742 .as_str()
743 .to_owned(),
744 ),
745 );
746
747 event.insert(
748 "origin_server_ts".into(),
749 CanonicalJsonValue::Integer(utils::millis_since_unix_epoch().try_into()?),
750 );
751
752 event.insert("room_id".into(), CanonicalJsonValue::String(room_id.as_str().into()));
753
754 event.insert("sender".into(), CanonicalJsonValue::String(sender_user.as_str().into()));
755
756 event.insert("state_key".into(), CanonicalJsonValue::String(sender_user.as_str().into()));
757
758 event.insert("type".into(), CanonicalJsonValue::String("m.room.member".into()));
759
760 let event_id = self
761 .services
762 .server_keys
763 .gen_id_hash_and_sign_event(&mut event, room_version_id)?;
764
765 check_rules(&event, &room_version_rules.event_format)?;
766
767 Ok((event, event_id, join_authorized_via_users_server))
768}
769
770#[implement(Service)]
771#[tracing::instrument(
772 name = "make_join",
773 level = "debug",
774 skip_all,
775 fields(?servers)
776)]
777async fn make_join_request(
778 &self,
779 sender_user: &UserId,
780 room_id: &RoomId,
781 servers: &[OwnedServerName],
782) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> {
783 let mut make_join_response_and_server =
784 Err!(BadServerResponse("No server available to assist in joining."));
785
786 let mut make_join_counter: usize = 0;
787 let mut incompatible_room_version_count: usize = 0;
788
789 for remote_server in servers {
790 if self
791 .services
792 .globals
793 .server_is_ours(remote_server)
794 {
795 continue;
796 }
797 info!("Asking {remote_server} for make_join ({make_join_counter})");
798 let make_join_response = self
799 .services
800 .federation
801 .execute(remote_server, federation::membership::prepare_join_event::v1::Request {
802 room_id: room_id.to_owned(),
803 user_id: sender_user.to_owned(),
804 ver: self
805 .services
806 .config
807 .supported_room_versions()
808 .map(at!(0))
809 .collect(),
810 })
811 .await;
812
813 trace!("make_join response: {make_join_response:?}");
814 make_join_counter = make_join_counter.saturating_add(1);
815
816 if let Err(ref e) = make_join_response {
817 if matches!(
818 e.kind(),
819 ErrorKind::IncompatibleRoomVersion { .. } | ErrorKind::UnsupportedRoomVersion
820 ) {
821 incompatible_room_version_count =
822 incompatible_room_version_count.saturating_add(1);
823 }
824
825 if incompatible_room_version_count > 15 {
826 info!(
827 "15 servers have responded with M_INCOMPATIBLE_ROOM_VERSION or \
828 M_UNSUPPORTED_ROOM_VERSION, assuming that tuwunel does not support the \
829 room version {room_id}: {e}"
830 );
831
832 make_join_response_and_server =
833 Err!(BadServerResponse("Room version is not supported by tuwunel"));
834
835 return make_join_response_and_server;
836 }
837
838 let max_attempts = self
839 .services
840 .config
841 .max_make_join_attempts_per_join_attempt;
842
843 if make_join_counter >= max_attempts {
844 warn!(?remote_server, "last make_join failure reason: {e}");
845 warn!(
846 "{max_attempts} servers failed to provide valid make_join response, \
847 assuming no server can assist in joining."
848 );
849
850 make_join_response_and_server =
851 Err!(BadServerResponse("No server available to assist in joining."));
852
853 return make_join_response_and_server;
854 }
855 }
856
857 make_join_response_and_server = make_join_response.map(|r| (r, remote_server.clone()));
858
859 if make_join_response_and_server.is_ok() {
860 break;
861 }
862 }
863
864 make_join_response_and_server
865}
866
867pub(super) async fn get_servers_for_room(
868 services: &Services,
869 user_id: &UserId,
870 room_id: &RoomId,
871 orig_room_id: Option<&RoomOrAliasId>,
872 via: &[OwnedServerName],
873) -> Result<Vec<OwnedServerName>> {
874 let mut additional_servers = services
876 .state_cache
877 .servers_invite_via(room_id)
878 .map(ToOwned::to_owned)
879 .collect::<Vec<_>>()
880 .await;
881
882 additional_servers.extend(
884 services
885 .state_cache
886 .invite_state(user_id, room_id)
887 .await
888 .unwrap_or_default()
889 .iter()
890 .filter_map(|event| event.get_field("sender").ok().flatten())
891 .filter_map(|sender: &str| UserId::parse(sender).ok())
892 .map(|user| user.server_name().to_owned()),
893 );
894
895 let mut servers = Vec::from(via);
896 shuffle(&mut servers);
897
898 let has_remote_via = via
901 .iter()
902 .any(|s| !services.globals.server_is_ours(s));
903
904 if !has_remote_via {
905 if let Some(server_name) = room_id.server_name() {
906 servers.insert(0, server_name.to_owned());
907 }
908
909 if let Some(orig_room_id) = orig_room_id
910 && let Some(orig_server_name) = orig_room_id.server_name()
911 {
912 servers.insert(0, orig_server_name.to_owned());
913 }
914 }
915
916 shuffle(&mut additional_servers);
917
918 servers.extend_from_slice(&additional_servers);
919
920 debug!(?servers);
925
926 let mut set = HashSet::new();
928 servers.retain(|x| set.insert(x.clone()));
929 debug!(?servers);
930
931 if !servers.is_empty() {
933 for i in 0..servers.len() {
934 if services
935 .server
936 .config
937 .deprioritize_joins_through_servers
938 .is_match(servers[i].host())
939 {
940 let server = servers.remove(i);
941 servers.push(server);
942 }
943 }
944 }
945
946 debug_info!(?servers);
947 Ok(servers)
948}