tuwunel_service/rooms/timeline/
create.rs1use std::cmp;
2
3use futures::{StreamExt, TryStreamExt};
4use ruma::{
5 CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId,
6 OwnedRoomId, RoomId, UserId,
7 events::{StateEventType, TimelineEventType, room::create::RoomCreateEventContent},
8 room_version_rules::RoomIdFormatVersion,
9 uint,
10};
11use serde_json::value::to_raw_value;
12use tuwunel_core::{
13 Error, Result, err, implement,
14 matrix::{
15 event::{Event, StateKey, TypeExt},
16 pdu::{EventHash, PduBuilder, PduEvent, PrevEvents, check_rules},
17 room_version,
18 },
19 utils::{
20 IterStream, ReadyExt, TryReadyExt, millis_since_unix_epoch, stream::TryIgnore,
21 to_canonical_object,
22 },
23};
24
25use super::RoomMutexGuard;
26use crate::rooms::state_res;
27
28#[implement(super::Service)]
29pub async fn create_hash_and_sign_event(
30 &self,
31 pdu_builder: PduBuilder,
32 sender: &UserId,
33 room_id: &RoomId,
34 _mutex_lock: &RoomMutexGuard,
36) -> Result<(PduEvent, CanonicalJsonObject)> {
37 let PduBuilder {
38 event_type,
39 content,
40 unsigned,
41 state_key,
42 redacts,
43 timestamp,
44 } = pdu_builder;
45
46 let prev_events: PrevEvents = self
47 .services
48 .state
49 .get_forward_extremities(room_id)
50 .take(20)
51 .map(Into::into)
52 .collect()
53 .await;
54
55 let (room_version, version_rules) = self
57 .services
58 .state
59 .get_room_version(room_id)
60 .await
61 .or_else(|_| {
62 if event_type == TimelineEventType::RoomCreate {
63 let content: RoomCreateEventContent = serde_json::from_str(content.get())?;
64 Ok(content.room_version)
65 } else {
66 Err(Error::InconsistentRoomState(
67 "non-create event for room of unknown version",
68 room_id.to_owned(),
69 ))
70 }
71 })
72 .and_then(|room_version| {
73 Ok((room_version.clone(), room_version::rules(&room_version)?))
74 })?;
75
76 let auth_events = self
77 .services
78 .state
79 .get_auth_events(
80 room_id,
81 &event_type,
82 sender,
83 state_key.as_deref(),
84 &content,
85 &version_rules.authorization,
86 true,
87 )
88 .await?;
89
90 let depth = prev_events
92 .iter()
93 .stream()
94 .map(Ok)
95 .and_then(|event_id| self.get_pdu(event_id))
96 .ready_and_then(|pdu| Ok(pdu.depth))
97 .ignore_err()
98 .ready_fold(uint!(0), cmp::max)
99 .await
100 .saturating_add(uint!(1));
101
102 let mut unsigned = unsigned.unwrap_or_default();
103 if let Some(state_key) = &state_key
104 && let Ok(prev_pdu) = self
105 .services
106 .state_accessor
107 .room_state_get(room_id, &event_type.to_string().into(), state_key)
108 .await
109 {
110 unsigned.insert("prev_content".to_owned(), prev_pdu.get_content_as_value());
111 unsigned.insert("prev_sender".to_owned(), serde_json::to_value(prev_pdu.sender())?);
112 unsigned.insert("replaces_state".to_owned(), serde_json::to_value(prev_pdu.event_id())?);
113 }
114
115 let unsigned = unsigned
116 .is_empty()
117 .eq(&false)
118 .then_some(to_raw_value(&unsigned)?);
119
120 let origin_server_ts = timestamp
121 .as_ref()
122 .map(MilliSecondsSinceUnixEpoch::get)
123 .unwrap_or_else(|| {
124 millis_since_unix_epoch()
125 .try_into()
126 .expect("u64 to UInt")
127 });
128
129 let mut pdu = PduEvent {
130 event_id: ruma::event_id!("$thiswillbereplaced").into(),
131 room_id: room_id.to_owned(),
132 sender: sender.to_owned(),
133 origin: Some(self.services.globals.server_name().to_owned()),
134 content: content.into(),
135 origin_server_ts,
136 kind: event_type,
137 state_key,
138 depth,
139 redacts,
140 unsigned,
141 hashes: EventHash::default(),
142 signatures: None,
143 prev_events,
144 auth_events: auth_events
145 .values()
146 .filter(|pdu| {
147 version_rules
148 .event_format
149 .allow_room_create_in_auth_events
150 || *pdu.kind() != TimelineEventType::RoomCreate
151 })
152 .map(|pdu| pdu.event_id.clone())
153 .collect(),
154 };
155
156 let auth_fetch = async |k: StateEventType, s: StateKey| {
157 auth_events
158 .get(&k.with_state_key(s.as_str()))
159 .map(ToOwned::to_owned)
160 .ok_or_else(|| err!(Request(NotFound("Missing auth events"))))
161 };
162
163 state_res::auth_check(
164 &version_rules,
165 &pdu,
166 &async |event_id: OwnedEventId| self.get_pdu(&event_id).await,
167 &auth_fetch,
168 )
169 .await?;
170
171 let mut pdu_json = to_canonical_object(&pdu).map_err(|e| {
173 err!(Request(BadJson(warn!("Failed to convert PDU to canonical JSON: {e}"))))
174 })?;
175
176 if !version_rules
178 .event_format
179 .require_room_create_room_id
180 && pdu.kind == TimelineEventType::RoomCreate
181 {
182 pdu_json.remove("room_id");
183 }
184
185 pdu.event_id = self
186 .services
187 .server_keys
188 .gen_id_hash_and_sign_event(&mut pdu_json, &room_version)?;
189
190 if matches!(version_rules.room_id_format, RoomIdFormatVersion::V2)
192 && pdu.kind == TimelineEventType::RoomCreate
193 {
194 pdu.room_id = OwnedRoomId::from_parts('!', pdu.event_id.localpart(), None)?;
195 pdu_json.insert("room_id".into(), CanonicalJsonValue::String(pdu.room_id.clone().into()));
196 }
197
198 check_rules(&pdu_json, &version_rules.event_format)?;
199
200 let _shorteventid = self
202 .services
203 .short
204 .get_or_create_shorteventid(&pdu.event_id)
205 .await;
206
207 Ok((pdu, pdu_json))
208}