Skip to main content

tuwunel_service/appservice/
append.rs

1use futures::{FutureExt, StreamExt, TryFutureExt};
2use ruma::{UserId, events::TimelineEventType};
3use tuwunel_core::{
4	Result, error, implement,
5	matrix::{
6		event::Event,
7		pdu::{Pdu, RawPduId},
8	},
9	utils::{
10		ReadyExt,
11		stream::{IterStream, automatic_width},
12	},
13};
14
15use super::RegistrationInfo;
16
17/// Called by timeline::append() after accepting new PDU.
18#[implement(super::Service)]
19#[tracing::instrument(name = "append", level = "debug", skip_all)]
20pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
21	let guard = self.read().await;
22
23	guard
24		.values()
25		.stream()
26		.for_each_concurrent(automatic_width(), |appservice| {
27			self.append_pdu_to(appservice, pdu_id, pdu)
28				.inspect_err(|e| {
29					error!(
30						event_id = %pdu.event_id(),
31						appservice = ?appservice.registration.id,
32						"Failed to send PDU to appservice: {e}"
33					);
34				})
35				.map(drop)
36		})
37		.await;
38
39	Ok(())
40}
41
42#[implement(super::Service)]
43#[tracing::instrument(
44	name = "append_to",
45	level = "debug",
46	skip_all,
47	fields(id = %appservice.registration.id),
48)]
49async fn append_pdu_to(
50	&self,
51	appservice: &RegistrationInfo,
52	pdu_id: RawPduId,
53	pdu: &Pdu,
54) -> Result {
55	if self.should_append_to(appservice, pdu).await {
56		self.services
57			.sending
58			.send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?;
59	}
60
61	Ok(())
62}
63
64#[implement(super::Service)]
65async fn should_append_to(&self, appservice: &RegistrationInfo, pdu: &Pdu) -> bool {
66	if self
67		.services
68		.state_cache
69		.appservice_in_room(pdu.room_id(), appservice)
70		.await
71	{
72		return true;
73	}
74
75	if appservice.is_user_match(pdu.sender()) {
76		return true;
77	}
78
79	if *pdu.kind() == TimelineEventType::RoomMember
80		&& pdu
81			.state_key
82			.as_ref()
83			.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
84			.is_some_and(|user_id| appservice.is_user_match(&user_id))
85	{
86		return true;
87	}
88
89	if self
90		.services
91		.alias
92		.local_aliases_for_room(pdu.room_id())
93		.ready_any(|room_alias| appservice.aliases.is_match(room_alias.as_str()))
94		.await
95	{
96		return true;
97	}
98
99	if appservice.rooms.is_match(pdu.room_id().as_str()) {
100		return true;
101	}
102
103	false
104}