Skip to main content

tuwunel_service/account_data/
mod.rs

1mod direct;
2mod push_rules;
3mod room_tags;
4
5use std::sync::Arc;
6
7use futures::{Stream, StreamExt, TryFutureExt, pin_mut};
8use ruma::{
9	RoomId, UserId,
10	events::{
11		AnyGlobalAccountDataEvent, AnyRawAccountDataEvent, AnyRoomAccountDataEvent,
12		GlobalAccountDataEventType, RoomAccountDataEventType,
13	},
14	serde::Raw,
15};
16use serde::Deserialize;
17use serde_json::json;
18use tuwunel_core::{
19	Err, Result, at, err, implement,
20	utils::{ReadyExt, result::LogErr, stream::TryIgnore},
21};
22use tuwunel_database::{Deserialized, Handle, Ignore, Interfix, Json, Map};
23
24pub struct Service {
25	services: Arc<crate::services::OnceServices>,
26	db: Data,
27}
28
29struct Data {
30	roomuserdataid_accountdata: Arc<Map>,
31	roomusertype_roomuserdataid: Arc<Map>,
32}
33
34impl crate::Service for Service {
35	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
36		Ok(Arc::new(Self {
37			services: args.services.clone(),
38			db: Data {
39				roomuserdataid_accountdata: args.db["roomuserdataid_accountdata"].clone(),
40				roomusertype_roomuserdataid: args.db["roomusertype_roomuserdataid"].clone(),
41			},
42		}))
43	}
44
45	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
46}
47
48/// Places one event in the account data of the user and removes the
49/// previous entry.
50#[implement(Service)]
51pub async fn update(
52	&self,
53	room_id: Option<&RoomId>,
54	user_id: &UserId,
55	event_type: RoomAccountDataEventType,
56	data: &serde_json::Value,
57) -> Result {
58	if data.get("type").is_none() || data.get("content").is_none() {
59		return Err!(Request(InvalidParam("Account data doesn't have all required fields.")));
60	}
61
62	let count = self.services.globals.next_count();
63	let roomuserdataid = (room_id, user_id, *count, &event_type);
64	self.db
65		.roomuserdataid_accountdata
66		.put(roomuserdataid, Json(data));
67
68	let key = (room_id, user_id, &event_type);
69	let prev = self
70		.db
71		.roomusertype_roomuserdataid
72		.qry(&key)
73		.await;
74
75	self.db
76		.roomusertype_roomuserdataid
77		.put(key, roomuserdataid);
78
79	// Remove old entry
80	if let Ok(prev) = prev {
81		self.db.roomuserdataid_accountdata.remove(&prev);
82	}
83
84	Ok(())
85}
86
87/// MSC3391: replace the stored event with a tombstone whose content is
88/// `{}`. Delta sync surfaces the empty content so clients can apply the
89/// deletion; initial sync and GET treat the tombstone as not-present.
90#[implement(Service)]
91pub async fn delete(
92	&self,
93	room_id: Option<&RoomId>,
94	user_id: &UserId,
95	event_type: RoomAccountDataEventType,
96) -> Result {
97	let tombstone = json!({
98		"type": event_type.to_string(),
99		"content": {},
100	});
101
102	self.update(room_id, user_id, event_type, &tombstone)
103		.await
104}
105
106/// Searches the room account data for a specific kind.
107#[implement(Service)]
108pub async fn get_global<T>(&self, user_id: &UserId, kind: GlobalAccountDataEventType) -> Result<T>
109where
110	T: for<'de> Deserialize<'de>,
111{
112	self.get_raw(None, user_id, &kind.to_string())
113		.await
114		.deserialized()
115}
116
117/// Searches the global account data for a specific kind.
118#[implement(Service)]
119pub async fn get_room<T>(
120	&self,
121	room_id: &RoomId,
122	user_id: &UserId,
123	kind: RoomAccountDataEventType,
124) -> Result<T>
125where
126	T: for<'de> Deserialize<'de>,
127{
128	self.get_raw(Some(room_id), user_id, &kind.to_string())
129		.await
130		.deserialized()
131}
132
133#[implement(Service)]
134pub async fn get_raw(
135	&self,
136	room_id: Option<&RoomId>,
137	user_id: &UserId,
138	kind: &str,
139) -> Result<Handle<'_>> {
140	let key = (room_id, user_id, kind.to_owned());
141	self.db
142		.roomusertype_roomuserdataid
143		.qry(&key)
144		.and_then(|roomuserdataid| {
145			self.db
146				.roomuserdataid_accountdata
147				.get(&roomuserdataid)
148		})
149		.await
150}
151
152/// Returns all changes to the account data that happened after `since`.
153#[implement(Service)]
154pub fn changes_since<'a>(
155	&'a self,
156	room_id: Option<&'a RoomId>,
157	user_id: &'a UserId,
158	since: u64,
159	to: Option<u64>,
160) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
161	type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
162
163	// Skip the data that's exactly at since, because we sent that last time
164	let first_possible = (room_id, user_id, since.saturating_add(1));
165
166	self.db
167		.roomuserdataid_accountdata
168		.stream_from(&first_possible)
169		.ignore_err()
170		.ready_take_while(move |((room_id_, user_id_, count, _), _): &(Key<'_>, _)| {
171			room_id == *room_id_ && user_id == *user_id_ && to.is_none_or(|to| *count <= to)
172		})
173		.map(move |(_, v)| {
174			match room_id {
175				| Some(_) => serde_json::from_slice::<Raw<AnyRoomAccountDataEvent>>(v)
176					.map(AnyRawAccountDataEvent::Room),
177				| None => serde_json::from_slice::<Raw<AnyGlobalAccountDataEvent>>(v)
178					.map(AnyRawAccountDataEvent::Global),
179			}
180			.map_err(|e| err!(Database("Database contains invalid account data: {e}")))
181			.log_err()
182		})
183		.ignore_err()
184}
185
186/// MSC4025: erase all account data for a user in the given namespace
187/// (global if `room_id` is `None`, otherwise a single room). Mirrors
188/// `threads::delete_all_rooms_threads`: prefix-scan the keys and
189/// remove each.
190#[implement(Service)]
191pub async fn erase_user(&self, user_id: &UserId, room_id: Option<&RoomId>) {
192	let prefix = (room_id, user_id, Interfix);
193
194	self.db
195		.roomuserdataid_accountdata
196		.keys_prefix_raw(&prefix)
197		.ignore_err()
198		.ready_for_each(|key| self.db.roomuserdataid_accountdata.remove(key))
199		.await;
200
201	self.db
202		.roomusertype_roomuserdataid
203		.keys_prefix_raw(&prefix)
204		.ignore_err()
205		.ready_for_each(|key| self.db.roomusertype_roomuserdataid.remove(key))
206		.await;
207}
208
209/// Returns all changes to the account data that happened after `since`.
210#[implement(Service)]
211pub async fn last_count<'a>(
212	&'a self,
213	room_id: Option<&'a RoomId>,
214	user_id: &'a UserId,
215	upper: Option<u64>,
216) -> Result<u64> {
217	type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
218
219	let upper = upper.unwrap_or(u64::MAX);
220	let key = (room_id, user_id, upper, Interfix);
221	let keys = self
222		.db
223		.roomuserdataid_accountdata
224		.rev_keys_from(&key)
225		.ignore_err()
226		.ready_take_while(move |(room_id_, user_id_, ..): &Key<'_>| {
227			room_id == *room_id_ && user_id == *user_id_
228		})
229		.map(at!(2));
230
231	pin_mut!(keys);
232	keys.next()
233		.await
234		.ok_or_else(|| err!(Request(NotFound("No account data found."))))
235}