Skip to main content

tuwunel_service/account_data/
mod.rs

1mod direct;
2mod room_tags;
3
4use std::sync::Arc;
5
6use futures::{Stream, StreamExt, TryFutureExt, pin_mut};
7use ruma::{
8	RoomId, UserId,
9	events::{
10		AnyGlobalAccountDataEvent, AnyRawAccountDataEvent, AnyRoomAccountDataEvent,
11		GlobalAccountDataEventType, RoomAccountDataEventType,
12	},
13	serde::Raw,
14};
15use serde::Deserialize;
16use serde_json::json;
17use tuwunel_core::{
18	Err, Result, at, err, implement,
19	utils::{ReadyExt, result::LogErr, stream::TryIgnore},
20};
21use tuwunel_database::{Deserialized, Handle, Ignore, Interfix, Json, Map};
22
23pub struct Service {
24	services: Arc<crate::services::OnceServices>,
25	db: Data,
26}
27
28struct Data {
29	roomuserdataid_accountdata: Arc<Map>,
30	roomusertype_roomuserdataid: Arc<Map>,
31}
32
33impl crate::Service for Service {
34	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
35		Ok(Arc::new(Self {
36			services: args.services.clone(),
37			db: Data {
38				roomuserdataid_accountdata: args.db["roomuserdataid_accountdata"].clone(),
39				roomusertype_roomuserdataid: args.db["roomusertype_roomuserdataid"].clone(),
40			},
41		}))
42	}
43
44	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
45}
46
47/// Places one event in the account data of the user and removes the
48/// previous entry.
49#[implement(Service)]
50pub async fn update(
51	&self,
52	room_id: Option<&RoomId>,
53	user_id: &UserId,
54	event_type: RoomAccountDataEventType,
55	data: &serde_json::Value,
56) -> Result {
57	if data.get("type").is_none() || data.get("content").is_none() {
58		return Err!(Request(InvalidParam("Account data doesn't have all required fields.")));
59	}
60
61	let count = self.services.globals.next_count();
62	let roomuserdataid = (room_id, user_id, *count, &event_type);
63	self.db
64		.roomuserdataid_accountdata
65		.put(roomuserdataid, Json(data));
66
67	let key = (room_id, user_id, &event_type);
68	let prev = self
69		.db
70		.roomusertype_roomuserdataid
71		.qry(&key)
72		.await;
73
74	self.db
75		.roomusertype_roomuserdataid
76		.put(key, roomuserdataid);
77
78	// Remove old entry
79	if let Ok(prev) = prev {
80		self.db.roomuserdataid_accountdata.remove(&prev);
81	}
82
83	Ok(())
84}
85
86/// MSC3391: replace the stored event with a tombstone whose content is
87/// `{}`. Delta sync surfaces the empty content so clients can apply the
88/// deletion; initial sync and GET treat the tombstone as not-present.
89#[implement(Service)]
90pub async fn delete(
91	&self,
92	room_id: Option<&RoomId>,
93	user_id: &UserId,
94	event_type: RoomAccountDataEventType,
95) -> Result {
96	let tombstone = json!({
97		"type": event_type.to_string(),
98		"content": {},
99	});
100
101	self.update(room_id, user_id, event_type, &tombstone)
102		.await
103}
104
105/// Searches the room account data for a specific kind.
106#[implement(Service)]
107pub async fn get_global<T>(&self, user_id: &UserId, kind: GlobalAccountDataEventType) -> Result<T>
108where
109	T: for<'de> Deserialize<'de>,
110{
111	self.get_raw(None, user_id, &kind.to_string())
112		.await
113		.deserialized()
114}
115
116/// Searches the global account data for a specific kind.
117#[implement(Service)]
118pub async fn get_room<T>(
119	&self,
120	room_id: &RoomId,
121	user_id: &UserId,
122	kind: RoomAccountDataEventType,
123) -> Result<T>
124where
125	T: for<'de> Deserialize<'de>,
126{
127	self.get_raw(Some(room_id), user_id, &kind.to_string())
128		.await
129		.deserialized()
130}
131
132#[implement(Service)]
133pub async fn get_raw(
134	&self,
135	room_id: Option<&RoomId>,
136	user_id: &UserId,
137	kind: &str,
138) -> Result<Handle<'_>> {
139	let key = (room_id, user_id, kind.to_owned());
140	self.db
141		.roomusertype_roomuserdataid
142		.qry(&key)
143		.and_then(|roomuserdataid| {
144			self.db
145				.roomuserdataid_accountdata
146				.get(&roomuserdataid)
147		})
148		.await
149}
150
151/// Returns all changes to the account data that happened after `since`.
152#[implement(Service)]
153pub fn changes_since<'a>(
154	&'a self,
155	room_id: Option<&'a RoomId>,
156	user_id: &'a UserId,
157	since: u64,
158	to: Option<u64>,
159) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
160	type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
161
162	// Skip the data that's exactly at since, because we sent that last time
163	let first_possible = (room_id, user_id, since.saturating_add(1));
164
165	self.db
166		.roomuserdataid_accountdata
167		.stream_from(&first_possible)
168		.ignore_err()
169		.ready_take_while(move |((room_id_, user_id_, count, _), _): &(Key<'_>, _)| {
170			room_id == *room_id_ && user_id == *user_id_ && to.is_none_or(|to| *count <= to)
171		})
172		.map(move |(_, v)| {
173			match room_id {
174				| Some(_) => serde_json::from_slice::<Raw<AnyRoomAccountDataEvent>>(v)
175					.map(AnyRawAccountDataEvent::Room),
176				| None => serde_json::from_slice::<Raw<AnyGlobalAccountDataEvent>>(v)
177					.map(AnyRawAccountDataEvent::Global),
178			}
179			.map_err(|e| err!(Database("Database contains invalid account data: {e}")))
180			.log_err()
181		})
182		.ignore_err()
183}
184
185/// MSC4025: erase all account data for a user in the given namespace
186/// (global if `room_id` is `None`, otherwise a single room). Mirrors
187/// `threads::delete_all_rooms_threads`: prefix-scan the keys and
188/// remove each.
189#[implement(Service)]
190pub async fn erase_user(&self, user_id: &UserId, room_id: Option<&RoomId>) {
191	let prefix = (room_id, user_id, Interfix);
192
193	self.db
194		.roomuserdataid_accountdata
195		.keys_prefix_raw(&prefix)
196		.ignore_err()
197		.ready_for_each(|key| self.db.roomuserdataid_accountdata.remove(key))
198		.await;
199
200	self.db
201		.roomusertype_roomuserdataid
202		.keys_prefix_raw(&prefix)
203		.ignore_err()
204		.ready_for_each(|key| self.db.roomusertype_roomuserdataid.remove(key))
205		.await;
206}
207
208/// Returns all changes to the account data that happened after `since`.
209#[implement(Service)]
210pub async fn last_count<'a>(
211	&'a self,
212	room_id: Option<&'a RoomId>,
213	user_id: &'a UserId,
214	upper: Option<u64>,
215) -> Result<u64> {
216	type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
217
218	let upper = upper.unwrap_or(u64::MAX);
219	let key = (room_id, user_id, upper, Interfix);
220	let keys = self
221		.db
222		.roomuserdataid_accountdata
223		.rev_keys_from(&key)
224		.ignore_err()
225		.ready_take_while(move |(room_id_, user_id_, ..): &Key<'_>| {
226			room_id == *room_id_ && user_id == *user_id_
227		})
228		.map(at!(2));
229
230	pin_mut!(keys);
231	keys.next()
232		.await
233		.ok_or_else(|| err!(Request(NotFound("No account data found."))))
234}