Skip to main content

tuwunel_api/client/sync/v5/extensions/
receipts.rs

1use futures::{FutureExt, StreamExt};
2use ruma::{
3	OwnedRoomId, RoomId,
4	api::client::sync::sync_events::v5::response,
5	events::{AnySyncEphemeralRoomEvent, receipt::SyncReceiptEvent},
6	serde::Raw,
7};
8use tuwunel_core::{
9	Result,
10	utils::{BoolExt, IterStream, stream::BroadbandExt},
11};
12use tuwunel_service::{
13	rooms::read_receipt::{PrivateReadEvents, pack_receipts},
14	sync::Room,
15};
16
17use super::{Connection, SyncInfo, Window, selector};
18
19#[tracing::instrument(name = "receipts", level = "trace", skip_all)]
20pub(super) async fn collect(
21	sync_info: SyncInfo<'_>,
22	conn: &Connection,
23	window: &Window,
24) -> Result<response::Receipts> {
25	let SyncInfo { .. } = sync_info;
26
27	let implicit = conn
28		.extensions
29		.receipts
30		.lists
31		.as_deref()
32		.map(<[_]>::iter);
33
34	let explicit = conn
35		.extensions
36		.receipts
37		.rooms
38		.as_deref()
39		.map(<[_]>::iter);
40
41	let rooms = selector(sync_info, conn, window, implicit, explicit)
42		.stream()
43		.broad_filter_map(|room_id| collect_room(sync_info, conn, window, room_id))
44		.collect()
45		.await;
46
47	Ok(response::Receipts { rooms })
48}
49
50#[tracing::instrument(level = "trace", skip_all, fields(room_id), ret)]
51async fn collect_room(
52	SyncInfo { services, sender_user, .. }: SyncInfo<'_>,
53	conn: &Connection,
54	_window: &Window,
55	room_id: &RoomId,
56) -> Option<(OwnedRoomId, Raw<SyncReceiptEvent>)> {
57	let &Room { roomsince, .. } = conn.rooms.get(room_id)?;
58	let private_receipt = services
59		.read_receipt
60		.last_privateread_update(sender_user, room_id)
61		.then(async |last_private_update| {
62			if last_private_update <= roomsince || last_private_update > conn.next_batch {
63				return PrivateReadEvents::new();
64			}
65
66			services
67				.read_receipt
68				.private_read_get(room_id, sender_user)
69				.await
70				.unwrap_or_default()
71		})
72		.map(IterStream::stream)
73		.flatten_stream();
74
75	let receipts: Vec<Raw<AnySyncEphemeralRoomEvent>> = services
76		.read_receipt
77		.readreceipts_since(room_id, roomsince, Some(conn.next_batch))
78		.filter_map(async |(read_user, _ts, v)| {
79			services
80				.users
81				.user_is_ignored(read_user, sender_user)
82				.await
83				.or_some(v)
84		})
85		.chain(private_receipt)
86		.collect()
87		.boxed()
88		.await;
89
90	receipts
91		.is_empty()
92		.is_false()
93		.then(|| (room_id.to_owned(), pack_receipts(receipts.into_iter())))
94}