Skip to main content

tuwunel_service/rooms/timeline/
create.rs

1use std::cmp;
2
3use futures::{StreamExt, TryStreamExt};
4use ruma::{
5	CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId,
6	OwnedRoomId, RoomId, UserId,
7	events::{StateEventType, TimelineEventType, room::create::RoomCreateEventContent},
8	room_version_rules::RoomIdFormatVersion,
9	uint,
10};
11use serde_json::value::to_raw_value;
12use tuwunel_core::{
13	Error, Result, err, implement,
14	matrix::{
15		event::{Event, StateKey, TypeExt},
16		pdu::{EventHash, PduBuilder, PduEvent, PrevEvents, check_rules},
17		room_version,
18	},
19	utils::{
20		IterStream, ReadyExt, TryReadyExt, millis_since_unix_epoch, stream::TryIgnore,
21		to_canonical_object,
22	},
23};
24
25use super::RoomMutexGuard;
26use crate::rooms::state_res;
27
28#[implement(super::Service)]
29pub async fn create_hash_and_sign_event(
30	&self,
31	pdu_builder: PduBuilder,
32	sender: &UserId,
33	room_id: &RoomId,
34	// Take mutex guard to make sure users get the room state mutex
35	_mutex_lock: &RoomMutexGuard,
36) -> Result<(PduEvent, CanonicalJsonObject)> {
37	let PduBuilder {
38		event_type,
39		content,
40		unsigned,
41		state_key,
42		redacts,
43		timestamp,
44	} = pdu_builder;
45
46	let prev_events: PrevEvents = self
47		.services
48		.state
49		.get_forward_extremities(room_id)
50		.take(20)
51		.map(Into::into)
52		.collect()
53		.await;
54
55	// If there was no create event yet, assume we are creating a room
56	let (room_version, version_rules) = self
57		.services
58		.state
59		.get_room_version(room_id)
60		.await
61		.or_else(|_| {
62			if event_type == TimelineEventType::RoomCreate {
63				let content: RoomCreateEventContent = serde_json::from_str(content.get())?;
64				Ok(content.room_version)
65			} else {
66				Err(Error::InconsistentRoomState(
67					"non-create event for room of unknown version",
68					room_id.to_owned(),
69				))
70			}
71		})
72		.and_then(|room_version| {
73			Ok((room_version.clone(), room_version::rules(&room_version)?))
74		})?;
75
76	let auth_events = self
77		.services
78		.state
79		.get_auth_events(
80			room_id,
81			&event_type,
82			sender,
83			state_key.as_deref(),
84			&content,
85			&version_rules.authorization,
86			true,
87		)
88		.await?;
89
90	// Our depth is the maximum depth of prev_events + 1
91	let depth = prev_events
92		.iter()
93		.stream()
94		.map(Ok)
95		.and_then(|event_id| self.get_pdu(event_id))
96		.ready_and_then(|pdu| Ok(pdu.depth))
97		.ignore_err()
98		.ready_fold(uint!(0), cmp::max)
99		.await
100		.saturating_add(uint!(1));
101
102	let mut unsigned = unsigned.unwrap_or_default();
103	if let Some(state_key) = &state_key
104		&& let Ok(prev_pdu) = self
105			.services
106			.state_accessor
107			.room_state_get(room_id, &event_type.to_string().into(), state_key)
108			.await
109	{
110		unsigned.insert("prev_content".to_owned(), prev_pdu.get_content_as_value());
111		unsigned.insert("prev_sender".to_owned(), serde_json::to_value(prev_pdu.sender())?);
112		unsigned.insert("replaces_state".to_owned(), serde_json::to_value(prev_pdu.event_id())?);
113	}
114
115	let unsigned = unsigned
116		.is_empty()
117		.eq(&false)
118		.then_some(to_raw_value(&unsigned)?);
119
120	let origin_server_ts = timestamp
121		.as_ref()
122		.map(MilliSecondsSinceUnixEpoch::get)
123		.unwrap_or_else(|| {
124			millis_since_unix_epoch()
125				.try_into()
126				.expect("u64 to UInt")
127		});
128
129	let mut pdu = PduEvent {
130		event_id: ruma::event_id!("$thiswillbereplaced").into(),
131		room_id: room_id.to_owned(),
132		sender: sender.to_owned(),
133		origin: Some(self.services.globals.server_name().to_owned()),
134		content: content.into(),
135		origin_server_ts,
136		kind: event_type,
137		state_key,
138		depth,
139		redacts,
140		unsigned,
141		hashes: EventHash::default(),
142		signatures: None,
143		prev_events,
144		auth_events: auth_events
145			.values()
146			.filter(|pdu| {
147				version_rules
148					.event_format
149					.allow_room_create_in_auth_events
150					|| *pdu.kind() != TimelineEventType::RoomCreate
151			})
152			.map(|pdu| pdu.event_id.clone())
153			.collect(),
154	};
155
156	let auth_fetch = async |k: StateEventType, s: StateKey| {
157		auth_events
158			.get(&k.with_state_key(s.as_str()))
159			.map(ToOwned::to_owned)
160			.ok_or_else(|| err!(Request(NotFound("Missing auth events"))))
161	};
162
163	state_res::auth_check(
164		&version_rules,
165		&pdu,
166		&async |event_id: OwnedEventId| self.get_pdu(&event_id).await,
167		&auth_fetch,
168	)
169	.await?;
170
171	// Hash and sign
172	let mut pdu_json = to_canonical_object(&pdu).map_err(|e| {
173		err!(Request(BadJson(warn!("Failed to convert PDU to canonical JSON: {e}"))))
174	})?;
175
176	// room v12 and above removed the placeholder "room_id" field from m.room.create
177	if !version_rules
178		.event_format
179		.require_room_create_room_id
180		&& pdu.kind == TimelineEventType::RoomCreate
181	{
182		pdu_json.remove("room_id");
183	}
184
185	pdu.event_id = self
186		.services
187		.server_keys
188		.gen_id_hash_and_sign_event(&mut pdu_json, &room_version)?;
189
190	// Room id is event id for V12+
191	if matches!(version_rules.room_id_format, RoomIdFormatVersion::V2)
192		&& pdu.kind == TimelineEventType::RoomCreate
193	{
194		pdu.room_id = OwnedRoomId::from_parts('!', pdu.event_id.localpart(), None)?;
195		pdu_json.insert("room_id".into(), CanonicalJsonValue::String(pdu.room_id.clone().into()));
196	}
197
198	check_rules(&pdu_json, &version_rules.event_format)?;
199
200	// Generate short event id
201	let _shorteventid = self
202		.services
203		.short
204		.get_or_create_shorteventid(&pdu.event_id)
205		.await;
206
207	Ok((pdu, pdu_json))
208}