Skip to main content

tuwunel_service/presence/
data.rs

1use std::sync::Arc;
2
3use futures::Stream;
4use ruma::{UInt, UserId, events::presence::PresenceEvent, presence::PresenceState};
5use tuwunel_core::{
6	Result, debug_warn, utils,
7	utils::{ReadyExt, stream::TryIgnore},
8};
9use tuwunel_database::{Deserialized, Json, Map};
10
11use super::Presence;
12
13pub(crate) struct Data {
14	presenceid_presence: Arc<Map>,
15	userid_presenceid: Arc<Map>,
16	services: Arc<crate::services::OnceServices>,
17}
18
19impl Data {
20	pub(super) fn new(args: &crate::Args<'_>) -> Self {
21		let db = &args.db;
22		Self {
23			presenceid_presence: db["presenceid_presence"].clone(),
24			userid_presenceid: db["userid_presenceid"].clone(),
25			services: args.services.clone(),
26		}
27	}
28
29	#[inline]
30	pub(super) async fn get_presence(&self, user_id: &UserId) -> Result<(u64, PresenceEvent)> {
31		let count = self
32			.userid_presenceid
33			.get(user_id)
34			.await
35			.deserialized::<u64>()?;
36
37		let key = presenceid_key(count, user_id);
38		let bytes = self.presenceid_presence.get(&key).await?;
39		let event = Presence::from_json_bytes(&bytes)?
40			.to_presence_event(user_id, &self.services.users)
41			.await;
42
43		Ok((count, event))
44	}
45
46	pub(super) async fn get_presence_raw(&self, user_id: &UserId) -> Result<(u64, Presence)> {
47		let count = self
48			.userid_presenceid
49			.get(user_id)
50			.await
51			.deserialized::<u64>()?;
52
53		let key = presenceid_key(count, user_id);
54		let bytes = self.presenceid_presence.get(&key).await?;
55		let presence = Presence::from_json_bytes(&bytes)?;
56
57		Ok((count, presence))
58	}
59
60	pub(super) async fn set_presence(
61		&self,
62		user_id: &UserId,
63		presence_state: &PresenceState,
64		currently_active: Option<bool>,
65		last_active_ago: Option<UInt>,
66		status_msg: Option<String>,
67	) -> Result<Option<u64>> {
68		let last_presence = self.get_presence(user_id).await;
69		let state_changed = match last_presence {
70			| Err(_) => true,
71			| Ok(ref presence) => presence.1.content.presence != *presence_state,
72		};
73
74		let status_msg_changed = match last_presence {
75			| Err(_) => true,
76			| Ok(ref last_presence) => {
77				let old_msg = last_presence
78					.1
79					.content
80					.status_msg
81					.clone()
82					.unwrap_or_default();
83
84				let new_msg = status_msg.clone().unwrap_or_default();
85
86				new_msg != old_msg
87			},
88		};
89
90		let now = utils::millis_since_unix_epoch();
91		let last_last_active_ts = match last_presence {
92			| Err(_) => 0,
93			| Ok((_, ref presence)) => now.saturating_sub(
94				presence
95					.content
96					.last_active_ago
97					.unwrap_or_default()
98					.into(),
99			),
100		};
101
102		let last_active_ts = match last_active_ago {
103			| None => now,
104			| Some(last_active_ago) => now.saturating_sub(last_active_ago.into()),
105		};
106
107		// TODO: tighten for state flicker?
108		if !status_msg_changed && !state_changed && last_active_ts < last_last_active_ts {
109			debug_warn!(
110				"presence spam {user_id:?} last_active_ts:{last_active_ts:?} < \
111				 {last_last_active_ts:?}",
112			);
113			return Ok(None);
114		}
115
116		let status_msg = if status_msg.as_ref().is_some_and(String::is_empty) {
117			None
118		} else {
119			status_msg
120		};
121
122		let presence = Presence::new(
123			presence_state.to_owned(),
124			currently_active.unwrap_or(false),
125			last_active_ts,
126			status_msg,
127		);
128
129		let count = self.services.globals.next_count();
130		let key = presenceid_key(*count, user_id);
131
132		self.userid_presenceid.raw_put(user_id, *count);
133		self.presenceid_presence
134			.raw_put(key, Json(presence));
135
136		if let Ok((last_count, _)) = last_presence {
137			let key = presenceid_key(last_count, user_id);
138			self.presenceid_presence.remove(&key);
139		}
140
141		Ok(Some(*count))
142	}
143
144	#[inline]
145	pub(super) async fn remove_presence(&self, user_id: &UserId) {
146		let Ok(count) = self
147			.userid_presenceid
148			.get(user_id)
149			.await
150			.deserialized::<u64>()
151		else {
152			return;
153		};
154
155		let key = presenceid_key(count, user_id);
156		self.presenceid_presence.remove(&key);
157		self.userid_presenceid.remove(user_id);
158	}
159
160	#[inline]
161	pub(super) fn presence_since(
162		&self,
163		since: u64,
164		to: Option<u64>,
165	) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
166		self.presenceid_presence
167			.raw_stream()
168			.ignore_err()
169			.ready_filter_map(move |(key, presence)| {
170				let (count, user_id) = presenceid_parse(key).ok()?;
171				(count > since && to.is_none_or(|to| count <= to))
172					.then_some((user_id, count, presence))
173			})
174	}
175}
176
177#[inline]
178fn presenceid_key(count: u64, user_id: &UserId) -> Vec<u8> {
179	let cap = size_of::<u64>().saturating_add(user_id.as_bytes().len());
180	let mut key = Vec::with_capacity(cap);
181	key.extend_from_slice(&count.to_be_bytes());
182	key.extend_from_slice(user_id.as_bytes());
183	key
184}
185
186#[inline]
187fn presenceid_parse(key: &[u8]) -> Result<(u64, &UserId)> {
188	let (count, user_id) = key.split_at(8);
189	let user_id = user_id_from_bytes(user_id)?;
190	let count = utils::u64_from_u8(count);
191
192	Ok((count, user_id))
193}
194
195/// Parses a `UserId` from bytes.
196fn user_id_from_bytes(bytes: &[u8]) -> Result<&UserId> {
197	let str: &str = utils::str_from_bytes(bytes)?;
198	let user_id: &UserId = str.try_into()?;
199
200	Ok(user_id)
201}