Skip to main content

tuwunel_service/rooms/timeline/
append.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use ruma::{
4	CanonicalJsonObject, CanonicalJsonValue, EventId, UserId,
5	events::{
6		TimelineEventType,
7		receipt::ReceiptThread,
8		relation::RelationType,
9		room::{
10			encrypted::Relation,
11			member::{MembershipState, RoomMemberEventContent},
12		},
13	},
14};
15use tuwunel_core::{
16	Result, err, error, implement,
17	matrix::{
18		event::Event,
19		pdu::{PduCount, PduEvent, PduId, RawPduId},
20		room_version,
21	},
22	utils::{self, result::LogErr},
23};
24use tuwunel_database::Json;
25
26use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard, bias_count};
27use crate::rooms::{
28	short::ShortRoomId, state_accessor::plain_text_topic, state_compressor::CompressedState,
29};
30
31/// Append the incoming event setting the state snapshot to the state from
32/// the server that sent the event.
33#[implement(super::Service)]
34#[tracing::instrument(
35	name = "append_incoming",
36	level = "debug",
37	skip_all,
38	ret(Debug)
39)]
40pub(crate) async fn append_incoming_pdu<'a, Leafs>(
41	&'a self,
42	pdu: &'a PduEvent,
43	pdu_json: CanonicalJsonObject,
44	new_room_leafs: Leafs,
45	state_ids_compressed: Arc<CompressedState>,
46	soft_fail: bool,
47	state_lock: &'a RoomMutexGuard,
48) -> Result<Option<RawPduId>>
49where
50	Leafs: Iterator<Item = &'a EventId> + Send + 'a,
51{
52	// We append to state before appending the pdu, so we don't have a moment in
53	// time with the pdu without it's state. This is okay because append_pdu can't
54	// fail.
55	self.services
56		.state
57		.set_event_state(&pdu.event_id, &pdu.room_id, state_ids_compressed)
58		.await?;
59
60	if soft_fail {
61		self.services
62			.pdu_metadata
63			.mark_as_referenced(&pdu.room_id, pdu.prev_events.iter().map(AsRef::as_ref));
64
65		self.services
66			.state
67			.set_forward_extremities(&pdu.room_id, new_room_leafs, state_lock)
68			.await;
69
70		return Ok(None);
71	}
72
73	let pdu_id = self
74		.append_pdu(pdu, pdu_json, new_room_leafs, state_lock)
75		.await?;
76
77	Ok(Some(pdu_id))
78}
79
80/// Creates a new persisted data unit and adds it to a room.
81///
82/// By this point the incoming event should be fully authenticated, no auth
83/// happens in `append_pdu`.
84///
85/// Returns pdu id
86#[implement(super::Service)]
87#[tracing::instrument(name = "append", level = "debug", skip_all, ret(Debug))]
88pub async fn append_pdu<'a, Leafs>(
89	&'a self,
90	pdu: &'a PduEvent,
91	mut pdu_json: CanonicalJsonObject,
92	leafs: Leafs,
93	state_lock: &'a RoomMutexGuard,
94) -> Result<RawPduId>
95where
96	Leafs: Iterator<Item = &'a EventId> + Send + 'a,
97{
98	// Coalesce database writes for the remainder of this scope.
99	let _cork = self.db.db.cork_and_flush();
100
101	let shortroomid = self
102		.services
103		.short
104		.get_shortroomid(pdu.room_id())
105		.await
106		.map_err(|_| err!(Database("Room does not exist")))?;
107
108	// Make unsigned fields correct. This is not properly documented in the spec,
109	// but state events need to have previous content in the unsigned field, so
110	// clients can easily interpret things like membership changes
111	if let Some(state_key) = pdu.state_key() {
112		if let CanonicalJsonValue::Object(unsigned) = pdu_json
113			.entry("unsigned".into())
114			.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()))
115		{
116			if let Ok(shortstatehash) = self
117				.services
118				.state
119				.pdu_shortstatehash(pdu.event_id())
120				.await && let Ok(prev_state) = self
121				.services
122				.state_accessor
123				.state_get(shortstatehash, &pdu.kind().to_string().into(), state_key)
124				.await
125			{
126				unsigned.insert(
127					"prev_content".into(),
128					CanonicalJsonValue::Object(
129						utils::to_canonical_object(prev_state.get_content_as_value()).map_err(
130							|e| {
131								err!(Database(error!(
132									"Failed to convert prev_state to canonical JSON: {e}",
133								)))
134							},
135						)?,
136					),
137				);
138				unsigned.insert(
139					"prev_sender".into(),
140					CanonicalJsonValue::String(prev_state.sender().to_string()),
141				);
142				unsigned.insert(
143					"replaces_state".into(),
144					CanonicalJsonValue::String(prev_state.event_id().to_string()),
145				);
146			}
147		} else {
148			error!("Invalid unsigned type in pdu.");
149		}
150	}
151
152	// We must keep track of all events that have been referenced.
153	self.services
154		.pdu_metadata
155		.mark_as_referenced(pdu.room_id(), pdu.prev_events().map(AsRef::as_ref));
156
157	self.services
158		.state
159		.set_forward_extremities(pdu.room_id(), leafs, state_lock)
160		.await;
161
162	let insert_lock = self.mutex_insert.lock(pdu.room_id()).await;
163	let next_count1 = self.services.globals.next_count();
164	let next_count2 = self.services.globals.next_count();
165
166	// Mark as read first so the sending client doesn't get a notification even if
167	// appending fails. Route through the dispatcher so per-thread counts are
168	// also cleared; the sender's own send subsumes any thread receipt.
169	self.services
170		.read_receipt
171		.private_read_set(pdu.room_id(), pdu.sender(), *next_count2, &ReceiptThread::Unthreaded)
172		.await;
173
174	self.services
175		.pusher
176		.reset_notification_counts_for_thread(
177			pdu.sender(),
178			pdu.room_id(),
179			&ReceiptThread::Unthreaded,
180		)
181		.await;
182
183	let count = PduCount::Normal(*next_count1);
184	let pdu_id: RawPduId = PduId { shortroomid, count }.into();
185
186	// Insert pdu
187	self.append_pdu_json(&pdu_id, pdu, &pdu_json);
188
189	drop(insert_lock);
190
191	self.services
192		.pusher
193		.append_pdu(pdu_id, pdu)
194		.await
195		.log_err()
196		.ok();
197
198	self.append_pdu_effects(pdu_id, pdu, shortroomid, count, state_lock)
199		.await?;
200
201	drop(next_count1);
202	drop(next_count2);
203
204	self.services
205		.appservice
206		.append_pdu(pdu_id, pdu)
207		.await
208		.log_err()
209		.ok();
210
211	Ok(pdu_id)
212}
213
214#[implement(super::Service)]
215async fn append_pdu_effects(
216	&self,
217	pdu_id: RawPduId,
218	pdu: &PduEvent,
219	shortroomid: ShortRoomId,
220	count: PduCount,
221	state_lock: &RoomMutexGuard,
222) -> Result {
223	match *pdu.kind() {
224		| TimelineEventType::RoomRedaction => {
225			let room_version = self
226				.services
227				.state
228				.get_room_version(pdu.room_id())
229				.await?;
230
231			let room_rules = room_version::rules(&room_version)?;
232
233			let redacts_id = pdu.redacts_id(&room_rules);
234
235			if let Some(redacts_id) = &redacts_id
236				&& self
237					.services
238					.state_accessor
239					.user_can_redact(redacts_id, pdu.sender(), pdu.room_id(), false)
240					.await?
241			{
242				self.redact_pdu(redacts_id, pdu, shortroomid, state_lock)
243					.await?;
244			}
245		},
246		| TimelineEventType::SpaceChild =>
247			if let Some(_state_key) = pdu.state_key() {
248				self.services.spaces.cache_evict(pdu.room_id());
249			},
250		| TimelineEventType::RoomMember => {
251			if let Some(state_key) = pdu.state_key() {
252				// if the state_key fails
253				let target_user_id =
254					UserId::parse(state_key).expect("This state_key was previously validated");
255
256				let content: RoomMemberEventContent = pdu.get_content()?;
257				let stripped_state = match content.membership {
258					| MembershipState::Invite | MembershipState::Knock => self
259						.services
260						.state
261						.summary_stripped(pdu)
262						.await
263						.into(),
264					| _ => None,
265				};
266
267				// Update our membership info, we do this here incase a user is invited or
268				// knocked and immediately leaves we need the DB to record the invite or
269				// knock event for auth
270				self.services
271					.state_cache
272					.update_membership(
273						pdu.room_id(),
274						&target_user_id,
275						content,
276						pdu.sender(),
277						stripped_state,
278						None,
279						true,
280						count,
281					)
282					.await?;
283			}
284		},
285		| TimelineEventType::RoomMessage => {
286			let content: ExtractBody = pdu.get_content()?;
287			if let Some(body) = content.body {
288				self.services
289					.search
290					.index_pdu(shortroomid, &pdu_id, &body);
291
292				if self
293					.services
294					.admin
295					.is_admin_command(pdu, &body)
296					.await
297				{
298					self.services
299						.admin
300						.command(body, Some((pdu.event_id()).into()))
301						.await?;
302				}
303			}
304		},
305		| TimelineEventType::RoomTopic =>
306			if let Some(topic) = pdu.get_content().ok().and_then(plain_text_topic) {
307				self.services
308					.search
309					.index_pdu(shortroomid, &pdu_id, &topic);
310			},
311		| _ => {},
312	}
313
314	if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>()
315		&& let Ok(related_pducount) = self
316			.get_pdu_count(&content.relates_to.event_id)
317			.await
318	{
319		self.services
320			.pdu_metadata
321			.add_relation(count, related_pducount);
322	}
323
324	if let Ok(content) = pdu.get_content::<ExtractRelatesTo>() {
325		match content.relates_to {
326			| Relation::Reply(ruma::events::relation::Reply { in_reply_to }) => {
327				// We need to do it again here, because replies don't have
328				// event_id as a top level field
329				if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await {
330					self.services
331						.pdu_metadata
332						.add_relation(count, related_pducount);
333				}
334			},
335			| Relation::Thread(thread) => {
336				self.services
337					.threads
338					.add_to_thread(&thread.event_id, pdu)
339					.await?;
340			},
341			| Relation::Replacement(replacement) => {
342				self.services
343					.pdu_metadata
344					.add_typed_relation(
345						shortroomid,
346						count,
347						&replacement.event_id,
348						pdu,
349						RelationType::Replacement,
350					)
351					.await;
352			},
353			| Relation::Reference(reference) => {
354				self.services
355					.pdu_metadata
356					.add_typed_relation(
357						shortroomid,
358						count,
359						&reference.event_id,
360						pdu,
361						RelationType::Reference,
362					)
363					.await;
364			},
365			| _ => {}, // TODO: Aggregate other types
366		}
367	}
368
369	Ok(())
370}
371
372#[implement(super::Service)]
373fn append_pdu_json(&self, pdu_id: &RawPduId, pdu: &PduEvent, json: &CanonicalJsonObject) {
374	debug_assert!(matches!(pdu_id.pdu_count(), PduCount::Normal(_)), "PduCount not Normal");
375
376	self.db.pduid_pdu.raw_put(pdu_id, Json(json));
377
378	self.db
379		.eventid_pduid
380		.insert(pdu.event_id.as_bytes(), pdu_id);
381
382	self.db
383		.eventid_outlierpdu
384		.remove(pdu.event_id.as_bytes());
385
386	let ts = u64::from(pdu.origin_server_ts);
387	let count_key = bias_count(pdu_id.count());
388
389	self.db
390		.roomid_tscount_pducount
391		.put_raw((pdu.room_id(), ts, count_key), pdu_id.count());
392}