Skip to main content

tuwunel_service/rooms/timeline/
append.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use ruma::{
4	CanonicalJsonObject, CanonicalJsonValue, EventId, UserId,
5	events::{
6		TimelineEventType,
7		receipt::ReceiptThread,
8		room::{
9			encrypted::Relation,
10			member::{MembershipState, RoomMemberEventContent},
11		},
12	},
13};
14use tuwunel_core::{
15	Result, err, error, implement,
16	matrix::{
17		event::Event,
18		pdu::{PduCount, PduEvent, PduId, RawPduId},
19		room_version,
20	},
21	utils::{self, result::LogErr},
22};
23use tuwunel_database::Json;
24
25use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard};
26use crate::rooms::{short::ShortRoomId, state_compressor::CompressedState};
27
28/// Append the incoming event setting the state snapshot to the state from
29/// the server that sent the event.
30#[implement(super::Service)]
31#[tracing::instrument(
32	name = "append_incoming",
33	level = "debug",
34	skip_all,
35	ret(Debug)
36)]
37pub(crate) async fn append_incoming_pdu<'a, Leafs>(
38	&'a self,
39	pdu: &'a PduEvent,
40	pdu_json: CanonicalJsonObject,
41	new_room_leafs: Leafs,
42	state_ids_compressed: Arc<CompressedState>,
43	soft_fail: bool,
44	state_lock: &'a RoomMutexGuard,
45) -> Result<Option<RawPduId>>
46where
47	Leafs: Iterator<Item = &'a EventId> + Send + 'a,
48{
49	// We append to state before appending the pdu, so we don't have a moment in
50	// time with the pdu without it's state. This is okay because append_pdu can't
51	// fail.
52	self.services
53		.state
54		.set_event_state(&pdu.event_id, &pdu.room_id, state_ids_compressed)
55		.await?;
56
57	if soft_fail {
58		self.services
59			.pdu_metadata
60			.mark_as_referenced(&pdu.room_id, pdu.prev_events.iter().map(AsRef::as_ref));
61
62		self.services
63			.state
64			.set_forward_extremities(&pdu.room_id, new_room_leafs, state_lock)
65			.await;
66
67		return Ok(None);
68	}
69
70	let pdu_id = self
71		.append_pdu(pdu, pdu_json, new_room_leafs, state_lock)
72		.await?;
73
74	Ok(Some(pdu_id))
75}
76
77/// Creates a new persisted data unit and adds it to a room.
78///
79/// By this point the incoming event should be fully authenticated, no auth
80/// happens in `append_pdu`.
81///
82/// Returns pdu id
83#[implement(super::Service)]
84#[tracing::instrument(name = "append", level = "debug", skip_all, ret(Debug))]
85pub async fn append_pdu<'a, Leafs>(
86	&'a self,
87	pdu: &'a PduEvent,
88	mut pdu_json: CanonicalJsonObject,
89	leafs: Leafs,
90	state_lock: &'a RoomMutexGuard,
91) -> Result<RawPduId>
92where
93	Leafs: Iterator<Item = &'a EventId> + Send + 'a,
94{
95	// Coalesce database writes for the remainder of this scope.
96	let _cork = self.db.db.cork_and_flush();
97
98	let shortroomid = self
99		.services
100		.short
101		.get_shortroomid(pdu.room_id())
102		.await
103		.map_err(|_| err!(Database("Room does not exist")))?;
104
105	// Make unsigned fields correct. This is not properly documented in the spec,
106	// but state events need to have previous content in the unsigned field, so
107	// clients can easily interpret things like membership changes
108	if let Some(state_key) = pdu.state_key() {
109		if let CanonicalJsonValue::Object(unsigned) = pdu_json
110			.entry("unsigned".into())
111			.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()))
112		{
113			if let Ok(shortstatehash) = self
114				.services
115				.state
116				.pdu_shortstatehash(pdu.event_id())
117				.await && let Ok(prev_state) = self
118				.services
119				.state_accessor
120				.state_get(shortstatehash, &pdu.kind().to_string().into(), state_key)
121				.await
122			{
123				unsigned.insert(
124					"prev_content".into(),
125					CanonicalJsonValue::Object(
126						utils::to_canonical_object(prev_state.get_content_as_value()).map_err(
127							|e| {
128								err!(Database(error!(
129									"Failed to convert prev_state to canonical JSON: {e}",
130								)))
131							},
132						)?,
133					),
134				);
135				unsigned.insert(
136					"prev_sender".into(),
137					CanonicalJsonValue::String(prev_state.sender().to_string()),
138				);
139				unsigned.insert(
140					"replaces_state".into(),
141					CanonicalJsonValue::String(prev_state.event_id().to_string()),
142				);
143			}
144		} else {
145			error!("Invalid unsigned type in pdu.");
146		}
147	}
148
149	// We must keep track of all events that have been referenced.
150	self.services
151		.pdu_metadata
152		.mark_as_referenced(pdu.room_id(), pdu.prev_events().map(AsRef::as_ref));
153
154	self.services
155		.state
156		.set_forward_extremities(pdu.room_id(), leafs, state_lock)
157		.await;
158
159	let insert_lock = self.mutex_insert.lock(pdu.room_id()).await;
160	let next_count1 = self.services.globals.next_count();
161	let next_count2 = self.services.globals.next_count();
162
163	// Mark as read first so the sending client doesn't get a notification even if
164	// appending fails. Route through the dispatcher so per-thread counts are
165	// also cleared; the sender's own send subsumes any thread receipt.
166	self.services
167		.read_receipt
168		.private_read_set(pdu.room_id(), pdu.sender(), *next_count2, &ReceiptThread::Unthreaded)
169		.await;
170
171	self.services
172		.pusher
173		.reset_notification_counts_for_thread(
174			pdu.sender(),
175			pdu.room_id(),
176			&ReceiptThread::Unthreaded,
177		)
178		.await;
179
180	let count = PduCount::Normal(*next_count1);
181	let pdu_id: RawPduId = PduId { shortroomid, count }.into();
182
183	// Insert pdu
184	self.append_pdu_json(&pdu_id, pdu, &pdu_json);
185
186	drop(insert_lock);
187
188	self.services
189		.pusher
190		.append_pdu(pdu_id, pdu)
191		.await
192		.log_err()
193		.ok();
194
195	self.append_pdu_effects(pdu_id, pdu, shortroomid, count, state_lock)
196		.await?;
197
198	drop(next_count1);
199	drop(next_count2);
200
201	self.services
202		.appservice
203		.append_pdu(pdu_id, pdu)
204		.await
205		.log_err()
206		.ok();
207
208	Ok(pdu_id)
209}
210
211#[implement(super::Service)]
212async fn append_pdu_effects(
213	&self,
214	pdu_id: RawPduId,
215	pdu: &PduEvent,
216	shortroomid: ShortRoomId,
217	count: PduCount,
218	state_lock: &RoomMutexGuard,
219) -> Result {
220	match *pdu.kind() {
221		| TimelineEventType::RoomRedaction => {
222			let room_version = self
223				.services
224				.state
225				.get_room_version(pdu.room_id())
226				.await?;
227
228			let room_rules = room_version::rules(&room_version)?;
229
230			let redacts_id = pdu.redacts_id(&room_rules);
231
232			if let Some(redacts_id) = &redacts_id
233				&& self
234					.services
235					.state_accessor
236					.user_can_redact(redacts_id, pdu.sender(), pdu.room_id(), false)
237					.await?
238			{
239				self.redact_pdu(redacts_id, pdu, shortroomid, state_lock)
240					.await?;
241			}
242		},
243		| TimelineEventType::SpaceChild =>
244			if let Some(_state_key) = pdu.state_key() {
245				self.services.spaces.cache_evict(pdu.room_id());
246			},
247		| TimelineEventType::RoomMember => {
248			if let Some(state_key) = pdu.state_key() {
249				// if the state_key fails
250				let target_user_id =
251					UserId::parse(state_key).expect("This state_key was previously validated");
252
253				let content: RoomMemberEventContent = pdu.get_content()?;
254				let stripped_state = match content.membership {
255					| MembershipState::Invite | MembershipState::Knock => self
256						.services
257						.state
258						.summary_stripped(pdu)
259						.await
260						.into(),
261					| _ => None,
262				};
263
264				// Update our membership info, we do this here incase a user is invited or
265				// knocked and immediately leaves we need the DB to record the invite or
266				// knock event for auth
267				self.services
268					.state_cache
269					.update_membership(
270						pdu.room_id(),
271						&target_user_id,
272						content,
273						pdu.sender(),
274						stripped_state,
275						None,
276						true,
277						count,
278					)
279					.await?;
280			}
281		},
282		| TimelineEventType::RoomMessage => {
283			let content: ExtractBody = pdu.get_content()?;
284			if let Some(body) = content.body {
285				self.services
286					.search
287					.index_pdu(shortroomid, &pdu_id, &body);
288
289				if self
290					.services
291					.admin
292					.is_admin_command(pdu, &body)
293					.await
294				{
295					self.services
296						.admin
297						.command(body, Some((pdu.event_id()).into()))
298						.await?;
299				}
300			}
301		},
302		| _ => {},
303	}
304
305	if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>()
306		&& let Ok(related_pducount) = self
307			.get_pdu_count(&content.relates_to.event_id)
308			.await
309	{
310		self.services
311			.pdu_metadata
312			.add_relation(count, related_pducount);
313	}
314
315	if let Ok(content) = pdu.get_content::<ExtractRelatesTo>() {
316		match content.relates_to {
317			| Relation::Reply(ruma::events::relation::Reply { in_reply_to }) => {
318				// We need to do it again here, because replies don't have
319				// event_id as a top level field
320				if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await {
321					self.services
322						.pdu_metadata
323						.add_relation(count, related_pducount);
324				}
325			},
326			| Relation::Thread(thread) => {
327				self.services
328					.threads
329					.add_to_thread(&thread.event_id, pdu)
330					.await?;
331			},
332			| _ => {}, // TODO: Aggregate other types
333		}
334	}
335
336	Ok(())
337}
338
339#[implement(super::Service)]
340fn append_pdu_json(&self, pdu_id: &RawPduId, pdu: &PduEvent, json: &CanonicalJsonObject) {
341	debug_assert!(matches!(pdu_id.pdu_count(), PduCount::Normal(_)), "PduCount not Normal");
342
343	self.db.pduid_pdu.raw_put(pdu_id, Json(json));
344
345	self.db
346		.eventid_pduid
347		.insert(pdu.event_id.as_bytes(), pdu_id);
348
349	self.db
350		.eventid_outlierpdu
351		.remove(pdu.event_id.as_bytes());
352
353	let ts = u64::from(pdu.origin_server_ts);
354	self.db
355		.roomid_ts_pducount
356		.put_raw((pdu.room_id(), ts), pdu_id.count());
357}