tuwunel_api/client/sync/v5/extensions/
receipts.rs1use futures::{FutureExt, StreamExt};
2use ruma::{
3 OwnedRoomId, RoomId,
4 api::client::sync::sync_events::v5::response,
5 events::{AnySyncEphemeralRoomEvent, receipt::SyncReceiptEvent},
6 serde::Raw,
7};
8use tuwunel_core::{
9 Result,
10 utils::{BoolExt, IterStream, stream::BroadbandExt},
11};
12use tuwunel_service::{
13 rooms::read_receipt::{PrivateReadEvents, pack_receipts},
14 sync::Room,
15};
16
17use super::{Connection, SyncInfo, Window, selector};
18
19#[tracing::instrument(name = "receipts", level = "trace", skip_all)]
20pub(super) async fn collect(
21 sync_info: SyncInfo<'_>,
22 conn: &Connection,
23 window: &Window,
24) -> Result<response::Receipts> {
25 let SyncInfo { .. } = sync_info;
26
27 let implicit = conn
28 .extensions
29 .receipts
30 .lists
31 .as_deref()
32 .map(<[_]>::iter);
33
34 let explicit = conn
35 .extensions
36 .receipts
37 .rooms
38 .as_deref()
39 .map(<[_]>::iter);
40
41 let rooms = selector(sync_info, conn, window, implicit, explicit)
42 .stream()
43 .broad_filter_map(|room_id| collect_room(sync_info, conn, window, room_id))
44 .collect()
45 .await;
46
47 Ok(response::Receipts { rooms })
48}
49
50#[tracing::instrument(level = "trace", skip_all, fields(room_id), ret)]
51async fn collect_room(
52 SyncInfo { services, sender_user, .. }: SyncInfo<'_>,
53 conn: &Connection,
54 _window: &Window,
55 room_id: &RoomId,
56) -> Option<(OwnedRoomId, Raw<SyncReceiptEvent>)> {
57 let &Room { roomsince, .. } = conn.rooms.get(room_id)?;
58 let private_receipt = services
59 .read_receipt
60 .last_privateread_update(sender_user, room_id)
61 .then(async |last_private_update| {
62 if last_private_update <= roomsince || last_private_update > conn.next_batch {
63 return PrivateReadEvents::new();
64 }
65
66 services
67 .read_receipt
68 .private_read_get(room_id, sender_user)
69 .await
70 .unwrap_or_default()
71 })
72 .map(IterStream::stream)
73 .flatten_stream();
74
75 let receipts: Vec<Raw<AnySyncEphemeralRoomEvent>> = services
76 .read_receipt
77 .readreceipts_since(room_id, roomsince, Some(conn.next_batch))
78 .filter_map(async |(read_user, _ts, v)| {
79 services
80 .users
81 .user_is_ignored(read_user, sender_user)
82 .await
83 .or_some(v)
84 })
85 .chain(private_receipt)
86 .collect()
87 .boxed()
88 .await;
89
90 receipts
91 .is_empty()
92 .is_false()
93 .then(|| (room_id.to_owned(), pack_receipts(receipts.into_iter())))
94}