Skip to main content

tuwunel_service/rooms/timeline/
build.rs

1use std::{collections::HashSet, iter::once};
2
3use futures::{FutureExt, StreamExt};
4use ruma::{
5	OwnedEventId, OwnedServerName, RoomId, UserId,
6	events::{
7		TimelineEventType,
8		room::member::{MembershipState, RoomMemberEventContent},
9	},
10};
11use serde_json::value::to_raw_value;
12use tuwunel_core::{
13	Err, Result, implement,
14	matrix::{event::Event, pdu::PduBuilder, room_version},
15	utils::{IterStream, ReadyExt},
16};
17
18use super::RoomMutexGuard;
19
20/// Creates a new persisted data unit and adds it to a room. This function
21/// takes a roomid_mutex_state, meaning that only this function is able to
22/// mutate the room state.
23#[implement(super::Service)]
24#[tracing::instrument(
25	name = "build_and_append"
26	level = "debug",
27	skip(self, state_lock),
28	ret,
29)]
30pub async fn build_and_append_pdu(
31	&self,
32	mut pdu_builder: PduBuilder,
33	sender: &UserId,
34	room_id: &RoomId,
35	state_lock: &RoomMutexGuard,
36) -> Result<OwnedEventId> {
37	if pdu_builder.event_type == TimelineEventType::RoomMember {
38		self.sanitize_member_authorisation(&mut pdu_builder, room_id)
39			.boxed()
40			.await?;
41	}
42
43	let (pdu, mut pdu_json) = self
44		.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)
45		.await?;
46
47	self.check_pdu_for_suspended_sender(&pdu)
48		.boxed()
49		.await?;
50
51	//TODO: Use proper room version here
52	if *pdu.kind() == TimelineEventType::RoomCreate && pdu.room_id().server_name().is_none() {
53		let _short_id = self
54			.services
55			.short
56			.get_or_create_shortroomid(pdu.room_id())
57			.await;
58	}
59
60	if self
61		.services
62		.admin
63		.is_admin_room(pdu.room_id())
64		.await
65	{
66		self.check_pdu_for_admin_room(&pdu, sender)
67			.boxed()
68			.await?;
69	}
70
71	// If redaction event is not authorized, do not append it to the timeline
72	if *pdu.kind() == TimelineEventType::RoomRedaction {
73		let room_version = self
74			.services
75			.state
76			.get_room_version(pdu.room_id())
77			.await?;
78
79		let room_rules = room_version::rules(&room_version)?;
80
81		let redacts_id = pdu.redacts_id(&room_rules);
82
83		if let Some(redacts_id) = &redacts_id
84			&& !self
85				.services
86				.state_accessor
87				.user_can_redact(redacts_id, pdu.sender(), pdu.room_id(), false)
88				.await?
89		{
90			return Err!(Request(Forbidden("User cannot redact this event.")));
91		}
92	}
93
94	// MSC4284: ask the room's policy server (if any) to sign this event before
95	// federating it. Refusal aborts; fail-open on transport errors.
96	self.services
97		.event_handler
98		.sign_outgoing_pdu(&mut pdu_json, &pdu)
99		.boxed()
100		.await?;
101
102	// We append to state before appending the pdu, so we don't have a moment in
103	// time with the pdu without it's state. This is okay because append_pdu can't
104	// fail.
105	let statehashid = self.services.state.append_to_state(&pdu).await?;
106
107	let pdu_id = self
108		.append_pdu(
109			&pdu,
110			pdu_json,
111			// Since this PDU references all pdu_leaves we can update the leaves
112			// of the room
113			once(pdu.event_id()),
114			state_lock,
115		)
116		.boxed()
117		.await?;
118
119	// We set the room state after inserting the pdu, so that we never have a moment
120	// in time where events in the current room state do not exist
121	self.services
122		.state
123		.set_room_state(pdu.room_id(), statehashid, state_lock);
124
125	let mut servers: HashSet<OwnedServerName> = self
126		.services
127		.state_cache
128		.room_servers(pdu.room_id())
129		.map(ToOwned::to_owned)
130		.collect()
131		.await;
132
133	// In case we are kicking or banning a user, we need to inform their server of
134	// the change
135	if *pdu.kind() == TimelineEventType::RoomMember
136		&& let Some(state_key_uid) = &pdu
137			.state_key
138			.as_ref()
139			.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
140	{
141		servers.insert(state_key_uid.server_name().to_owned());
142	}
143
144	// Remove our server from the server list since it will be added to it by
145	// room_servers() and/or the if statement above
146	servers.remove(self.services.globals.server_name());
147
148	self.services
149		.sending
150		.send_pdu_servers(servers.iter().map(AsRef::as_ref).stream(), &pdu_id)
151		.await?;
152
153	Ok(pdu.event_id().to_owned())
154}
155
156#[implement(super::Service)]
157#[tracing::instrument(skip_all, level = "debug")]
158async fn sanitize_member_authorisation(
159	&self,
160	pdu_builder: &mut PduBuilder,
161	room_id: &RoomId,
162) -> Result {
163	let content: RoomMemberEventContent = pdu_builder.content.deserialize_as_unchecked()?;
164
165	let Some(authorising_user) = &content.join_authorized_via_users_server else {
166		return Ok(());
167	};
168
169	if content.membership != MembershipState::Join {
170		return Err!(Request(BadJson(
171			"join_authorised_via_users_server is only for member joins"
172		)));
173	}
174
175	// Already joined or invited: strip the inapplicable authorising user.
176	if let Some(target) = pdu_builder
177		.state_key
178		.as_deref()
179		.and_then(|key| UserId::parse(key).ok())
180		&& self
181			.services
182			.state_cache
183			.user_membership(&target, room_id)
184			.await
185			.is_some_and(|m| matches!(m, MembershipState::Join | MembershipState::Invite))
186	{
187		let mut object = pdu_builder.content.deserialize()?;
188		object.remove("join_authorised_via_users_server");
189		pdu_builder.content = to_raw_value(&object)?.into();
190
191		return Ok(());
192	}
193
194	if !self
195		.services
196		.globals
197		.user_is_local(authorising_user)
198	{
199		return Err!(Request(InvalidParam(
200			"Authorising user does not belong to this homeserver"
201		)));
202	}
203
204	Ok(())
205}
206
207#[implement(super::Service)]
208#[tracing::instrument(skip_all, level = "debug")]
209async fn check_pdu_for_admin_room<Pdu>(&self, pdu: &Pdu, sender: &UserId) -> Result
210where
211	Pdu: Event,
212{
213	match pdu.kind() {
214		| TimelineEventType::RoomEncryption => {
215			return Err!(Request(Forbidden(error!("Encryption not supported in admins room."))));
216		},
217		| TimelineEventType::RoomMember => {
218			let target = pdu
219				.state_key()
220				.filter(|v| v.starts_with('@'))
221				.unwrap_or(sender.as_str());
222
223			let server_user = &self.services.globals.server_user.to_string();
224
225			let content: RoomMemberEventContent = pdu.get_content()?;
226			match content.membership {
227				| MembershipState::Leave => {
228					if target == server_user {
229						return Err!(Request(Forbidden(error!(
230							"Server user cannot leave the admins room."
231						))));
232					}
233
234					let count = self
235						.services
236						.state_cache
237						.room_members(pdu.room_id())
238						.ready_filter(|user| self.services.globals.user_is_local(user))
239						.ready_filter(|user| *user != target)
240						.count()
241						.boxed()
242						.await;
243
244					if count < 2 {
245						return Err!(Request(Forbidden(error!(
246							"Last admin cannot leave the admins room."
247						))));
248					}
249				},
250
251				| MembershipState::Ban if pdu.state_key().is_some() => {
252					if target == server_user {
253						return Err!(Request(Forbidden(error!(
254							"Server cannot be banned from admins room."
255						))));
256					}
257
258					let count = self
259						.services
260						.state_cache
261						.room_members(pdu.room_id())
262						.ready_filter(|user| self.services.globals.user_is_local(user))
263						.ready_filter(|user| *user != target)
264						.count()
265						.boxed()
266						.await;
267
268					if count < 2 {
269						return Err!(Request(Forbidden(error!(
270							"Last admin cannot be banned from admins room."
271						))));
272					}
273				},
274				| _ => {},
275			}
276		},
277		| _ => {},
278	}
279
280	Ok(())
281}
282
283/// MSC3823: reject PDUs from a suspended sender, except self-redaction of
284/// their own event or self-leave. Synapse only checks `membership == leave`
285/// and so lets suspended moderators kick others via /kick or a state PUT.
286#[implement(super::Service)]
287#[tracing::instrument(skip_all, level = "debug")]
288async fn check_pdu_for_suspended_sender<Pdu>(&self, pdu: &Pdu) -> Result
289where
290	Pdu: Event,
291{
292	if !self
293		.services
294		.users
295		.is_suspended(pdu.sender())
296		.await
297	{
298		return Ok(());
299	}
300
301	let allowed = match pdu.kind() {
302		| TimelineEventType::RoomRedaction => self.is_self_redaction(pdu).await?,
303
304		| TimelineEventType::RoomMember =>
305			pdu.get_content()
306				.map(|content: RoomMemberEventContent| {
307					content.membership == MembershipState::Leave
308						&& pdu.state_key() == Some(pdu.sender().as_str())
309				})?,
310
311		| _ => false,
312	};
313
314	if allowed {
315		return Ok(());
316	}
317
318	Err!(Request(UserSuspended("Account is suspended.")))
319}
320
321#[implement(super::Service)]
322async fn is_self_redaction<Pdu>(&self, pdu: &Pdu) -> Result<bool>
323where
324	Pdu: Event,
325{
326	let room_version = self
327		.services
328		.state
329		.get_room_version(pdu.room_id())
330		.await?;
331
332	let room_rules = room_version::rules(&room_version)?;
333
334	let Some(target_id) = pdu.redacts_id(&room_rules) else {
335		return Ok(false);
336	};
337
338	let is_self = self
339		.get_pdu(&target_id)
340		.await
341		.is_ok_and(|target| target.sender() == pdu.sender());
342
343	Ok(is_self)
344}