Skip to main content

tuwunel_admin/user/
last_active.rs

1use std::cmp;
2
3use futures::{FutureExt, StreamExt, TryStreamExt};
4use ruma::{MilliSecondsSinceUnixEpoch, uint};
5use tuwunel_core::{
6	Result,
7	utils::{ReadyExt, stream::IterStream},
8};
9
10use crate::admin_command;
11
12#[admin_command]
13pub(super) async fn last_active(&self, limit: Option<usize>) -> Result {
14	self.services
15		.users
16		.list_local_users()
17		.map(ToOwned::to_owned)
18		.then(async |user_id| {
19			self.services
20				.users
21				.all_devices_metadata(&user_id)
22				.ready_filter_map(|device| {
23					device
24						.last_seen_ts
25						.map(|ts| (ts, device.last_seen_ip))
26				})
27				.ready_fold((MilliSecondsSinceUnixEpoch(uint!(0)), None), cmp::max)
28				.map(|(last_seen_ts, last_seen_ip)| (last_seen_ts, last_seen_ip, user_id.clone()))
29				.await
30		})
31		.ready_filter(|(ts, ..)| ts.get() > uint!(0))
32		.collect::<Vec<_>>()
33		.map(|mut vec| {
34			vec.sort_by_key(|k| cmp::Reverse(k.0));
35			vec
36		})
37		.map(Vec::into_iter)
38		.map(IterStream::try_stream)
39		.flatten_stream()
40		.take(limit.unwrap_or(48))
41		.try_for_each(async |(last_seen_ts, last_seen_ip, user_id)| {
42			let ago = last_seen_ts;
43			let user_id = user_id.localpart();
44			let ip = last_seen_ip.as_deref().unwrap_or_default();
45
46			write!(self, "{ago:?} {ip:<40} {user_id}\n").await
47		})
48		.boxed()
49		.await
50}