Skip to main content

tuwunel_service/presence/
pipeline.rs

1//! Presence update pipeline.
2//!
3//! This module centralizes the write path for presence updates. It keeps the
4//! aggregation and timer logic in one place so the public `Service` surface
5//! remains small and the update flow is easy to review.
6
7use std::{net::IpAddr, time::Duration};
8
9use futures::TryFutureExt;
10use ruma::{
11	DeviceId, OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState,
12};
13use tokio::time::sleep;
14use tuwunel_core::{
15	Error, Result, debug, error,
16	result::LogErr,
17	trace,
18	utils::{future::OptionFutureExt, option::OptionExt},
19};
20
21use super::{
22	Service, TimerFired,
23	aggregate::{self, StatusMsg},
24};
25
26impl Service {
27	fn device_key(device_id: Option<&DeviceId>, is_remote: bool) -> aggregate::DeviceKey {
28		if is_remote {
29			return aggregate::DeviceKey::Remote;
30		}
31
32		match device_id {
33			| Some(device_id) => aggregate::DeviceKey::Device(device_id.to_owned()),
34			| None => aggregate::DeviceKey::UnknownLocal,
35		}
36	}
37
38	fn schedule_presence_timer(
39		&self,
40		user_id: &UserId,
41		presence_state: &PresenceState,
42		count: u64,
43	) -> Result {
44		if !(self.timeout_remote_users || self.services.globals.user_is_local(user_id))
45			|| user_id == self.services.globals.server_user
46		{
47			return Ok(());
48		}
49
50		let timeout = match presence_state {
51			| PresenceState::Online =>
52				self.services
53					.server
54					.config
55					.presence_idle_timeout_s,
56			| _ =>
57				self.services
58					.server
59					.config
60					.presence_offline_timeout_s,
61		};
62
63		self.timer_channel
64			.0
65			.send((user_id.to_owned(), Duration::from_secs(timeout), count))
66			.map_err(|e| {
67				error!("Failed to add presence timer: {}", e);
68				Error::bad_database("Failed to add presence timer")
69			})
70	}
71
72	fn refresh_skip_decision(
73		refresh_window_ms: Option<u64>,
74		last_event: Option<&PresenceEvent>,
75		last_count: Option<u64>,
76	) -> Option<(u64, u64)> {
77		let (Some(refresh_ms), Some(event), Some(count)) =
78			(refresh_window_ms, last_event, last_count)
79		else {
80			return None;
81		};
82
83		let last_last_active_ago: u64 = event.content.last_active_ago?.into();
84
85		(last_last_active_ago < refresh_ms).then_some((count, last_last_active_ago))
86	}
87
88	fn timer_is_stale(expected_count: u64, current_count: u64) -> bool {
89		expected_count != current_count
90	}
91
92	#[expect(clippy::too_many_arguments)]
93	async fn apply_device_presence_update(
94		&self,
95		user_id: &UserId,
96		device_key: aggregate::DeviceKey,
97		state: &PresenceState,
98		currently_active: Option<bool>,
99		last_active_ago: Option<UInt>,
100		status_msg: StatusMsg,
101		refresh_window_ms: Option<u64>,
102	) -> Result {
103		let now = tuwunel_core::utils::millis_since_unix_epoch();
104		let preserve_status = matches!(status_msg, StatusMsg::Unchanged);
105		// 1) Capture per-device presence snapshot for aggregation.
106		debug!(
107			?user_id,
108			?device_key,
109			?state,
110			currently_active,
111			last_active_ago = last_active_ago.map(u64::from),
112			"Presence update received"
113		);
114		self.device_presence
115			.update(
116				user_id,
117				device_key,
118				state,
119				currently_active,
120				last_active_ago,
121				status_msg,
122				now,
123			)
124			.await;
125
126		// 2) Compute the aggregated presence across all devices.
127		let aggregated = self
128			.device_presence
129			.aggregate(user_id, now, self.idle_timeout, self.offline_timeout)
130			.await;
131		debug!(
132			?user_id,
133			agg_state = ?aggregated.state,
134			agg_currently_active = aggregated.currently_active,
135			agg_last_active_ts = aggregated.last_active_ts,
136			agg_device_count = aggregated.device_count,
137			"Presence aggregate computed"
138		);
139
140		// 3) Load the last persisted presence to decide whether to skip or merge.
141		let last_presence = self.db.get_presence(user_id).await;
142		let (last_count, last_event) = match last_presence {
143			| Ok((count, event)) => (Some(count), Some(event)),
144			| Err(_) => (None, None),
145		};
146
147		let last_state = last_event
148			.as_ref()
149			.map(|event| event.content.presence.clone());
150
151		let state_changed = match &last_event {
152			| Some(event) => event.content.presence != aggregated.state,
153			| None => true,
154		};
155
156		// 4) For rapid pings with no state change, skip writes and reschedule.
157		if !state_changed
158			&& let Some((count, last_last_active_ago)) =
159				Self::refresh_skip_decision(refresh_window_ms, last_event.as_ref(), last_count)
160		{
161			let presence = last_event
162				.as_ref()
163				.map(|event| &event.content.presence)
164				.unwrap_or(state);
165
166			self.schedule_presence_timer(user_id, presence, count)
167				.log_err()
168				.ok();
169			debug!(
170				?user_id,
171				?state,
172				last_last_active_ago,
173				"Skipping presence update: refresh window (timer rescheduled)"
174			);
175			return Ok(());
176		}
177
178		// 5) If we just transitioned away from online, flush suppressed pushes.
179		if matches!(last_state, Some(PresenceState::Online))
180			&& aggregated.state != PresenceState::Online
181		{
182			debug!(
183				?user_id,
184				from = ?PresenceState::Online,
185				to = ?aggregated.state,
186				"Presence went inactive; flushing suppressed pushes"
187			);
188			self.services
189				.sending
190				.schedule_flush_suppressed_for_user(
191					user_id.to_owned(),
192					"presence->inactive (aggregate)",
193				);
194		}
195
196		// 6) Unchanged preserves the last non-empty status; explicit None clears it.
197		let fallback_status = || {
198			last_event
199				.and_then(|event| event.content.status_msg)
200				.filter(|msg| !msg.is_empty())
201		};
202
203		let status_msg = aggregated
204			.status_msg
205			.or_else(|| preserve_status.then(fallback_status).flatten());
206
207		let last_active_ago =
208			Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
209
210		self.set_presence(
211			user_id,
212			&aggregated.state,
213			Some(aggregated.currently_active),
214			last_active_ago,
215			status_msg,
216		)
217		.await
218	}
219
220	/// Pings the presence of the given user, setting the specified state. When
221	/// device_id is supplied.
222	pub async fn maybe_ping_presence(
223		&self,
224		user_id: &UserId,
225		device_id: Option<&DeviceId>,
226		client_ip: Option<IpAddr>,
227		new_state: &PresenceState,
228	) -> Result {
229		const REFRESH_TIMEOUT: u64 = 30 * 1000;
230
231		if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
232			return Ok(());
233		}
234
235		let update_device_seen = device_id.map_async(|device_id| {
236			self.services
237				.users
238				.update_device_last_seen(user_id, device_id, client_ip, None)
239		});
240
241		let currently_active = *new_state == PresenceState::Online;
242		let set_presence = self.apply_device_presence_update(
243			user_id,
244			Self::device_key(device_id, false),
245			new_state,
246			Some(currently_active),
247			UInt::new(0),
248			StatusMsg::Unchanged,
249			Some(REFRESH_TIMEOUT),
250		);
251
252		debug!(?user_id, ?new_state, currently_active, "Presence ping accepted");
253
254		futures::future::try_join(set_presence, update_device_seen.unwrap_or(Ok(())))
255			.map_ok(|_| ())
256			.await
257	}
258
259	/// Applies an explicit presence update for a local device.
260	pub async fn set_presence_for_device(
261		&self,
262		user_id: &UserId,
263		device_id: Option<&DeviceId>,
264		state: &PresenceState,
265		status_msg: Option<String>,
266	) -> Result {
267		let currently_active = *state == PresenceState::Online;
268		self.apply_device_presence_update(
269			user_id,
270			Self::device_key(device_id, false),
271			state,
272			Some(currently_active),
273			None,
274			StatusMsg::Set(status_msg),
275			None,
276		)
277		.await
278	}
279
280	/// Applies a presence update received over federation.
281	pub async fn set_presence_from_federation(
282		&self,
283		user_id: &UserId,
284		state: &PresenceState,
285		currently_active: bool,
286		last_active_ago: UInt,
287		status_msg: Option<String>,
288	) -> Result {
289		self.apply_device_presence_update(
290			user_id,
291			Self::device_key(None, true),
292			state,
293			Some(currently_active),
294			Some(last_active_ago),
295			StatusMsg::Set(status_msg),
296			None,
297		)
298		.await
299	}
300
301	/// Adds a presence event which will be saved until a new event replaces it.
302	pub async fn set_presence(
303		&self,
304		user_id: &UserId,
305		state: &PresenceState,
306		currently_active: Option<bool>,
307		last_active_ago: Option<UInt>,
308		status_msg: Option<String>,
309	) -> Result {
310		let presence_state = match state.as_str() {
311			| "" => &PresenceState::Offline, // default an empty string to 'offline'
312			| &_ => state,
313		};
314
315		let count = self
316			.db
317			.set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg)
318			.await?;
319
320		if let Some(count) = count {
321			let is_local = self.services.globals.user_is_local(user_id);
322			let is_server_user = user_id == self.services.globals.server_user;
323			let allow_timeout = self.timeout_remote_users || is_local;
324
325			if allow_timeout && !is_server_user {
326				self.schedule_presence_timer(user_id, presence_state, count)?;
327			}
328		}
329
330		Ok(())
331	}
332
333	pub(super) async fn process_presence_timer(
334		&self,
335		user_id: &OwnedUserId,
336		expected_count: u64,
337	) -> Result {
338		let Ok((current_count, presence)) = self.db.get_presence_raw(user_id).await else {
339			return Ok(());
340		};
341
342		if Self::timer_is_stale(expected_count, current_count) {
343			trace!(?user_id, expected_count, current_count, "Skipping stale presence timer");
344			return Ok(());
345		}
346
347		let presence_state = presence.state().clone();
348		let now = tuwunel_core::utils::millis_since_unix_epoch();
349		let aggregated = self
350			.device_presence
351			.aggregate(user_id, now, self.idle_timeout, self.offline_timeout)
352			.await;
353
354		if aggregated.device_count == 0 {
355			let last_active_ago =
356				Some(UInt::new_saturating(now.saturating_sub(presence.last_active_ts())));
357			let status_msg = presence.status_msg();
358
359			let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
360				| (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout =>
361					Some(PresenceState::Unavailable),
362				| (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout =>
363					Some(PresenceState::Offline),
364				| _ => None,
365			};
366
367			debug!(
368				"Processed presence timer for user '{user_id}': Old state = {presence_state}, \
369				 New state = {new_state:?}"
370			);
371
372			if let Some(new_state) = new_state {
373				if matches!(new_state, PresenceState::Unavailable | PresenceState::Offline) {
374					self.services
375						.sending
376						.schedule_flush_suppressed_for_user(
377							user_id.to_owned(),
378							"presence->inactive",
379						);
380				}
381				self.set_presence(user_id, &new_state, Some(false), last_active_ago, status_msg)
382					.await?;
383			}
384
385			return Ok(());
386		}
387
388		if aggregated.state == presence_state {
389			self.schedule_presence_timer(user_id, &presence_state, current_count)
390				.log_err()
391				.ok();
392			return Ok(());
393		}
394
395		if matches!(aggregated.state, PresenceState::Unavailable | PresenceState::Offline) {
396			self.services
397				.sending
398				.schedule_flush_suppressed_for_user(user_id.to_owned(), "presence->inactive");
399		}
400
401		let status_msg = aggregated
402			.status_msg
403			.or_else(|| presence.status_msg());
404		let last_active_ago =
405			Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
406
407		self.set_presence(
408			user_id,
409			&aggregated.state,
410			Some(aggregated.currently_active),
411			last_active_ago,
412			status_msg,
413		)
414		.await?;
415
416		Ok(())
417	}
418}
419
420pub(super) async fn presence_timer(
421	user_id: OwnedUserId,
422	timeout: Duration,
423	count: u64,
424) -> TimerFired {
425	sleep(timeout).await;
426
427	(user_id, count)
428}
429
430#[cfg(test)]
431mod tests {
432	use ruma::{presence::PresenceState, uint, user_id};
433
434	use super::*;
435
436	#[test]
437	fn refresh_window_skip_decision() {
438		let user_id = user_id!("@alice:example.com");
439		let event = PresenceEvent {
440			sender: user_id.to_owned(),
441			content: ruma::events::presence::PresenceEventContent {
442				presence: PresenceState::Online,
443				status_msg: None,
444				currently_active: Some(true),
445				last_active_ago: Some(uint!(10)),
446				avatar_url: None,
447				displayname: None,
448			},
449		};
450
451		let decision = Service::refresh_skip_decision(Some(20), Some(&event), Some(5));
452		assert_eq!(decision, Some((5, 10)));
453
454		let decision = Service::refresh_skip_decision(Some(5), Some(&event), Some(5));
455		assert_eq!(decision, None);
456
457		let event_missing_ago = PresenceEvent {
458			sender: user_id.to_owned(),
459			content: ruma::events::presence::PresenceEventContent {
460				presence: PresenceState::Online,
461				status_msg: None,
462				currently_active: Some(true),
463				last_active_ago: None,
464				avatar_url: None,
465				displayname: None,
466			},
467		};
468
469		let decision =
470			Service::refresh_skip_decision(Some(20), Some(&event_missing_ago), Some(5));
471		assert_eq!(decision, None);
472
473		let decision = Service::refresh_skip_decision(Some(20), None, Some(5));
474		assert_eq!(decision, None);
475	}
476
477	#[test]
478	fn timer_stale_detection() {
479		assert!(Service::timer_is_stale(2, 3));
480		assert!(!Service::timer_is_stale(2, 2));
481	}
482}