Skip to main content

tuwunel_service/presence/
data.rs

1use std::sync::Arc;
2
3use futures::Stream;
4use ruma::{UInt, UserId, events::presence::PresenceEvent, presence::PresenceState};
5use tuwunel_core::{
6	Result, debug_warn, utils,
7	utils::{ReadyExt, stream::TryIgnore},
8};
9use tuwunel_database::{Deserialized, Json, Map};
10
11use crate::presence::Presence;
12
13pub(crate) struct Data {
14	presenceid_presence: Arc<Map>,
15	userid_presenceid: Arc<Map>,
16	services: Arc<crate::services::OnceServices>,
17}
18
19impl Data {
20	pub(super) fn new(args: &crate::Args<'_>) -> Self {
21		let db = &args.db;
22		Self {
23			presenceid_presence: db["presenceid_presence"].clone(),
24			userid_presenceid: db["userid_presenceid"].clone(),
25			services: args.services.clone(),
26		}
27	}
28
29	#[inline]
30	pub(super) async fn get_presence(&self, user_id: &UserId) -> Result<(u64, PresenceEvent)> {
31		let count = self
32			.userid_presenceid
33			.get(user_id)
34			.await
35			.deserialized::<u64>()?;
36
37		let key = presenceid_key(count, user_id);
38		let bytes = self.presenceid_presence.get(&key).await?;
39		let event = self
40			.services
41			.presence
42			.from_json_bytes_to_event(&bytes, user_id)
43			.await?;
44
45		Ok((count, event))
46	}
47
48	pub(super) async fn get_presence_raw(&self, user_id: &UserId) -> Result<(u64, Presence)> {
49		let count = self
50			.userid_presenceid
51			.get(user_id)
52			.await
53			.deserialized::<u64>()?;
54
55		let key = presenceid_key(count, user_id);
56		let bytes = self.presenceid_presence.get(&key).await?;
57		let presence = Presence::from_json_bytes(&bytes)?;
58
59		Ok((count, presence))
60	}
61
62	pub(super) async fn set_presence(
63		&self,
64		user_id: &UserId,
65		presence_state: &PresenceState,
66		currently_active: Option<bool>,
67		last_active_ago: Option<UInt>,
68		status_msg: Option<String>,
69	) -> Result<Option<u64>> {
70		let last_presence = self.get_presence(user_id).await;
71		let state_changed = match last_presence {
72			| Err(_) => true,
73			| Ok(ref presence) => presence.1.content.presence != *presence_state,
74		};
75
76		let status_msg_changed = match last_presence {
77			| Err(_) => true,
78			| Ok(ref last_presence) => {
79				let old_msg = last_presence
80					.1
81					.content
82					.status_msg
83					.clone()
84					.unwrap_or_default();
85
86				let new_msg = status_msg.clone().unwrap_or_default();
87
88				new_msg != old_msg
89			},
90		};
91
92		let now = utils::millis_since_unix_epoch();
93		let last_last_active_ts = match last_presence {
94			| Err(_) => 0,
95			| Ok((_, ref presence)) => now.saturating_sub(
96				presence
97					.content
98					.last_active_ago
99					.unwrap_or_default()
100					.into(),
101			),
102		};
103
104		let last_active_ts = match last_active_ago {
105			| None => now,
106			| Some(last_active_ago) => now.saturating_sub(last_active_ago.into()),
107		};
108
109		// TODO: tighten for state flicker?
110		if !status_msg_changed && !state_changed && last_active_ts < last_last_active_ts {
111			debug_warn!(
112				"presence spam {user_id:?} last_active_ts:{last_active_ts:?} < \
113				 {last_last_active_ts:?}",
114			);
115			return Ok(None);
116		}
117
118		let status_msg = if status_msg.as_ref().is_some_and(String::is_empty) {
119			None
120		} else {
121			status_msg
122		};
123
124		let presence = Presence {
125			state: presence_state.to_owned(),
126			currently_active: currently_active.unwrap_or(false),
127			last_active_ts,
128			status_msg,
129		};
130
131		let count = self.services.globals.next_count();
132		let key = presenceid_key(*count, user_id);
133
134		self.userid_presenceid.raw_put(user_id, *count);
135		self.presenceid_presence
136			.raw_put(key, Json(presence));
137
138		if let Ok((last_count, _)) = last_presence {
139			let key = presenceid_key(last_count, user_id);
140			self.presenceid_presence.remove(&key);
141		}
142
143		Ok(Some(*count))
144	}
145
146	#[inline]
147	pub(super) async fn remove_presence(&self, user_id: &UserId) {
148		let Ok(count) = self
149			.userid_presenceid
150			.get(user_id)
151			.await
152			.deserialized::<u64>()
153		else {
154			return;
155		};
156
157		let key = presenceid_key(count, user_id);
158		self.presenceid_presence.remove(&key);
159		self.userid_presenceid.remove(user_id);
160	}
161
162	#[inline]
163	pub(super) fn presence_since(
164		&self,
165		since: u64,
166		to: Option<u64>,
167	) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
168		self.presenceid_presence
169			.raw_stream()
170			.ignore_err()
171			.ready_filter_map(move |(key, presence)| {
172				let (count, user_id) = presenceid_parse(key).ok()?;
173				(count > since && to.is_none_or(|to| count <= to))
174					.then_some((user_id, count, presence))
175			})
176	}
177}
178
179#[inline]
180fn presenceid_key(count: u64, user_id: &UserId) -> Vec<u8> {
181	let cap = size_of::<u64>().saturating_add(user_id.as_bytes().len());
182	let mut key = Vec::with_capacity(cap);
183	key.extend_from_slice(&count.to_be_bytes());
184	key.extend_from_slice(user_id.as_bytes());
185	key
186}
187
188#[inline]
189fn presenceid_parse(key: &[u8]) -> Result<(u64, &UserId)> {
190	let (count, user_id) = key.split_at(8);
191	let user_id = user_id_from_bytes(user_id)?;
192	let count = utils::u64_from_u8(count);
193
194	Ok((count, user_id))
195}
196
197/// Parses a `UserId` from bytes.
198fn user_id_from_bytes(bytes: &[u8]) -> Result<&UserId> {
199	let str: &str = utils::str_from_bytes(bytes)?;
200	let user_id: &UserId = str.try_into()?;
201
202	Ok(user_id)
203}