tuwunel_api/client/
dehydrated_device.rs1use axum::extract::State;
2use futures::StreamExt;
3use ruma::api::client::dehydrated_device::{
4 delete_dehydrated_device::unstable as delete_dehydrated_device,
5 get_dehydrated_device::unstable as get_dehydrated_device, get_events::unstable as get_events,
6 put_dehydrated_device::unstable as put_dehydrated_device,
7};
8use tuwunel_core::{Err, Result, at, utils::result::IsErrOr};
9
10use crate::{ClientIp, Ruma};
11
12const MAX_BATCH_EVENTS: usize = 50;
13
14#[tracing::instrument(skip_all, fields(%client))]
18pub(crate) async fn put_dehydrated_device_route(
19 State(services): State<crate::State>,
20 ClientIp(client): ClientIp,
21 body: Ruma<put_dehydrated_device::Request>,
22) -> Result<put_dehydrated_device::Response> {
23 let sender_user = body
24 .sender_user
25 .as_deref()
26 .expect("AccessToken authentication required");
27
28 let device_id = body.body.device_id.clone();
29
30 services
31 .users
32 .set_dehydrated_device(sender_user, body.body)
33 .await?;
34
35 Ok(put_dehydrated_device::Response { device_id })
36}
37
38#[tracing::instrument(skip_all, fields(%client))]
42pub(crate) async fn delete_dehydrated_device_route(
43 State(services): State<crate::State>,
44 ClientIp(client): ClientIp,
45 body: Ruma<delete_dehydrated_device::Request>,
46) -> Result<delete_dehydrated_device::Response> {
47 let sender_user = body.sender_user();
48
49 let device_id = services
50 .users
51 .get_dehydrated_device_id(sender_user)
52 .await?;
53
54 services
55 .users
56 .remove_device(sender_user, &device_id)
57 .await;
58
59 Ok(delete_dehydrated_device::Response { device_id })
60}
61
62#[tracing::instrument(skip_all, fields(%client))]
66pub(crate) async fn get_dehydrated_device_route(
67 State(services): State<crate::State>,
68 ClientIp(client): ClientIp,
69 body: Ruma<get_dehydrated_device::Request>,
70) -> Result<get_dehydrated_device::Response> {
71 let sender_user = body.sender_user();
72
73 let device = services
74 .users
75 .get_dehydrated_device(sender_user)
76 .await?;
77
78 Ok(get_dehydrated_device::Response {
79 device_id: device.device_id,
80 device_data: device.device_data,
81 })
82}
83
84#[tracing::instrument(skip_all, fields(%client))]
88pub(crate) async fn get_dehydrated_events_route(
89 State(services): State<crate::State>,
90 ClientIp(client): ClientIp,
91 body: Ruma<get_events::Request>,
92) -> Result<get_events::Response> {
93 let sender_user = body.sender_user();
94
95 let device_id = &body.body.device_id;
96 let existing_id = services
97 .users
98 .get_dehydrated_device_id(sender_user)
99 .await;
100
101 if existing_id
102 .as_ref()
103 .is_err_or(|existing_id| existing_id != device_id)
104 {
105 return Err!(Request(Forbidden("Not the dehydrated device_id.")));
106 }
107
108 let since: Option<u64> = body
109 .body
110 .next_batch
111 .as_deref()
112 .map(str::parse)
113 .transpose()?;
114
115 let mut next_batch: Option<u64> = None;
116 let events = services
117 .users
118 .get_to_device_events(sender_user, device_id, since, None)
119 .take(MAX_BATCH_EVENTS)
120 .inspect(|&(count, _)| {
121 next_batch.replace(count);
122 })
123 .map(at!(1))
124 .collect()
125 .await;
126
127 Ok(get_events::Response {
128 events,
129 next_batch: next_batch.as_ref().map(ToString::to_string),
130 })
131}