tuwunel_admin/user/
last_active.rs1use std::cmp;
2
3use futures::{FutureExt, StreamExt, TryStreamExt};
4use ruma::{MilliSecondsSinceUnixEpoch, uint};
5use tuwunel_core::{
6 Result,
7 utils::{ReadyExt, stream::IterStream},
8};
9
10use crate::admin_command;
11
12#[admin_command]
13pub(super) async fn last_active(&self, limit: Option<usize>) -> Result {
14 self.services
15 .users
16 .list_local_users()
17 .map(ToOwned::to_owned)
18 .then(async |user_id| {
19 self.services
20 .users
21 .all_devices_metadata(&user_id)
22 .ready_filter_map(|device| {
23 device
24 .last_seen_ts
25 .map(|ts| (ts, device.last_seen_ip))
26 })
27 .ready_fold((MilliSecondsSinceUnixEpoch(uint!(0)), None), cmp::max)
28 .map(|(last_seen_ts, last_seen_ip)| (last_seen_ts, last_seen_ip, user_id.clone()))
29 .await
30 })
31 .ready_filter(|(ts, ..)| ts.get() > uint!(0))
32 .collect::<Vec<_>>()
33 .map(|mut vec| {
34 vec.sort_by_key(|k| cmp::Reverse(k.0));
35 vec
36 })
37 .map(Vec::into_iter)
38 .map(IterStream::try_stream)
39 .flatten_stream()
40 .take(limit.unwrap_or(48))
41 .try_for_each(async |(last_seen_ts, last_seen_ip, user_id)| {
42 let ago = last_seen_ts;
43 let user_id = user_id.localpart();
44 let ip = last_seen_ip.as_deref().unwrap_or_default();
45
46 write!(self, "{ago:?} {ip:<40} {user_id}\n").await
47 })
48 .boxed()
49 .await
50}