1use std::{net::IpAddr, time::Duration};
8
9use futures::TryFutureExt;
10use ruma::{
11 DeviceId, OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState,
12};
13use tokio::time::sleep;
14use tuwunel_core::{
15 Error, Result, debug, error,
16 result::LogErr,
17 trace,
18 utils::{future::OptionFutureExt, option::OptionExt},
19};
20
21use super::{
22 Service, TimerFired,
23 aggregate::{self, StatusMsg},
24};
25
26impl Service {
27 fn device_key(device_id: Option<&DeviceId>, is_remote: bool) -> aggregate::DeviceKey {
28 if is_remote {
29 return aggregate::DeviceKey::Remote;
30 }
31
32 match device_id {
33 | Some(device_id) => aggregate::DeviceKey::Device(device_id.to_owned()),
34 | None => aggregate::DeviceKey::UnknownLocal,
35 }
36 }
37
38 fn schedule_presence_timer(
39 &self,
40 user_id: &UserId,
41 presence_state: &PresenceState,
42 count: u64,
43 ) -> Result {
44 if !(self.timeout_remote_users || self.services.globals.user_is_local(user_id))
45 || user_id == self.services.globals.server_user
46 {
47 return Ok(());
48 }
49
50 let timeout = match presence_state {
51 | PresenceState::Online =>
52 self.services
53 .server
54 .config
55 .presence_idle_timeout_s,
56 | _ =>
57 self.services
58 .server
59 .config
60 .presence_offline_timeout_s,
61 };
62
63 self.timer_channel
64 .0
65 .send((user_id.to_owned(), Duration::from_secs(timeout), count))
66 .map_err(|e| {
67 error!("Failed to add presence timer: {}", e);
68 Error::bad_database("Failed to add presence timer")
69 })
70 }
71
72 fn refresh_skip_decision(
73 refresh_window_ms: Option<u64>,
74 last_event: Option<&PresenceEvent>,
75 last_count: Option<u64>,
76 ) -> Option<(u64, u64)> {
77 let (Some(refresh_ms), Some(event), Some(count)) =
78 (refresh_window_ms, last_event, last_count)
79 else {
80 return None;
81 };
82
83 let last_last_active_ago: u64 = event.content.last_active_ago?.into();
84
85 (last_last_active_ago < refresh_ms).then_some((count, last_last_active_ago))
86 }
87
88 fn timer_is_stale(expected_count: u64, current_count: u64) -> bool {
89 expected_count != current_count
90 }
91
92 #[expect(clippy::too_many_arguments)]
93 async fn apply_device_presence_update(
94 &self,
95 user_id: &UserId,
96 device_key: aggregate::DeviceKey,
97 state: &PresenceState,
98 currently_active: Option<bool>,
99 last_active_ago: Option<UInt>,
100 status_msg: StatusMsg,
101 refresh_window_ms: Option<u64>,
102 ) -> Result {
103 let now = tuwunel_core::utils::millis_since_unix_epoch();
104 let preserve_status = matches!(status_msg, StatusMsg::Unchanged);
105 debug!(
107 ?user_id,
108 ?device_key,
109 ?state,
110 currently_active,
111 last_active_ago = last_active_ago.map(u64::from),
112 "Presence update received"
113 );
114 self.device_presence
115 .update(
116 user_id,
117 device_key,
118 state,
119 currently_active,
120 last_active_ago,
121 status_msg,
122 now,
123 )
124 .await;
125
126 let aggregated = self
128 .device_presence
129 .aggregate(user_id, now, self.idle_timeout, self.offline_timeout)
130 .await;
131 debug!(
132 ?user_id,
133 agg_state = ?aggregated.state,
134 agg_currently_active = aggregated.currently_active,
135 agg_last_active_ts = aggregated.last_active_ts,
136 agg_device_count = aggregated.device_count,
137 "Presence aggregate computed"
138 );
139
140 let last_presence = self.db.get_presence(user_id).await;
142 let (last_count, last_event) = match last_presence {
143 | Ok((count, event)) => (Some(count), Some(event)),
144 | Err(_) => (None, None),
145 };
146
147 let last_state = last_event
148 .as_ref()
149 .map(|event| event.content.presence.clone());
150
151 let state_changed = match &last_event {
152 | Some(event) => event.content.presence != aggregated.state,
153 | None => true,
154 };
155
156 if !state_changed
158 && let Some((count, last_last_active_ago)) =
159 Self::refresh_skip_decision(refresh_window_ms, last_event.as_ref(), last_count)
160 {
161 let presence = last_event
162 .as_ref()
163 .map(|event| &event.content.presence)
164 .unwrap_or(state);
165
166 self.schedule_presence_timer(user_id, presence, count)
167 .log_err()
168 .ok();
169 debug!(
170 ?user_id,
171 ?state,
172 last_last_active_ago,
173 "Skipping presence update: refresh window (timer rescheduled)"
174 );
175 return Ok(());
176 }
177
178 if matches!(last_state, Some(PresenceState::Online))
180 && aggregated.state != PresenceState::Online
181 {
182 debug!(
183 ?user_id,
184 from = ?PresenceState::Online,
185 to = ?aggregated.state,
186 "Presence went inactive; flushing suppressed pushes"
187 );
188 self.services
189 .sending
190 .schedule_flush_suppressed_for_user(
191 user_id.to_owned(),
192 "presence->inactive (aggregate)",
193 );
194 }
195
196 let fallback_status = || {
198 last_event
199 .and_then(|event| event.content.status_msg)
200 .filter(|msg| !msg.is_empty())
201 };
202
203 let status_msg = aggregated
204 .status_msg
205 .or_else(|| preserve_status.then(fallback_status).flatten());
206
207 let last_active_ago =
208 Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
209
210 self.set_presence(
211 user_id,
212 &aggregated.state,
213 Some(aggregated.currently_active),
214 last_active_ago,
215 status_msg,
216 )
217 .await
218 }
219
220 pub async fn maybe_ping_presence(
223 &self,
224 user_id: &UserId,
225 device_id: Option<&DeviceId>,
226 client_ip: Option<IpAddr>,
227 new_state: &PresenceState,
228 ) -> Result {
229 const REFRESH_TIMEOUT: u64 = 30 * 1000;
230
231 if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
232 return Ok(());
233 }
234
235 let update_device_seen = device_id.map_async(|device_id| {
236 self.services
237 .users
238 .update_device_last_seen(user_id, device_id, client_ip, None)
239 });
240
241 let currently_active = *new_state == PresenceState::Online;
242 let set_presence = self.apply_device_presence_update(
243 user_id,
244 Self::device_key(device_id, false),
245 new_state,
246 Some(currently_active),
247 UInt::new(0),
248 StatusMsg::Unchanged,
249 Some(REFRESH_TIMEOUT),
250 );
251
252 debug!(?user_id, ?new_state, currently_active, "Presence ping accepted");
253
254 futures::future::try_join(set_presence, update_device_seen.unwrap_or(Ok(())))
255 .map_ok(|_| ())
256 .await
257 }
258
259 pub async fn set_presence_for_device(
261 &self,
262 user_id: &UserId,
263 device_id: Option<&DeviceId>,
264 state: &PresenceState,
265 status_msg: Option<String>,
266 ) -> Result {
267 let currently_active = *state == PresenceState::Online;
268 self.apply_device_presence_update(
269 user_id,
270 Self::device_key(device_id, false),
271 state,
272 Some(currently_active),
273 None,
274 StatusMsg::Set(status_msg),
275 None,
276 )
277 .await
278 }
279
280 pub async fn set_presence_from_federation(
282 &self,
283 user_id: &UserId,
284 state: &PresenceState,
285 currently_active: bool,
286 last_active_ago: UInt,
287 status_msg: Option<String>,
288 ) -> Result {
289 self.apply_device_presence_update(
290 user_id,
291 Self::device_key(None, true),
292 state,
293 Some(currently_active),
294 Some(last_active_ago),
295 StatusMsg::Set(status_msg),
296 None,
297 )
298 .await
299 }
300
301 pub async fn set_presence(
303 &self,
304 user_id: &UserId,
305 state: &PresenceState,
306 currently_active: Option<bool>,
307 last_active_ago: Option<UInt>,
308 status_msg: Option<String>,
309 ) -> Result {
310 let presence_state = match state.as_str() {
311 | "" => &PresenceState::Offline, | &_ => state,
313 };
314
315 let count = self
316 .db
317 .set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg)
318 .await?;
319
320 if let Some(count) = count {
321 let is_local = self.services.globals.user_is_local(user_id);
322 let is_server_user = user_id == self.services.globals.server_user;
323 let allow_timeout = self.timeout_remote_users || is_local;
324
325 if allow_timeout && !is_server_user {
326 self.schedule_presence_timer(user_id, presence_state, count)?;
327 }
328 }
329
330 Ok(())
331 }
332
333 pub(super) async fn process_presence_timer(
334 &self,
335 user_id: &OwnedUserId,
336 expected_count: u64,
337 ) -> Result {
338 let Ok((current_count, presence)) = self.db.get_presence_raw(user_id).await else {
339 return Ok(());
340 };
341
342 if Self::timer_is_stale(expected_count, current_count) {
343 trace!(?user_id, expected_count, current_count, "Skipping stale presence timer");
344 return Ok(());
345 }
346
347 let presence_state = presence.state().clone();
348 let now = tuwunel_core::utils::millis_since_unix_epoch();
349 let aggregated = self
350 .device_presence
351 .aggregate(user_id, now, self.idle_timeout, self.offline_timeout)
352 .await;
353
354 if aggregated.device_count == 0 {
355 let last_active_ago =
356 Some(UInt::new_saturating(now.saturating_sub(presence.last_active_ts())));
357 let status_msg = presence.status_msg();
358
359 let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
360 | (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout =>
361 Some(PresenceState::Unavailable),
362 | (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout =>
363 Some(PresenceState::Offline),
364 | _ => None,
365 };
366
367 debug!(
368 "Processed presence timer for user '{user_id}': Old state = {presence_state}, \
369 New state = {new_state:?}"
370 );
371
372 if let Some(new_state) = new_state {
373 if matches!(new_state, PresenceState::Unavailable | PresenceState::Offline) {
374 self.services
375 .sending
376 .schedule_flush_suppressed_for_user(
377 user_id.to_owned(),
378 "presence->inactive",
379 );
380 }
381 self.set_presence(user_id, &new_state, Some(false), last_active_ago, status_msg)
382 .await?;
383 }
384
385 return Ok(());
386 }
387
388 if aggregated.state == presence_state {
389 self.schedule_presence_timer(user_id, &presence_state, current_count)
390 .log_err()
391 .ok();
392 return Ok(());
393 }
394
395 if matches!(aggregated.state, PresenceState::Unavailable | PresenceState::Offline) {
396 self.services
397 .sending
398 .schedule_flush_suppressed_for_user(user_id.to_owned(), "presence->inactive");
399 }
400
401 let status_msg = aggregated
402 .status_msg
403 .or_else(|| presence.status_msg());
404 let last_active_ago =
405 Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
406
407 self.set_presence(
408 user_id,
409 &aggregated.state,
410 Some(aggregated.currently_active),
411 last_active_ago,
412 status_msg,
413 )
414 .await?;
415
416 Ok(())
417 }
418}
419
420pub(super) async fn presence_timer(
421 user_id: OwnedUserId,
422 timeout: Duration,
423 count: u64,
424) -> TimerFired {
425 sleep(timeout).await;
426
427 (user_id, count)
428}
429
430#[cfg(test)]
431mod tests {
432 use ruma::{presence::PresenceState, uint, user_id};
433
434 use super::*;
435
436 #[test]
437 fn refresh_window_skip_decision() {
438 let user_id = user_id!("@alice:example.com");
439 let event = PresenceEvent {
440 sender: user_id.to_owned(),
441 content: ruma::events::presence::PresenceEventContent {
442 presence: PresenceState::Online,
443 status_msg: None,
444 currently_active: Some(true),
445 last_active_ago: Some(uint!(10)),
446 avatar_url: None,
447 displayname: None,
448 },
449 };
450
451 let decision = Service::refresh_skip_decision(Some(20), Some(&event), Some(5));
452 assert_eq!(decision, Some((5, 10)));
453
454 let decision = Service::refresh_skip_decision(Some(5), Some(&event), Some(5));
455 assert_eq!(decision, None);
456
457 let event_missing_ago = PresenceEvent {
458 sender: user_id.to_owned(),
459 content: ruma::events::presence::PresenceEventContent {
460 presence: PresenceState::Online,
461 status_msg: None,
462 currently_active: Some(true),
463 last_active_ago: None,
464 avatar_url: None,
465 displayname: None,
466 },
467 };
468
469 let decision =
470 Service::refresh_skip_decision(Some(20), Some(&event_missing_ago), Some(5));
471 assert_eq!(decision, None);
472
473 let decision = Service::refresh_skip_decision(Some(20), None, Some(5));
474 assert_eq!(decision, None);
475 }
476
477 #[test]
478 fn timer_stale_detection() {
479 assert!(Service::timer_is_stale(2, 3));
480 assert!(!Service::timer_is_stale(2, 2));
481 }
482}