tuwunel_api/client/push/
notifications.rs1use axum::extract::State;
2use futures::StreamExt;
3use ruma::{MilliSecondsSinceUnixEpoch, api::client::push::get_notifications, push::Action};
4use tuwunel_core::{
5 Result, at, err,
6 matrix::{Event, PduId},
7 utils::{
8 stream::{ReadyExt, WidebandExt},
9 string::to_small_string,
10 },
11};
12
13use crate::Ruma;
14
15pub(crate) async fn get_notifications_route(
20 State(services): State<crate::State>,
21 body: Ruma<get_notifications::v3::Request>,
22) -> Result<get_notifications::v3::Response> {
23 use get_notifications::v3::Notification;
24
25 let sender_user = body.sender_user();
26
27 let from = body
28 .body
29 .from
30 .as_deref()
31 .map(str::parse)
32 .transpose()
33 .map_err(|e| err!(Request(InvalidParam("Invalid `from' parameter: {e}"))))?;
34
35 let limit: usize = body
36 .body
37 .limit
38 .map(TryInto::try_into)
39 .transpose()?
40 .unwrap_or(50)
41 .clamp(1, 100);
42
43 let only_highlight = body
44 .body
45 .only
46 .as_deref()
47 .is_some_and(|only| only.contains("highlight"));
48
49 let mut next_token: Option<u64> = None;
50 let notifications = services
51 .pusher
52 .get_notifications(sender_user, from)
53 .ready_filter(|(_, notify)| {
54 if only_highlight && !notify.actions.iter().any(Action::is_highlight) {
55 return false;
56 }
57
58 true
59 })
60 .wide_filter_map(async |(count, notify)| {
61 let pdu_id = PduId {
62 shortroomid: notify.sroomid,
63 count: count.into(),
64 };
65
66 let event = services
67 .timeline
68 .get_pdu_from_id(&pdu_id.into())
69 .await
70 .ok()
71 .filter(|event| !event.is_redacted())?;
72
73 let read = services
74 .pusher
75 .last_notification_read(sender_user, event.room_id())
76 .await
77 .is_ok_and(|last_read| last_read.ge(&count));
78
79 let ts = notify
80 .ts
81 .try_into()
82 .map(MilliSecondsSinceUnixEpoch)
83 .ok()?;
84
85 let notification = Notification {
86 room_id: event.room_id().into(),
87 event: event.into_format(),
88 ts,
89 read,
90 profile_tag: notify.tag,
91 actions: notify.actions,
92 };
93
94 Some((count, notification))
95 })
96 .take(limit)
97 .inspect(|(count, _)| {
98 next_token.replace(*count);
99 })
100 .map(at!(1))
101 .collect::<Vec<_>>()
102 .await;
103
104 Ok(get_notifications::v3::Response {
105 next_token: next_token.map(to_small_string),
106 notifications,
107 })
108}