tuwunel_api/client/sync/v5/extensions/
typing.rs1use std::collections::BTreeMap;
2
3use futures::{FutureExt, StreamExt, TryFutureExt};
4use ruma::{
5 api::client::sync::sync_events::v5::response,
6 events::typing::{SyncTypingEvent, TypingEventContent},
7 serde::Raw,
8};
9use tuwunel_core::{
10 Result, debug_error,
11 utils::{IterStream, ReadyExt},
12};
13
14use super::{Connection, SyncInfo, Window, selector};
15
16#[tracing::instrument(name = "typing", level = "trace", skip_all, ret)]
17pub(super) async fn collect(
18 sync_info: SyncInfo<'_>,
19 conn: &Connection,
20 window: &Window,
21) -> Result<response::Typing> {
22 use response::Typing;
23
24 let SyncInfo { services, sender_user, .. } = sync_info;
25
26 let implicit = conn
27 .extensions
28 .typing
29 .lists
30 .as_deref()
31 .map(<[_]>::iter);
32
33 let explicit = conn
34 .extensions
35 .typing
36 .rooms
37 .as_deref()
38 .map(<[_]>::iter);
39
40 selector(sync_info, conn, window, implicit, explicit)
41 .stream()
42 .filter_map(async |room_id| {
43 services
44 .typing
45 .typing_users_for_user(room_id, sender_user)
46 .inspect_err(|e| debug_error!(%room_id, "Failed to get typing events: {e}"))
47 .await
48 .ok()
49 .filter(|users| !users.is_empty())
50 .map(|users| (room_id, users))
51 })
52 .ready_filter_map(|(room_id, users)| {
53 let content = TypingEventContent::new(users);
54 let event = SyncTypingEvent { content };
55 let event = Raw::new(&event);
56
57 Some((room_id.to_owned(), event.ok()?))
58 })
59 .collect::<BTreeMap<_, _>>()
60 .map(|rooms| Typing { rooms })
61 .map(Ok)
62 .await
63}