Skip to main content

tuwunel_service/presence/
mod.rs

1mod aggregate;
2mod data;
3// Write/update pipeline lives in pipeline.rs.
4mod pipeline;
5mod presence;
6
7use std::{collections::HashMap, sync::Arc, time::Duration};
8
9use async_trait::async_trait;
10use futures::{
11	Stream, StreamExt, TryFutureExt,
12	future::{AbortHandle, Abortable},
13	stream::FuturesUnordered,
14};
15use loole::{Receiver, Sender};
16use ruma::{OwnedUserId, UserId, events::presence::PresenceEvent, presence::PresenceState};
17use tokio::sync::RwLock;
18use tuwunel_core::{Result, checked, debug, debug_warn, result::LogErr, trace};
19
20use self::{aggregate::PresenceAggregator, data::Data, presence::Presence};
21
22pub struct Service {
23	timer_channel: (Sender<TimerType>, Receiver<TimerType>),
24	timeout_remote_users: bool,
25	idle_timeout: u64,
26	offline_timeout: u64,
27	db: Data,
28	services: Arc<crate::services::OnceServices>,
29	last_sync_seen: RwLock<HashMap<OwnedUserId, u64>>,
30	device_presence: PresenceAggregator,
31}
32
33type TimerType = (OwnedUserId, Duration, u64);
34type TimerFired = (OwnedUserId, u64);
35
36#[async_trait]
37impl crate::Service for Service {
38	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
39		let config = &args.server.config;
40		let idle_timeout_s = config.presence_idle_timeout_s;
41		let offline_timeout_s = config.presence_offline_timeout_s;
42		Ok(Arc::new(Self {
43			timer_channel: loole::unbounded(),
44			timeout_remote_users: config.presence_timeout_remote_users,
45			idle_timeout: checked!(idle_timeout_s * 1_000)?,
46			offline_timeout: checked!(offline_timeout_s * 1_000)?,
47			db: Data::new(args),
48			services: args.services.clone(),
49			last_sync_seen: RwLock::new(HashMap::new()),
50			device_presence: PresenceAggregator::new(),
51		}))
52	}
53
54	async fn worker(self: Arc<Self>) -> Result {
55		// reset dormant online/away statuses to offline, and set the server user as
56		// online
57		self.unset_all_presence().await;
58		self.device_presence.clear().await;
59		_ = self
60			.maybe_ping_presence(
61				&self.services.globals.server_user,
62				None,
63				None,
64				&PresenceState::Online,
65			)
66			.await;
67
68		let receiver = self.timer_channel.1.clone();
69
70		let mut presence_timers: FuturesUnordered<_> = FuturesUnordered::new();
71		let mut timer_handles: HashMap<OwnedUserId, (u64, AbortHandle)> = HashMap::new();
72		while !receiver.is_closed() && self.services.server.is_running() {
73			tokio::select! {
74				Some(result) = presence_timers.next() => {
75					let Ok((user_id, count)) = result else {
76						continue;
77					};
78
79					if let Some((current_count, _)) = timer_handles.get(&user_id)
80						&& *current_count != count {
81						trace!(?user_id, count, current_count, "Skipping stale presence timer");
82						continue;
83					}
84
85					timer_handles.remove(&user_id);
86					self.process_presence_timer(&user_id, count).await.log_err().ok();
87				},
88				event = receiver.recv_async() => match event {
89					Ok((user_id, timeout, count)) => {
90						debug!(
91							"Adding timer {}: {user_id} timeout:{timeout:?} count:{count}",
92							presence_timers.len()
93						);
94						if let Some((_, handle)) = timer_handles.remove(&user_id) {
95							handle.abort();
96						}
97
98						let (handle, reg) = AbortHandle::new_pair();
99						presence_timers.push(Abortable::new(
100							pipeline::presence_timer(user_id.clone(), timeout, count),
101							reg,
102						));
103						timer_handles.insert(user_id, (count, handle));
104					},
105					_ => break,
106				},
107			}
108		}
109
110		// set the server user as offline
111		_ = self
112			.maybe_ping_presence(
113				&self.services.globals.server_user,
114				None,
115				None,
116				&PresenceState::Offline,
117			)
118			.await;
119
120		Ok(())
121	}
122
123	async fn interrupt(&self) {
124		let (timer_sender, _) = &self.timer_channel;
125		if !timer_sender.is_closed() {
126			timer_sender.close();
127		}
128	}
129
130	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
131}
132
133impl Service {
134	/// record that a user has just successfully completed a /sync (or
135	/// equivalent activity)
136	pub async fn note_sync(&self, user_id: &UserId) {
137		if !self.services.config.suppress_push_when_active {
138			return;
139		}
140
141		let now = tuwunel_core::utils::millis_since_unix_epoch();
142		self.last_sync_seen
143			.write()
144			.await
145			.insert(user_id.to_owned(), now);
146	}
147
148	/// Returns milliseconds since last observed sync for user (if any)
149	pub async fn last_sync_gap_ms(&self, user_id: &UserId) -> Option<u64> {
150		let now = tuwunel_core::utils::millis_since_unix_epoch();
151		self.last_sync_seen
152			.read()
153			.await
154			.get(user_id)
155			.map(|ts| now.saturating_sub(*ts))
156	}
157
158	/// Returns the latest presence event for the given user.
159	pub async fn get_presence(&self, user_id: &UserId) -> Result<PresenceEvent> {
160		self.db
161			.get_presence(user_id)
162			.map_ok(|(_, presence)| presence)
163			.await
164	}
165
166	/// Removes the presence record for the given user from the database.
167	///
168	/// TODO: Why is this not used?
169	pub async fn remove_presence(&self, user_id: &UserId) {
170		self.db.remove_presence(user_id).await;
171	}
172
173	// Unset online/unavailable presence to offline on startup
174	async fn unset_all_presence(&self) {
175		if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
176			return;
177		}
178
179		let _cork = self.services.db.cork();
180
181		for user_id in &self
182			.services
183			.users
184			.list_local_users()
185			.map(UserId::to_owned)
186			.collect::<Vec<_>>()
187			.await
188		{
189			let presence = self.db.get_presence(user_id).await;
190
191			let presence = match presence {
192				| Ok((_, ref presence)) => &presence.content,
193				| _ => continue,
194			};
195
196			if !matches!(
197				presence.presence,
198				PresenceState::Unavailable | PresenceState::Online | PresenceState::Busy
199			) {
200				trace!(?user_id, ?presence, "Skipping user");
201				continue;
202			}
203
204			trace!(?user_id, ?presence, "Resetting presence to offline");
205
206			_ = self
207				.set_presence(
208					user_id,
209					&PresenceState::Offline,
210					Some(false),
211					presence.last_active_ago,
212					presence.status_msg.clone(),
213				)
214				.await
215				.inspect_err(|e| {
216					debug_warn!(
217						?presence,
218						"{user_id} has invalid presence in database and failed to reset it to \
219						 offline: {e}"
220					);
221				});
222		}
223	}
224
225	/// Returns the most recent presence updates that happened after the event
226	/// with id `since`.
227	pub fn presence_since(
228		&self,
229		since: u64,
230		to: Option<u64>,
231	) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
232		self.db.presence_since(since, to)
233	}
234
235	#[inline]
236	pub async fn from_json_bytes_to_event(
237		&self,
238		bytes: &[u8],
239		user_id: &UserId,
240	) -> Result<PresenceEvent> {
241		let presence = Presence::from_json_bytes(bytes)?;
242		let event = presence
243			.to_presence_event(user_id, &self.services.users)
244			.await;
245
246		Ok(event)
247	}
248}
249
250// presence_timer lives in pipeline.rs alongside the timer handling logic.