1use std::{
2 borrow::Borrow,
3 collections::{BTreeSet, HashMap, HashSet},
4 iter::once,
5 mem::take,
6 sync::Arc,
7};
8
9use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
10use ruma::{
11 CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedServerName, OwnedUserId, RoomId,
12 RoomOrAliasId, RoomVersionId, UserId,
13 api::{error::ErrorKind, federation},
14 canonical_json::to_canonical_value,
15 events::{
16 StateEventType,
17 room::{
18 create::RoomCreateEventContent,
19 join_rules::RoomJoinRulesEventContent,
20 member::{MembershipState, RoomMemberEventContent},
21 },
22 },
23 room::{AllowRule, JoinRule},
24 room_version_rules::RoomVersionRules,
25};
26use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
27use tuwunel_core::{
28 Err, Result, at, debug, debug_error, debug_info, debug_warn, err, error, implement, info,
29 matrix::{event::gen_event_id_canonical_json, room_version},
30 pdu::{Pdu, PduBuilder, check_rules},
31 trace,
32 utils::{self, BoolExt, IterStream, ReadyExt, math::Expected, shuffle},
33 warn,
34};
35
36use super::Service;
37use crate::{
38 Services,
39 federation::{Candidates, WhenAllBackedOff},
40 rooms::{
41 state::RoomMutexGuard,
42 state_compressor::{CompressedState, HashSetCompressStateEvent},
43 state_res,
44 },
45};
46
47#[implement(Service)]
48#[expect(clippy::too_many_arguments)]
49#[tracing::instrument(
50 level = "debug",
51 skip_all,
52 fields(%sender_user, %room_id)
53)]
54pub async fn join(
55 &self,
56 sender_user: &UserId,
57 room_id: &RoomId,
58 orig_room_id: Option<&RoomOrAliasId>,
59 reason: Option<String>,
60 servers: &[OwnedServerName],
61 is_appservice: bool,
62 extra_content: Option<CanonicalJsonObject>,
63) -> Result {
64 let state_lock = self.services.state.mutex.lock(room_id).await;
65
66 let servers =
67 get_servers_for_room(&self.services, sender_user, room_id, orig_room_id, servers).await?;
68
69 let user_is_guest = self
70 .services
71 .users
72 .is_deactivated(sender_user)
73 .await
74 .unwrap_or(false)
75 && !is_appservice;
76
77 if user_is_guest
78 && !self
79 .services
80 .state_accessor
81 .guest_can_join(room_id)
82 .await
83 {
84 return Err!(Request(Forbidden("Guests are not allowed to join this room")));
85 }
86
87 if self
88 .services
89 .state_cache
90 .is_joined(sender_user, room_id)
91 .await
92 {
93 debug_warn!("{sender_user} is already joined in {room_id}");
94 return Ok(());
95 }
96
97 if let Ok(membership) = self
99 .services
100 .state_accessor
101 .get_member(room_id, sender_user)
102 .await && membership.membership == MembershipState::Ban
103 && !self
104 .services
105 .state_cache
106 .is_invited(sender_user, room_id)
107 .await
108 {
109 debug_warn!("{sender_user} is banned from {room_id} but attempted to join");
110 return Err!(Request(Forbidden("You are banned from the room.")));
111 }
112
113 let server_in_room = self
114 .services
115 .state_cache
116 .server_in_room(self.services.globals.server_name(), room_id)
117 .await;
118
119 let local_join = server_in_room
120 || servers.is_empty()
121 || (servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]));
122
123 if local_join {
124 self.join_local(sender_user, room_id, reason, &servers, state_lock, extra_content)
125 .boxed()
126 .await?;
127 } else {
128 self.join_remote(sender_user, room_id, reason, &servers, state_lock, extra_content)
130 .boxed()
131 .await?;
132 }
133
134 self.copy_predecessor_push_rules(sender_user, room_id)
135 .await;
136
137 Ok(())
138}
139
140#[implement(Service)]
141async fn copy_predecessor_push_rules(&self, user_id: &UserId, room_id: &RoomId) {
142 let Ok(create): Result<RoomCreateEventContent> = self
143 .services
144 .state_accessor
145 .room_state_get_content(room_id, &StateEventType::RoomCreate, "")
146 .await
147 else {
148 return;
149 };
150
151 let Some(predecessor) = create.predecessor else {
152 return;
153 };
154
155 self.services
156 .account_data
157 .copy_room_push_rule(user_id, &predecessor.room_id, room_id)
158 .await
159 .ok();
160}
161
162#[implement(Service)]
163#[tracing::instrument(
164 name = "remote",
165 level = "debug",
166 skip_all,
167 fields(?servers)
168)]
169pub async fn join_remote(
170 &self,
171 sender_user: &UserId,
172 room_id: &RoomId,
173 reason: Option<String>,
174 servers: &[OwnedServerName],
175 state_lock: RoomMutexGuard,
176 extra_content: Option<CanonicalJsonObject>,
177) -> Result {
178 info!("Joining {room_id} over federation.");
179
180 let (make_join_response, remote_server) = self
181 .make_join_request(sender_user, room_id, servers)
182 .await?;
183
184 info!("make_join finished");
185
186 let room_version_id = self.require_supported_remote_room_version(&make_join_response)?;
187 let room_version_rules = room_version::rules(&room_version_id)?;
188 let (mut join_event, event_id, join_authorized_via_users_server) = self
189 .create_join_event(
190 room_id,
191 sender_user,
192 &make_join_response.event,
193 &room_version_id,
194 &room_version_rules,
195 reason,
196 extra_content,
197 )
198 .await?;
199
200 let _federation_lock = self
203 .services
204 .event_handler
205 .mutex_federation
206 .lock(room_id)
207 .await;
208
209 let mut response = self
210 .execute_send_join(
211 &remote_server,
212 room_id,
213 &event_id,
214 join_event.clone(),
215 &room_version_id,
216 )
217 .await?;
218
219 if response.members_omitted {
220 self.fetch_omitted_state(&remote_server, room_id, &event_id, servers, &mut response)
221 .await?;
222 }
223
224 if join_authorized_via_users_server.is_some() {
225 merge_restricted_signature(
226 &remote_server,
227 &event_id,
228 &room_version_id,
229 &response,
230 &mut join_event,
231 )?;
232 }
233
234 let shortroomid = self
235 .services
236 .short
237 .get_or_create_shortroomid(room_id)
238 .await;
239
240 info!(
241 %room_id,
242 %shortroomid,
243 "Initialized room. Parsing join event..."
244 );
245 let (parsed_join_pdu, join_event) =
246 Pdu::from_object_federation(room_id, &event_id, join_event, &room_version_rules)?;
247
248 info!(
249 events = response
250 .state
251 .len()
252 .expected_add(response.auth_chain.len()),
253 "Acquiring server signing keys for response events..."
254 );
255 self.services
256 .server_keys
257 .acquire_events_pubkeys(
258 response
259 .auth_chain
260 .iter()
261 .chain(response.state.iter()),
262 )
263 .await;
264
265 let state = self
266 .ingest_send_join_state(room_id, &room_version_id, &room_version_rules, &response.state)
267 .await;
268
269 self.ingest_send_join_auth_chain(
270 room_id,
271 &room_version_id,
272 &room_version_rules,
273 &response.auth_chain,
274 )
275 .await;
276
277 debug!("Running send_join auth check...");
278 state_res::auth_check(
279 &room_version_rules,
280 &parsed_join_pdu,
281 &async |event_id| self.services.timeline.get_pdu(&event_id).await,
282 &async |event_type, state_key| {
283 let shortstatekey = self
284 .services
285 .short
286 .get_shortstatekey(&event_type, state_key.as_str())
287 .await?;
288
289 let event_id = state.get(&shortstatekey).ok_or_else(|| {
290 err!(Request(NotFound("Missing fetch_state {shortstatekey:?}")))
291 })?;
292
293 self.services.timeline.get_pdu(event_id).await
294 },
295 )
296 .inspect_err(|e| error!("send_join auth check failed: {e:?}"))
297 .boxed()
298 .await?;
299
300 self.apply_send_join_state(room_id, &state, &state_lock)
301 .await?;
302
303 let statehash_after_join = self
307 .services
308 .state
309 .append_to_state(&parsed_join_pdu)
310 .await?;
311
312 info!(
313 event_id = %parsed_join_pdu.event_id,
314 "Appending new room join event..."
315 );
316
317 self.services
318 .timeline
319 .append_pdu(
320 &parsed_join_pdu,
321 join_event,
322 once(parsed_join_pdu.event_id.borrow()),
323 &state_lock,
324 )
325 .await?;
326
327 self.services
330 .state
331 .set_room_state(room_id, statehash_after_join, &state_lock);
332
333 info!(
334 statehash = %statehash_after_join,
335 "Set final room state for new room."
336 );
337
338 Ok(())
339}
340
341#[implement(Service)]
342fn require_supported_remote_room_version(
343 &self,
344 make_join_response: &federation::membership::prepare_join_event::v1::Response,
345) -> Result<RoomVersionId> {
346 let Some(room_version_id) = make_join_response.room_version.clone() else {
347 return Err!(BadServerResponse("Remote room version is not supported by tuwunel"));
348 };
349
350 if !self
351 .services
352 .config
353 .supported_room_version(&room_version_id)
354 {
355 return Err!(BadServerResponse(
356 "Remote room version {room_version_id} is not supported by tuwunel"
357 ));
358 }
359
360 Ok(room_version_id)
361}
362
363#[implement(Service)]
364async fn execute_send_join(
365 &self,
366 remote_server: &OwnedServerName,
367 room_id: &RoomId,
368 event_id: &OwnedEventId,
369 join_event: CanonicalJsonObject,
370 room_version_id: &RoomVersionId,
371) -> Result<federation::membership::create_join_event::v2::RoomState> {
372 let send_join_request = federation::membership::create_join_event::v2::Request {
373 room_id: room_id.to_owned(),
374 event_id: event_id.clone(),
375 omit_members: true,
376 pdu: self
377 .services
378 .federation
379 .format_pdu_into(join_event, Some(room_version_id))
380 .await,
381 };
382
383 info!("Asking {remote_server} for fast_join in room {room_id}");
384 let response = self
385 .services
386 .federation
387 .execute(remote_server, send_join_request)
388 .await
389 .inspect_err(|e| error!("send_join failed: {e}"))?
390 .room_state;
391
392 info!(
393 fast_join = response.members_omitted,
394 auth_chain = response.auth_chain.len(),
395 state = response.state.len(),
396 servers = response
397 .servers_in_room
398 .as_ref()
399 .map(Vec::len)
400 .unwrap_or(0),
401 "send_join finished"
402 );
403
404 Ok(response)
405}
406
407#[implement(Service)]
408async fn fetch_omitted_state(
409 &self,
410 remote_server: &OwnedServerName,
411 room_id: &RoomId,
412 event_id: &OwnedEventId,
413 servers: &[OwnedServerName],
414 response: &mut federation::membership::create_join_event::v2::RoomState,
415) -> Result {
416 use federation::event::get_room_state::v1::{Request, Response};
417
418 let eligible =
419 self.omitted_state_servers(remote_server, servers, response.servers_in_room.as_deref());
420
421 let candidates = self
422 .services
423 .federation
424 .rank_candidates(eligible, WhenAllBackedOff::Attempt)
425 .await;
426
427 let mut last_error = Err!(BadServerResponse("No server provided omitted send_join state."));
428 for server in candidates {
429 info!("Asking {server} for state in room {room_id}");
430 let result = self
431 .services
432 .federation
433 .execute(&server, Request {
434 room_id: room_id.to_owned(),
435 event_id: event_id.clone(),
436 })
437 .await;
438
439 match result {
440 | Err(e) => {
441 debug_warn!(?server, "state fetch failed: {e}");
442 last_error = Err(e);
443 },
444 | Ok(Response { mut auth_chain, mut pdus }) => {
445 response.auth_chain = take(&mut auth_chain);
446 response.state = take(&mut pdus);
447
448 info!(
449 auth_chain = response.auth_chain.len(),
450 state = response.state.len(),
451 "state finished"
452 );
453
454 return Ok(());
455 },
456 }
457 }
458
459 last_error
460}
461
462#[implement(Service)]
463fn omitted_state_servers(
464 &self,
465 remote_server: &OwnedServerName,
466 servers: &[OwnedServerName],
467 servers_in_room: Option<&[String]>,
468) -> Candidates {
469 let extracted = servers_in_room
470 .into_iter()
471 .flatten()
472 .filter_map(|server| OwnedServerName::parse(server.as_str()).ok());
473
474 let mut seen = BTreeSet::new();
475 once(remote_server.clone())
476 .chain(extracted)
477 .chain(servers.iter().cloned())
478 .filter(|server| !self.services.globals.server_is_ours(server))
479 .filter(move |server| seen.insert(server.clone()))
480 .take(
481 self.services
482 .config
483 .max_make_join_attempts_per_join_attempt,
484 )
485 .collect()
486}
487
488fn merge_restricted_signature(
489 remote_server: &OwnedServerName,
490 event_id: &OwnedEventId,
491 room_version_id: &RoomVersionId,
492 response: &federation::membership::create_join_event::v2::RoomState,
493 join_event: &mut CanonicalJsonObject,
494) -> Result {
495 let Some(signed_raw) = &response.event else {
496 return Ok(());
497 };
498
499 debug_info!(
500 "There is a signed event with join_authorized_via_users_server. This room is probably \
501 using restricted joins. Adding signature to our event"
502 );
503
504 let (signed_event_id, signed_value) =
505 gen_event_id_canonical_json(signed_raw, room_version_id).map_err(|e| {
506 err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
507 })?;
508
509 if signed_event_id != *event_id {
510 return Err!(Request(BadJson(warn!(
511 %signed_event_id, %event_id,
512 "Server {remote_server} sent event with wrong event ID"
513 ))));
514 }
515
516 let signature = signed_value["signatures"]
517 .as_object()
518 .ok_or_else(|| {
519 err!(BadServerResponse(warn!("Server {remote_server} sent invalid signatures type")))
520 })
521 .and_then(|e| {
522 e.get(remote_server.as_str()).ok_or_else(|| {
523 err!(BadServerResponse(warn!(
524 "Server {remote_server} did not send its signature for a restricted room"
525 )))
526 })
527 });
528
529 match signature {
530 | Ok(signature) => {
531 join_event
532 .get_mut("signatures")
533 .expect("we created a valid pdu")
534 .as_object_mut()
535 .expect("we created a valid pdu")
536 .insert(remote_server.as_str().into(), signature.clone());
537 },
538 | Err(e) => {
539 warn!(
540 "Server {remote_server} sent invalid signature in send_join signatures for \
541 event {signed_value:?}: {e:?}",
542 );
543 },
544 }
545
546 Ok(())
547}
548
549#[implement(Service)]
550async fn ingest_send_join_state(
551 &self,
552 room_id: &RoomId,
553 room_version_id: &RoomVersionId,
554 room_version_rules: &RoomVersionRules,
555 state_pdus: &[Box<RawJsonValue>],
556) -> HashMap<u64, OwnedEventId> {
557 info!(events = state_pdus.len(), "Going through send_join response room_state...");
558 let cork = self.services.db.cork_and_flush();
559 let state = state_pdus
560 .iter()
561 .stream()
562 .then(|pdu| {
563 self.services
564 .server_keys
565 .validate_and_add_event_id_no_fetch(pdu, room_version_id)
566 })
567 .inspect_err(|e| debug_error!("Invalid send_join state event: {e:?}"))
568 .ready_filter_map(Result::ok)
569 .ready_filter_map(|(event_id, value)| {
570 Pdu::from_object_federation(room_id, &event_id, value, room_version_rules)
571 .inspect_err(|e| {
572 debug_warn!("Invalid PDU {event_id:?} in send_join response: {e:?}");
573 })
574 .map(move |(pdu, value)| (event_id, pdu, value))
575 .ok()
576 })
577 .fold(HashMap::new(), async |mut state, (event_id, pdu, value)| {
578 self.services
579 .timeline
580 .add_pdu_outlier(&event_id, &value);
581
582 if let Some(state_key) = &pdu.state_key {
583 let shortstatekey = self
584 .services
585 .short
586 .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
587 .await;
588
589 state.insert(shortstatekey, pdu.event_id.clone());
590 }
591
592 state
593 })
594 .await;
595
596 drop(cork);
597 state
598}
599
600#[implement(Service)]
601async fn ingest_send_join_auth_chain(
602 &self,
603 room_id: &RoomId,
604 room_version_id: &RoomVersionId,
605 room_version_rules: &RoomVersionRules,
606 auth_chain: &[Box<RawJsonValue>],
607) {
608 info!(events = auth_chain.len(), "Going through send_join response auth_chain...");
609 let cork = self.services.db.cork_and_flush();
610 auth_chain
611 .iter()
612 .stream()
613 .then(|pdu| {
614 self.services
615 .server_keys
616 .validate_and_add_event_id_no_fetch(pdu, room_version_id)
617 })
618 .inspect_err(|e| debug_error!("Invalid send_join auth_chain event: {e:?}"))
619 .ready_filter_map(Result::ok)
620 .ready_for_each(|(event_id, mut value)| {
621 if !room_version_rules
622 .event_format
623 .require_room_create_room_id
624 && value["type"] == "m.room.create"
625 {
626 let room_id = CanonicalJsonValue::String(room_id.as_str().into());
627 value.insert("room_id".into(), room_id);
628 }
629
630 self.services
631 .timeline
632 .add_pdu_outlier(&event_id, &value);
633 })
634 .await;
635
636 drop(cork);
637}
638
639#[implement(Service)]
640async fn apply_send_join_state(
641 &self,
642 room_id: &RoomId,
643 state: &HashMap<u64, OwnedEventId>,
644 state_lock: &RoomMutexGuard,
645) -> Result {
646 info!(events = state.len(), "Compressing state from send_join...");
647 let compressed: CompressedState = self
648 .services
649 .state_compressor
650 .compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow())))
651 .collect()
652 .await;
653
654 debug!("Saving compressed state...");
655 let HashSetCompressStateEvent {
656 shortstatehash: statehash_before_join,
657 added,
658 removed,
659 } = self
660 .services
661 .state_compressor
662 .save_state(room_id, Arc::new(compressed))
663 .await?;
664
665 debug!(
666 state_hash = ?statehash_before_join,
667 "Forcing state for new room..."
668 );
669 self.services
670 .state
671 .force_state(room_id, statehash_before_join, added, removed, state_lock)
672 .await?;
673
674 self.services
675 .state_cache
676 .update_joined_count(room_id)
677 .await;
678
679 Ok(())
680}
681
682#[implement(Service)]
683#[tracing::instrument(name = "local", level = "debug", skip_all)]
684pub async fn join_local(
685 &self,
686 sender_user: &UserId,
687 room_id: &RoomId,
688 reason: Option<String>,
689 servers: &[OwnedServerName],
690 state_lock: RoomMutexGuard,
691 extra_content: Option<CanonicalJsonObject>,
692) -> Result {
693 debug_info!("We can join locally");
694
695 let join_rules_event_content = self
696 .services
697 .state_accessor
698 .room_state_get_content::<RoomJoinRulesEventContent>(
699 room_id,
700 &StateEventType::RoomJoinRules,
701 "",
702 )
703 .await;
704
705 let restriction_rooms = match join_rules_event_content {
706 | Ok(RoomJoinRulesEventContent {
707 join_rule: JoinRule::Restricted(restricted) | JoinRule::KnockRestricted(restricted),
708 }) => restricted
709 .allow
710 .into_iter()
711 .filter_map(|a| match a {
712 | AllowRule::RoomMembership(r) => Some(r.room_id),
713 | _ => None,
714 })
715 .collect(),
716 | _ => Vec::new(),
717 };
718
719 let is_joined_restricted_rooms = restriction_rooms
720 .iter()
721 .stream()
722 .any(|restriction_room_id| {
723 self.services
724 .state_cache
725 .is_joined(sender_user, restriction_room_id)
726 })
727 .await;
728
729 let join_authorized_via_users_server = is_joined_restricted_rooms
730 .then_async(async || {
731 self.services
732 .state_cache
733 .local_users_in_room(room_id)
734 .filter(|user| {
735 self.services.state_accessor.user_can_invite(
736 room_id,
737 user,
738 sender_user,
739 &state_lock,
740 )
741 })
742 .map(ToOwned::to_owned)
743 .boxed()
744 .next()
745 .await
746 })
747 .map(Option::flatten)
748 .await;
749
750 let mut content = RoomMemberEventContent {
751 reason: reason.clone(),
752 join_authorized_via_users_server,
753 ..RoomMemberEventContent::new(MembershipState::Join)
754 };
755
756 self.services
757 .profile
758 .fill_profile_data(sender_user, &mut content)
759 .await;
760
761 let content = merge_member_content(content, extra_content.as_ref())?;
762
763 let pdu_builder = PduBuilder {
764 event_type: StateEventType::RoomMember.into(),
765 content: to_raw_value(&content).map(Into::into)?,
766 state_key: Some(sender_user.to_string().into()),
767 ..Default::default()
768 };
769
770 let Err(error) = self
772 .services
773 .timeline
774 .build_and_append_pdu(pdu_builder, sender_user, room_id, &state_lock)
775 .await
776 else {
777 return Ok(());
778 };
779
780 if restriction_rooms.is_empty()
781 && (servers.is_empty()
782 || servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]))
783 {
784 return Err(error);
785 }
786
787 warn!(
788 "We couldn't do the join locally, maybe federation can help to satisfy the restricted \
789 join requirements"
790 );
791
792 drop(state_lock);
795
796 let Ok((make_join_response, remote_server)) = self
797 .make_join_request(sender_user, room_id, servers)
798 .await
799 else {
800 return Err(error);
801 };
802
803 let room_version_id = self.require_supported_remote_room_version(&make_join_response)?;
804 let room_version_rules = room_version::rules(&room_version_id)?;
805 let (join_event, event_id, _) = self
806 .create_join_event(
807 room_id,
808 sender_user,
809 &make_join_response.event,
810 &room_version_id,
811 &room_version_rules,
812 reason,
813 extra_content,
814 )
815 .await?;
816
817 let send_join_response = self
818 .execute_send_join(&remote_server, room_id, &event_id, join_event, &room_version_id)
819 .await?;
820
821 let Some(signed_raw) = send_join_response.event else {
822 return Err(error);
823 };
824
825 let (signed_event_id, signed_value) =
826 gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| {
827 err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
828 })?;
829
830 if signed_event_id != event_id {
831 return Err!(Request(BadJson(warn!(
832 %signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID"
833 ))));
834 }
835
836 self.services
837 .event_handler
838 .handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true)
839 .boxed()
840 .await?;
841
842 Ok(())
843}
844
845#[implement(Service)]
846#[expect(clippy::too_many_arguments)]
847#[tracing::instrument(name = "make_join", level = "debug", skip_all)]
848async fn create_join_event(
849 &self,
850 room_id: &RoomId,
851 sender_user: &UserId,
852 join_event_stub: &RawJsonValue,
853 room_version_id: &RoomVersionId,
854 room_version_rules: &RoomVersionRules,
855 reason: Option<String>,
856 extra_content: Option<CanonicalJsonObject>,
857) -> Result<(CanonicalJsonObject, OwnedEventId, Option<OwnedUserId>)> {
858 let mut event: CanonicalJsonObject =
859 serde_json::from_str(join_event_stub.get()).map_err(|e| {
860 err!(BadServerResponse("Invalid make_join event json received from server: {e:?}"))
861 })?;
862
863 let join_authorized_via_users_server = room_version_rules
864 .authorization
865 .restricted_join_rule
866 .then(|| event.get("content"))
867 .flatten()
868 .and_then(|s| {
869 s.as_object()?
870 .get("join_authorised_via_users_server")
871 })
872 .and_then(|s| OwnedUserId::try_from(s.as_str().unwrap_or_default()).ok());
873
874 let mut content = RoomMemberEventContent {
875 reason,
876 join_authorized_via_users_server: join_authorized_via_users_server.clone(),
877 ..RoomMemberEventContent::new(MembershipState::Join)
878 };
879
880 self.services
881 .profile
882 .fill_profile_data(sender_user, &mut content)
883 .await;
884
885 let content = merge_member_content(content, extra_content.as_ref())?;
886
887 event.insert("content".into(), content);
888
889 event.insert(
890 "origin".into(),
891 CanonicalJsonValue::String(
892 self.services
893 .globals
894 .server_name()
895 .as_str()
896 .to_owned(),
897 ),
898 );
899
900 event.insert(
901 "origin_server_ts".into(),
902 CanonicalJsonValue::Integer(utils::millis_since_unix_epoch().try_into()?),
903 );
904
905 event.insert("room_id".into(), CanonicalJsonValue::String(room_id.as_str().into()));
906
907 event.insert("sender".into(), CanonicalJsonValue::String(sender_user.as_str().into()));
908
909 event.insert("state_key".into(), CanonicalJsonValue::String(sender_user.as_str().into()));
910
911 event.insert("type".into(), CanonicalJsonValue::String("m.room.member".into()));
912
913 let event_id = self
914 .services
915 .server_keys
916 .gen_id_hash_and_sign_event(&mut event, room_version_id)?;
917
918 check_rules(&event, &room_version_rules.event_format)?;
919
920 Ok((event, event_id, join_authorized_via_users_server))
921}
922
923fn merge_member_content(
925 content: RoomMemberEventContent,
926 extra_content: Option<&CanonicalJsonObject>,
927) -> Result<CanonicalJsonValue> {
928 let mut content = to_canonical_value(content)?;
929
930 if let (CanonicalJsonValue::Object(content), Some(extra_content)) =
931 (&mut content, extra_content)
932 {
933 for (key, value) in extra_content {
934 content
935 .entry(key.clone())
936 .or_insert_with(|| value.clone());
937 }
938 }
939
940 Ok(content)
941}
942
943#[implement(Service)]
944#[tracing::instrument(
945 name = "make_join",
946 level = "debug",
947 skip_all,
948 fields(?servers)
949)]
950async fn make_join_request(
951 &self,
952 sender_user: &UserId,
953 room_id: &RoomId,
954 servers: &[OwnedServerName],
955) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> {
956 let mut make_join_response_and_server =
957 Err!(BadServerResponse("No server available to assist in joining."));
958
959 let mut make_join_counter: usize = 0;
960 let mut incompatible_room_version_count: usize = 0;
961
962 for remote_server in servers {
963 if self
964 .services
965 .globals
966 .server_is_ours(remote_server)
967 {
968 continue;
969 }
970 info!("Asking {remote_server} for make_join ({make_join_counter})");
971 let make_join_response = self
972 .services
973 .federation
974 .execute(remote_server, federation::membership::prepare_join_event::v1::Request {
975 room_id: room_id.to_owned(),
976 user_id: sender_user.to_owned(),
977 ver: self
978 .services
979 .config
980 .supported_room_versions()
981 .map(at!(0))
982 .collect(),
983 })
984 .await;
985
986 trace!("make_join response: {make_join_response:?}");
987 make_join_counter = make_join_counter.saturating_add(1);
988
989 if let Err(ref e) = make_join_response {
990 if matches!(
991 e.kind(),
992 ErrorKind::IncompatibleRoomVersion { .. } | ErrorKind::UnsupportedRoomVersion
993 ) {
994 incompatible_room_version_count =
995 incompatible_room_version_count.saturating_add(1);
996 }
997
998 if incompatible_room_version_count > 15 {
999 info!(
1000 "15 servers have responded with M_INCOMPATIBLE_ROOM_VERSION or \
1001 M_UNSUPPORTED_ROOM_VERSION, assuming that tuwunel does not support the \
1002 room version {room_id}: {e}"
1003 );
1004
1005 make_join_response_and_server =
1006 Err!(BadServerResponse("Room version is not supported by tuwunel"));
1007
1008 return make_join_response_and_server;
1009 }
1010
1011 let max_attempts = self
1012 .services
1013 .config
1014 .max_make_join_attempts_per_join_attempt;
1015
1016 if make_join_counter >= max_attempts {
1017 warn!(?remote_server, "last make_join failure reason: {e}");
1018 warn!(
1019 "{max_attempts} servers failed to provide valid make_join response, \
1020 assuming no server can assist in joining."
1021 );
1022
1023 make_join_response_and_server =
1024 Err!(BadServerResponse("No server available to assist in joining."));
1025
1026 return make_join_response_and_server;
1027 }
1028 }
1029
1030 make_join_response_and_server = make_join_response.map(|r| (r, remote_server.clone()));
1031
1032 if make_join_response_and_server.is_ok() {
1033 break;
1034 }
1035 }
1036
1037 make_join_response_and_server
1038}
1039
1040pub(super) async fn get_servers_for_room(
1041 services: &Services,
1042 user_id: &UserId,
1043 room_id: &RoomId,
1044 orig_room_id: Option<&RoomOrAliasId>,
1045 via: &[OwnedServerName],
1046) -> Result<Vec<OwnedServerName>> {
1047 let mut additional_servers = services
1049 .state_cache
1050 .servers_invite_via(room_id)
1051 .map(ToOwned::to_owned)
1052 .collect::<Vec<_>>()
1053 .await;
1054
1055 additional_servers.extend(
1057 services
1058 .state_cache
1059 .invite_state(user_id, room_id)
1060 .await
1061 .unwrap_or_default()
1062 .iter()
1063 .filter_map(|event| event.get_field("sender").ok().flatten())
1064 .filter_map(|sender: &str| UserId::parse(sender).ok())
1065 .map(|user| user.server_name().to_owned()),
1066 );
1067
1068 let mut servers = Vec::from(via);
1069 shuffle(&mut servers);
1070
1071 let has_remote_via = via
1074 .iter()
1075 .any(|s| !services.globals.server_is_ours(s));
1076
1077 if !has_remote_via {
1078 if let Some(server_name) = room_id.server_name() {
1079 servers.insert(0, server_name.to_owned());
1080 }
1081
1082 if let Some(orig_room_id) = orig_room_id
1083 && let Some(orig_server_name) = orig_room_id.server_name()
1084 {
1085 servers.insert(0, orig_server_name.to_owned());
1086 }
1087 }
1088
1089 shuffle(&mut additional_servers);
1090
1091 servers.extend_from_slice(&additional_servers);
1092
1093 debug!(?servers);
1098
1099 let mut set = HashSet::new();
1101 servers.retain(|x| set.insert(x.clone()));
1102 debug!(?servers);
1103
1104 if !servers.is_empty() {
1106 for i in 0..servers.len() {
1107 if services
1108 .server
1109 .config
1110 .deprioritize_joins_through_servers
1111 .is_match(servers[i].host())
1112 {
1113 let server = servers.remove(i);
1114 servers.push(server);
1115 }
1116 }
1117 }
1118
1119 debug_info!(?servers);
1120 Ok(servers)
1121}