tuwunel_service/presence/
data.rs1use std::sync::Arc;
2
3use futures::Stream;
4use ruma::{UInt, UserId, events::presence::PresenceEvent, presence::PresenceState};
5use tuwunel_core::{
6 Result, debug_warn, utils,
7 utils::{ReadyExt, stream::TryIgnore},
8};
9use tuwunel_database::{Deserialized, Json, Map};
10
11use super::Presence;
12
13pub(crate) struct Data {
14 presenceid_presence: Arc<Map>,
15 userid_presenceid: Arc<Map>,
16 services: Arc<crate::services::OnceServices>,
17}
18
19impl Data {
20 pub(super) fn new(args: &crate::Args<'_>) -> Self {
21 let db = &args.db;
22 Self {
23 presenceid_presence: db["presenceid_presence"].clone(),
24 userid_presenceid: db["userid_presenceid"].clone(),
25 services: args.services.clone(),
26 }
27 }
28
29 #[inline]
30 pub(super) async fn get_presence(&self, user_id: &UserId) -> Result<(u64, PresenceEvent)> {
31 let count = self
32 .userid_presenceid
33 .get(user_id)
34 .await
35 .deserialized::<u64>()?;
36
37 let key = presenceid_key(count, user_id);
38 let bytes = self.presenceid_presence.get(&key).await?;
39 let event = Presence::from_json_bytes(&bytes)?
40 .to_presence_event(user_id, &self.services.users)
41 .await;
42
43 Ok((count, event))
44 }
45
46 pub(super) async fn get_presence_raw(&self, user_id: &UserId) -> Result<(u64, Presence)> {
47 let count = self
48 .userid_presenceid
49 .get(user_id)
50 .await
51 .deserialized::<u64>()?;
52
53 let key = presenceid_key(count, user_id);
54 let bytes = self.presenceid_presence.get(&key).await?;
55 let presence = Presence::from_json_bytes(&bytes)?;
56
57 Ok((count, presence))
58 }
59
60 pub(super) async fn set_presence(
61 &self,
62 user_id: &UserId,
63 presence_state: &PresenceState,
64 currently_active: Option<bool>,
65 last_active_ago: Option<UInt>,
66 status_msg: Option<String>,
67 ) -> Result<Option<u64>> {
68 let last_presence = self.get_presence(user_id).await;
69 let state_changed = match last_presence {
70 | Err(_) => true,
71 | Ok(ref presence) => presence.1.content.presence != *presence_state,
72 };
73
74 let status_msg_changed = match last_presence {
75 | Err(_) => true,
76 | Ok(ref last_presence) => {
77 let old_msg = last_presence
78 .1
79 .content
80 .status_msg
81 .clone()
82 .unwrap_or_default();
83
84 let new_msg = status_msg.clone().unwrap_or_default();
85
86 new_msg != old_msg
87 },
88 };
89
90 let now = utils::millis_since_unix_epoch();
91 let last_last_active_ts = match last_presence {
92 | Err(_) => 0,
93 | Ok((_, ref presence)) => now.saturating_sub(
94 presence
95 .content
96 .last_active_ago
97 .unwrap_or_default()
98 .into(),
99 ),
100 };
101
102 let last_active_ts = match last_active_ago {
103 | None => now,
104 | Some(last_active_ago) => now.saturating_sub(last_active_ago.into()),
105 };
106
107 if !status_msg_changed && !state_changed && last_active_ts < last_last_active_ts {
109 debug_warn!(
110 "presence spam {user_id:?} last_active_ts:{last_active_ts:?} < \
111 {last_last_active_ts:?}",
112 );
113 return Ok(None);
114 }
115
116 let status_msg = if status_msg.as_ref().is_some_and(String::is_empty) {
117 None
118 } else {
119 status_msg
120 };
121
122 let presence = Presence::new(
123 presence_state.to_owned(),
124 currently_active.unwrap_or(false),
125 last_active_ts,
126 status_msg,
127 );
128
129 let count = self.services.globals.next_count();
130 let key = presenceid_key(*count, user_id);
131
132 self.userid_presenceid.raw_put(user_id, *count);
133 self.presenceid_presence
134 .raw_put(key, Json(presence));
135
136 if let Ok((last_count, _)) = last_presence {
137 let key = presenceid_key(last_count, user_id);
138 self.presenceid_presence.remove(&key);
139 }
140
141 Ok(Some(*count))
142 }
143
144 #[inline]
145 pub(super) async fn remove_presence(&self, user_id: &UserId) {
146 let Ok(count) = self
147 .userid_presenceid
148 .get(user_id)
149 .await
150 .deserialized::<u64>()
151 else {
152 return;
153 };
154
155 let key = presenceid_key(count, user_id);
156 self.presenceid_presence.remove(&key);
157 self.userid_presenceid.remove(user_id);
158 }
159
160 #[inline]
161 pub(super) fn presence_since(
162 &self,
163 since: u64,
164 to: Option<u64>,
165 ) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
166 self.presenceid_presence
167 .raw_stream()
168 .ignore_err()
169 .ready_filter_map(move |(key, presence)| {
170 let (count, user_id) = presenceid_parse(key).ok()?;
171 (count > since && to.is_none_or(|to| count <= to))
172 .then_some((user_id, count, presence))
173 })
174 }
175}
176
177#[inline]
178fn presenceid_key(count: u64, user_id: &UserId) -> Vec<u8> {
179 let cap = size_of::<u64>().saturating_add(user_id.as_bytes().len());
180 let mut key = Vec::with_capacity(cap);
181 key.extend_from_slice(&count.to_be_bytes());
182 key.extend_from_slice(user_id.as_bytes());
183 key
184}
185
186#[inline]
187fn presenceid_parse(key: &[u8]) -> Result<(u64, &UserId)> {
188 let (count, user_id) = key.split_at(8);
189 let user_id = user_id_from_bytes(user_id)?;
190 let count = utils::u64_from_u8(count);
191
192 Ok((count, user_id))
193}
194
195fn user_id_from_bytes(bytes: &[u8]) -> Result<&UserId> {
197 let str: &str = utils::str_from_bytes(bytes)?;
198 let user_id: &UserId = str.try_into()?;
199
200 Ok(user_id)
201}