Skip to main content

tuwunel_service/rooms/timeline/
build.rs

1use std::{collections::HashSet, iter::once};
2
3use futures::{FutureExt, StreamExt};
4use ruma::{
5	OwnedEventId, OwnedServerName, RoomId, UserId,
6	events::{
7		TimelineEventType,
8		room::member::{MembershipState, RoomMemberEventContent},
9	},
10};
11use tuwunel_core::{
12	Err, Result, implement,
13	matrix::{event::Event, pdu::PduBuilder, room_version},
14	utils::{IterStream, ReadyExt},
15};
16
17use super::RoomMutexGuard;
18
19/// Creates a new persisted data unit and adds it to a room. This function
20/// takes a roomid_mutex_state, meaning that only this function is able to
21/// mutate the room state.
22#[implement(super::Service)]
23#[tracing::instrument(
24	name = "build_and_append"
25	level = "debug",
26	skip(self, state_lock),
27	ret,
28)]
29pub async fn build_and_append_pdu(
30	&self,
31	pdu_builder: PduBuilder,
32	sender: &UserId,
33	room_id: &RoomId,
34	state_lock: &RoomMutexGuard,
35) -> Result<OwnedEventId> {
36	let (pdu, mut pdu_json) = self
37		.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)
38		.await?;
39
40	self.check_pdu_for_suspended_sender(&pdu)
41		.boxed()
42		.await?;
43
44	//TODO: Use proper room version here
45	if *pdu.kind() == TimelineEventType::RoomCreate && pdu.room_id().server_name().is_none() {
46		let _short_id = self
47			.services
48			.short
49			.get_or_create_shortroomid(pdu.room_id())
50			.await;
51	}
52
53	if self
54		.services
55		.admin
56		.is_admin_room(pdu.room_id())
57		.await
58	{
59		self.check_pdu_for_admin_room(&pdu, sender)
60			.boxed()
61			.await?;
62	}
63
64	// If redaction event is not authorized, do not append it to the timeline
65	if *pdu.kind() == TimelineEventType::RoomRedaction {
66		let room_version = self
67			.services
68			.state
69			.get_room_version(pdu.room_id())
70			.await?;
71
72		let room_rules = room_version::rules(&room_version)?;
73
74		let redacts_id = pdu.redacts_id(&room_rules);
75
76		if let Some(redacts_id) = &redacts_id
77			&& !self
78				.services
79				.state_accessor
80				.user_can_redact(redacts_id, pdu.sender(), pdu.room_id(), false)
81				.await?
82		{
83			return Err!(Request(Forbidden("User cannot redact this event.")));
84		}
85	}
86
87	if *pdu.kind() == TimelineEventType::RoomMember {
88		let content: RoomMemberEventContent = pdu.get_content()?;
89
90		if content.join_authorized_via_users_server.is_some()
91			&& content.membership != MembershipState::Join
92		{
93			return Err!(Request(BadJson(
94				"join_authorised_via_users_server is only for member joins"
95			)));
96		}
97
98		if content
99			.join_authorized_via_users_server
100			.as_ref()
101			.is_some_and(|authorising_user| {
102				!self
103					.services
104					.globals
105					.user_is_local(authorising_user)
106			}) {
107			return Err!(Request(InvalidParam(
108				"Authorising user does not belong to this homeserver"
109			)));
110		}
111	}
112
113	// MSC4284: ask the room's policy server (if any) to sign this event before
114	// federating it. Refusal aborts; fail-open on transport errors.
115	self.services
116		.event_handler
117		.sign_outgoing_pdu(&mut pdu_json, &pdu)
118		.boxed()
119		.await?;
120
121	// We append to state before appending the pdu, so we don't have a moment in
122	// time with the pdu without it's state. This is okay because append_pdu can't
123	// fail.
124	let statehashid = self.services.state.append_to_state(&pdu).await?;
125
126	let pdu_id = self
127		.append_pdu(
128			&pdu,
129			pdu_json,
130			// Since this PDU references all pdu_leaves we can update the leaves
131			// of the room
132			once(pdu.event_id()),
133			state_lock,
134		)
135		.boxed()
136		.await?;
137
138	// We set the room state after inserting the pdu, so that we never have a moment
139	// in time where events in the current room state do not exist
140	self.services
141		.state
142		.set_room_state(pdu.room_id(), statehashid, state_lock);
143
144	let mut servers: HashSet<OwnedServerName> = self
145		.services
146		.state_cache
147		.room_servers(pdu.room_id())
148		.map(ToOwned::to_owned)
149		.collect()
150		.await;
151
152	// In case we are kicking or banning a user, we need to inform their server of
153	// the change
154	if *pdu.kind() == TimelineEventType::RoomMember
155		&& let Some(state_key_uid) = &pdu
156			.state_key
157			.as_ref()
158			.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
159	{
160		servers.insert(state_key_uid.server_name().to_owned());
161	}
162
163	// Remove our server from the server list since it will be added to it by
164	// room_servers() and/or the if statement above
165	servers.remove(self.services.globals.server_name());
166
167	self.services
168		.sending
169		.send_pdu_servers(servers.iter().map(AsRef::as_ref).stream(), &pdu_id)
170		.await?;
171
172	Ok(pdu.event_id().to_owned())
173}
174
175#[implement(super::Service)]
176#[tracing::instrument(skip_all, level = "debug")]
177async fn check_pdu_for_admin_room<Pdu>(&self, pdu: &Pdu, sender: &UserId) -> Result
178where
179	Pdu: Event,
180{
181	match pdu.kind() {
182		| TimelineEventType::RoomEncryption => {
183			return Err!(Request(Forbidden(error!("Encryption not supported in admins room."))));
184		},
185		| TimelineEventType::RoomMember => {
186			let target = pdu
187				.state_key()
188				.filter(|v| v.starts_with('@'))
189				.unwrap_or(sender.as_str());
190
191			let server_user = &self.services.globals.server_user.to_string();
192
193			let content: RoomMemberEventContent = pdu.get_content()?;
194			match content.membership {
195				| MembershipState::Leave => {
196					if target == server_user {
197						return Err!(Request(Forbidden(error!(
198							"Server user cannot leave the admins room."
199						))));
200					}
201
202					let count = self
203						.services
204						.state_cache
205						.room_members(pdu.room_id())
206						.ready_filter(|user| self.services.globals.user_is_local(user))
207						.ready_filter(|user| *user != target)
208						.count()
209						.boxed()
210						.await;
211
212					if count < 2 {
213						return Err!(Request(Forbidden(error!(
214							"Last admin cannot leave the admins room."
215						))));
216					}
217				},
218
219				| MembershipState::Ban if pdu.state_key().is_some() => {
220					if target == server_user {
221						return Err!(Request(Forbidden(error!(
222							"Server cannot be banned from admins room."
223						))));
224					}
225
226					let count = self
227						.services
228						.state_cache
229						.room_members(pdu.room_id())
230						.ready_filter(|user| self.services.globals.user_is_local(user))
231						.ready_filter(|user| *user != target)
232						.count()
233						.boxed()
234						.await;
235
236					if count < 2 {
237						return Err!(Request(Forbidden(error!(
238							"Last admin cannot be banned from admins room."
239						))));
240					}
241				},
242				| _ => {},
243			}
244		},
245		| _ => {},
246	}
247
248	Ok(())
249}
250
251/// MSC3823: reject PDUs from a suspended sender, except self-redaction of
252/// their own event or self-leave. Synapse only checks `membership == leave`
253/// and so lets suspended moderators kick others via /kick or a state PUT.
254#[implement(super::Service)]
255#[tracing::instrument(skip_all, level = "debug")]
256async fn check_pdu_for_suspended_sender<Pdu>(&self, pdu: &Pdu) -> Result
257where
258	Pdu: Event,
259{
260	if !self
261		.services
262		.users
263		.is_suspended(pdu.sender())
264		.await
265	{
266		return Ok(());
267	}
268
269	let allowed = match pdu.kind() {
270		| TimelineEventType::RoomRedaction => self.is_self_redaction(pdu).await?,
271
272		| TimelineEventType::RoomMember =>
273			pdu.get_content()
274				.map(|content: RoomMemberEventContent| {
275					content.membership == MembershipState::Leave
276						&& pdu.state_key() == Some(pdu.sender().as_str())
277				})?,
278
279		| _ => false,
280	};
281
282	if allowed {
283		return Ok(());
284	}
285
286	Err!(Request(UserSuspended("Account is suspended.")))
287}
288
289#[implement(super::Service)]
290async fn is_self_redaction<Pdu>(&self, pdu: &Pdu) -> Result<bool>
291where
292	Pdu: Event,
293{
294	let room_version = self
295		.services
296		.state
297		.get_room_version(pdu.room_id())
298		.await?;
299
300	let room_rules = room_version::rules(&room_version)?;
301
302	let Some(target_id) = pdu.redacts_id(&room_rules) else {
303		return Ok(false);
304	};
305
306	let is_self = self
307		.get_pdu(&target_id)
308		.await
309		.is_ok_and(|target| target.sender() == pdu.sender());
310
311	Ok(is_self)
312}