1use axum::extract::State;
2use futures::StreamExt;
3use ruma::{
4 CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch,
5 api::client::push::{
6 delete_pushrule, get_notifications, get_pushers, get_pushrule, get_pushrule_actions,
7 get_pushrule_enabled, get_pushrules_all, get_pushrules_global_scope, set_pusher,
8 set_pushrule, set_pushrule_actions, set_pushrule_enabled,
9 },
10 events::{
11 GlobalAccountDataEventType,
12 push_rules::{PushRulesEvent, PushRulesEventContent},
13 },
14 push::{
15 Action, InsertPushRuleError, PredefinedContentRuleId, PredefinedOverrideRuleId,
16 RemovePushRuleError, Ruleset,
17 },
18};
19use tuwunel_core::{
20 Err, Result, at, err,
21 matrix::{Event, PduId},
22 utils::{
23 stream::{ReadyExt, WidebandExt},
24 string::to_small_string,
25 },
26};
27use tuwunel_service::Services;
28
29use crate::Ruma;
30
31pub(crate) async fn get_notifications_route(
36 State(services): State<crate::State>,
37 body: Ruma<get_notifications::v3::Request>,
38) -> Result<get_notifications::v3::Response> {
39 use get_notifications::v3::Notification;
40
41 let sender_user = body.sender_user();
42
43 let from = body
44 .body
45 .from
46 .as_deref()
47 .map(str::parse)
48 .transpose()
49 .map_err(|e| err!(Request(InvalidParam("Invalid `from' parameter: {e}"))))?;
50
51 let limit: usize = body
52 .body
53 .limit
54 .map(TryInto::try_into)
55 .transpose()?
56 .unwrap_or(50)
57 .clamp(1, 100);
58
59 let only_highlight = body
60 .body
61 .only
62 .as_deref()
63 .is_some_and(|only| only.contains("highlight"));
64
65 let mut next_token: Option<u64> = None;
66 let notifications = services
67 .pusher
68 .get_notifications(sender_user, from)
69 .ready_filter(|(_, notify)| {
70 if only_highlight && !notify.actions.iter().any(Action::is_highlight) {
71 return false;
72 }
73
74 true
75 })
76 .wide_filter_map(async |(count, notify)| {
77 let pdu_id = PduId {
78 shortroomid: notify.sroomid,
79 count: count.into(),
80 };
81
82 let event = services
83 .timeline
84 .get_pdu_from_id(&pdu_id.into())
85 .await
86 .ok()
87 .filter(|event| !event.is_redacted())?;
88
89 let read = services
90 .pusher
91 .last_notification_read(sender_user, event.room_id())
92 .await
93 .is_ok_and(|last_read| last_read.ge(&count));
94
95 let ts = notify
96 .ts
97 .try_into()
98 .map(MilliSecondsSinceUnixEpoch)
99 .ok()?;
100
101 let notification = Notification {
102 room_id: event.room_id().into(),
103 event: event.into_format(),
104 ts,
105 read,
106 profile_tag: notify.tag,
107 actions: notify.actions,
108 };
109
110 Some((count, notification))
111 })
112 .take(limit)
113 .inspect(|(count, _)| {
114 next_token.replace(*count);
115 })
116 .map(at!(1))
117 .collect::<Vec<_>>()
118 .await;
119
120 Ok(get_notifications::v3::Response {
121 next_token: next_token.map(to_small_string),
122 notifications,
123 })
124}
125
126pub(crate) async fn get_pushrules_all_route(
130 State(services): State<crate::State>,
131 body: Ruma<get_pushrules_all::v3::Request>,
132) -> Result<get_pushrules_all::v3::Response> {
133 let sender_user = body.sender_user();
134
135 let Some(content_value) = services
136 .account_data
137 .get_global::<CanonicalJsonObject>(sender_user, GlobalAccountDataEventType::PushRules)
138 .await
139 .ok()
140 .and_then(|event| event.get("content").cloned())
141 .filter(CanonicalJsonValue::is_object)
142 else {
143 return recreate_push_rules_and_return(&services, sender_user).await;
146 };
147
148 let account_data_content =
149 serde_json::from_value::<PushRulesEventContent>(content_value.into()).map_err(|e| {
150 err!(Database(warn!("Invalid push rules account data event in database: {e}")))
151 })?;
152
153 let mut global_ruleset = account_data_content.global;
154
155 #[expect(deprecated)]
158 {
159 use ruma::push::RuleKind::*;
160 if global_ruleset
161 .get(Override, PredefinedOverrideRuleId::ContainsDisplayName.as_str())
162 .is_some()
163 || global_ruleset
164 .get(Override, PredefinedOverrideRuleId::RoomNotif.as_str())
165 .is_some()
166 || global_ruleset
167 .get(Content, PredefinedContentRuleId::ContainsUserName.as_str())
168 .is_some()
169 {
170 global_ruleset
171 .remove(Override, PredefinedOverrideRuleId::ContainsDisplayName)
172 .ok();
173 global_ruleset
174 .remove(Override, PredefinedOverrideRuleId::RoomNotif)
175 .ok();
176 global_ruleset
177 .remove(Content, PredefinedContentRuleId::ContainsUserName)
178 .ok();
179
180 global_ruleset.update_with_server_default(Ruleset::server_default(sender_user));
181
182 let ty = GlobalAccountDataEventType::PushRules;
183 let event = PushRulesEvent {
184 content: PushRulesEventContent { global: global_ruleset.clone() },
185 };
186
187 services
188 .account_data
189 .update(None, sender_user, ty.to_string().into(), &serde_json::to_value(event)?)
190 .await?;
191 }
192 };
193
194 Ok(get_pushrules_all::v3::Response { global: global_ruleset })
195}
196
197pub(crate) async fn get_pushrules_global_route(
203 State(services): State<crate::State>,
204 body: Ruma<get_pushrules_global_scope::v3::Request>,
205) -> Result<get_pushrules_global_scope::v3::Response> {
206 let sender_user = body.sender_user();
207
208 let Some(content_value) = services
209 .account_data
210 .get_global::<CanonicalJsonObject>(sender_user, GlobalAccountDataEventType::PushRules)
211 .await
212 .ok()
213 .and_then(|event| event.get("content").cloned())
214 .filter(CanonicalJsonValue::is_object)
215 else {
216 let ty = GlobalAccountDataEventType::PushRules;
220 let event = PushRulesEvent {
221 content: PushRulesEventContent {
222 global: Ruleset::server_default(sender_user),
223 },
224 };
225
226 services
227 .account_data
228 .update(None, sender_user, ty.to_string().into(), &serde_json::to_value(event)?)
229 .await?;
230
231 return Ok(get_pushrules_global_scope::v3::Response {
232 global: Ruleset::server_default(sender_user),
233 });
234 };
235
236 let account_data_content =
237 serde_json::from_value::<PushRulesEventContent>(content_value.into()).map_err(|e| {
238 err!(Database(warn!("Invalid push rules account data event in database: {e}")))
239 })?;
240
241 let mut global_ruleset = account_data_content.global;
242
243 #[expect(deprecated)]
246 {
247 use ruma::push::RuleKind::*;
248 if global_ruleset
249 .get(Override, PredefinedOverrideRuleId::ContainsDisplayName.as_str())
250 .is_some()
251 || global_ruleset
252 .get(Override, PredefinedOverrideRuleId::RoomNotif.as_str())
253 .is_some()
254 || global_ruleset
255 .get(Content, PredefinedContentRuleId::ContainsUserName.as_str())
256 .is_some()
257 {
258 global_ruleset
259 .remove(Override, PredefinedOverrideRuleId::ContainsDisplayName)
260 .ok();
261 global_ruleset
262 .remove(Override, PredefinedOverrideRuleId::RoomNotif)
263 .ok();
264 global_ruleset
265 .remove(Content, PredefinedContentRuleId::ContainsUserName)
266 .ok();
267
268 global_ruleset.update_with_server_default(Ruleset::server_default(sender_user));
269
270 services
271 .account_data
272 .update(
273 None,
274 sender_user,
275 GlobalAccountDataEventType::PushRules
276 .to_string()
277 .into(),
278 &serde_json::to_value(PushRulesEvent {
279 content: PushRulesEventContent { global: global_ruleset.clone() },
280 })
281 .expect("to json always works"),
282 )
283 .await?;
284 }
285 };
286
287 Ok(get_pushrules_global_scope::v3::Response { global: global_ruleset })
288}
289
290pub(crate) async fn get_pushrule_route(
294 State(services): State<crate::State>,
295 body: Ruma<get_pushrule::v3::Request>,
296) -> Result<get_pushrule::v3::Response> {
297 let sender_user = body
298 .sender_user
299 .as_ref()
300 .expect("user is authenticated");
301
302 #[expect(deprecated)]
304 if body.rule_id.as_str() == PredefinedContentRuleId::ContainsUserName.as_str()
305 || body.rule_id.as_str() == PredefinedOverrideRuleId::ContainsDisplayName.as_str()
306 || body.rule_id.as_str() == PredefinedOverrideRuleId::RoomNotif.as_str()
307 {
308 return Err!(Request(NotFound("Push rule not found.")));
309 }
310
311 let event: PushRulesEvent = services
312 .account_data
313 .get_global(sender_user, GlobalAccountDataEventType::PushRules)
314 .await
315 .map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
316
317 let rule = event
318 .content
319 .global
320 .get(body.kind.clone(), &body.rule_id)
321 .map(Into::into);
322
323 if let Some(rule) = rule {
324 Ok(get_pushrule::v3::Response { rule })
325 } else {
326 Err!(Request(NotFound("Push rule not found.")))
327 }
328}
329
330pub(crate) async fn set_pushrule_route(
334 State(services): State<crate::State>,
335 body: Ruma<set_pushrule::v3::Request>,
336) -> Result<set_pushrule::v3::Response> {
337 let sender_user = body.sender_user();
338 let mut account_data: PushRulesEvent = services
339 .account_data
340 .get_global(sender_user, GlobalAccountDataEventType::PushRules)
341 .await
342 .map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
343
344 if let Err(error) = account_data.content.global.insert(
345 body.rule.clone(),
346 body.after.as_deref(),
347 body.before.as_deref(),
348 ) {
349 use InsertPushRuleError::*;
350
351 return match error {
352 | ServerDefaultRuleId => Err!(Request(InvalidParam(
353 "Rule IDs starting with a dot are reserved for server-default rules."
354 ))),
355 | RelativeToServerDefaultRule => Err!(Request(InvalidParam(
356 "Can't place a push rule relatively to a server-default rule."
357 ))),
358 | BeforeHigherThanAfter => Err!(Request(InvalidParam(
359 "The before rule has a higher priority than the after rule."
360 ))),
361 | InvalidRuleId =>
362 Err!(Request(InvalidParam("Rule ID containing invalid characters."))),
363
364 | UnknownRuleId =>
365 Err!(Request(NotFound("The before or after rule could not be found."))),
366
367 | _ => Err!(Request(InvalidParam("Invalid data."))),
368 };
369 }
370
371 let ty = GlobalAccountDataEventType::PushRules;
372 services
373 .account_data
374 .update(None, sender_user, ty.to_string().into(), &serde_json::to_value(account_data)?)
375 .await?;
376
377 Ok(set_pushrule::v3::Response {})
378}
379
380pub(crate) async fn get_pushrule_actions_route(
384 State(services): State<crate::State>,
385 body: Ruma<get_pushrule_actions::v3::Request>,
386) -> Result<get_pushrule_actions::v3::Response> {
387 let sender_user = body.sender_user();
388
389 #[expect(deprecated)]
391 if body.rule_id.as_str() == PredefinedContentRuleId::ContainsUserName.as_str()
392 || body.rule_id.as_str() == PredefinedOverrideRuleId::ContainsDisplayName.as_str()
393 || body.rule_id.as_str() == PredefinedOverrideRuleId::RoomNotif.as_str()
394 {
395 return Err!(Request(NotFound("Push rule not found.")));
396 }
397
398 let event: PushRulesEvent = services
399 .account_data
400 .get_global(sender_user, GlobalAccountDataEventType::PushRules)
401 .await
402 .map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
403
404 let actions = event
405 .content
406 .global
407 .get(body.kind.clone(), &body.rule_id)
408 .map(|rule| rule.actions().to_owned())
409 .ok_or_else(|| err!(Request(NotFound("Push rule not found."))))?;
410
411 Ok(get_pushrule_actions::v3::Response { actions })
412}
413
414pub(crate) async fn set_pushrule_actions_route(
418 State(services): State<crate::State>,
419 body: Ruma<set_pushrule_actions::v3::Request>,
420) -> Result<set_pushrule_actions::v3::Response> {
421 let sender_user = body.sender_user();
422
423 let mut account_data: PushRulesEvent = services
424 .account_data
425 .get_global(sender_user, GlobalAccountDataEventType::PushRules)
426 .await
427 .map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
428
429 if account_data
430 .content
431 .global
432 .set_actions(body.kind.clone(), &body.rule_id, body.actions.clone().into())
433 .is_err()
434 {
435 return Err!(Request(NotFound("Push rule not found.")));
436 }
437
438 let ty = GlobalAccountDataEventType::PushRules;
439 services
440 .account_data
441 .update(None, sender_user, ty.to_string().into(), &serde_json::to_value(account_data)?)
442 .await?;
443
444 Ok(set_pushrule_actions::v3::Response {})
445}
446
447pub(crate) async fn get_pushrule_enabled_route(
451 State(services): State<crate::State>,
452 body: Ruma<get_pushrule_enabled::v3::Request>,
453) -> Result<get_pushrule_enabled::v3::Response> {
454 let sender_user = body.sender_user();
455
456 #[expect(deprecated)]
458 if body.rule_id.as_str() == PredefinedContentRuleId::ContainsUserName.as_str()
459 || body.rule_id.as_str() == PredefinedOverrideRuleId::ContainsDisplayName.as_str()
460 || body.rule_id.as_str() == PredefinedOverrideRuleId::RoomNotif.as_str()
461 {
462 return Ok(get_pushrule_enabled::v3::Response { enabled: false });
463 }
464
465 let event: PushRulesEvent = services
466 .account_data
467 .get_global(sender_user, GlobalAccountDataEventType::PushRules)
468 .await
469 .map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
470
471 let enabled = event
472 .content
473 .global
474 .get(body.kind.clone(), &body.rule_id)
475 .map(ruma::push::AnyPushRuleRef::enabled)
476 .ok_or_else(|| err!(Request(NotFound("Push rule not found."))))?;
477
478 Ok(get_pushrule_enabled::v3::Response { enabled })
479}
480
481pub(crate) async fn set_pushrule_enabled_route(
485 State(services): State<crate::State>,
486 body: Ruma<set_pushrule_enabled::v3::Request>,
487) -> Result<set_pushrule_enabled::v3::Response> {
488 let sender_user = body.sender_user();
489
490 let mut account_data: PushRulesEvent = services
491 .account_data
492 .get_global(sender_user, GlobalAccountDataEventType::PushRules)
493 .await
494 .map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
495
496 if account_data
497 .content
498 .global
499 .set_enabled(body.kind.clone(), &body.rule_id, body.enabled)
500 .is_err()
501 {
502 return Err!(Request(NotFound("Push rule not found.")));
503 }
504
505 let ty = GlobalAccountDataEventType::PushRules;
506 services
507 .account_data
508 .update(None, sender_user, ty.to_string().into(), &serde_json::to_value(account_data)?)
509 .await?;
510
511 Ok(set_pushrule_enabled::v3::Response {})
512}
513
514pub(crate) async fn delete_pushrule_route(
518 State(services): State<crate::State>,
519 body: Ruma<delete_pushrule::v3::Request>,
520) -> Result<delete_pushrule::v3::Response> {
521 let sender_user = body.sender_user();
522
523 let mut account_data: PushRulesEvent = services
524 .account_data
525 .get_global(sender_user, GlobalAccountDataEventType::PushRules)
526 .await
527 .map_err(|_| err!(Request(NotFound("PushRules event not found."))))?;
528
529 if let Err(error) = account_data
530 .content
531 .global
532 .remove(body.kind.clone(), &body.rule_id)
533 {
534 return match error {
535 | RemovePushRuleError::ServerDefault =>
536 Err!(Request(InvalidParam("Cannot delete a server-default pushrule."))),
537
538 | RemovePushRuleError::NotFound => Err!(Request(NotFound("Push rule not found."))),
539
540 | _ => Err!(Request(InvalidParam("Invalid data."))),
541 };
542 }
543
544 let ty = GlobalAccountDataEventType::PushRules;
545 services
546 .account_data
547 .update(None, sender_user, ty.to_string().into(), &serde_json::to_value(account_data)?)
548 .await?;
549
550 Ok(delete_pushrule::v3::Response {})
551}
552
553pub(crate) async fn get_pushers_route(
557 State(services): State<crate::State>,
558 body: Ruma<get_pushers::v3::Request>,
559) -> Result<get_pushers::v3::Response> {
560 let sender_user = body.sender_user();
561
562 Ok(get_pushers::v3::Response {
563 pushers: services.pusher.get_pushers(sender_user).await,
564 })
565}
566
567pub(crate) async fn set_pushers_route(
573 State(services): State<crate::State>,
574 body: Ruma<set_pusher::v3::Request>,
575) -> Result<set_pusher::v3::Response> {
576 let sender_user = body.sender_user();
577
578 services
579 .pusher
580 .set_pusher(sender_user, body.sender_device()?, &body.action)
581 .await?;
582
583 Ok(set_pusher::v3::Response::new())
584}
585
586async fn recreate_push_rules_and_return(
589 services: &Services,
590 sender_user: &ruma::UserId,
591) -> Result<get_pushrules_all::v3::Response> {
592 let ty = GlobalAccountDataEventType::PushRules;
593 let event = PushRulesEvent {
594 content: PushRulesEventContent {
595 global: Ruleset::server_default(sender_user),
596 },
597 };
598
599 services
600 .account_data
601 .update(None, sender_user, ty.to_string().into(), &serde_json::to_value(event)?)
602 .await?;
603
604 Ok(get_pushrules_all::v3::Response {
605 global: Ruleset::server_default(sender_user),
606 })
607}