Skip to main content

tuwunel_service/presence/
aggregate.rs

1//! Presence aggregation across devices.
2//!
3//! This module keeps per-device presence snapshots and computes a single
4//! user-level presence view. Aggregation applies idle/offline thresholds,
5//! favors higher-ranked states, and prunes stale devices to cap memory.
6
7use std::collections::HashMap;
8
9use ruma::{OwnedDeviceId, OwnedUserId, UInt, UserId, presence::PresenceState};
10use tokio::sync::RwLock;
11use tuwunel_core::debug;
12
13#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub(crate) enum DeviceKey {
15	Device(OwnedDeviceId),
16	Remote,
17	UnknownLocal,
18}
19
20/// Kinds of updates to the per-device `status_msg`.
21///
22/// `Unchanged` preserves what the device already has.
23/// `Set` writes through, including `None` and `Some("")` to clear it.
24#[derive(Debug, Clone)]
25pub(crate) enum StatusMsg {
26	Set(Option<String>),
27	Unchanged,
28}
29
30#[derive(Debug, Clone)]
31struct DevicePresence {
32	state: PresenceState,
33	currently_active: bool,
34	last_active_ts: u64,
35	last_update_ts: u64,
36	status_msg: Option<String>,
37}
38
39#[derive(Debug, Clone)]
40pub(crate) struct AggregatedPresence {
41	pub(crate) state: PresenceState,
42	pub(crate) currently_active: bool,
43	pub(crate) last_active_ts: u64,
44	pub(crate) status_msg: Option<String>,
45	pub(crate) device_count: usize,
46}
47
48#[derive(Debug, Default)]
49pub(crate) struct PresenceAggregator {
50	inner: RwLock<HashMap<OwnedUserId, HashMap<DeviceKey, DevicePresence>>>,
51}
52
53impl PresenceAggregator {
54	/// Create a new, empty aggregator.
55	pub(crate) fn new() -> Self { Self::default() }
56
57	/// Clear all tracked device state.
58	pub(crate) async fn clear(&self) { self.inner.write().await.clear(); }
59
60	/// Update presence state for a single device.
61	#[expect(clippy::too_many_arguments)]
62	pub(crate) async fn update(
63		&self,
64		user_id: &UserId,
65		device_key: DeviceKey,
66		state: &PresenceState,
67		currently_active: Option<bool>,
68		last_active_ago: Option<UInt>,
69		status_msg: StatusMsg,
70		now_ms: u64,
71	) {
72		let mut guard = self.inner.write().await;
73		let devices = guard.entry(user_id.to_owned()).or_default();
74
75		let last_active_ts = match last_active_ago {
76			| None => now_ms,
77			| Some(ago) => now_ms.saturating_sub(ago.into()),
78		};
79
80		let initial_status = match &status_msg {
81			| StatusMsg::Set(msg) => msg.clone(),
82			| StatusMsg::Unchanged => None,
83		};
84
85		let entry = devices
86			.entry(device_key)
87			.or_insert_with(|| DevicePresence {
88				state: state.clone(),
89				currently_active: currently_active.unwrap_or(false),
90				last_active_ts,
91				last_update_ts: now_ms,
92				status_msg: initial_status,
93			});
94
95		entry.state = state.clone();
96		entry.currently_active = currently_active.unwrap_or(false);
97		entry.last_active_ts = last_active_ts;
98		entry.last_update_ts = now_ms;
99		if let StatusMsg::Set(msg) = status_msg {
100			entry.status_msg = msg;
101		}
102	}
103
104	/// Aggregate per-device state into a single presence snapshot.
105	///
106	/// Prunes devices that have not updated within the offline timeout to keep
107	/// the map bounded.
108	pub(crate) async fn aggregate(
109		&self,
110		user_id: &UserId,
111		now_ms: u64,
112		idle_timeout_ms: u64,
113		offline_timeout_ms: u64,
114	) -> AggregatedPresence {
115		let mut guard = self.inner.write().await;
116		let Some(devices) = guard.get_mut(user_id) else {
117			return AggregatedPresence {
118				state: PresenceState::Offline,
119				currently_active: false,
120				last_active_ts: now_ms,
121				status_msg: None,
122				device_count: 0,
123			};
124		};
125
126		let mut best_state = PresenceState::Offline;
127		let mut best_rank = state_rank(&best_state);
128		let mut any_currently_active = false;
129		let mut last_active_ts = 0_u64;
130		let mut latest_status: Option<(u64, String)> = None;
131
132		devices.retain(|_, device| {
133			let last_active_age = now_ms.saturating_sub(device.last_active_ts);
134			let last_update_age = now_ms.saturating_sub(device.last_update_ts);
135
136			let effective_state = effective_device_state(
137				&device.state,
138				last_active_age,
139				idle_timeout_ms,
140				offline_timeout_ms,
141			);
142
143			let rank = state_rank(&effective_state);
144			if rank > best_rank {
145				best_rank = rank;
146				best_state = effective_state.clone();
147			}
148
149			if (effective_state == PresenceState::Online
150				|| effective_state == PresenceState::Busy)
151				&& device.currently_active
152				&& last_active_age < idle_timeout_ms
153			{
154				any_currently_active = true;
155			}
156
157			if let Some(msg) = device
158				.status_msg
159				.as_ref()
160				.filter(|msg| !msg.is_empty())
161			{
162				match latest_status {
163					| None => {
164						latest_status = Some((device.last_update_ts, msg.clone()));
165					},
166					| Some((ts, _)) if device.last_update_ts > ts => {
167						latest_status = Some((device.last_update_ts, msg.clone()));
168					},
169					| _ => {},
170				}
171			}
172
173			if device.last_active_ts > last_active_ts {
174				last_active_ts = device.last_active_ts;
175			}
176
177			// Drop devices that haven't updated in a long time to keep the map small.
178			last_update_age < offline_timeout_ms
179		});
180
181		let device_count = devices.len();
182		let status_msg = latest_status.map(|(_, msg)| msg);
183
184		if device_count == 0 {
185			guard.remove(user_id);
186			return AggregatedPresence {
187				state: PresenceState::Offline,
188				currently_active: false,
189				last_active_ts: now_ms,
190				status_msg: None,
191				device_count: 0,
192			};
193		}
194
195		debug!(
196			?user_id,
197			device_count,
198			state = ?best_state,
199			currently_active = any_currently_active,
200			last_active_ts,
201			status_msg = status_msg.as_deref(),
202			"Aggregated presence"
203		);
204
205		AggregatedPresence {
206			state: best_state,
207			currently_active: any_currently_active,
208			last_active_ts: if last_active_ts == 0 { now_ms } else { last_active_ts },
209			status_msg,
210			device_count,
211		}
212	}
213}
214
215fn effective_device_state(
216	state: &PresenceState,
217	last_active_age: u64,
218	idle_timeout_ms: u64,
219	offline_timeout_ms: u64,
220) -> PresenceState {
221	match state {
222		| PresenceState::Busy | PresenceState::Online =>
223			if last_active_age >= idle_timeout_ms {
224				PresenceState::Unavailable
225			} else {
226				state.clone()
227			},
228		| PresenceState::Unavailable =>
229			if last_active_age >= offline_timeout_ms {
230				PresenceState::Offline
231			} else {
232				PresenceState::Unavailable
233			},
234		| PresenceState::Offline => PresenceState::Offline,
235		| _ => state.clone(),
236	}
237}
238
239fn state_rank(state: &PresenceState) -> u8 {
240	match state {
241		| PresenceState::Busy => 3,
242		| PresenceState::Online => 2,
243		| PresenceState::Unavailable => 1,
244		| _ => 0,
245	}
246}
247
248#[cfg(test)]
249mod tests {
250	use ruma::{device_id, uint, user_id};
251
252	use super::*;
253
254	#[tokio::test]
255	async fn aggregates_rank_and_status_msg() {
256		let aggregator = PresenceAggregator::new();
257		let user = user_id!("@alice:example.com");
258		let now = 1_000_u64;
259
260		aggregator
261			.update(
262				user,
263				DeviceKey::Device(device_id!("DEVICE_A").to_owned()),
264				&PresenceState::Unavailable,
265				Some(false),
266				Some(uint!(50)),
267				StatusMsg::Set(Some("away".into())),
268				now,
269			)
270			.await;
271
272		aggregator
273			.update(
274				user,
275				DeviceKey::Device(device_id!("DEVICE_B").to_owned()),
276				&PresenceState::Online,
277				Some(true),
278				Some(uint!(10)),
279				StatusMsg::Set(Some("online".into())),
280				now + 10,
281			)
282			.await;
283
284		let aggregated = aggregator
285			.aggregate(user, now + 10, 100, 300)
286			.await;
287
288		assert_eq!(aggregated.state, PresenceState::Online);
289		assert!(aggregated.currently_active);
290		assert_eq!(aggregated.status_msg.as_deref(), Some("online"));
291		assert_eq!(aggregated.device_count, 2);
292	}
293
294	#[tokio::test]
295	async fn degrades_online_to_unavailable_after_idle() {
296		let aggregator = PresenceAggregator::new();
297		let user = user_id!("@bob:example.com");
298		let now = 10_000_u64;
299
300		aggregator
301			.update(
302				user,
303				DeviceKey::Device(device_id!("DEVICE_IDLE").to_owned()),
304				&PresenceState::Online,
305				Some(true),
306				Some(uint!(500)),
307				StatusMsg::Unchanged,
308				now,
309			)
310			.await;
311
312		let aggregated = aggregator
313			.aggregate(user, now + 500, 100, 1_000)
314			.await;
315
316		assert_eq!(aggregated.state, PresenceState::Unavailable);
317	}
318
319	#[tokio::test]
320	async fn explicit_set_clears_status_msg() {
321		let aggregator = PresenceAggregator::new();
322		let user = user_id!("@alice:example.com");
323		let device = DeviceKey::Device(device_id!("DEVICE_A").to_owned());
324		let now = 1_000_u64;
325
326		aggregator
327			.update(
328				user,
329				device.clone(),
330				&PresenceState::Online,
331				Some(true),
332				Some(uint!(0)),
333				StatusMsg::Set(Some("busy".to_owned())),
334				now,
335			)
336			.await;
337
338		let aggregated = aggregator.aggregate(user, now, 100, 300).await;
339		assert_eq!(aggregated.status_msg.as_deref(), Some("busy"));
340
341		aggregator
342			.update(
343				user,
344				device.clone(),
345				&PresenceState::Online,
346				Some(true),
347				Some(uint!(0)),
348				StatusMsg::Set(Some(String::new())),
349				now + 1,
350			)
351			.await;
352
353		let aggregated = aggregator
354			.aggregate(user, now + 1, 100, 300)
355			.await;
356
357		assert!(aggregated.status_msg.is_none());
358
359		aggregator
360			.update(
361				user,
362				device.clone(),
363				&PresenceState::Online,
364				Some(true),
365				Some(uint!(0)),
366				StatusMsg::Set(Some("back".to_owned())),
367				now + 2,
368			)
369			.await;
370
371		aggregator
372			.update(
373				user,
374				device,
375				&PresenceState::Online,
376				Some(true),
377				Some(uint!(0)),
378				StatusMsg::Set(None),
379				now + 3,
380			)
381			.await;
382
383		let aggregated = aggregator
384			.aggregate(user, now + 3, 100, 300)
385			.await;
386
387		assert!(aggregated.status_msg.is_none());
388	}
389
390	#[tokio::test]
391	async fn unchanged_preserves_status_msg() {
392		let aggregator = PresenceAggregator::new();
393		let user = user_id!("@alice:example.com");
394		let device = DeviceKey::Device(device_id!("DEVICE_A").to_owned());
395		let now = 1_000_u64;
396
397		aggregator
398			.update(
399				user,
400				device.clone(),
401				&PresenceState::Online,
402				Some(true),
403				Some(uint!(0)),
404				StatusMsg::Set(Some("away".to_owned())),
405				now,
406			)
407			.await;
408
409		aggregator
410			.update(
411				user,
412				device,
413				&PresenceState::Online,
414				Some(true),
415				Some(uint!(0)),
416				StatusMsg::Unchanged,
417				now + 1,
418			)
419			.await;
420
421		let aggregated = aggregator
422			.aggregate(user, now + 1, 100, 300)
423			.await;
424
425		assert_eq!(aggregated.status_msg.as_deref(), Some("away"));
426	}
427
428	#[tokio::test]
429	async fn drops_stale_devices_on_aggregate() {
430		let aggregator = PresenceAggregator::new();
431		let user = user_id!("@carol:example.com");
432
433		aggregator
434			.update(
435				user,
436				DeviceKey::Device(device_id!("DEVICE_STALE").to_owned()),
437				&PresenceState::Online,
438				Some(true),
439				Some(uint!(10)),
440				StatusMsg::Unchanged,
441				0,
442			)
443			.await;
444
445		let aggregated = aggregator.aggregate(user, 1_000, 100, 100).await;
446
447		assert_eq!(aggregated.device_count, 0);
448		assert_eq!(aggregated.state, PresenceState::Offline);
449	}
450}