tuwunel_service/rooms/timeline/
build.rs1use std::{collections::HashSet, iter::once};
2
3use futures::{FutureExt, StreamExt};
4use ruma::{
5 OwnedEventId, OwnedServerName, RoomId, UserId,
6 events::{
7 TimelineEventType,
8 room::member::{MembershipState, RoomMemberEventContent},
9 },
10};
11use serde_json::value::to_raw_value;
12use tuwunel_core::{
13 Err, Result, implement,
14 matrix::{event::Event, pdu::PduBuilder, room_version},
15 utils::{IterStream, ReadyExt},
16};
17
18use super::RoomMutexGuard;
19
20#[implement(super::Service)]
24#[tracing::instrument(
25 name = "build_and_append"
26 level = "debug",
27 skip(self, state_lock),
28 ret,
29)]
30pub async fn build_and_append_pdu(
31 &self,
32 mut pdu_builder: PduBuilder,
33 sender: &UserId,
34 room_id: &RoomId,
35 state_lock: &RoomMutexGuard,
36) -> Result<OwnedEventId> {
37 if pdu_builder.event_type == TimelineEventType::RoomMember {
38 self.sanitize_member_authorisation(&mut pdu_builder, room_id)
39 .boxed()
40 .await?;
41 }
42
43 let (pdu, mut pdu_json) = self
44 .create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)
45 .await?;
46
47 self.check_pdu_for_suspended_sender(&pdu)
48 .boxed()
49 .await?;
50
51 if *pdu.kind() == TimelineEventType::RoomCreate && pdu.room_id().server_name().is_none() {
53 let _short_id = self
54 .services
55 .short
56 .get_or_create_shortroomid(pdu.room_id())
57 .await;
58 }
59
60 if self
61 .services
62 .admin
63 .is_admin_room(pdu.room_id())
64 .await
65 {
66 self.check_pdu_for_admin_room(&pdu, sender)
67 .boxed()
68 .await?;
69 }
70
71 if *pdu.kind() == TimelineEventType::RoomRedaction {
73 let room_version = self
74 .services
75 .state
76 .get_room_version(pdu.room_id())
77 .await?;
78
79 let room_rules = room_version::rules(&room_version)?;
80
81 let redacts_id = pdu.redacts_id(&room_rules);
82
83 if let Some(redacts_id) = &redacts_id
84 && !self
85 .services
86 .state_accessor
87 .user_can_redact(redacts_id, pdu.sender(), pdu.room_id(), false)
88 .await?
89 {
90 return Err!(Request(Forbidden("User cannot redact this event.")));
91 }
92 }
93
94 self.services
97 .event_handler
98 .sign_outgoing_pdu(&mut pdu_json, &pdu)
99 .boxed()
100 .await?;
101
102 let statehashid = self.services.state.append_to_state(&pdu).await?;
106
107 let pdu_id = self
108 .append_pdu(
109 &pdu,
110 pdu_json,
111 once(pdu.event_id()),
114 state_lock,
115 )
116 .boxed()
117 .await?;
118
119 self.services
122 .state
123 .set_room_state(pdu.room_id(), statehashid, state_lock);
124
125 let mut servers: HashSet<OwnedServerName> = self
126 .services
127 .state_cache
128 .room_servers(pdu.room_id())
129 .map(ToOwned::to_owned)
130 .collect()
131 .await;
132
133 if *pdu.kind() == TimelineEventType::RoomMember
136 && let Some(state_key_uid) = &pdu
137 .state_key
138 .as_ref()
139 .and_then(|state_key| UserId::parse(state_key.as_str()).ok())
140 {
141 servers.insert(state_key_uid.server_name().to_owned());
142 }
143
144 servers.remove(self.services.globals.server_name());
147
148 self.services
149 .sending
150 .send_pdu_servers(servers.iter().map(AsRef::as_ref).stream(), &pdu_id)
151 .await?;
152
153 Ok(pdu.event_id().to_owned())
154}
155
156#[implement(super::Service)]
157#[tracing::instrument(skip_all, level = "debug")]
158async fn sanitize_member_authorisation(
159 &self,
160 pdu_builder: &mut PduBuilder,
161 room_id: &RoomId,
162) -> Result {
163 let content: RoomMemberEventContent = pdu_builder.content.deserialize_as_unchecked()?;
164
165 let Some(authorising_user) = &content.join_authorized_via_users_server else {
166 return Ok(());
167 };
168
169 if content.membership != MembershipState::Join {
170 return Err!(Request(BadJson(
171 "join_authorised_via_users_server is only for member joins"
172 )));
173 }
174
175 if let Some(target) = pdu_builder
177 .state_key
178 .as_deref()
179 .and_then(|key| UserId::parse(key).ok())
180 && self
181 .services
182 .state_cache
183 .user_membership(&target, room_id)
184 .await
185 .is_some_and(|m| matches!(m, MembershipState::Join | MembershipState::Invite))
186 {
187 let mut object = pdu_builder.content.deserialize()?;
188 object.remove("join_authorised_via_users_server");
189 pdu_builder.content = to_raw_value(&object)?.into();
190
191 return Ok(());
192 }
193
194 if !self
195 .services
196 .globals
197 .user_is_local(authorising_user)
198 {
199 return Err!(Request(InvalidParam(
200 "Authorising user does not belong to this homeserver"
201 )));
202 }
203
204 Ok(())
205}
206
207#[implement(super::Service)]
208#[tracing::instrument(skip_all, level = "debug")]
209async fn check_pdu_for_admin_room<Pdu>(&self, pdu: &Pdu, sender: &UserId) -> Result
210where
211 Pdu: Event,
212{
213 match pdu.kind() {
214 | TimelineEventType::RoomEncryption => {
215 return Err!(Request(Forbidden(error!("Encryption not supported in admins room."))));
216 },
217 | TimelineEventType::RoomMember => {
218 let target = pdu
219 .state_key()
220 .filter(|v| v.starts_with('@'))
221 .unwrap_or(sender.as_str());
222
223 let server_user = &self.services.globals.server_user.to_string();
224
225 let content: RoomMemberEventContent = pdu.get_content()?;
226 match content.membership {
227 | MembershipState::Leave => {
228 if target == server_user {
229 return Err!(Request(Forbidden(error!(
230 "Server user cannot leave the admins room."
231 ))));
232 }
233
234 let count = self
235 .services
236 .state_cache
237 .room_members(pdu.room_id())
238 .ready_filter(|user| self.services.globals.user_is_local(user))
239 .ready_filter(|user| *user != target)
240 .count()
241 .boxed()
242 .await;
243
244 if count < 2 {
245 return Err!(Request(Forbidden(error!(
246 "Last admin cannot leave the admins room."
247 ))));
248 }
249 },
250
251 | MembershipState::Ban if pdu.state_key().is_some() => {
252 if target == server_user {
253 return Err!(Request(Forbidden(error!(
254 "Server cannot be banned from admins room."
255 ))));
256 }
257
258 let count = self
259 .services
260 .state_cache
261 .room_members(pdu.room_id())
262 .ready_filter(|user| self.services.globals.user_is_local(user))
263 .ready_filter(|user| *user != target)
264 .count()
265 .boxed()
266 .await;
267
268 if count < 2 {
269 return Err!(Request(Forbidden(error!(
270 "Last admin cannot be banned from admins room."
271 ))));
272 }
273 },
274 | _ => {},
275 }
276 },
277 | _ => {},
278 }
279
280 Ok(())
281}
282
283#[implement(super::Service)]
287#[tracing::instrument(skip_all, level = "debug")]
288async fn check_pdu_for_suspended_sender<Pdu>(&self, pdu: &Pdu) -> Result
289where
290 Pdu: Event,
291{
292 if !self
293 .services
294 .users
295 .is_suspended(pdu.sender())
296 .await
297 {
298 return Ok(());
299 }
300
301 let allowed = match pdu.kind() {
302 | TimelineEventType::RoomRedaction => self.is_self_redaction(pdu).await?,
303
304 | TimelineEventType::RoomMember =>
305 pdu.get_content()
306 .map(|content: RoomMemberEventContent| {
307 content.membership == MembershipState::Leave
308 && pdu.state_key() == Some(pdu.sender().as_str())
309 })?,
310
311 | _ => false,
312 };
313
314 if allowed {
315 return Ok(());
316 }
317
318 Err!(Request(UserSuspended("Account is suspended.")))
319}
320
321#[implement(super::Service)]
322async fn is_self_redaction<Pdu>(&self, pdu: &Pdu) -> Result<bool>
323where
324 Pdu: Event,
325{
326 let room_version = self
327 .services
328 .state
329 .get_room_version(pdu.room_id())
330 .await?;
331
332 let room_rules = room_version::rules(&room_version)?;
333
334 let Some(target_id) = pdu.redacts_id(&room_rules) else {
335 return Ok(false);
336 };
337
338 let is_self = self
339 .get_pdu(&target_id)
340 .await
341 .is_ok_and(|target| target.sender() == pdu.sender());
342
343 Ok(is_self)
344}