Skip to main content

tuwunel_api/client/
push.rs

1use axum::extract::State;
2use futures::StreamExt;
3use ruma::{
4	CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch,
5	api::client::push::{
6		delete_pushrule, get_notifications, get_pushers, get_pushrule, get_pushrule_actions,
7		get_pushrule_enabled, get_pushrules_all, get_pushrules_global_scope, set_pusher,
8		set_pushrule, set_pushrule_actions, set_pushrule_enabled,
9	},
10	events::{
11		GlobalAccountDataEventType,
12		push_rules::{PushRulesEvent, PushRulesEventContent},
13	},
14	push::{
15		Action, InsertPushRuleError, PredefinedContentRuleId, PredefinedOverrideRuleId,
16		RemovePushRuleError, Ruleset,
17	},
18};
19use tuwunel_core::{
20	Err, Result, at, err,
21	matrix::{Event, PduId},
22	utils::{
23		stream::{ReadyExt, WidebandExt},
24		string::to_small_string,
25	},
26};
27use tuwunel_service::Services;
28
29use crate::Ruma;
30
31/// # `GET /_matrix/client/r0/notifications/`
32///
33/// Paginate through the list of events the user has been, or would have been
34/// notified about.
35pub(crate) async fn get_notifications_route(
36	State(services): State<crate::State>,
37	body: Ruma<get_notifications::v3::Request>,
38) -> Result<get_notifications::v3::Response> {
39	use get_notifications::v3::Notification;
40
41	let sender_user = body.sender_user();
42
43	let from = body
44		.body
45		.from
46		.as_deref()
47		.map(str::parse)
48		.transpose()
49		.map_err(|e| err!(Request(InvalidParam("Invalid `from' parameter: {e}"))))?;
50
51	let limit: usize = body
52		.body
53		.limit
54		.map(TryInto::try_into)
55		.transpose()?
56		.unwrap_or(50)
57		.clamp(1, 100);
58
59	let only_highlight = body
60		.body
61		.only
62		.as_deref()
63		.is_some_and(|only| only.contains("highlight"));
64
65	let mut next_token: Option<u64> = None;
66	let notifications = services
67		.pusher
68		.get_notifications(sender_user, from)
69		.ready_filter(|(_, notify)| {
70			if only_highlight && !notify.actions.iter().any(Action::is_highlight) {
71				return false;
72			}
73
74			true
75		})
76		.wide_filter_map(async |(count, notify)| {
77			let pdu_id = PduId {
78				shortroomid: notify.sroomid,
79				count: count.into(),
80			};
81
82			let event = services
83				.timeline
84				.get_pdu_from_id(&pdu_id.into())
85				.await
86				.ok()
87				.filter(|event| !event.is_redacted())?;
88
89			let read = services
90				.pusher
91				.last_notification_read(sender_user, event.room_id())
92				.await
93				.is_ok_and(|last_read| last_read.ge(&count));
94
95			let ts = notify
96				.ts
97				.try_into()
98				.map(MilliSecondsSinceUnixEpoch)
99				.ok()?;
100
101			let notification = Notification {
102				room_id: event.room_id().into(),
103				event: event.into_format(),
104				ts,
105				read,
106				profile_tag: notify.tag,
107				actions: notify.actions,
108			};
109
110			Some((count, notification))
111		})
112		.take(limit)
113		.inspect(|(count, _)| {
114			next_token.replace(*count);
115		})
116		.map(at!(1))
117		.collect::<Vec<_>>()
118		.await;
119
120	Ok(get_notifications::v3::Response {
121		next_token: next_token.map(to_small_string),
122		notifications,
123	})
124}
125
126/// # `GET /_matrix/client/r0/pushrules/`
127///
128/// Retrieves the push rules event for this user.
129pub(crate) async fn get_pushrules_all_route(
130	State(services): State<crate::State>,
131	body: Ruma<get_pushrules_all::v3::Request>,
132) -> Result<get_pushrules_all::v3::Response> {
133	let sender_user = body.sender_user();
134
135	let Some(content_value) = services
136		.account_data
137		.get_global::<CanonicalJsonObject>(sender_user, GlobalAccountDataEventType::PushRules)
138		.await
139		.ok()
140		.and_then(|event| event.get("content").cloned())
141		.filter(CanonicalJsonValue::is_object)
142	else {
143		// user somehow has non-existent push rule event. recreate it and return server
144		// default silently
145		return recreate_push_rules_and_return(&services, sender_user).await;
146	};
147
148	let account_data_content =
149		serde_json::from_value::<PushRulesEventContent>(content_value.into()).map_err(|e| {
150			err!(Database(warn!("Invalid push rules account data event in database: {e}")))
151		})?;
152
153	let mut global_ruleset = account_data_content.global;
154
155	// remove old deprecated mentions push rules as per MSC4210
156	// and update the stored server default push rules
157	#[expect(deprecated)]
158	{
159		use ruma::push::RuleKind::*;
160		if global_ruleset
161			.get(Override, PredefinedOverrideRuleId::ContainsDisplayName.as_str())
162			.is_some()
163			|| global_ruleset
164				.get(Override, PredefinedOverrideRuleId::RoomNotif.as_str())
165				.is_some()
166			|| global_ruleset
167				.get(Content, PredefinedContentRuleId::ContainsUserName.as_str())
168				.is_some()
169		{
170			global_ruleset
171				.remove(Override, PredefinedOverrideRuleId::ContainsDisplayName)
172				.ok();
173			global_ruleset
174				.remove(Override, PredefinedOverrideRuleId::RoomNotif)
175				.ok();
176			global_ruleset
177				.remove(Content, PredefinedContentRuleId::ContainsUserName)
178				.ok();
179
180			global_ruleset.update_with_server_default(Ruleset::server_default(sender_user));
181
182			let ty = GlobalAccountDataEventType::PushRules;
183			let event = PushRulesEvent {
184				content: PushRulesEventContent { global: global_ruleset.clone() },
185			};
186
187			services
188				.account_data
189				.update(None, sender_user, ty.to_string().into(), &serde_json::to_value(event)?)
190				.await?;
191		}
192	};
193
194	Ok(get_pushrules_all::v3::Response { global: global_ruleset })
195}
196
197/// # `GET /_matrix/client/r0/pushrules/global/`
198///
199/// Retrieves the push rules event for this user.
200///
201/// This appears to be the exact same as `GET /_matrix/client/r0/pushrules/`.
202pub(crate) async fn get_pushrules_global_route(
203	State(services): State<crate::State>,
204	body: Ruma<get_pushrules_global_scope::v3::Request>,
205) -> Result<get_pushrules_global_scope::v3::Response> {
206	let sender_user = body.sender_user();
207
208	let Some(content_value) = services
209		.account_data
210		.get_global::<CanonicalJsonObject>(sender_user, GlobalAccountDataEventType::PushRules)
211		.await
212		.ok()
213		.and_then(|event| event.get("content").cloned())
214		.filter(CanonicalJsonValue::is_object)
215	else {
216		// user somehow has non-existent push rule event. recreate it and return server
217		// default silently
218
219		let ty = GlobalAccountDataEventType::PushRules;
220		let event = PushRulesEvent {
221			content: PushRulesEventContent {
222				global: Ruleset::server_default(sender_user),
223			},
224		};
225
226		services
227			.account_data
228			.update(None, sender_user, ty.to_string().into(), &serde_json::to_value(event)?)
229			.await?;
230
231		return Ok(get_pushrules_global_scope::v3::Response {
232			global: Ruleset::server_default(sender_user),
233		});
234	};
235
236	let account_data_content =
237		serde_json::from_value::<PushRulesEventContent>(content_value.into()).map_err(|e| {
238			err!(Database(warn!("Invalid push rules account data event in database: {e}")))
239		})?;
240
241	let mut global_ruleset = account_data_content.global;
242
243	// remove old deprecated mentions push rules as per MSC4210
244	// and update the stored server default push rules
245	#[expect(deprecated)]
246	{
247		use ruma::push::RuleKind::*;
248		if global_ruleset
249			.get(Override, PredefinedOverrideRuleId::ContainsDisplayName.as_str())
250			.is_some()
251			|| global_ruleset
252				.get(Override, PredefinedOverrideRuleId::RoomNotif.as_str())
253				.is_some()
254			|| global_ruleset
255				.get(Content, PredefinedContentRuleId::ContainsUserName.as_str())
256				.is_some()
257		{
258			global_ruleset
259				.remove(Override, PredefinedOverrideRuleId::ContainsDisplayName)
260				.ok();
261			global_ruleset
262				.remove(Override, PredefinedOverrideRuleId::RoomNotif)
263				.ok();
264			global_ruleset
265				.remove(Content, PredefinedContentRuleId::ContainsUserName)
266				.ok();
267
268			global_ruleset.update_with_server_default(Ruleset::server_default(sender_user));
269
270			services
271				.account_data
272				.update(
273					None,
274					sender_user,
275					GlobalAccountDataEventType::PushRules
276						.to_string()
277						.into(),
278					&serde_json::to_value(PushRulesEvent {
279						content: PushRulesEventContent { global: global_ruleset.clone() },
280					})
281					.expect("to json always works"),
282				)
283				.await?;
284		}
285	};
286
287	Ok(get_pushrules_global_scope::v3::Response { global: global_ruleset })
288}
289
290/// # `GET /_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}`
291///
292/// Retrieves a single specified push rule for this user.
293pub(crate) async fn get_pushrule_route(
294	State(services): State<crate::State>,
295	body: Ruma<get_pushrule::v3::Request>,
296) -> Result<get_pushrule::v3::Response> {
297	let sender_user = body
298		.sender_user
299		.as_ref()
300		.expect("user is authenticated");
301
302	// remove old deprecated mentions push rules as per MSC4210
303	#[expect(deprecated)]
304	if body.rule_id.as_str() == PredefinedContentRuleId::ContainsUserName.as_str()
305		|| body.rule_id.as_str() == PredefinedOverrideRuleId::ContainsDisplayName.as_str()
306		|| body.rule_id.as_str() == PredefinedOverrideRuleId::RoomNotif.as_str()
307	{
308		return Err!(Request(NotFound("Push rule not found.")));
309	}
310
311	let event: PushRulesEvent = services
312		.account_data
313		.get_global(sender_user, GlobalAccountDataEventType::PushRules)
314		.await
315		.map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
316
317	let rule = event
318		.content
319		.global
320		.get(body.kind.clone(), &body.rule_id)
321		.map(Into::into);
322
323	if let Some(rule) = rule {
324		Ok(get_pushrule::v3::Response { rule })
325	} else {
326		Err!(Request(NotFound("Push rule not found.")))
327	}
328}
329
330/// # `PUT /_matrix/client/r0/pushrules/global/{kind}/{ruleId}`
331///
332/// Creates a single specified push rule for this user.
333pub(crate) async fn set_pushrule_route(
334	State(services): State<crate::State>,
335	body: Ruma<set_pushrule::v3::Request>,
336) -> Result<set_pushrule::v3::Response> {
337	let sender_user = body.sender_user();
338	let mut account_data: PushRulesEvent = services
339		.account_data
340		.get_global(sender_user, GlobalAccountDataEventType::PushRules)
341		.await
342		.map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
343
344	if let Err(error) = account_data.content.global.insert(
345		body.rule.clone(),
346		body.after.as_deref(),
347		body.before.as_deref(),
348	) {
349		use InsertPushRuleError::*;
350
351		return match error {
352			| ServerDefaultRuleId => Err!(Request(InvalidParam(
353				"Rule IDs starting with a dot are reserved for server-default rules."
354			))),
355			| RelativeToServerDefaultRule => Err!(Request(InvalidParam(
356				"Can't place a push rule relatively to a server-default rule."
357			))),
358			| BeforeHigherThanAfter => Err!(Request(InvalidParam(
359				"The before rule has a higher priority than the after rule."
360			))),
361			| InvalidRuleId =>
362				Err!(Request(InvalidParam("Rule ID containing invalid characters."))),
363
364			| UnknownRuleId =>
365				Err!(Request(NotFound("The before or after rule could not be found."))),
366
367			| _ => Err!(Request(InvalidParam("Invalid data."))),
368		};
369	}
370
371	let ty = GlobalAccountDataEventType::PushRules;
372	services
373		.account_data
374		.update(None, sender_user, ty.to_string().into(), &serde_json::to_value(account_data)?)
375		.await?;
376
377	Ok(set_pushrule::v3::Response {})
378}
379
380/// # `GET /_matrix/client/r0/pushrules/global/{kind}/{ruleId}/actions`
381///
382/// Gets the actions of a single specified push rule for this user.
383pub(crate) async fn get_pushrule_actions_route(
384	State(services): State<crate::State>,
385	body: Ruma<get_pushrule_actions::v3::Request>,
386) -> Result<get_pushrule_actions::v3::Response> {
387	let sender_user = body.sender_user();
388
389	// remove old deprecated mentions push rules as per MSC4210
390	#[expect(deprecated)]
391	if body.rule_id.as_str() == PredefinedContentRuleId::ContainsUserName.as_str()
392		|| body.rule_id.as_str() == PredefinedOverrideRuleId::ContainsDisplayName.as_str()
393		|| body.rule_id.as_str() == PredefinedOverrideRuleId::RoomNotif.as_str()
394	{
395		return Err!(Request(NotFound("Push rule not found.")));
396	}
397
398	let event: PushRulesEvent = services
399		.account_data
400		.get_global(sender_user, GlobalAccountDataEventType::PushRules)
401		.await
402		.map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
403
404	let actions = event
405		.content
406		.global
407		.get(body.kind.clone(), &body.rule_id)
408		.map(|rule| rule.actions().to_owned())
409		.ok_or_else(|| err!(Request(NotFound("Push rule not found."))))?;
410
411	Ok(get_pushrule_actions::v3::Response { actions })
412}
413
414/// # `PUT /_matrix/client/r0/pushrules/global/{kind}/{ruleId}/actions`
415///
416/// Sets the actions of a single specified push rule for this user.
417pub(crate) async fn set_pushrule_actions_route(
418	State(services): State<crate::State>,
419	body: Ruma<set_pushrule_actions::v3::Request>,
420) -> Result<set_pushrule_actions::v3::Response> {
421	let sender_user = body.sender_user();
422
423	let mut account_data: PushRulesEvent = services
424		.account_data
425		.get_global(sender_user, GlobalAccountDataEventType::PushRules)
426		.await
427		.map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
428
429	if account_data
430		.content
431		.global
432		.set_actions(body.kind.clone(), &body.rule_id, body.actions.clone().into())
433		.is_err()
434	{
435		return Err!(Request(NotFound("Push rule not found.")));
436	}
437
438	let ty = GlobalAccountDataEventType::PushRules;
439	services
440		.account_data
441		.update(None, sender_user, ty.to_string().into(), &serde_json::to_value(account_data)?)
442		.await?;
443
444	Ok(set_pushrule_actions::v3::Response {})
445}
446
447/// # `GET /_matrix/client/r0/pushrules/global/{kind}/{ruleId}/enabled`
448///
449/// Gets the enabled status of a single specified push rule for this user.
450pub(crate) async fn get_pushrule_enabled_route(
451	State(services): State<crate::State>,
452	body: Ruma<get_pushrule_enabled::v3::Request>,
453) -> Result<get_pushrule_enabled::v3::Response> {
454	let sender_user = body.sender_user();
455
456	// remove old deprecated mentions push rules as per MSC4210
457	#[expect(deprecated)]
458	if body.rule_id.as_str() == PredefinedContentRuleId::ContainsUserName.as_str()
459		|| body.rule_id.as_str() == PredefinedOverrideRuleId::ContainsDisplayName.as_str()
460		|| body.rule_id.as_str() == PredefinedOverrideRuleId::RoomNotif.as_str()
461	{
462		return Ok(get_pushrule_enabled::v3::Response { enabled: false });
463	}
464
465	let event: PushRulesEvent = services
466		.account_data
467		.get_global(sender_user, GlobalAccountDataEventType::PushRules)
468		.await
469		.map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
470
471	let enabled = event
472		.content
473		.global
474		.get(body.kind.clone(), &body.rule_id)
475		.map(ruma::push::AnyPushRuleRef::enabled)
476		.ok_or_else(|| err!(Request(NotFound("Push rule not found."))))?;
477
478	Ok(get_pushrule_enabled::v3::Response { enabled })
479}
480
481/// # `PUT /_matrix/client/r0/pushrules/global/{kind}/{ruleId}/enabled`
482///
483/// Sets the enabled status of a single specified push rule for this user.
484pub(crate) async fn set_pushrule_enabled_route(
485	State(services): State<crate::State>,
486	body: Ruma<set_pushrule_enabled::v3::Request>,
487) -> Result<set_pushrule_enabled::v3::Response> {
488	let sender_user = body.sender_user();
489
490	let mut account_data: PushRulesEvent = services
491		.account_data
492		.get_global(sender_user, GlobalAccountDataEventType::PushRules)
493		.await
494		.map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
495
496	if account_data
497		.content
498		.global
499		.set_enabled(body.kind.clone(), &body.rule_id, body.enabled)
500		.is_err()
501	{
502		return Err!(Request(NotFound("Push rule not found.")));
503	}
504
505	let ty = GlobalAccountDataEventType::PushRules;
506	services
507		.account_data
508		.update(None, sender_user, ty.to_string().into(), &serde_json::to_value(account_data)?)
509		.await?;
510
511	Ok(set_pushrule_enabled::v3::Response {})
512}
513
514/// # `DELETE /_matrix/client/r0/pushrules/global/{kind}/{ruleId}`
515///
516/// Deletes a single specified push rule for this user.
517pub(crate) async fn delete_pushrule_route(
518	State(services): State<crate::State>,
519	body: Ruma<delete_pushrule::v3::Request>,
520) -> Result<delete_pushrule::v3::Response> {
521	let sender_user = body.sender_user();
522
523	let mut account_data: PushRulesEvent = services
524		.account_data
525		.get_global(sender_user, GlobalAccountDataEventType::PushRules)
526		.await
527		.map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
528
529	if let Err(error) = account_data
530		.content
531		.global
532		.remove(body.kind.clone(), &body.rule_id)
533	{
534		return match error {
535			| RemovePushRuleError::ServerDefault =>
536				Err!(Request(InvalidParam("Cannot delete a server-default pushrule."))),
537
538			| RemovePushRuleError::NotFound => Err!(Request(NotFound("Push rule not found."))),
539
540			| _ => Err!(Request(InvalidParam("Invalid data."))),
541		};
542	}
543
544	let ty = GlobalAccountDataEventType::PushRules;
545	services
546		.account_data
547		.update(None, sender_user, ty.to_string().into(), &serde_json::to_value(account_data)?)
548		.await?;
549
550	Ok(delete_pushrule::v3::Response {})
551}
552
553/// # `GET /_matrix/client/r0/pushers`
554///
555/// Gets all currently active pushers for the sender user.
556pub(crate) async fn get_pushers_route(
557	State(services): State<crate::State>,
558	body: Ruma<get_pushers::v3::Request>,
559) -> Result<get_pushers::v3::Response> {
560	let sender_user = body.sender_user();
561
562	Ok(get_pushers::v3::Response {
563		pushers: services.pusher.get_pushers(sender_user).await,
564	})
565}
566
567/// # `POST /_matrix/client/r0/pushers/set`
568///
569/// Adds a pusher for the sender user.
570///
571/// - TODO: Handle `append`
572pub(crate) async fn set_pushers_route(
573	State(services): State<crate::State>,
574	body: Ruma<set_pusher::v3::Request>,
575) -> Result<set_pusher::v3::Response> {
576	let sender_user = body.sender_user();
577
578	services
579		.pusher
580		.set_pusher(sender_user, body.sender_device()?, &body.action)
581		.await?;
582
583	Ok(set_pusher::v3::Response::new())
584}
585
586/// user somehow has bad push rules, these must always exist per spec.
587/// so recreate it and return server default silently
588async fn recreate_push_rules_and_return(
589	services: &Services,
590	sender_user: &ruma::UserId,
591) -> Result<get_pushrules_all::v3::Response> {
592	let ty = GlobalAccountDataEventType::PushRules;
593	let event = PushRulesEvent {
594		content: PushRulesEventContent {
595			global: Ruleset::server_default(sender_user),
596		},
597	};
598
599	services
600		.account_data
601		.update(None, sender_user, ty.to_string().into(), &serde_json::to_value(event)?)
602		.await?;
603
604	Ok(get_pushrules_all::v3::Response {
605		global: Ruleset::server_default(sender_user),
606	})
607}