Skip to main content

tuwunel_service/presence/
mod.rs

1mod aggregate;
2mod data;
3// Write/update pipeline lives in pipeline.rs.
4mod pipeline;
5
6use std::{collections::HashMap, sync::Arc, time::Duration};
7
8use async_trait::async_trait;
9use futures::{
10	Stream, StreamExt, TryFutureExt,
11	future::{AbortHandle, Abortable, join},
12	stream::FuturesUnordered,
13};
14use loole::{Receiver, Sender};
15use ruma::{
16	OwnedUserId, UInt, UserId,
17	events::presence::{PresenceEvent, PresenceEventContent},
18	presence::PresenceState,
19};
20use serde::{Deserialize, Serialize};
21use tokio::sync::RwLock;
22use tuwunel_core::{
23	Result, checked, debug, debug_warn, err,
24	result::LogErr,
25	trace,
26	utils::{self, TryFutureExtExt},
27};
28
29use self::{aggregate::PresenceAggregator, data::Data};
30
31/// Represents data required to be kept in order to implement the presence
32/// specification.
33#[derive(Serialize, Deserialize, Debug, Clone)]
34pub(super) struct Presence {
35	pub(super) state: PresenceState,
36	pub(super) currently_active: bool,
37	pub(super) last_active_ts: u64,
38	pub(super) status_msg: Option<String>,
39}
40
41impl Presence {
42	pub(super) fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
43		serde_json::from_slice(bytes)
44			.map_err(|_| err!(Database(error!("Invalid presence data in database"))))
45	}
46}
47
48pub struct Service {
49	timer_channel: (Sender<TimerType>, Receiver<TimerType>),
50	timeout_remote_users: bool,
51	idle_timeout: u64,
52	offline_timeout: u64,
53	db: Data,
54	services: Arc<crate::services::OnceServices>,
55	last_sync_seen: RwLock<HashMap<OwnedUserId, u64>>,
56	device_presence: PresenceAggregator,
57}
58
59type TimerType = (OwnedUserId, Duration, u64);
60type TimerFired = (OwnedUserId, u64);
61
62#[async_trait]
63impl crate::Service for Service {
64	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
65		let config = &args.server.config;
66		let idle_timeout_s = config.presence_idle_timeout_s;
67		let offline_timeout_s = config.presence_offline_timeout_s;
68		Ok(Arc::new(Self {
69			timer_channel: loole::unbounded(),
70			timeout_remote_users: config.presence_timeout_remote_users,
71			idle_timeout: checked!(idle_timeout_s * 1_000)?,
72			offline_timeout: checked!(offline_timeout_s * 1_000)?,
73			db: Data::new(args),
74			services: args.services.clone(),
75			last_sync_seen: RwLock::new(HashMap::new()),
76			device_presence: PresenceAggregator::new(),
77		}))
78	}
79
80	async fn worker(self: Arc<Self>) -> Result {
81		// reset dormant online/away statuses to offline, and set the server user as
82		// online
83		self.unset_all_presence().await;
84		self.device_presence.clear().await;
85		_ = self
86			.maybe_ping_presence(
87				&self.services.globals.server_user,
88				None,
89				None,
90				&PresenceState::Online,
91			)
92			.await;
93
94		let receiver = self.timer_channel.1.clone();
95
96		let mut presence_timers: FuturesUnordered<_> = FuturesUnordered::new();
97		let mut timer_handles: HashMap<OwnedUserId, (u64, AbortHandle)> = HashMap::new();
98		while !receiver.is_closed() && self.services.server.is_running() {
99			tokio::select! {
100				Some(result) = presence_timers.next() => {
101					let Ok((user_id, count)) = result else {
102						continue;
103					};
104
105					if let Some((current_count, _)) = timer_handles.get(&user_id)
106						&& *current_count != count {
107						trace!(?user_id, count, current_count, "Skipping stale presence timer");
108						continue;
109					}
110
111					timer_handles.remove(&user_id);
112					self.process_presence_timer(&user_id, count).await.log_err().ok();
113				},
114				event = receiver.recv_async() => match event {
115					Ok((user_id, timeout, count)) => {
116						debug!(
117							"Adding timer {}: {user_id} timeout:{timeout:?} count:{count}",
118							presence_timers.len()
119						);
120						if let Some((_, handle)) = timer_handles.remove(&user_id) {
121							handle.abort();
122						}
123
124						let (handle, reg) = AbortHandle::new_pair();
125						presence_timers.push(Abortable::new(
126							pipeline::presence_timer(user_id.clone(), timeout, count),
127							reg,
128						));
129						timer_handles.insert(user_id, (count, handle));
130					},
131					_ => break,
132				},
133			}
134		}
135
136		// set the server user as offline
137		_ = self
138			.maybe_ping_presence(
139				&self.services.globals.server_user,
140				None,
141				None,
142				&PresenceState::Offline,
143			)
144			.await;
145
146		Ok(())
147	}
148
149	async fn interrupt(&self) {
150		let (timer_sender, _) = &self.timer_channel;
151		if !timer_sender.is_closed() {
152			timer_sender.close();
153		}
154	}
155
156	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
157}
158
159impl Service {
160	/// record that a user has just successfully completed a /sync (or
161	/// equivalent activity)
162	pub async fn note_sync(&self, user_id: &UserId) {
163		if !self.services.config.suppress_push_when_active {
164			return;
165		}
166
167		let now = utils::millis_since_unix_epoch();
168		self.last_sync_seen
169			.write()
170			.await
171			.insert(user_id.to_owned(), now);
172	}
173
174	/// Returns milliseconds since last observed sync for user (if any)
175	pub async fn last_sync_gap_ms(&self, user_id: &UserId) -> Option<u64> {
176		let now = utils::millis_since_unix_epoch();
177		self.last_sync_seen
178			.read()
179			.await
180			.get(user_id)
181			.map(|ts| now.saturating_sub(*ts))
182	}
183
184	/// Returns the latest presence event for the given user.
185	pub async fn get_presence(&self, user_id: &UserId) -> Result<PresenceEvent> {
186		self.db
187			.get_presence(user_id)
188			.map_ok(|(_, presence)| presence)
189			.await
190	}
191
192	/// Removes the presence record for the given user from the database.
193	///
194	/// TODO: Why is this not used?
195	pub async fn remove_presence(&self, user_id: &UserId) {
196		self.db.remove_presence(user_id).await;
197	}
198
199	// Unset online/unavailable presence to offline on startup
200	async fn unset_all_presence(&self) {
201		if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
202			return;
203		}
204
205		let _cork = self.services.db.cork();
206
207		for user_id in &self
208			.services
209			.users
210			.list_local_users()
211			.map(UserId::to_owned)
212			.collect::<Vec<_>>()
213			.await
214		{
215			let presence = self.db.get_presence(user_id).await;
216
217			let presence = match presence {
218				| Ok((_, ref presence)) => &presence.content,
219				| _ => continue,
220			};
221
222			if !matches!(
223				presence.presence,
224				PresenceState::Unavailable | PresenceState::Online | PresenceState::Busy
225			) {
226				trace!(?user_id, ?presence, "Skipping user");
227				continue;
228			}
229
230			trace!(?user_id, ?presence, "Resetting presence to offline");
231
232			_ = self
233				.set_presence(
234					user_id,
235					&PresenceState::Offline,
236					Some(false),
237					presence.last_active_ago,
238					presence.status_msg.clone(),
239				)
240				.await
241				.inspect_err(|e| {
242					debug_warn!(
243						?presence,
244						"{user_id} has invalid presence in database and failed to reset it to \
245						 offline: {e}"
246					);
247				});
248		}
249	}
250
251	/// Returns the most recent presence updates that happened after the event
252	/// with id `since`.
253	pub fn presence_since(
254		&self,
255		since: u64,
256		to: Option<u64>,
257	) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
258		self.db.presence_since(since, to)
259	}
260
261	#[inline]
262	pub async fn from_json_bytes_to_event(
263		&self,
264		bytes: &[u8],
265		user_id: &UserId,
266	) -> Result<PresenceEvent> {
267		let presence = Presence::from_json_bytes(bytes)?;
268		let event = self.to_presence_event(presence, user_id).await;
269
270		Ok(event)
271	}
272
273	/// Creates a PresenceEvent from available data.
274	async fn to_presence_event(&self, presence: Presence, user_id: &UserId) -> PresenceEvent {
275		let now = utils::millis_since_unix_epoch();
276		let last_active_ago = now.saturating_sub(presence.last_active_ts);
277
278		let avatar_url = self.services.profile.avatar_url(user_id).ok();
279		let displayname = self.services.profile.displayname(user_id).ok();
280		let (avatar_url, displayname) = join(avatar_url, displayname).await;
281
282		PresenceEvent {
283			sender: user_id.to_owned(),
284			content: PresenceEventContent {
285				presence: presence.state,
286				status_msg: presence.status_msg,
287				currently_active: Some(presence.currently_active),
288				last_active_ago: Some(UInt::new_saturating(last_active_ago)),
289				avatar_url,
290				displayname,
291			},
292		}
293	}
294}