Skip to main content

tuwunel_service/rooms/typing/
mod.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use futures::{FutureExt, StreamExt, TryStreamExt, future::try_join};
4use ruma::{
5	OwnedRoomId, OwnedUserId, RoomId, UserId,
6	api::{
7		appservice::event::push_events::v1::EphemeralData,
8		federation::transactions::edu::{Edu, TypingContent},
9	},
10	events::{EphemeralRoomEvent, typing::TypingEventContent},
11};
12use tokio::sync::{RwLock, broadcast};
13use tuwunel_core::{
14	Result, Server, debug_info, trace,
15	utils::{self, BoolExt, IterStream},
16};
17
18use crate::sending::EduBuf;
19
20pub struct Service {
21	server: Arc<Server>,
22	services: Arc<crate::services::OnceServices>,
23	/// u64 is unix timestamp of timeout
24	pub typing: RwLock<BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, u64>>>,
25	/// timestamp of the last change to typing users
26	pub last_typing_update: RwLock<BTreeMap<OwnedRoomId, u64>>,
27	pub typing_update_sender: broadcast::Sender<OwnedRoomId>,
28}
29
30impl crate::Service for Service {
31	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
32		Ok(Arc::new(Self {
33			server: args.server.clone(),
34			services: args.services.clone(),
35			typing: RwLock::new(BTreeMap::new()),
36			last_typing_update: RwLock::new(BTreeMap::new()),
37			typing_update_sender: broadcast::channel(100).0,
38		}))
39	}
40
41	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
42}
43
44impl Service {
45	/// Sets a user as typing until the timeout timestamp is reached or
46	/// roomtyping_remove is called.
47	pub async fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result {
48		debug_info!("typing started {user_id:?} in {room_id:?} timeout:{timeout:?}");
49
50		// update clients
51		self.typing
52			.write()
53			.await
54			.entry(room_id.to_owned())
55			.or_default()
56			.insert(user_id.to_owned(), timeout);
57
58		let count = self.services.globals.next_count();
59
60		self.last_typing_update
61			.write()
62			.await
63			.insert(room_id.to_owned(), *count);
64
65		drop(count);
66
67		if self
68			.typing_update_sender
69			.send(room_id.to_owned())
70			.is_err()
71		{
72			trace!("receiver found what it was looking for and is no longer interested");
73		}
74
75		// update appservices
76		let appservice_send = self.appservice_send(room_id);
77
78		// update federation
79		let federation_send = self
80			.services
81			.globals
82			.user_is_local(user_id)
83			.then_async(|| self.federation_send(room_id, user_id, true))
84			.map(Option::transpose);
85
86		try_join(appservice_send, federation_send)
87			.await
88			.map(|_| ())
89	}
90
91	/// Removes a user from typing before the timeout is reached.
92	pub async fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result {
93		debug_info!("typing stopped {user_id:?} in {room_id:?}");
94
95		// update clients
96		self.typing
97			.write()
98			.await
99			.entry(room_id.to_owned())
100			.or_default()
101			.remove(user_id);
102
103		let count = self.services.globals.next_count();
104
105		self.last_typing_update
106			.write()
107			.await
108			.insert(room_id.to_owned(), *count);
109
110		drop(count);
111
112		if self
113			.typing_update_sender
114			.send(room_id.to_owned())
115			.is_err()
116		{
117			trace!("receiver found what it was looking for and is no longer interested");
118		}
119
120		// update appservices
121		let appservice_send = self.appservice_send(room_id);
122
123		// update federation
124		let federation_send = self
125			.services
126			.globals
127			.user_is_local(user_id)
128			.then_async(|| self.federation_send(room_id, user_id, false))
129			.map(Option::transpose);
130
131		try_join(appservice_send, federation_send)
132			.await
133			.map(|_| ())
134	}
135
136	pub async fn wait_for_update(&self, room_id: &RoomId) {
137		let mut receiver = self.typing_update_sender.subscribe();
138		while let Ok(next) = receiver.recv().await {
139			if next == room_id {
140				break;
141			}
142		}
143	}
144
145	/// Makes sure that typing events with old timestamps get removed.
146	async fn typings_maintain(&self, room_id: &RoomId) -> Result {
147		let current_timestamp = utils::millis_since_unix_epoch();
148		let mut removable = Vec::new();
149
150		let typing = self.typing.read().await;
151		let Some(room) = typing.get(room_id) else {
152			return Ok(());
153		};
154
155		for (user, timeout) in room {
156			if *timeout < current_timestamp {
157				removable.push(user.clone());
158			}
159		}
160
161		drop(typing);
162
163		if removable.is_empty() {
164			return Ok(());
165		}
166
167		let mut typing = self.typing.write().await;
168		let room = typing.entry(room_id.to_owned()).or_default();
169		for user in &removable {
170			debug_info!("typing timeout {user:?} in {room_id:?}");
171			room.remove(user);
172		}
173
174		drop(typing);
175
176		// update clients
177		let count = self.services.globals.next_count();
178		self.last_typing_update
179			.write()
180			.await
181			.insert(room_id.to_owned(), *count);
182
183		drop(count);
184
185		if self
186			.typing_update_sender
187			.send(room_id.to_owned())
188			.is_err()
189		{
190			trace!("receiver found what it was looking for and is no longer interested");
191		}
192
193		// update appservices
194		let appservice_send = self.appservice_send(room_id);
195
196		// update federation
197		let federation_sends = removable
198			.iter()
199			.filter(|user_id| self.services.globals.user_is_local(user_id))
200			.try_stream()
201			.try_for_each(|user_id| self.federation_send(room_id, user_id, false));
202
203		try_join(appservice_send, federation_sends)
204			.boxed()
205			.await
206			.map(|_| ())
207	}
208
209	/// Returns the count of the last typing update in this room.
210	pub async fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> {
211		self.typings_maintain(room_id).await?;
212
213		self.last_typing_update
214			.read()
215			.await
216			.get(room_id)
217			.copied()
218			.map(Ok)
219			.unwrap_or(Ok(0))
220	}
221
222	/// Returns the typing content with all typing users in the room.
223	async fn typings_content(&self, room_id: &RoomId) -> TypingEventContent {
224		let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
225
226		let Some(typing_indicators) = room_typing_indicators else {
227			return TypingEventContent { user_ids: Vec::new() };
228		};
229
230		TypingEventContent {
231			user_ids: typing_indicators.into_keys().collect(),
232		}
233	}
234
235	/// Sends a typing EDU to all appservices interested in the room.
236	async fn appservice_send(&self, room_id: &RoomId) -> Result {
237		let content = self.typings_content(room_id).await;
238
239		self.services
240			.sending
241			.send_edu_room_appservices(room_id, |buf| {
242				let edu = EphemeralData::Typing(EphemeralRoomEvent {
243					room_id: room_id.to_owned(),
244					content: content.clone(),
245				});
246
247				Ok(serde_json::to_writer(buf, &edu)?)
248			})
249			.await
250	}
251
252	/// Returns a new typing EDU.
253	pub async fn typing_users_for_user(
254		&self,
255		room_id: &RoomId,
256		sender_user: &UserId,
257	) -> Result<Vec<OwnedUserId>> {
258		let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
259
260		let Some(typing_indicators) = room_typing_indicators else {
261			return Ok(Vec::new());
262		};
263
264		let user_ids: Vec<_> = typing_indicators
265			.into_keys()
266			.stream()
267			.filter_map(async |typing_user_id| {
268				self.services
269					.users
270					.user_is_ignored(&typing_user_id, sender_user)
271					.await
272					.eq(&false)
273					.then_some(typing_user_id)
274			})
275			.collect()
276			.await;
277
278		Ok(user_ids)
279	}
280
281	async fn federation_send(&self, room_id: &RoomId, user_id: &UserId, typing: bool) -> Result {
282		debug_assert!(
283			self.services.globals.user_is_local(user_id),
284			"tried to broadcast typing status of remote user",
285		);
286
287		if !self.server.config.allow_outgoing_typing {
288			return Ok(());
289		}
290
291		let content = TypingContent::new(room_id.to_owned(), user_id.to_owned(), typing);
292		let edu = Edu::Typing(content);
293
294		let mut buf = EduBuf::new();
295		serde_json::to_writer(&mut buf, &edu).expect("Serialized Edu::Typing");
296
297		self.services
298			.sending
299			.send_edu_room(room_id, buf)
300			.await?;
301
302		Ok(())
303	}
304}