1use std::collections::HashMap;
8
9use ruma::{OwnedDeviceId, OwnedUserId, UInt, UserId, presence::PresenceState};
10use tokio::sync::RwLock;
11use tuwunel_core::debug;
12
13#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub(crate) enum DeviceKey {
15 Device(OwnedDeviceId),
16 Remote,
17 UnknownLocal,
18}
19
20#[derive(Debug, Clone)]
25pub(crate) enum StatusMsg {
26 Set(Option<String>),
27 Unchanged,
28}
29
30#[derive(Debug, Clone)]
31struct DevicePresence {
32 state: PresenceState,
33 currently_active: bool,
34 last_active_ts: u64,
35 last_update_ts: u64,
36 status_msg: Option<String>,
37}
38
39#[derive(Debug, Clone)]
40pub(crate) struct AggregatedPresence {
41 pub(crate) state: PresenceState,
42 pub(crate) currently_active: bool,
43 pub(crate) last_active_ts: u64,
44 pub(crate) status_msg: Option<String>,
45 pub(crate) device_count: usize,
46}
47
48#[derive(Debug, Default)]
49pub(crate) struct PresenceAggregator {
50 inner: RwLock<HashMap<OwnedUserId, HashMap<DeviceKey, DevicePresence>>>,
51}
52
53impl PresenceAggregator {
54 pub(crate) fn new() -> Self { Self::default() }
56
57 pub(crate) async fn clear(&self) { self.inner.write().await.clear(); }
59
60 #[expect(clippy::too_many_arguments)]
62 pub(crate) async fn update(
63 &self,
64 user_id: &UserId,
65 device_key: DeviceKey,
66 state: &PresenceState,
67 currently_active: Option<bool>,
68 last_active_ago: Option<UInt>,
69 status_msg: StatusMsg,
70 now_ms: u64,
71 ) {
72 let mut guard = self.inner.write().await;
73 let devices = guard.entry(user_id.to_owned()).or_default();
74
75 let last_active_ts = match last_active_ago {
76 | None => now_ms,
77 | Some(ago) => now_ms.saturating_sub(ago.into()),
78 };
79
80 let initial_status = match &status_msg {
81 | StatusMsg::Set(msg) => msg.clone(),
82 | StatusMsg::Unchanged => None,
83 };
84
85 let entry = devices
86 .entry(device_key)
87 .or_insert_with(|| DevicePresence {
88 state: state.clone(),
89 currently_active: currently_active.unwrap_or(false),
90 last_active_ts,
91 last_update_ts: now_ms,
92 status_msg: initial_status,
93 });
94
95 entry.state = state.clone();
96 entry.currently_active = currently_active.unwrap_or(false);
97 entry.last_active_ts = last_active_ts;
98 entry.last_update_ts = now_ms;
99 if let StatusMsg::Set(msg) = status_msg {
100 entry.status_msg = msg;
101 }
102 }
103
104 pub(crate) async fn aggregate(
109 &self,
110 user_id: &UserId,
111 now_ms: u64,
112 idle_timeout_ms: u64,
113 offline_timeout_ms: u64,
114 ) -> AggregatedPresence {
115 let mut guard = self.inner.write().await;
116 let Some(devices) = guard.get_mut(user_id) else {
117 return AggregatedPresence {
118 state: PresenceState::Offline,
119 currently_active: false,
120 last_active_ts: now_ms,
121 status_msg: None,
122 device_count: 0,
123 };
124 };
125
126 let mut best_state = PresenceState::Offline;
127 let mut best_rank = state_rank(&best_state);
128 let mut any_currently_active = false;
129 let mut last_active_ts = 0_u64;
130 let mut latest_status: Option<(u64, String)> = None;
131
132 devices.retain(|_, device| {
133 let last_active_age = now_ms.saturating_sub(device.last_active_ts);
134 let last_update_age = now_ms.saturating_sub(device.last_update_ts);
135
136 let effective_state = effective_device_state(
137 &device.state,
138 last_active_age,
139 idle_timeout_ms,
140 offline_timeout_ms,
141 );
142
143 let rank = state_rank(&effective_state);
144 if rank > best_rank {
145 best_rank = rank;
146 best_state = effective_state.clone();
147 }
148
149 if (effective_state == PresenceState::Online
150 || effective_state == PresenceState::Busy)
151 && device.currently_active
152 && last_active_age < idle_timeout_ms
153 {
154 any_currently_active = true;
155 }
156
157 if let Some(msg) = device
158 .status_msg
159 .as_ref()
160 .filter(|msg| !msg.is_empty())
161 {
162 match latest_status {
163 | None => {
164 latest_status = Some((device.last_update_ts, msg.clone()));
165 },
166 | Some((ts, _)) if device.last_update_ts > ts => {
167 latest_status = Some((device.last_update_ts, msg.clone()));
168 },
169 | _ => {},
170 }
171 }
172
173 if device.last_active_ts > last_active_ts {
174 last_active_ts = device.last_active_ts;
175 }
176
177 last_update_age < offline_timeout_ms
179 });
180
181 let device_count = devices.len();
182 let status_msg = latest_status.map(|(_, msg)| msg);
183
184 if device_count == 0 {
185 guard.remove(user_id);
186 return AggregatedPresence {
187 state: PresenceState::Offline,
188 currently_active: false,
189 last_active_ts: now_ms,
190 status_msg: None,
191 device_count: 0,
192 };
193 }
194
195 debug!(
196 ?user_id,
197 device_count,
198 state = ?best_state,
199 currently_active = any_currently_active,
200 last_active_ts,
201 status_msg = status_msg.as_deref(),
202 "Aggregated presence"
203 );
204
205 AggregatedPresence {
206 state: best_state,
207 currently_active: any_currently_active,
208 last_active_ts: if last_active_ts == 0 { now_ms } else { last_active_ts },
209 status_msg,
210 device_count,
211 }
212 }
213}
214
215fn effective_device_state(
216 state: &PresenceState,
217 last_active_age: u64,
218 idle_timeout_ms: u64,
219 offline_timeout_ms: u64,
220) -> PresenceState {
221 match state {
222 | PresenceState::Busy | PresenceState::Online =>
223 if last_active_age >= idle_timeout_ms {
224 PresenceState::Unavailable
225 } else {
226 state.clone()
227 },
228 | PresenceState::Unavailable =>
229 if last_active_age >= offline_timeout_ms {
230 PresenceState::Offline
231 } else {
232 PresenceState::Unavailable
233 },
234 | PresenceState::Offline => PresenceState::Offline,
235 | _ => state.clone(),
236 }
237}
238
239fn state_rank(state: &PresenceState) -> u8 {
240 match state {
241 | PresenceState::Busy => 3,
242 | PresenceState::Online => 2,
243 | PresenceState::Unavailable => 1,
244 | _ => 0,
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use ruma::{device_id, uint, user_id};
251
252 use super::*;
253
254 #[tokio::test]
255 async fn aggregates_rank_and_status_msg() {
256 let aggregator = PresenceAggregator::new();
257 let user = user_id!("@alice:example.com");
258 let now = 1_000_u64;
259
260 aggregator
261 .update(
262 user,
263 DeviceKey::Device(device_id!("DEVICE_A").to_owned()),
264 &PresenceState::Unavailable,
265 Some(false),
266 Some(uint!(50)),
267 StatusMsg::Set(Some("away".into())),
268 now,
269 )
270 .await;
271
272 aggregator
273 .update(
274 user,
275 DeviceKey::Device(device_id!("DEVICE_B").to_owned()),
276 &PresenceState::Online,
277 Some(true),
278 Some(uint!(10)),
279 StatusMsg::Set(Some("online".into())),
280 now + 10,
281 )
282 .await;
283
284 let aggregated = aggregator
285 .aggregate(user, now + 10, 100, 300)
286 .await;
287
288 assert_eq!(aggregated.state, PresenceState::Online);
289 assert!(aggregated.currently_active);
290 assert_eq!(aggregated.status_msg.as_deref(), Some("online"));
291 assert_eq!(aggregated.device_count, 2);
292 }
293
294 #[tokio::test]
295 async fn degrades_online_to_unavailable_after_idle() {
296 let aggregator = PresenceAggregator::new();
297 let user = user_id!("@bob:example.com");
298 let now = 10_000_u64;
299
300 aggregator
301 .update(
302 user,
303 DeviceKey::Device(device_id!("DEVICE_IDLE").to_owned()),
304 &PresenceState::Online,
305 Some(true),
306 Some(uint!(500)),
307 StatusMsg::Unchanged,
308 now,
309 )
310 .await;
311
312 let aggregated = aggregator
313 .aggregate(user, now + 500, 100, 1_000)
314 .await;
315
316 assert_eq!(aggregated.state, PresenceState::Unavailable);
317 }
318
319 #[tokio::test]
320 async fn explicit_set_clears_status_msg() {
321 let aggregator = PresenceAggregator::new();
322 let user = user_id!("@alice:example.com");
323 let device = DeviceKey::Device(device_id!("DEVICE_A").to_owned());
324 let now = 1_000_u64;
325
326 aggregator
327 .update(
328 user,
329 device.clone(),
330 &PresenceState::Online,
331 Some(true),
332 Some(uint!(0)),
333 StatusMsg::Set(Some("busy".to_owned())),
334 now,
335 )
336 .await;
337
338 let aggregated = aggregator.aggregate(user, now, 100, 300).await;
339 assert_eq!(aggregated.status_msg.as_deref(), Some("busy"));
340
341 aggregator
342 .update(
343 user,
344 device.clone(),
345 &PresenceState::Online,
346 Some(true),
347 Some(uint!(0)),
348 StatusMsg::Set(Some(String::new())),
349 now + 1,
350 )
351 .await;
352
353 let aggregated = aggregator
354 .aggregate(user, now + 1, 100, 300)
355 .await;
356
357 assert!(aggregated.status_msg.is_none());
358
359 aggregator
360 .update(
361 user,
362 device.clone(),
363 &PresenceState::Online,
364 Some(true),
365 Some(uint!(0)),
366 StatusMsg::Set(Some("back".to_owned())),
367 now + 2,
368 )
369 .await;
370
371 aggregator
372 .update(
373 user,
374 device,
375 &PresenceState::Online,
376 Some(true),
377 Some(uint!(0)),
378 StatusMsg::Set(None),
379 now + 3,
380 )
381 .await;
382
383 let aggregated = aggregator
384 .aggregate(user, now + 3, 100, 300)
385 .await;
386
387 assert!(aggregated.status_msg.is_none());
388 }
389
390 #[tokio::test]
391 async fn unchanged_preserves_status_msg() {
392 let aggregator = PresenceAggregator::new();
393 let user = user_id!("@alice:example.com");
394 let device = DeviceKey::Device(device_id!("DEVICE_A").to_owned());
395 let now = 1_000_u64;
396
397 aggregator
398 .update(
399 user,
400 device.clone(),
401 &PresenceState::Online,
402 Some(true),
403 Some(uint!(0)),
404 StatusMsg::Set(Some("away".to_owned())),
405 now,
406 )
407 .await;
408
409 aggregator
410 .update(
411 user,
412 device,
413 &PresenceState::Online,
414 Some(true),
415 Some(uint!(0)),
416 StatusMsg::Unchanged,
417 now + 1,
418 )
419 .await;
420
421 let aggregated = aggregator
422 .aggregate(user, now + 1, 100, 300)
423 .await;
424
425 assert_eq!(aggregated.status_msg.as_deref(), Some("away"));
426 }
427
428 #[tokio::test]
429 async fn drops_stale_devices_on_aggregate() {
430 let aggregator = PresenceAggregator::new();
431 let user = user_id!("@carol:example.com");
432
433 aggregator
434 .update(
435 user,
436 DeviceKey::Device(device_id!("DEVICE_STALE").to_owned()),
437 &PresenceState::Online,
438 Some(true),
439 Some(uint!(10)),
440 StatusMsg::Unchanged,
441 0,
442 )
443 .await;
444
445 let aggregated = aggregator.aggregate(user, 1_000, 100, 100).await;
446
447 assert_eq!(aggregated.device_count, 0);
448 assert_eq!(aggregated.state, PresenceState::Offline);
449 }
450}