tuwunel_service/account_data/
mod.rs1mod direct;
2mod room_tags;
3
4use std::sync::Arc;
5
6use futures::{Stream, StreamExt, TryFutureExt, pin_mut};
7use ruma::{
8 RoomId, UserId,
9 events::{
10 AnyGlobalAccountDataEvent, AnyRawAccountDataEvent, AnyRoomAccountDataEvent,
11 GlobalAccountDataEventType, RoomAccountDataEventType,
12 },
13 serde::Raw,
14};
15use serde::Deserialize;
16use serde_json::json;
17use tuwunel_core::{
18 Err, Result, at, err, implement,
19 utils::{ReadyExt, result::LogErr, stream::TryIgnore},
20};
21use tuwunel_database::{Deserialized, Handle, Ignore, Interfix, Json, Map};
22
23pub struct Service {
24 services: Arc<crate::services::OnceServices>,
25 db: Data,
26}
27
28struct Data {
29 roomuserdataid_accountdata: Arc<Map>,
30 roomusertype_roomuserdataid: Arc<Map>,
31}
32
33impl crate::Service for Service {
34 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
35 Ok(Arc::new(Self {
36 services: args.services.clone(),
37 db: Data {
38 roomuserdataid_accountdata: args.db["roomuserdataid_accountdata"].clone(),
39 roomusertype_roomuserdataid: args.db["roomusertype_roomuserdataid"].clone(),
40 },
41 }))
42 }
43
44 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
45}
46
47#[implement(Service)]
50pub async fn update(
51 &self,
52 room_id: Option<&RoomId>,
53 user_id: &UserId,
54 event_type: RoomAccountDataEventType,
55 data: &serde_json::Value,
56) -> Result {
57 if data.get("type").is_none() || data.get("content").is_none() {
58 return Err!(Request(InvalidParam("Account data doesn't have all required fields.")));
59 }
60
61 let count = self.services.globals.next_count();
62 let roomuserdataid = (room_id, user_id, *count, &event_type);
63 self.db
64 .roomuserdataid_accountdata
65 .put(roomuserdataid, Json(data));
66
67 let key = (room_id, user_id, &event_type);
68 let prev = self
69 .db
70 .roomusertype_roomuserdataid
71 .qry(&key)
72 .await;
73
74 self.db
75 .roomusertype_roomuserdataid
76 .put(key, roomuserdataid);
77
78 if let Ok(prev) = prev {
80 self.db.roomuserdataid_accountdata.remove(&prev);
81 }
82
83 Ok(())
84}
85
86#[implement(Service)]
90pub async fn delete(
91 &self,
92 room_id: Option<&RoomId>,
93 user_id: &UserId,
94 event_type: RoomAccountDataEventType,
95) -> Result {
96 let tombstone = json!({
97 "type": event_type.to_string(),
98 "content": {},
99 });
100
101 self.update(room_id, user_id, event_type, &tombstone)
102 .await
103}
104
105#[implement(Service)]
107pub async fn get_global<T>(&self, user_id: &UserId, kind: GlobalAccountDataEventType) -> Result<T>
108where
109 T: for<'de> Deserialize<'de>,
110{
111 self.get_raw(None, user_id, &kind.to_string())
112 .await
113 .deserialized()
114}
115
116#[implement(Service)]
118pub async fn get_room<T>(
119 &self,
120 room_id: &RoomId,
121 user_id: &UserId,
122 kind: RoomAccountDataEventType,
123) -> Result<T>
124where
125 T: for<'de> Deserialize<'de>,
126{
127 self.get_raw(Some(room_id), user_id, &kind.to_string())
128 .await
129 .deserialized()
130}
131
132#[implement(Service)]
133pub async fn get_raw(
134 &self,
135 room_id: Option<&RoomId>,
136 user_id: &UserId,
137 kind: &str,
138) -> Result<Handle<'_>> {
139 let key = (room_id, user_id, kind.to_owned());
140 self.db
141 .roomusertype_roomuserdataid
142 .qry(&key)
143 .and_then(|roomuserdataid| {
144 self.db
145 .roomuserdataid_accountdata
146 .get(&roomuserdataid)
147 })
148 .await
149}
150
151#[implement(Service)]
153pub fn changes_since<'a>(
154 &'a self,
155 room_id: Option<&'a RoomId>,
156 user_id: &'a UserId,
157 since: u64,
158 to: Option<u64>,
159) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
160 type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
161
162 let first_possible = (room_id, user_id, since.saturating_add(1));
164
165 self.db
166 .roomuserdataid_accountdata
167 .stream_from(&first_possible)
168 .ignore_err()
169 .ready_take_while(move |((room_id_, user_id_, count, _), _): &(Key<'_>, _)| {
170 room_id == *room_id_ && user_id == *user_id_ && to.is_none_or(|to| *count <= to)
171 })
172 .map(move |(_, v)| {
173 match room_id {
174 | Some(_) => serde_json::from_slice::<Raw<AnyRoomAccountDataEvent>>(v)
175 .map(AnyRawAccountDataEvent::Room),
176 | None => serde_json::from_slice::<Raw<AnyGlobalAccountDataEvent>>(v)
177 .map(AnyRawAccountDataEvent::Global),
178 }
179 .map_err(|e| err!(Database("Database contains invalid account data: {e}")))
180 .log_err()
181 })
182 .ignore_err()
183}
184
185#[implement(Service)]
190pub async fn erase_user(&self, user_id: &UserId, room_id: Option<&RoomId>) {
191 let prefix = (room_id, user_id, Interfix);
192
193 self.db
194 .roomuserdataid_accountdata
195 .keys_prefix_raw(&prefix)
196 .ignore_err()
197 .ready_for_each(|key| self.db.roomuserdataid_accountdata.remove(key))
198 .await;
199
200 self.db
201 .roomusertype_roomuserdataid
202 .keys_prefix_raw(&prefix)
203 .ignore_err()
204 .ready_for_each(|key| self.db.roomusertype_roomuserdataid.remove(key))
205 .await;
206}
207
208#[implement(Service)]
210pub async fn last_count<'a>(
211 &'a self,
212 room_id: Option<&'a RoomId>,
213 user_id: &'a UserId,
214 upper: Option<u64>,
215) -> Result<u64> {
216 type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
217
218 let upper = upper.unwrap_or(u64::MAX);
219 let key = (room_id, user_id, upper, Interfix);
220 let keys = self
221 .db
222 .roomuserdataid_accountdata
223 .rev_keys_from(&key)
224 .ignore_err()
225 .ready_take_while(move |(room_id_, user_id_, ..): &Key<'_>| {
226 room_id == *room_id_ && user_id == *user_id_
227 })
228 .map(at!(2));
229
230 pin_mut!(keys);
231 keys.next()
232 .await
233 .ok_or_else(|| err!(Request(NotFound("No account data found."))))
234}