tuwunel_service/rooms/typing/
mod.rs1use std::{collections::BTreeMap, sync::Arc};
2
3use futures::{FutureExt, StreamExt, TryStreamExt, future::try_join};
4use ruma::{
5 OwnedRoomId, OwnedUserId, RoomId, UserId,
6 api::{
7 appservice::event::push_events::v1::EphemeralData,
8 federation::transactions::edu::{Edu, TypingContent},
9 },
10 events::{EphemeralRoomEvent, typing::TypingEventContent},
11};
12use tokio::sync::{RwLock, broadcast};
13use tuwunel_core::{
14 Result, Server, debug_info, trace,
15 utils::{self, BoolExt, IterStream},
16};
17
18use crate::sending::EduBuf;
19
20pub struct Service {
21 server: Arc<Server>,
22 services: Arc<crate::services::OnceServices>,
23 pub typing: RwLock<BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, u64>>>,
25 pub last_typing_update: RwLock<BTreeMap<OwnedRoomId, u64>>,
27 pub typing_update_sender: broadcast::Sender<OwnedRoomId>,
28}
29
30impl crate::Service for Service {
31 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
32 Ok(Arc::new(Self {
33 server: args.server.clone(),
34 services: args.services.clone(),
35 typing: RwLock::new(BTreeMap::new()),
36 last_typing_update: RwLock::new(BTreeMap::new()),
37 typing_update_sender: broadcast::channel(100).0,
38 }))
39 }
40
41 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
42}
43
44impl Service {
45 pub async fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result {
48 debug_info!("typing started {user_id:?} in {room_id:?} timeout:{timeout:?}");
49
50 self.typing
52 .write()
53 .await
54 .entry(room_id.to_owned())
55 .or_default()
56 .insert(user_id.to_owned(), timeout);
57
58 let count = self.services.globals.next_count();
59
60 self.last_typing_update
61 .write()
62 .await
63 .insert(room_id.to_owned(), *count);
64
65 drop(count);
66
67 if self
68 .typing_update_sender
69 .send(room_id.to_owned())
70 .is_err()
71 {
72 trace!("receiver found what it was looking for and is no longer interested");
73 }
74
75 let appservice_send = self.appservice_send(room_id);
77
78 let federation_send = self
80 .services
81 .globals
82 .user_is_local(user_id)
83 .then_async(|| self.federation_send(room_id, user_id, true))
84 .map(Option::transpose);
85
86 try_join(appservice_send, federation_send)
87 .await
88 .map(|_| ())
89 }
90
91 pub async fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result {
93 debug_info!("typing stopped {user_id:?} in {room_id:?}");
94
95 self.typing
97 .write()
98 .await
99 .entry(room_id.to_owned())
100 .or_default()
101 .remove(user_id);
102
103 let count = self.services.globals.next_count();
104
105 self.last_typing_update
106 .write()
107 .await
108 .insert(room_id.to_owned(), *count);
109
110 drop(count);
111
112 if self
113 .typing_update_sender
114 .send(room_id.to_owned())
115 .is_err()
116 {
117 trace!("receiver found what it was looking for and is no longer interested");
118 }
119
120 let appservice_send = self.appservice_send(room_id);
122
123 let federation_send = self
125 .services
126 .globals
127 .user_is_local(user_id)
128 .then_async(|| self.federation_send(room_id, user_id, false))
129 .map(Option::transpose);
130
131 try_join(appservice_send, federation_send)
132 .await
133 .map(|_| ())
134 }
135
136 pub async fn wait_for_update(&self, room_id: &RoomId) {
137 let mut receiver = self.typing_update_sender.subscribe();
138 while let Ok(next) = receiver.recv().await {
139 if next == room_id {
140 break;
141 }
142 }
143 }
144
145 async fn typings_maintain(&self, room_id: &RoomId) -> Result {
147 let current_timestamp = utils::millis_since_unix_epoch();
148 let mut removable = Vec::new();
149
150 let typing = self.typing.read().await;
151 let Some(room) = typing.get(room_id) else {
152 return Ok(());
153 };
154
155 for (user, timeout) in room {
156 if *timeout < current_timestamp {
157 removable.push(user.clone());
158 }
159 }
160
161 drop(typing);
162
163 if removable.is_empty() {
164 return Ok(());
165 }
166
167 let mut typing = self.typing.write().await;
168 let room = typing.entry(room_id.to_owned()).or_default();
169 for user in &removable {
170 debug_info!("typing timeout {user:?} in {room_id:?}");
171 room.remove(user);
172 }
173
174 drop(typing);
175
176 let count = self.services.globals.next_count();
178 self.last_typing_update
179 .write()
180 .await
181 .insert(room_id.to_owned(), *count);
182
183 drop(count);
184
185 if self
186 .typing_update_sender
187 .send(room_id.to_owned())
188 .is_err()
189 {
190 trace!("receiver found what it was looking for and is no longer interested");
191 }
192
193 let appservice_send = self.appservice_send(room_id);
195
196 let federation_sends = removable
198 .iter()
199 .filter(|user_id| self.services.globals.user_is_local(user_id))
200 .try_stream()
201 .try_for_each(|user_id| self.federation_send(room_id, user_id, false));
202
203 try_join(appservice_send, federation_sends)
204 .boxed()
205 .await
206 .map(|_| ())
207 }
208
209 pub async fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> {
211 self.typings_maintain(room_id).await?;
212
213 self.last_typing_update
214 .read()
215 .await
216 .get(room_id)
217 .copied()
218 .map(Ok)
219 .unwrap_or(Ok(0))
220 }
221
222 async fn typings_content(&self, room_id: &RoomId) -> TypingEventContent {
224 let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
225
226 let Some(typing_indicators) = room_typing_indicators else {
227 return TypingEventContent { user_ids: Vec::new() };
228 };
229
230 TypingEventContent {
231 user_ids: typing_indicators.into_keys().collect(),
232 }
233 }
234
235 async fn appservice_send(&self, room_id: &RoomId) -> Result {
237 let content = self.typings_content(room_id).await;
238
239 self.services
240 .sending
241 .send_edu_room_appservices(room_id, |buf| {
242 let edu = EphemeralData::Typing(EphemeralRoomEvent {
243 room_id: room_id.to_owned(),
244 content: content.clone(),
245 });
246
247 Ok(serde_json::to_writer(buf, &edu)?)
248 })
249 .await
250 }
251
252 pub async fn typing_users_for_user(
254 &self,
255 room_id: &RoomId,
256 sender_user: &UserId,
257 ) -> Result<Vec<OwnedUserId>> {
258 let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
259
260 let Some(typing_indicators) = room_typing_indicators else {
261 return Ok(Vec::new());
262 };
263
264 let user_ids: Vec<_> = typing_indicators
265 .into_keys()
266 .stream()
267 .filter_map(async |typing_user_id| {
268 self.services
269 .users
270 .user_is_ignored(&typing_user_id, sender_user)
271 .await
272 .eq(&false)
273 .then_some(typing_user_id)
274 })
275 .collect()
276 .await;
277
278 Ok(user_ids)
279 }
280
281 async fn federation_send(&self, room_id: &RoomId, user_id: &UserId, typing: bool) -> Result {
282 debug_assert!(
283 self.services.globals.user_is_local(user_id),
284 "tried to broadcast typing status of remote user",
285 );
286
287 if !self.server.config.allow_outgoing_typing {
288 return Ok(());
289 }
290
291 let content = TypingContent::new(room_id.to_owned(), user_id.to_owned(), typing);
292 let edu = Edu::Typing(content);
293
294 let mut buf = EduBuf::new();
295 serde_json::to_writer(&mut buf, &edu).expect("Serialized Edu::Typing");
296
297 self.services
298 .sending
299 .send_edu_room(room_id, buf)
300 .await?;
301
302 Ok(())
303 }
304}