tuwunel_service/rooms/timeline/
append.rs1use std::{collections::BTreeMap, sync::Arc};
2
3use ruma::{
4 CanonicalJsonObject, CanonicalJsonValue, EventId, UserId,
5 events::{
6 TimelineEventType,
7 receipt::ReceiptThread,
8 relation::RelationType,
9 room::{
10 encrypted::Relation,
11 member::{MembershipState, RoomMemberEventContent},
12 },
13 },
14};
15use tuwunel_core::{
16 Result, err, error, implement,
17 matrix::{
18 event::Event,
19 pdu::{PduCount, PduEvent, PduId, RawPduId},
20 room_version,
21 },
22 utils::{self, result::LogErr},
23};
24use tuwunel_database::Json;
25
26use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard, bias_count};
27use crate::rooms::{
28 short::ShortRoomId, state_accessor::plain_text_topic, state_compressor::CompressedState,
29};
30
31#[implement(super::Service)]
34#[tracing::instrument(
35 name = "append_incoming",
36 level = "debug",
37 skip_all,
38 ret(Debug)
39)]
40pub(crate) async fn append_incoming_pdu<'a, Leafs>(
41 &'a self,
42 pdu: &'a PduEvent,
43 pdu_json: CanonicalJsonObject,
44 new_room_leafs: Leafs,
45 state_ids_compressed: Arc<CompressedState>,
46 soft_fail: bool,
47 state_lock: &'a RoomMutexGuard,
48) -> Result<Option<RawPduId>>
49where
50 Leafs: Iterator<Item = &'a EventId> + Send + 'a,
51{
52 self.services
56 .state
57 .set_event_state(&pdu.event_id, &pdu.room_id, state_ids_compressed)
58 .await?;
59
60 if soft_fail {
61 self.services
62 .pdu_metadata
63 .mark_as_referenced(&pdu.room_id, pdu.prev_events.iter().map(AsRef::as_ref));
64
65 self.services
66 .state
67 .set_forward_extremities(&pdu.room_id, new_room_leafs, state_lock)
68 .await;
69
70 return Ok(None);
71 }
72
73 let pdu_id = self
74 .append_pdu(pdu, pdu_json, new_room_leafs, state_lock)
75 .await?;
76
77 Ok(Some(pdu_id))
78}
79
80#[implement(super::Service)]
87#[tracing::instrument(name = "append", level = "debug", skip_all, ret(Debug))]
88pub async fn append_pdu<'a, Leafs>(
89 &'a self,
90 pdu: &'a PduEvent,
91 mut pdu_json: CanonicalJsonObject,
92 leafs: Leafs,
93 state_lock: &'a RoomMutexGuard,
94) -> Result<RawPduId>
95where
96 Leafs: Iterator<Item = &'a EventId> + Send + 'a,
97{
98 let _cork = self.db.db.cork_and_flush();
100
101 let shortroomid = self
102 .services
103 .short
104 .get_shortroomid(pdu.room_id())
105 .await
106 .map_err(|_| err!(Database("Room does not exist")))?;
107
108 if let Some(state_key) = pdu.state_key() {
112 if let CanonicalJsonValue::Object(unsigned) = pdu_json
113 .entry("unsigned".into())
114 .or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()))
115 {
116 if let Ok(shortstatehash) = self
117 .services
118 .state
119 .pdu_shortstatehash(pdu.event_id())
120 .await && let Ok(prev_state) = self
121 .services
122 .state_accessor
123 .state_get(shortstatehash, &pdu.kind().to_string().into(), state_key)
124 .await
125 {
126 unsigned.insert(
127 "prev_content".into(),
128 CanonicalJsonValue::Object(
129 utils::to_canonical_object(prev_state.get_content_as_value()).map_err(
130 |e| {
131 err!(Database(error!(
132 "Failed to convert prev_state to canonical JSON: {e}",
133 )))
134 },
135 )?,
136 ),
137 );
138 unsigned.insert(
139 "prev_sender".into(),
140 CanonicalJsonValue::String(prev_state.sender().to_string()),
141 );
142 unsigned.insert(
143 "replaces_state".into(),
144 CanonicalJsonValue::String(prev_state.event_id().to_string()),
145 );
146 }
147 } else {
148 error!("Invalid unsigned type in pdu.");
149 }
150 }
151
152 self.services
154 .pdu_metadata
155 .mark_as_referenced(pdu.room_id(), pdu.prev_events().map(AsRef::as_ref));
156
157 self.services
158 .state
159 .set_forward_extremities(pdu.room_id(), leafs, state_lock)
160 .await;
161
162 let insert_lock = self.mutex_insert.lock(pdu.room_id()).await;
163 let next_count1 = self.services.globals.next_count();
164 let next_count2 = self.services.globals.next_count();
165
166 self.services
170 .read_receipt
171 .private_read_set(pdu.room_id(), pdu.sender(), *next_count2, &ReceiptThread::Unthreaded)
172 .await;
173
174 self.services
175 .pusher
176 .reset_notification_counts_for_thread(
177 pdu.sender(),
178 pdu.room_id(),
179 &ReceiptThread::Unthreaded,
180 )
181 .await;
182
183 let count = PduCount::Normal(*next_count1);
184 let pdu_id: RawPduId = PduId { shortroomid, count }.into();
185
186 self.append_pdu_json(&pdu_id, pdu, &pdu_json);
188
189 drop(insert_lock);
190
191 self.services
192 .pusher
193 .append_pdu(pdu_id, pdu)
194 .await
195 .log_err()
196 .ok();
197
198 self.append_pdu_effects(pdu_id, pdu, shortroomid, count, state_lock)
199 .await?;
200
201 drop(next_count1);
202 drop(next_count2);
203
204 self.services
205 .appservice
206 .append_pdu(pdu_id, pdu)
207 .await
208 .log_err()
209 .ok();
210
211 Ok(pdu_id)
212}
213
214#[implement(super::Service)]
215async fn append_pdu_effects(
216 &self,
217 pdu_id: RawPduId,
218 pdu: &PduEvent,
219 shortroomid: ShortRoomId,
220 count: PduCount,
221 state_lock: &RoomMutexGuard,
222) -> Result {
223 match *pdu.kind() {
224 | TimelineEventType::RoomRedaction => {
225 let room_version = self
226 .services
227 .state
228 .get_room_version(pdu.room_id())
229 .await?;
230
231 let room_rules = room_version::rules(&room_version)?;
232
233 let redacts_id = pdu.redacts_id(&room_rules);
234
235 if let Some(redacts_id) = &redacts_id
236 && self
237 .services
238 .state_accessor
239 .user_can_redact(redacts_id, pdu.sender(), pdu.room_id(), false)
240 .await?
241 {
242 self.redact_pdu(redacts_id, pdu, shortroomid, state_lock)
243 .await?;
244 }
245 },
246 | TimelineEventType::SpaceChild =>
247 if let Some(_state_key) = pdu.state_key() {
248 self.services.spaces.cache_evict(pdu.room_id());
249 },
250 | TimelineEventType::RoomMember => {
251 if let Some(state_key) = pdu.state_key() {
252 let target_user_id =
254 UserId::parse(state_key).expect("This state_key was previously validated");
255
256 let content: RoomMemberEventContent = pdu.get_content()?;
257 let stripped_state = match content.membership {
258 | MembershipState::Invite | MembershipState::Knock => self
259 .services
260 .state
261 .summary_stripped(pdu)
262 .await
263 .into(),
264 | _ => None,
265 };
266
267 self.services
271 .state_cache
272 .update_membership(
273 pdu.room_id(),
274 &target_user_id,
275 content,
276 pdu.sender(),
277 stripped_state,
278 None,
279 true,
280 count,
281 )
282 .await?;
283 }
284 },
285 | TimelineEventType::RoomMessage => {
286 let content: ExtractBody = pdu.get_content()?;
287 if let Some(body) = content.body {
288 self.services
289 .search
290 .index_pdu(shortroomid, &pdu_id, &body);
291
292 if self
293 .services
294 .admin
295 .is_admin_command(pdu, &body)
296 .await
297 {
298 self.services
299 .admin
300 .command(body, Some((pdu.event_id()).into()))
301 .await?;
302 }
303 }
304 },
305 | TimelineEventType::RoomTopic =>
306 if let Some(topic) = pdu.get_content().ok().and_then(plain_text_topic) {
307 self.services
308 .search
309 .index_pdu(shortroomid, &pdu_id, &topic);
310 },
311 | _ => {},
312 }
313
314 if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>()
315 && let Ok(related_pducount) = self
316 .get_pdu_count(&content.relates_to.event_id)
317 .await
318 {
319 self.services
320 .pdu_metadata
321 .add_relation(count, related_pducount);
322 }
323
324 if let Ok(content) = pdu.get_content::<ExtractRelatesTo>() {
325 match content.relates_to {
326 | Relation::Reply(ruma::events::relation::Reply { in_reply_to }) => {
327 if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await {
330 self.services
331 .pdu_metadata
332 .add_relation(count, related_pducount);
333 }
334 },
335 | Relation::Thread(thread) => {
336 self.services
337 .threads
338 .add_to_thread(&thread.event_id, pdu)
339 .await?;
340 },
341 | Relation::Replacement(replacement) => {
342 self.services
343 .pdu_metadata
344 .add_typed_relation(
345 shortroomid,
346 count,
347 &replacement.event_id,
348 pdu,
349 RelationType::Replacement,
350 )
351 .await;
352 },
353 | Relation::Reference(reference) => {
354 self.services
355 .pdu_metadata
356 .add_typed_relation(
357 shortroomid,
358 count,
359 &reference.event_id,
360 pdu,
361 RelationType::Reference,
362 )
363 .await;
364 },
365 | _ => {}, }
367 }
368
369 Ok(())
370}
371
372#[implement(super::Service)]
373fn append_pdu_json(&self, pdu_id: &RawPduId, pdu: &PduEvent, json: &CanonicalJsonObject) {
374 debug_assert!(matches!(pdu_id.pdu_count(), PduCount::Normal(_)), "PduCount not Normal");
375
376 self.db.pduid_pdu.raw_put(pdu_id, Json(json));
377
378 self.db
379 .eventid_pduid
380 .insert(pdu.event_id.as_bytes(), pdu_id);
381
382 self.db
383 .eventid_outlierpdu
384 .remove(pdu.event_id.as_bytes());
385
386 let ts = u64::from(pdu.origin_server_ts);
387 let count_key = bias_count(pdu_id.count());
388
389 self.db
390 .roomid_tscount_pducount
391 .put_raw((pdu.room_id(), ts, count_key), pdu_id.count());
392}