tuwunel_service/presence/
mod.rs1mod aggregate;
2mod data;
3mod pipeline;
5
6use std::{collections::HashMap, sync::Arc, time::Duration};
7
8use async_trait::async_trait;
9use futures::{
10 Stream, StreamExt, TryFutureExt,
11 future::{AbortHandle, Abortable, join},
12 stream::FuturesUnordered,
13};
14use loole::{Receiver, Sender};
15use ruma::{
16 OwnedUserId, UInt, UserId,
17 events::presence::{PresenceEvent, PresenceEventContent},
18 presence::PresenceState,
19};
20use serde::{Deserialize, Serialize};
21use tokio::sync::RwLock;
22use tuwunel_core::{
23 Result, checked, debug, debug_warn, err,
24 result::LogErr,
25 trace,
26 utils::{self, TryFutureExtExt},
27};
28
29use self::{aggregate::PresenceAggregator, data::Data};
30
31#[derive(Serialize, Deserialize, Debug, Clone)]
34pub(super) struct Presence {
35 pub(super) state: PresenceState,
36 pub(super) currently_active: bool,
37 pub(super) last_active_ts: u64,
38 pub(super) status_msg: Option<String>,
39}
40
41impl Presence {
42 pub(super) fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
43 serde_json::from_slice(bytes)
44 .map_err(|_| err!(Database(error!("Invalid presence data in database"))))
45 }
46}
47
48pub struct Service {
49 timer_channel: (Sender<TimerType>, Receiver<TimerType>),
50 timeout_remote_users: bool,
51 idle_timeout: u64,
52 offline_timeout: u64,
53 db: Data,
54 services: Arc<crate::services::OnceServices>,
55 last_sync_seen: RwLock<HashMap<OwnedUserId, u64>>,
56 device_presence: PresenceAggregator,
57}
58
59type TimerType = (OwnedUserId, Duration, u64);
60type TimerFired = (OwnedUserId, u64);
61
62#[async_trait]
63impl crate::Service for Service {
64 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
65 let config = &args.server.config;
66 let idle_timeout_s = config.presence_idle_timeout_s;
67 let offline_timeout_s = config.presence_offline_timeout_s;
68 Ok(Arc::new(Self {
69 timer_channel: loole::unbounded(),
70 timeout_remote_users: config.presence_timeout_remote_users,
71 idle_timeout: checked!(idle_timeout_s * 1_000)?,
72 offline_timeout: checked!(offline_timeout_s * 1_000)?,
73 db: Data::new(args),
74 services: args.services.clone(),
75 last_sync_seen: RwLock::new(HashMap::new()),
76 device_presence: PresenceAggregator::new(),
77 }))
78 }
79
80 async fn worker(self: Arc<Self>) -> Result {
81 self.unset_all_presence().await;
84 self.device_presence.clear().await;
85 _ = self
86 .maybe_ping_presence(
87 &self.services.globals.server_user,
88 None,
89 None,
90 &PresenceState::Online,
91 )
92 .await;
93
94 let receiver = self.timer_channel.1.clone();
95
96 let mut presence_timers: FuturesUnordered<_> = FuturesUnordered::new();
97 let mut timer_handles: HashMap<OwnedUserId, (u64, AbortHandle)> = HashMap::new();
98 while !receiver.is_closed() && self.services.server.is_running() {
99 tokio::select! {
100 Some(result) = presence_timers.next() => {
101 let Ok((user_id, count)) = result else {
102 continue;
103 };
104
105 if let Some((current_count, _)) = timer_handles.get(&user_id)
106 && *current_count != count {
107 trace!(?user_id, count, current_count, "Skipping stale presence timer");
108 continue;
109 }
110
111 timer_handles.remove(&user_id);
112 self.process_presence_timer(&user_id, count).await.log_err().ok();
113 },
114 event = receiver.recv_async() => match event {
115 Ok((user_id, timeout, count)) => {
116 debug!(
117 "Adding timer {}: {user_id} timeout:{timeout:?} count:{count}",
118 presence_timers.len()
119 );
120 if let Some((_, handle)) = timer_handles.remove(&user_id) {
121 handle.abort();
122 }
123
124 let (handle, reg) = AbortHandle::new_pair();
125 presence_timers.push(Abortable::new(
126 pipeline::presence_timer(user_id.clone(), timeout, count),
127 reg,
128 ));
129 timer_handles.insert(user_id, (count, handle));
130 },
131 _ => break,
132 },
133 }
134 }
135
136 _ = self
138 .maybe_ping_presence(
139 &self.services.globals.server_user,
140 None,
141 None,
142 &PresenceState::Offline,
143 )
144 .await;
145
146 Ok(())
147 }
148
149 async fn interrupt(&self) {
150 let (timer_sender, _) = &self.timer_channel;
151 if !timer_sender.is_closed() {
152 timer_sender.close();
153 }
154 }
155
156 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
157}
158
159impl Service {
160 pub async fn note_sync(&self, user_id: &UserId) {
163 if !self.services.config.suppress_push_when_active {
164 return;
165 }
166
167 let now = utils::millis_since_unix_epoch();
168 self.last_sync_seen
169 .write()
170 .await
171 .insert(user_id.to_owned(), now);
172 }
173
174 pub async fn last_sync_gap_ms(&self, user_id: &UserId) -> Option<u64> {
176 let now = utils::millis_since_unix_epoch();
177 self.last_sync_seen
178 .read()
179 .await
180 .get(user_id)
181 .map(|ts| now.saturating_sub(*ts))
182 }
183
184 pub async fn get_presence(&self, user_id: &UserId) -> Result<PresenceEvent> {
186 self.db
187 .get_presence(user_id)
188 .map_ok(|(_, presence)| presence)
189 .await
190 }
191
192 pub async fn remove_presence(&self, user_id: &UserId) {
196 self.db.remove_presence(user_id).await;
197 }
198
199 async fn unset_all_presence(&self) {
201 if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
202 return;
203 }
204
205 let _cork = self.services.db.cork();
206
207 for user_id in &self
208 .services
209 .users
210 .list_local_users()
211 .map(UserId::to_owned)
212 .collect::<Vec<_>>()
213 .await
214 {
215 let presence = self.db.get_presence(user_id).await;
216
217 let presence = match presence {
218 | Ok((_, ref presence)) => &presence.content,
219 | _ => continue,
220 };
221
222 if !matches!(
223 presence.presence,
224 PresenceState::Unavailable | PresenceState::Online | PresenceState::Busy
225 ) {
226 trace!(?user_id, ?presence, "Skipping user");
227 continue;
228 }
229
230 trace!(?user_id, ?presence, "Resetting presence to offline");
231
232 _ = self
233 .set_presence(
234 user_id,
235 &PresenceState::Offline,
236 Some(false),
237 presence.last_active_ago,
238 presence.status_msg.clone(),
239 )
240 .await
241 .inspect_err(|e| {
242 debug_warn!(
243 ?presence,
244 "{user_id} has invalid presence in database and failed to reset it to \
245 offline: {e}"
246 );
247 });
248 }
249 }
250
251 pub fn presence_since(
254 &self,
255 since: u64,
256 to: Option<u64>,
257 ) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
258 self.db.presence_since(since, to)
259 }
260
261 #[inline]
262 pub async fn from_json_bytes_to_event(
263 &self,
264 bytes: &[u8],
265 user_id: &UserId,
266 ) -> Result<PresenceEvent> {
267 let presence = Presence::from_json_bytes(bytes)?;
268 let event = self.to_presence_event(presence, user_id).await;
269
270 Ok(event)
271 }
272
273 async fn to_presence_event(&self, presence: Presence, user_id: &UserId) -> PresenceEvent {
275 let now = utils::millis_since_unix_epoch();
276 let last_active_ago = now.saturating_sub(presence.last_active_ts);
277
278 let avatar_url = self.services.profile.avatar_url(user_id).ok();
279 let displayname = self.services.profile.displayname(user_id).ok();
280 let (avatar_url, displayname) = join(avatar_url, displayname).await;
281
282 PresenceEvent {
283 sender: user_id.to_owned(),
284 content: PresenceEventContent {
285 presence: presence.state,
286 status_msg: presence.status_msg,
287 currently_active: Some(presence.currently_active),
288 last_active_ago: Some(UInt::new_saturating(last_active_ago)),
289 avatar_url,
290 displayname,
291 },
292 }
293 }
294}