tuwunel_api/client/
send.rs1use std::collections::BTreeMap;
2
3use axum::extract::State;
4use futures::{FutureExt, future::try_join3};
5use ruma::{
6 DeviceId, RoomId, TransactionId, UserId,
7 api::client::message::send_message_event,
8 events::{
9 AnyMessageLikeEventContent, MessageLikeEventType, reaction::ReactionEventContent,
10 room::redaction::RoomRedactionEventContent,
11 },
12 serde::Raw,
13};
14use serde_json::from_str;
15use tuwunel_core::{
16 Err, Result, err,
17 matrix::pdu::PduBuilder,
18 utils::{self},
19 warn,
20};
21use tuwunel_service::Services;
22
23use crate::Ruma;
24
25pub(crate) async fn send_message_event_route(
35 State(services): State<crate::State>,
36 body: Ruma<send_message_event::v3::Request>,
37) -> Result<send_message_event::v3::Response> {
38 let sender_user = body.sender_user();
39 let sender_device = body.sender_device.as_deref();
40 let appservice_info = body.appservice_info.as_ref();
41
42 if body.event_type == MessageLikeEventType::RoomRedaction
43 && services.config.disable_local_redactions
44 && !services.admin.user_is_admin(sender_user).await
45 {
46 if let Some(event_id) = body
47 .body
48 .body
49 .deserialize_as_unchecked::<RoomRedactionEventContent>()
50 .ok()
51 .and_then(|content| content.redacts)
52 {
53 warn!(
54 %sender_user,
55 %event_id,
56 "Local redactions are disabled, non-admin user attempted to redact an event"
57 );
58 } else {
59 warn!(
60 %sender_user,
61 event = %body.body.body.json(),
62 "Local redactions are disabled, non-admin user attempted to redact an event \
63 with an invalid redaction event"
64 );
65 }
66
67 return Err!(Request(Forbidden("Redactions are disabled on this server.")));
68 }
69
70 if body.event_type == MessageLikeEventType::RoomEncrypted && !services.config.allow_encryption
72 {
73 return Err!(Request(Forbidden("Encryption has been disabled")));
74 }
75
76 let state_lock = services.state.mutex.lock(&body.room_id).await;
77
78 let (existing_txnid, ..) = try_join3(
79 check_existing_txnid(&services, sender_user, sender_device, &body.txn_id).map(Ok),
80 check_duplicate_reaction(&services, &body.event_type, sender_user, &body.body.body),
81 check_public_call_invite(&services, &body.event_type, &body.room_id),
82 )
83 .await?;
84
85 if let Some(existing_txnid) = existing_txnid {
86 return existing_txnid;
87 }
88
89 let mut unsigned = BTreeMap::new();
90 unsigned.insert("transaction_id".to_owned(), body.txn_id.to_string().into());
91
92 let content = from_str(body.body.body.json().get())
93 .map_err(|e| err!(Request(BadJson("Invalid JSON body: {e}"))))?;
94
95 let redacts = body
100 .event_type
101 .eq(&MessageLikeEventType::RoomRedaction)
102 .then(|| {
103 body.body
104 .body
105 .deserialize_as_unchecked::<RoomRedactionEventContent>()
106 .ok()
107 })
108 .flatten()
109 .and_then(|content| content.redacts);
110
111 let event_id = services
112 .timeline
113 .build_and_append_pdu(
114 PduBuilder {
115 event_type: body.event_type.clone().into(),
116 content,
117 unsigned: Some(unsigned),
118 timestamp: appservice_info.and(body.timestamp),
119 redacts,
120 ..Default::default()
121 },
122 sender_user,
123 &body.room_id,
124 &state_lock,
125 )
126 .await?;
127
128 services.transaction_ids.add_txnid(
129 sender_user,
130 sender_device,
131 &body.txn_id,
132 event_id.as_bytes(),
133 );
134
135 drop(state_lock);
136
137 Ok(send_message_event::v3::Response { event_id })
138}
139
140async fn check_public_call_invite(
141 services: &Services,
142 event_type: &MessageLikeEventType,
143 room_id: &RoomId,
144) -> Result {
145 if *event_type != MessageLikeEventType::CallInvite {
146 return Ok(());
147 }
148
149 if !services.directory.is_public_room(room_id).await {
150 return Ok(());
151 }
152
153 Err!(Request(Forbidden("Room call invites are not allowed in public rooms")))
154}
155
156async fn check_duplicate_reaction(
158 services: &Services,
159 event_type: &MessageLikeEventType,
160 sender_user: &UserId,
161 body: &Raw<AnyMessageLikeEventContent>,
162) -> Result {
163 if *event_type != MessageLikeEventType::Reaction {
164 return Ok(());
165 }
166
167 let Ok(content) = body.deserialize_as_unchecked::<ReactionEventContent>() else {
168 return Ok(());
169 };
170
171 if !services
172 .pdu_metadata
173 .event_has_relation(
174 &content.relates_to.event_id,
175 Some(sender_user),
176 None,
177 Some(&content.relates_to.key),
178 )
179 .await
180 {
181 return Ok(());
182 }
183
184 Err!(Request(DuplicateAnnotation("Duplicate reactions are not allowed.")))
185}
186
187async fn check_existing_txnid(
191 services: &Services,
192 sender_user: &UserId,
193 sender_device: Option<&DeviceId>,
194 txn_id: &TransactionId,
195) -> Option<Result<send_message_event::v3::Response>> {
196 let Ok(response) = services
197 .transaction_ids
198 .existing_txnid(sender_user, sender_device, txn_id)
199 .await
200 else {
201 return None;
202 };
203
204 if response.is_empty() {
207 return Some(Err!(Request(InvalidParam(
208 "Tried to use txn_id already used for an incompatible endpoint."
209 ))));
210 }
211
212 let Ok(Ok(event_id)) = utils::string_from_bytes(&response).map(TryInto::try_into) else {
213 return Some(Err!(Database("Invalid event_id in txn_id data: {response:?}.")));
214 };
215
216 Some(Ok(send_message_event::v3::Response { event_id }))
217}