tuwunel_service/rooms/timeline/
build.rs1use std::{collections::HashSet, iter::once};
2
3use futures::{FutureExt, StreamExt};
4use ruma::{
5 OwnedEventId, OwnedServerName, RoomId, UserId,
6 events::{
7 TimelineEventType,
8 room::member::{MembershipState, RoomMemberEventContent},
9 },
10};
11use tuwunel_core::{
12 Err, Result, implement,
13 matrix::{event::Event, pdu::PduBuilder, room_version},
14 utils::{IterStream, ReadyExt},
15};
16
17use super::RoomMutexGuard;
18
19#[implement(super::Service)]
23#[tracing::instrument(
24 name = "build_and_append"
25 level = "debug",
26 skip(self, state_lock),
27 ret,
28)]
29pub async fn build_and_append_pdu(
30 &self,
31 pdu_builder: PduBuilder,
32 sender: &UserId,
33 room_id: &RoomId,
34 state_lock: &RoomMutexGuard,
35) -> Result<OwnedEventId> {
36 let (pdu, mut pdu_json) = self
37 .create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)
38 .await?;
39
40 self.check_pdu_for_suspended_sender(&pdu)
41 .boxed()
42 .await?;
43
44 if *pdu.kind() == TimelineEventType::RoomCreate && pdu.room_id().server_name().is_none() {
46 let _short_id = self
47 .services
48 .short
49 .get_or_create_shortroomid(pdu.room_id())
50 .await;
51 }
52
53 if self
54 .services
55 .admin
56 .is_admin_room(pdu.room_id())
57 .await
58 {
59 self.check_pdu_for_admin_room(&pdu, sender)
60 .boxed()
61 .await?;
62 }
63
64 if *pdu.kind() == TimelineEventType::RoomRedaction {
66 let room_version = self
67 .services
68 .state
69 .get_room_version(pdu.room_id())
70 .await?;
71
72 let room_rules = room_version::rules(&room_version)?;
73
74 let redacts_id = pdu.redacts_id(&room_rules);
75
76 if let Some(redacts_id) = &redacts_id
77 && !self
78 .services
79 .state_accessor
80 .user_can_redact(redacts_id, pdu.sender(), pdu.room_id(), false)
81 .await?
82 {
83 return Err!(Request(Forbidden("User cannot redact this event.")));
84 }
85 }
86
87 if *pdu.kind() == TimelineEventType::RoomMember {
88 let content: RoomMemberEventContent = pdu.get_content()?;
89
90 if content.join_authorized_via_users_server.is_some()
91 && content.membership != MembershipState::Join
92 {
93 return Err!(Request(BadJson(
94 "join_authorised_via_users_server is only for member joins"
95 )));
96 }
97
98 if content
99 .join_authorized_via_users_server
100 .as_ref()
101 .is_some_and(|authorising_user| {
102 !self
103 .services
104 .globals
105 .user_is_local(authorising_user)
106 }) {
107 return Err!(Request(InvalidParam(
108 "Authorising user does not belong to this homeserver"
109 )));
110 }
111 }
112
113 self.services
116 .event_handler
117 .sign_outgoing_pdu(&mut pdu_json, &pdu)
118 .boxed()
119 .await?;
120
121 let statehashid = self.services.state.append_to_state(&pdu).await?;
125
126 let pdu_id = self
127 .append_pdu(
128 &pdu,
129 pdu_json,
130 once(pdu.event_id()),
133 state_lock,
134 )
135 .boxed()
136 .await?;
137
138 self.services
141 .state
142 .set_room_state(pdu.room_id(), statehashid, state_lock);
143
144 let mut servers: HashSet<OwnedServerName> = self
145 .services
146 .state_cache
147 .room_servers(pdu.room_id())
148 .map(ToOwned::to_owned)
149 .collect()
150 .await;
151
152 if *pdu.kind() == TimelineEventType::RoomMember
155 && let Some(state_key_uid) = &pdu
156 .state_key
157 .as_ref()
158 .and_then(|state_key| UserId::parse(state_key.as_str()).ok())
159 {
160 servers.insert(state_key_uid.server_name().to_owned());
161 }
162
163 servers.remove(self.services.globals.server_name());
166
167 self.services
168 .sending
169 .send_pdu_servers(servers.iter().map(AsRef::as_ref).stream(), &pdu_id)
170 .await?;
171
172 Ok(pdu.event_id().to_owned())
173}
174
175#[implement(super::Service)]
176#[tracing::instrument(skip_all, level = "debug")]
177async fn check_pdu_for_admin_room<Pdu>(&self, pdu: &Pdu, sender: &UserId) -> Result
178where
179 Pdu: Event,
180{
181 match pdu.kind() {
182 | TimelineEventType::RoomEncryption => {
183 return Err!(Request(Forbidden(error!("Encryption not supported in admins room."))));
184 },
185 | TimelineEventType::RoomMember => {
186 let target = pdu
187 .state_key()
188 .filter(|v| v.starts_with('@'))
189 .unwrap_or(sender.as_str());
190
191 let server_user = &self.services.globals.server_user.to_string();
192
193 let content: RoomMemberEventContent = pdu.get_content()?;
194 match content.membership {
195 | MembershipState::Leave => {
196 if target == server_user {
197 return Err!(Request(Forbidden(error!(
198 "Server user cannot leave the admins room."
199 ))));
200 }
201
202 let count = self
203 .services
204 .state_cache
205 .room_members(pdu.room_id())
206 .ready_filter(|user| self.services.globals.user_is_local(user))
207 .ready_filter(|user| *user != target)
208 .count()
209 .boxed()
210 .await;
211
212 if count < 2 {
213 return Err!(Request(Forbidden(error!(
214 "Last admin cannot leave the admins room."
215 ))));
216 }
217 },
218
219 | MembershipState::Ban if pdu.state_key().is_some() => {
220 if target == server_user {
221 return Err!(Request(Forbidden(error!(
222 "Server cannot be banned from admins room."
223 ))));
224 }
225
226 let count = self
227 .services
228 .state_cache
229 .room_members(pdu.room_id())
230 .ready_filter(|user| self.services.globals.user_is_local(user))
231 .ready_filter(|user| *user != target)
232 .count()
233 .boxed()
234 .await;
235
236 if count < 2 {
237 return Err!(Request(Forbidden(error!(
238 "Last admin cannot be banned from admins room."
239 ))));
240 }
241 },
242 | _ => {},
243 }
244 },
245 | _ => {},
246 }
247
248 Ok(())
249}
250
251#[implement(super::Service)]
255#[tracing::instrument(skip_all, level = "debug")]
256async fn check_pdu_for_suspended_sender<Pdu>(&self, pdu: &Pdu) -> Result
257where
258 Pdu: Event,
259{
260 if !self
261 .services
262 .users
263 .is_suspended(pdu.sender())
264 .await
265 {
266 return Ok(());
267 }
268
269 let allowed = match pdu.kind() {
270 | TimelineEventType::RoomRedaction => self.is_self_redaction(pdu).await?,
271
272 | TimelineEventType::RoomMember =>
273 pdu.get_content()
274 .map(|content: RoomMemberEventContent| {
275 content.membership == MembershipState::Leave
276 && pdu.state_key() == Some(pdu.sender().as_str())
277 })?,
278
279 | _ => false,
280 };
281
282 if allowed {
283 return Ok(());
284 }
285
286 Err!(Request(UserSuspended("Account is suspended.")))
287}
288
289#[implement(super::Service)]
290async fn is_self_redaction<Pdu>(&self, pdu: &Pdu) -> Result<bool>
291where
292 Pdu: Event,
293{
294 let room_version = self
295 .services
296 .state
297 .get_room_version(pdu.room_id())
298 .await?;
299
300 let room_rules = room_version::rules(&room_version)?;
301
302 let Some(target_id) = pdu.redacts_id(&room_rules) else {
303 return Ok(false);
304 };
305
306 let is_self = self
307 .get_pdu(&target_id)
308 .await
309 .is_ok_and(|target| target.sender() == pdu.sender());
310
311 Ok(is_self)
312}