Skip to main content

tuwunel_api/client/sync/v5/extensions/
typing.rs

1use std::collections::BTreeMap;
2
3use futures::{FutureExt, StreamExt, TryFutureExt};
4use ruma::{
5	api::client::sync::sync_events::v5::response,
6	events::typing::{SyncTypingEvent, TypingEventContent},
7	serde::Raw,
8};
9use tuwunel_core::{
10	Result, debug_error,
11	utils::{IterStream, ReadyExt},
12};
13
14use super::{Connection, SyncInfo, Window, selector};
15
16#[tracing::instrument(name = "typing", level = "trace", skip_all, ret)]
17pub(super) async fn collect(
18	sync_info: SyncInfo<'_>,
19	conn: &Connection,
20	window: &Window,
21) -> Result<response::Typing> {
22	use response::Typing;
23
24	let SyncInfo { services, sender_user, .. } = sync_info;
25
26	let implicit = conn
27		.extensions
28		.typing
29		.lists
30		.as_deref()
31		.map(<[_]>::iter);
32
33	let explicit = conn
34		.extensions
35		.typing
36		.rooms
37		.as_deref()
38		.map(<[_]>::iter);
39
40	selector(sync_info, conn, window, implicit, explicit)
41		.stream()
42		.filter_map(async |room_id| {
43			services
44				.typing
45				.typing_users_for_user(room_id, sender_user)
46				.inspect_err(|e| debug_error!(%room_id, "Failed to get typing events: {e}"))
47				.await
48				.ok()
49				.filter(|users| !users.is_empty())
50				.map(|users| (room_id, users))
51		})
52		.ready_filter_map(|(room_id, users)| {
53			let content = TypingEventContent::new(users);
54			let event = SyncTypingEvent { content };
55			let event = Raw::new(&event);
56
57			Some((room_id.to_owned(), event.ok()?))
58		})
59		.collect::<BTreeMap<_, _>>()
60		.map(|rooms| Typing { rooms })
61		.map(Ok)
62		.await
63}