tuwunel_service/presence/
mod.rs1mod aggregate;
2mod data;
3mod pipeline;
5mod presence;
6
7use std::{collections::HashMap, sync::Arc, time::Duration};
8
9use async_trait::async_trait;
10use futures::{
11 Stream, StreamExt, TryFutureExt,
12 future::{AbortHandle, Abortable},
13 stream::FuturesUnordered,
14};
15use loole::{Receiver, Sender};
16use ruma::{OwnedUserId, UserId, events::presence::PresenceEvent, presence::PresenceState};
17use tokio::sync::RwLock;
18use tuwunel_core::{Result, checked, debug, debug_warn, result::LogErr, trace};
19
20use self::{aggregate::PresenceAggregator, data::Data, presence::Presence};
21
22pub struct Service {
23 timer_channel: (Sender<TimerType>, Receiver<TimerType>),
24 timeout_remote_users: bool,
25 idle_timeout: u64,
26 offline_timeout: u64,
27 db: Data,
28 services: Arc<crate::services::OnceServices>,
29 last_sync_seen: RwLock<HashMap<OwnedUserId, u64>>,
30 device_presence: PresenceAggregator,
31}
32
33type TimerType = (OwnedUserId, Duration, u64);
34type TimerFired = (OwnedUserId, u64);
35
36#[async_trait]
37impl crate::Service for Service {
38 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
39 let config = &args.server.config;
40 let idle_timeout_s = config.presence_idle_timeout_s;
41 let offline_timeout_s = config.presence_offline_timeout_s;
42 Ok(Arc::new(Self {
43 timer_channel: loole::unbounded(),
44 timeout_remote_users: config.presence_timeout_remote_users,
45 idle_timeout: checked!(idle_timeout_s * 1_000)?,
46 offline_timeout: checked!(offline_timeout_s * 1_000)?,
47 db: Data::new(args),
48 services: args.services.clone(),
49 last_sync_seen: RwLock::new(HashMap::new()),
50 device_presence: PresenceAggregator::new(),
51 }))
52 }
53
54 async fn worker(self: Arc<Self>) -> Result {
55 self.unset_all_presence().await;
58 self.device_presence.clear().await;
59 _ = self
60 .maybe_ping_presence(
61 &self.services.globals.server_user,
62 None,
63 None,
64 &PresenceState::Online,
65 )
66 .await;
67
68 let receiver = self.timer_channel.1.clone();
69
70 let mut presence_timers: FuturesUnordered<_> = FuturesUnordered::new();
71 let mut timer_handles: HashMap<OwnedUserId, (u64, AbortHandle)> = HashMap::new();
72 while !receiver.is_closed() && self.services.server.is_running() {
73 tokio::select! {
74 Some(result) = presence_timers.next() => {
75 let Ok((user_id, count)) = result else {
76 continue;
77 };
78
79 if let Some((current_count, _)) = timer_handles.get(&user_id)
80 && *current_count != count {
81 trace!(?user_id, count, current_count, "Skipping stale presence timer");
82 continue;
83 }
84
85 timer_handles.remove(&user_id);
86 self.process_presence_timer(&user_id, count).await.log_err().ok();
87 },
88 event = receiver.recv_async() => match event {
89 Ok((user_id, timeout, count)) => {
90 debug!(
91 "Adding timer {}: {user_id} timeout:{timeout:?} count:{count}",
92 presence_timers.len()
93 );
94 if let Some((_, handle)) = timer_handles.remove(&user_id) {
95 handle.abort();
96 }
97
98 let (handle, reg) = AbortHandle::new_pair();
99 presence_timers.push(Abortable::new(
100 pipeline::presence_timer(user_id.clone(), timeout, count),
101 reg,
102 ));
103 timer_handles.insert(user_id, (count, handle));
104 },
105 _ => break,
106 },
107 }
108 }
109
110 _ = self
112 .maybe_ping_presence(
113 &self.services.globals.server_user,
114 None,
115 None,
116 &PresenceState::Offline,
117 )
118 .await;
119
120 Ok(())
121 }
122
123 async fn interrupt(&self) {
124 let (timer_sender, _) = &self.timer_channel;
125 if !timer_sender.is_closed() {
126 timer_sender.close();
127 }
128 }
129
130 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
131}
132
133impl Service {
134 pub async fn note_sync(&self, user_id: &UserId) {
137 if !self.services.config.suppress_push_when_active {
138 return;
139 }
140
141 let now = tuwunel_core::utils::millis_since_unix_epoch();
142 self.last_sync_seen
143 .write()
144 .await
145 .insert(user_id.to_owned(), now);
146 }
147
148 pub async fn last_sync_gap_ms(&self, user_id: &UserId) -> Option<u64> {
150 let now = tuwunel_core::utils::millis_since_unix_epoch();
151 self.last_sync_seen
152 .read()
153 .await
154 .get(user_id)
155 .map(|ts| now.saturating_sub(*ts))
156 }
157
158 pub async fn get_presence(&self, user_id: &UserId) -> Result<PresenceEvent> {
160 self.db
161 .get_presence(user_id)
162 .map_ok(|(_, presence)| presence)
163 .await
164 }
165
166 pub async fn remove_presence(&self, user_id: &UserId) {
170 self.db.remove_presence(user_id).await;
171 }
172
173 async fn unset_all_presence(&self) {
175 if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
176 return;
177 }
178
179 let _cork = self.services.db.cork();
180
181 for user_id in &self
182 .services
183 .users
184 .list_local_users()
185 .map(UserId::to_owned)
186 .collect::<Vec<_>>()
187 .await
188 {
189 let presence = self.db.get_presence(user_id).await;
190
191 let presence = match presence {
192 | Ok((_, ref presence)) => &presence.content,
193 | _ => continue,
194 };
195
196 if !matches!(
197 presence.presence,
198 PresenceState::Unavailable | PresenceState::Online | PresenceState::Busy
199 ) {
200 trace!(?user_id, ?presence, "Skipping user");
201 continue;
202 }
203
204 trace!(?user_id, ?presence, "Resetting presence to offline");
205
206 _ = self
207 .set_presence(
208 user_id,
209 &PresenceState::Offline,
210 Some(false),
211 presence.last_active_ago,
212 presence.status_msg.clone(),
213 )
214 .await
215 .inspect_err(|e| {
216 debug_warn!(
217 ?presence,
218 "{user_id} has invalid presence in database and failed to reset it to \
219 offline: {e}"
220 );
221 });
222 }
223 }
224
225 pub fn presence_since(
228 &self,
229 since: u64,
230 to: Option<u64>,
231 ) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
232 self.db.presence_since(since, to)
233 }
234
235 #[inline]
236 pub async fn from_json_bytes_to_event(
237 &self,
238 bytes: &[u8],
239 user_id: &UserId,
240 ) -> Result<PresenceEvent> {
241 let presence = Presence::from_json_bytes(bytes)?;
242 let event = presence
243 .to_presence_event(user_id, &self.services.users)
244 .await;
245
246 Ok(event)
247 }
248}
249
250