tuwunel_service/rooms/timeline/
append.rs1use std::{collections::BTreeMap, sync::Arc};
2
3use ruma::{
4 CanonicalJsonObject, CanonicalJsonValue, EventId, UserId,
5 events::{
6 TimelineEventType,
7 receipt::ReceiptThread,
8 room::{
9 encrypted::Relation,
10 member::{MembershipState, RoomMemberEventContent},
11 },
12 },
13};
14use tuwunel_core::{
15 Result, err, error, implement,
16 matrix::{
17 event::Event,
18 pdu::{PduCount, PduEvent, PduId, RawPduId},
19 room_version,
20 },
21 utils::{self, result::LogErr},
22};
23use tuwunel_database::Json;
24
25use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard};
26use crate::rooms::{short::ShortRoomId, state_compressor::CompressedState};
27
28#[implement(super::Service)]
31#[tracing::instrument(
32 name = "append_incoming",
33 level = "debug",
34 skip_all,
35 ret(Debug)
36)]
37pub(crate) async fn append_incoming_pdu<'a, Leafs>(
38 &'a self,
39 pdu: &'a PduEvent,
40 pdu_json: CanonicalJsonObject,
41 new_room_leafs: Leafs,
42 state_ids_compressed: Arc<CompressedState>,
43 soft_fail: bool,
44 state_lock: &'a RoomMutexGuard,
45) -> Result<Option<RawPduId>>
46where
47 Leafs: Iterator<Item = &'a EventId> + Send + 'a,
48{
49 self.services
53 .state
54 .set_event_state(&pdu.event_id, &pdu.room_id, state_ids_compressed)
55 .await?;
56
57 if soft_fail {
58 self.services
59 .pdu_metadata
60 .mark_as_referenced(&pdu.room_id, pdu.prev_events.iter().map(AsRef::as_ref));
61
62 self.services
63 .state
64 .set_forward_extremities(&pdu.room_id, new_room_leafs, state_lock)
65 .await;
66
67 return Ok(None);
68 }
69
70 let pdu_id = self
71 .append_pdu(pdu, pdu_json, new_room_leafs, state_lock)
72 .await?;
73
74 Ok(Some(pdu_id))
75}
76
77#[implement(super::Service)]
84#[tracing::instrument(name = "append", level = "debug", skip_all, ret(Debug))]
85pub async fn append_pdu<'a, Leafs>(
86 &'a self,
87 pdu: &'a PduEvent,
88 mut pdu_json: CanonicalJsonObject,
89 leafs: Leafs,
90 state_lock: &'a RoomMutexGuard,
91) -> Result<RawPduId>
92where
93 Leafs: Iterator<Item = &'a EventId> + Send + 'a,
94{
95 let _cork = self.db.db.cork_and_flush();
97
98 let shortroomid = self
99 .services
100 .short
101 .get_shortroomid(pdu.room_id())
102 .await
103 .map_err(|_| err!(Database("Room does not exist")))?;
104
105 if let Some(state_key) = pdu.state_key() {
109 if let CanonicalJsonValue::Object(unsigned) = pdu_json
110 .entry("unsigned".into())
111 .or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()))
112 {
113 if let Ok(shortstatehash) = self
114 .services
115 .state
116 .pdu_shortstatehash(pdu.event_id())
117 .await && let Ok(prev_state) = self
118 .services
119 .state_accessor
120 .state_get(shortstatehash, &pdu.kind().to_string().into(), state_key)
121 .await
122 {
123 unsigned.insert(
124 "prev_content".into(),
125 CanonicalJsonValue::Object(
126 utils::to_canonical_object(prev_state.get_content_as_value()).map_err(
127 |e| {
128 err!(Database(error!(
129 "Failed to convert prev_state to canonical JSON: {e}",
130 )))
131 },
132 )?,
133 ),
134 );
135 unsigned.insert(
136 "prev_sender".into(),
137 CanonicalJsonValue::String(prev_state.sender().to_string()),
138 );
139 unsigned.insert(
140 "replaces_state".into(),
141 CanonicalJsonValue::String(prev_state.event_id().to_string()),
142 );
143 }
144 } else {
145 error!("Invalid unsigned type in pdu.");
146 }
147 }
148
149 self.services
151 .pdu_metadata
152 .mark_as_referenced(pdu.room_id(), pdu.prev_events().map(AsRef::as_ref));
153
154 self.services
155 .state
156 .set_forward_extremities(pdu.room_id(), leafs, state_lock)
157 .await;
158
159 let insert_lock = self.mutex_insert.lock(pdu.room_id()).await;
160 let next_count1 = self.services.globals.next_count();
161 let next_count2 = self.services.globals.next_count();
162
163 self.services
167 .read_receipt
168 .private_read_set(pdu.room_id(), pdu.sender(), *next_count2, &ReceiptThread::Unthreaded)
169 .await;
170
171 self.services
172 .pusher
173 .reset_notification_counts_for_thread(
174 pdu.sender(),
175 pdu.room_id(),
176 &ReceiptThread::Unthreaded,
177 )
178 .await;
179
180 let count = PduCount::Normal(*next_count1);
181 let pdu_id: RawPduId = PduId { shortroomid, count }.into();
182
183 self.append_pdu_json(&pdu_id, pdu, &pdu_json);
185
186 drop(insert_lock);
187
188 self.services
189 .pusher
190 .append_pdu(pdu_id, pdu)
191 .await
192 .log_err()
193 .ok();
194
195 self.append_pdu_effects(pdu_id, pdu, shortroomid, count, state_lock)
196 .await?;
197
198 drop(next_count1);
199 drop(next_count2);
200
201 self.services
202 .appservice
203 .append_pdu(pdu_id, pdu)
204 .await
205 .log_err()
206 .ok();
207
208 Ok(pdu_id)
209}
210
211#[implement(super::Service)]
212async fn append_pdu_effects(
213 &self,
214 pdu_id: RawPduId,
215 pdu: &PduEvent,
216 shortroomid: ShortRoomId,
217 count: PduCount,
218 state_lock: &RoomMutexGuard,
219) -> Result {
220 match *pdu.kind() {
221 | TimelineEventType::RoomRedaction => {
222 let room_version = self
223 .services
224 .state
225 .get_room_version(pdu.room_id())
226 .await?;
227
228 let room_rules = room_version::rules(&room_version)?;
229
230 let redacts_id = pdu.redacts_id(&room_rules);
231
232 if let Some(redacts_id) = &redacts_id
233 && self
234 .services
235 .state_accessor
236 .user_can_redact(redacts_id, pdu.sender(), pdu.room_id(), false)
237 .await?
238 {
239 self.redact_pdu(redacts_id, pdu, shortroomid, state_lock)
240 .await?;
241 }
242 },
243 | TimelineEventType::SpaceChild =>
244 if let Some(_state_key) = pdu.state_key() {
245 self.services.spaces.cache_evict(pdu.room_id());
246 },
247 | TimelineEventType::RoomMember => {
248 if let Some(state_key) = pdu.state_key() {
249 let target_user_id =
251 UserId::parse(state_key).expect("This state_key was previously validated");
252
253 let content: RoomMemberEventContent = pdu.get_content()?;
254 let stripped_state = match content.membership {
255 | MembershipState::Invite | MembershipState::Knock => self
256 .services
257 .state
258 .summary_stripped(pdu)
259 .await
260 .into(),
261 | _ => None,
262 };
263
264 self.services
268 .state_cache
269 .update_membership(
270 pdu.room_id(),
271 &target_user_id,
272 content,
273 pdu.sender(),
274 stripped_state,
275 None,
276 true,
277 count,
278 )
279 .await?;
280 }
281 },
282 | TimelineEventType::RoomMessage => {
283 let content: ExtractBody = pdu.get_content()?;
284 if let Some(body) = content.body {
285 self.services
286 .search
287 .index_pdu(shortroomid, &pdu_id, &body);
288
289 if self
290 .services
291 .admin
292 .is_admin_command(pdu, &body)
293 .await
294 {
295 self.services
296 .admin
297 .command(body, Some((pdu.event_id()).into()))
298 .await?;
299 }
300 }
301 },
302 | _ => {},
303 }
304
305 if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>()
306 && let Ok(related_pducount) = self
307 .get_pdu_count(&content.relates_to.event_id)
308 .await
309 {
310 self.services
311 .pdu_metadata
312 .add_relation(count, related_pducount);
313 }
314
315 if let Ok(content) = pdu.get_content::<ExtractRelatesTo>() {
316 match content.relates_to {
317 | Relation::Reply(ruma::events::relation::Reply { in_reply_to }) => {
318 if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await {
321 self.services
322 .pdu_metadata
323 .add_relation(count, related_pducount);
324 }
325 },
326 | Relation::Thread(thread) => {
327 self.services
328 .threads
329 .add_to_thread(&thread.event_id, pdu)
330 .await?;
331 },
332 | _ => {}, }
334 }
335
336 Ok(())
337}
338
339#[implement(super::Service)]
340fn append_pdu_json(&self, pdu_id: &RawPduId, pdu: &PduEvent, json: &CanonicalJsonObject) {
341 debug_assert!(matches!(pdu_id.pdu_count(), PduCount::Normal(_)), "PduCount not Normal");
342
343 self.db.pduid_pdu.raw_put(pdu_id, Json(json));
344
345 self.db
346 .eventid_pduid
347 .insert(pdu.event_id.as_bytes(), pdu_id);
348
349 self.db
350 .eventid_outlierpdu
351 .remove(pdu.event_id.as_bytes());
352
353 let ts = u64::from(pdu.origin_server_ts);
354 self.db
355 .roomid_ts_pducount
356 .put_raw((pdu.room_id(), ts), pdu_id.count());
357}