tuwunel_service/presence/
data.rs1use std::sync::Arc;
2
3use futures::Stream;
4use ruma::{UInt, UserId, events::presence::PresenceEvent, presence::PresenceState};
5use tuwunel_core::{
6 Result, debug_warn, utils,
7 utils::{ReadyExt, stream::TryIgnore},
8};
9use tuwunel_database::{Deserialized, Json, Map};
10
11use crate::presence::Presence;
12
13pub(crate) struct Data {
14 presenceid_presence: Arc<Map>,
15 userid_presenceid: Arc<Map>,
16 services: Arc<crate::services::OnceServices>,
17}
18
19impl Data {
20 pub(super) fn new(args: &crate::Args<'_>) -> Self {
21 let db = &args.db;
22 Self {
23 presenceid_presence: db["presenceid_presence"].clone(),
24 userid_presenceid: db["userid_presenceid"].clone(),
25 services: args.services.clone(),
26 }
27 }
28
29 #[inline]
30 pub(super) async fn get_presence(&self, user_id: &UserId) -> Result<(u64, PresenceEvent)> {
31 let count = self
32 .userid_presenceid
33 .get(user_id)
34 .await
35 .deserialized::<u64>()?;
36
37 let key = presenceid_key(count, user_id);
38 let bytes = self.presenceid_presence.get(&key).await?;
39 let event = self
40 .services
41 .presence
42 .from_json_bytes_to_event(&bytes, user_id)
43 .await?;
44
45 Ok((count, event))
46 }
47
48 pub(super) async fn get_presence_raw(&self, user_id: &UserId) -> Result<(u64, Presence)> {
49 let count = self
50 .userid_presenceid
51 .get(user_id)
52 .await
53 .deserialized::<u64>()?;
54
55 let key = presenceid_key(count, user_id);
56 let bytes = self.presenceid_presence.get(&key).await?;
57 let presence = Presence::from_json_bytes(&bytes)?;
58
59 Ok((count, presence))
60 }
61
62 pub(super) async fn set_presence(
63 &self,
64 user_id: &UserId,
65 presence_state: &PresenceState,
66 currently_active: Option<bool>,
67 last_active_ago: Option<UInt>,
68 status_msg: Option<String>,
69 ) -> Result<Option<u64>> {
70 let last_presence = self.get_presence(user_id).await;
71 let state_changed = match last_presence {
72 | Err(_) => true,
73 | Ok(ref presence) => presence.1.content.presence != *presence_state,
74 };
75
76 let status_msg_changed = match last_presence {
77 | Err(_) => true,
78 | Ok(ref last_presence) => {
79 let old_msg = last_presence
80 .1
81 .content
82 .status_msg
83 .clone()
84 .unwrap_or_default();
85
86 let new_msg = status_msg.clone().unwrap_or_default();
87
88 new_msg != old_msg
89 },
90 };
91
92 let now = utils::millis_since_unix_epoch();
93 let last_last_active_ts = match last_presence {
94 | Err(_) => 0,
95 | Ok((_, ref presence)) => now.saturating_sub(
96 presence
97 .content
98 .last_active_ago
99 .unwrap_or_default()
100 .into(),
101 ),
102 };
103
104 let last_active_ts = match last_active_ago {
105 | None => now,
106 | Some(last_active_ago) => now.saturating_sub(last_active_ago.into()),
107 };
108
109 if !status_msg_changed && !state_changed && last_active_ts < last_last_active_ts {
111 debug_warn!(
112 "presence spam {user_id:?} last_active_ts:{last_active_ts:?} < \
113 {last_last_active_ts:?}",
114 );
115 return Ok(None);
116 }
117
118 let status_msg = if status_msg.as_ref().is_some_and(String::is_empty) {
119 None
120 } else {
121 status_msg
122 };
123
124 let presence = Presence {
125 state: presence_state.to_owned(),
126 currently_active: currently_active.unwrap_or(false),
127 last_active_ts,
128 status_msg,
129 };
130
131 let count = self.services.globals.next_count();
132 let key = presenceid_key(*count, user_id);
133
134 self.userid_presenceid.raw_put(user_id, *count);
135 self.presenceid_presence
136 .raw_put(key, Json(presence));
137
138 if let Ok((last_count, _)) = last_presence {
139 let key = presenceid_key(last_count, user_id);
140 self.presenceid_presence.remove(&key);
141 }
142
143 Ok(Some(*count))
144 }
145
146 #[inline]
147 pub(super) async fn remove_presence(&self, user_id: &UserId) {
148 let Ok(count) = self
149 .userid_presenceid
150 .get(user_id)
151 .await
152 .deserialized::<u64>()
153 else {
154 return;
155 };
156
157 let key = presenceid_key(count, user_id);
158 self.presenceid_presence.remove(&key);
159 self.userid_presenceid.remove(user_id);
160 }
161
162 #[inline]
163 pub(super) fn presence_since(
164 &self,
165 since: u64,
166 to: Option<u64>,
167 ) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
168 self.presenceid_presence
169 .raw_stream()
170 .ignore_err()
171 .ready_filter_map(move |(key, presence)| {
172 let (count, user_id) = presenceid_parse(key).ok()?;
173 (count > since && to.is_none_or(|to| count <= to))
174 .then_some((user_id, count, presence))
175 })
176 }
177}
178
179#[inline]
180fn presenceid_key(count: u64, user_id: &UserId) -> Vec<u8> {
181 let cap = size_of::<u64>().saturating_add(user_id.as_bytes().len());
182 let mut key = Vec::with_capacity(cap);
183 key.extend_from_slice(&count.to_be_bytes());
184 key.extend_from_slice(user_id.as_bytes());
185 key
186}
187
188#[inline]
189fn presenceid_parse(key: &[u8]) -> Result<(u64, &UserId)> {
190 let (count, user_id) = key.split_at(8);
191 let user_id = user_id_from_bytes(user_id)?;
192 let count = utils::u64_from_u8(count);
193
194 Ok((count, user_id))
195}
196
197fn user_id_from_bytes(bytes: &[u8]) -> Result<&UserId> {
199 let str: &str = utils::str_from_bytes(bytes)?;
200 let user_id: &UserId = str.try_into()?;
201
202 Ok(user_id)
203}