Skip to main content

tuwunel_api/client/push/
notifications.rs

1use axum::extract::State;
2use futures::StreamExt;
3use ruma::{MilliSecondsSinceUnixEpoch, api::client::push::get_notifications, push::Action};
4use tuwunel_core::{
5	Result, at, err,
6	matrix::{Event, PduId},
7	utils::{
8		stream::{ReadyExt, WidebandExt},
9		string::to_small_string,
10	},
11};
12
13use crate::Ruma;
14
15/// # `GET /_matrix/client/r0/notifications/`
16///
17/// Paginate through the list of events the user has been, or would have been
18/// notified about.
19pub(crate) async fn get_notifications_route(
20	State(services): State<crate::State>,
21	body: Ruma<get_notifications::v3::Request>,
22) -> Result<get_notifications::v3::Response> {
23	use get_notifications::v3::Notification;
24
25	let sender_user = body.sender_user();
26
27	let from = body
28		.body
29		.from
30		.as_deref()
31		.map(str::parse)
32		.transpose()
33		.map_err(|e| err!(Request(InvalidParam("Invalid `from' parameter: {e}"))))?;
34
35	let limit: usize = body
36		.body
37		.limit
38		.map(TryInto::try_into)
39		.transpose()?
40		.unwrap_or(50)
41		.clamp(1, 100);
42
43	let only_highlight = body
44		.body
45		.only
46		.as_deref()
47		.is_some_and(|only| only.contains("highlight"));
48
49	let mut next_token: Option<u64> = None;
50	let notifications = services
51		.pusher
52		.get_notifications(sender_user, from)
53		.ready_filter(|(_, notify)| {
54			if only_highlight && !notify.actions.iter().any(Action::is_highlight) {
55				return false;
56			}
57
58			true
59		})
60		.wide_filter_map(async |(count, notify)| {
61			let pdu_id = PduId {
62				shortroomid: notify.sroomid,
63				count: count.into(),
64			};
65
66			let event = services
67				.timeline
68				.get_pdu_from_id(&pdu_id.into())
69				.await
70				.ok()
71				.filter(|event| !event.is_redacted())?;
72
73			let read = services
74				.pusher
75				.last_notification_read(sender_user, event.room_id())
76				.await
77				.is_ok_and(|last_read| last_read.ge(&count));
78
79			let ts = notify
80				.ts
81				.try_into()
82				.map(MilliSecondsSinceUnixEpoch)
83				.ok()?;
84
85			let notification = Notification {
86				room_id: event.room_id().into(),
87				event: event.into_format(),
88				ts,
89				read,
90				profile_tag: notify.tag,
91				actions: notify.actions,
92			};
93
94			Some((count, notification))
95		})
96		.take(limit)
97		.inspect(|(count, _)| {
98			next_token.replace(*count);
99		})
100		.map(at!(1))
101		.collect::<Vec<_>>()
102		.await;
103
104	Ok(get_notifications::v3::Response {
105		next_token: next_token.map(to_small_string),
106		notifications,
107	})
108}