tuwunel_service/account_data/
mod.rs1mod direct;
2mod push_rules;
3mod room_tags;
4
5use std::sync::Arc;
6
7use futures::{Stream, StreamExt, TryFutureExt, pin_mut};
8use ruma::{
9 RoomId, UserId,
10 events::{
11 AnyGlobalAccountDataEvent, AnyRawAccountDataEvent, AnyRoomAccountDataEvent,
12 GlobalAccountDataEventType, RoomAccountDataEventType,
13 },
14 serde::Raw,
15};
16use serde::Deserialize;
17use serde_json::json;
18use tuwunel_core::{
19 Err, Result, at, err, implement,
20 utils::{ReadyExt, result::LogErr, stream::TryIgnore},
21};
22use tuwunel_database::{Deserialized, Handle, Ignore, Interfix, Json, Map};
23
24pub struct Service {
25 services: Arc<crate::services::OnceServices>,
26 db: Data,
27}
28
29struct Data {
30 roomuserdataid_accountdata: Arc<Map>,
31 roomusertype_roomuserdataid: Arc<Map>,
32}
33
34impl crate::Service for Service {
35 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
36 Ok(Arc::new(Self {
37 services: args.services.clone(),
38 db: Data {
39 roomuserdataid_accountdata: args.db["roomuserdataid_accountdata"].clone(),
40 roomusertype_roomuserdataid: args.db["roomusertype_roomuserdataid"].clone(),
41 },
42 }))
43 }
44
45 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
46}
47
48#[implement(Service)]
51pub async fn update(
52 &self,
53 room_id: Option<&RoomId>,
54 user_id: &UserId,
55 event_type: RoomAccountDataEventType,
56 data: &serde_json::Value,
57) -> Result {
58 if data.get("type").is_none() || data.get("content").is_none() {
59 return Err!(Request(InvalidParam("Account data doesn't have all required fields.")));
60 }
61
62 let count = self.services.globals.next_count();
63 let roomuserdataid = (room_id, user_id, *count, &event_type);
64 self.db
65 .roomuserdataid_accountdata
66 .put(roomuserdataid, Json(data));
67
68 let key = (room_id, user_id, &event_type);
69 let prev = self
70 .db
71 .roomusertype_roomuserdataid
72 .qry(&key)
73 .await;
74
75 self.db
76 .roomusertype_roomuserdataid
77 .put(key, roomuserdataid);
78
79 if let Ok(prev) = prev {
81 self.db.roomuserdataid_accountdata.remove(&prev);
82 }
83
84 Ok(())
85}
86
87#[implement(Service)]
91pub async fn delete(
92 &self,
93 room_id: Option<&RoomId>,
94 user_id: &UserId,
95 event_type: RoomAccountDataEventType,
96) -> Result {
97 let tombstone = json!({
98 "type": event_type.to_string(),
99 "content": {},
100 });
101
102 self.update(room_id, user_id, event_type, &tombstone)
103 .await
104}
105
106#[implement(Service)]
108pub async fn get_global<T>(&self, user_id: &UserId, kind: GlobalAccountDataEventType) -> Result<T>
109where
110 T: for<'de> Deserialize<'de>,
111{
112 self.get_raw(None, user_id, &kind.to_string())
113 .await
114 .deserialized()
115}
116
117#[implement(Service)]
119pub async fn get_room<T>(
120 &self,
121 room_id: &RoomId,
122 user_id: &UserId,
123 kind: RoomAccountDataEventType,
124) -> Result<T>
125where
126 T: for<'de> Deserialize<'de>,
127{
128 self.get_raw(Some(room_id), user_id, &kind.to_string())
129 .await
130 .deserialized()
131}
132
133#[implement(Service)]
134pub async fn get_raw(
135 &self,
136 room_id: Option<&RoomId>,
137 user_id: &UserId,
138 kind: &str,
139) -> Result<Handle<'_>> {
140 let key = (room_id, user_id, kind.to_owned());
141 self.db
142 .roomusertype_roomuserdataid
143 .qry(&key)
144 .and_then(|roomuserdataid| {
145 self.db
146 .roomuserdataid_accountdata
147 .get(&roomuserdataid)
148 })
149 .await
150}
151
152#[implement(Service)]
154pub fn changes_since<'a>(
155 &'a self,
156 room_id: Option<&'a RoomId>,
157 user_id: &'a UserId,
158 since: u64,
159 to: Option<u64>,
160) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
161 type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
162
163 let first_possible = (room_id, user_id, since.saturating_add(1));
165
166 self.db
167 .roomuserdataid_accountdata
168 .stream_from(&first_possible)
169 .ignore_err()
170 .ready_take_while(move |((room_id_, user_id_, count, _), _): &(Key<'_>, _)| {
171 room_id == *room_id_ && user_id == *user_id_ && to.is_none_or(|to| *count <= to)
172 })
173 .map(move |(_, v)| {
174 match room_id {
175 | Some(_) => serde_json::from_slice::<Raw<AnyRoomAccountDataEvent>>(v)
176 .map(AnyRawAccountDataEvent::Room),
177 | None => serde_json::from_slice::<Raw<AnyGlobalAccountDataEvent>>(v)
178 .map(AnyRawAccountDataEvent::Global),
179 }
180 .map_err(|e| err!(Database("Database contains invalid account data: {e}")))
181 .log_err()
182 })
183 .ignore_err()
184}
185
186#[implement(Service)]
191pub async fn erase_user(&self, user_id: &UserId, room_id: Option<&RoomId>) {
192 let prefix = (room_id, user_id, Interfix);
193
194 self.db
195 .roomuserdataid_accountdata
196 .keys_prefix_raw(&prefix)
197 .ignore_err()
198 .ready_for_each(|key| self.db.roomuserdataid_accountdata.remove(key))
199 .await;
200
201 self.db
202 .roomusertype_roomuserdataid
203 .keys_prefix_raw(&prefix)
204 .ignore_err()
205 .ready_for_each(|key| self.db.roomusertype_roomuserdataid.remove(key))
206 .await;
207}
208
209#[implement(Service)]
211pub async fn last_count<'a>(
212 &'a self,
213 room_id: Option<&'a RoomId>,
214 user_id: &'a UserId,
215 upper: Option<u64>,
216) -> Result<u64> {
217 type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
218
219 let upper = upper.unwrap_or(u64::MAX);
220 let key = (room_id, user_id, upper, Interfix);
221 let keys = self
222 .db
223 .roomuserdataid_accountdata
224 .rev_keys_from(&key)
225 .ignore_err()
226 .ready_take_while(move |(room_id_, user_id_, ..): &Key<'_>| {
227 room_id == *room_id_ && user_id == *user_id_
228 })
229 .map(at!(2));
230
231 pin_mut!(keys);
232 keys.next()
233 .await
234 .ok_or_else(|| err!(Request(NotFound("No account data found."))))
235}