Skip to main content

tuwunel_admin/user/
commands.rs

1use std::{cmp, collections::BTreeMap};
2
3use futures::{FutureExt, StreamExt, TryStreamExt};
4use ruma::{
5	Int, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedRoomId,
6	OwnedRoomOrAliasId, OwnedUserId, UserId,
7	events::{
8		RoomAccountDataEventType, StateEventType,
9		room::{
10			power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent, UserPowerLevel},
11			redaction::RoomRedactionEventContent,
12		},
13		tag::{TagEvent, TagEventContent, TagInfo},
14	},
15	uint,
16};
17use tuwunel_core::{
18	Err, Result, debug_warn, info,
19	matrix::{Event, pdu::PduBuilder},
20	utils::{
21		self, ReadyExt,
22		stream::{BroadbandExt, IterStream},
23	},
24};
25use tuwunel_service::{Services, users::Register};
26
27use crate::{
28	admin_command, get_room_info,
29	utils::{parse_active_local_user_id, parse_local_user_id, parse_user_id},
30};
31
32const AUTO_GEN_PASSWORD_LENGTH: usize = 25;
33const BULK_JOIN_REASON: &str = "Bulk force joining this room as initiated by the server admin.";
34
35#[derive(Default)]
36struct RejectInvitesAcc {
37	rejected: usize,
38	failed: usize,
39}
40
41impl RejectInvitesAcc {
42	fn merge(mut self, Self { rejected, failed }: Self) -> Self {
43		self.rejected = self.rejected.saturating_add(rejected);
44		self.failed = self.failed.saturating_add(failed);
45		self
46	}
47}
48
49#[admin_command]
50pub(super) async fn list_users(&self) -> Result {
51	let users: Vec<_> = self
52		.services
53		.users
54		.list_local_users()
55		.map(ToString::to_string)
56		.collect()
57		.await;
58
59	let mut plain_msg = format!("Found {} local user account(s):\n```\n", users.len());
60	plain_msg += users.join("\n").as_str();
61	plain_msg += "\n```";
62
63	self.write_str(&plain_msg).await
64}
65
66#[admin_command]
67pub(super) async fn create_user(&self, username: String, password: Option<String>) -> Result {
68	// Validate user id
69	let user_id = parse_local_user_id(self.services, &username)?;
70
71	if let Err(e) = user_id.validate_strict()
72		&& self.services.config.emergency_password.is_none()
73	{
74		return Err!("Username {user_id} contains disallowed characters or spaces: {e}");
75	}
76
77	if self.services.users.exists(&user_id).await {
78		return Err!("User {user_id} already exists");
79	}
80
81	let password = password.unwrap_or_else(|| utils::random_string(AUTO_GEN_PASSWORD_LENGTH));
82
83	self.services
84		.users
85		.full_register(Register {
86			user_id: Some(&user_id),
87			password: Some(&password),
88			grant_first_user_admin: true,
89			..Default::default()
90		})
91		.await?;
92
93	self.write_str(&format!("Created user with user_id: {user_id} and password: `{password}`"))
94		.await
95}
96
97#[admin_command]
98pub(super) async fn deactivate(&self, no_leave_rooms: bool, user_id: String) -> Result {
99	// Validate user id
100	let user_id = parse_local_user_id(self.services, &user_id)?;
101
102	// don't deactivate the server service account
103	if user_id == self.services.globals.server_user {
104		return Err!("Not allowed to deactivate the server service account.",);
105	}
106
107	deactivate_user(self.services, &user_id, no_leave_rooms).await?;
108
109	self.write_str(&format!("User {user_id} has been deactivated"))
110		.await
111}
112
113#[admin_command]
114pub(super) async fn delete_device(
115	&self,
116	user_id: OwnedUserId,
117	device_id: OwnedDeviceId,
118) -> Result {
119	if !self.services.globals.user_is_local(&user_id) {
120		return Err!("Cannot delete device of remote user");
121	}
122
123	self.services
124		.users
125		.remove_device(&user_id, &device_id)
126		.await;
127
128	self.write_str(&format!("User {user_id}'s device {device_id} removed."))
129		.await
130}
131
132#[admin_command]
133pub(super) async fn reset_password(&self, username: String, password: Option<String>) -> Result {
134	let user_id = parse_local_user_id(self.services, &username)?;
135
136	if user_id == self.services.globals.server_user {
137		return Err!(
138			"Not allowed to set the password for the server account. Please use the emergency \
139			 password config option.",
140		);
141	}
142
143	let new_password = password.unwrap_or_else(|| utils::random_string(AUTO_GEN_PASSWORD_LENGTH));
144
145	match self
146		.services
147		.users
148		.set_password(&user_id, Some(new_password.as_str()))
149		.await
150	{
151		| Err(e) => return Err!("Couldn't reset the password for user {user_id}: {e}"),
152		| Ok(()) => {
153			write!(self, "Successfully reset the password for user {user_id}: `{new_password}`")
154		},
155	}
156	.await
157}
158
159#[admin_command]
160pub(super) async fn deactivate_all(&self, no_leave_rooms: bool, force: bool) -> Result {
161	if self.body.len() < 2
162		|| !self.body[0].trim().starts_with("```")
163		|| self.body.last().unwrap_or(&"").trim() != "```"
164	{
165		return Err!("Expected code block in command body. Add --help for details.",);
166	}
167
168	let usernames = self
169		.body
170		.to_vec()
171		.drain(1..self.body.len().saturating_sub(1))
172		.collect::<Vec<_>>();
173
174	let mut user_ids: Vec<OwnedUserId> = Vec::with_capacity(usernames.len());
175	let mut admins = Vec::new();
176
177	for username in usernames {
178		match parse_active_local_user_id(self.services, username).await {
179			| Err(e) => {
180				self.services
181					.admin
182					.send_text(&format!("{username} is not a valid username, skipping over: {e}"))
183					.await;
184
185				continue;
186			},
187			| Ok(user_id) => {
188				if self.services.admin.user_is_admin(&user_id).await && !force {
189					self.services
190						.admin
191						.send_text(&format!(
192							"{username} is an admin and --force is not set, skipping over"
193						))
194						.await;
195
196					admins.push(username);
197					continue;
198				}
199
200				// don't deactivate the server service account
201				if user_id == self.services.globals.server_user {
202					self.services
203						.admin
204						.send_text(&format!(
205							"{username} is the server service account, skipping over"
206						))
207						.await;
208
209					continue;
210				}
211
212				user_ids.push(user_id);
213			},
214		}
215	}
216
217	let mut deactivation_count: usize = 0;
218
219	for user_id in user_ids {
220		match deactivate_user(self.services, &user_id, no_leave_rooms).await {
221			| Ok(()) => {
222				deactivation_count = deactivation_count.saturating_add(1);
223			},
224			| Err(e) => {
225				self.services
226					.admin
227					.send_text(&format!("Failed deactivating user: {e}"))
228					.await;
229			},
230		}
231	}
232
233	if admins.is_empty() {
234		write!(self, "Deactivated {deactivation_count} accounts.")
235	} else {
236		write!(
237			self,
238			"Deactivated {deactivation_count} accounts.\nSkipped admin accounts: {}. Use \
239			 --force to deactivate admin accounts",
240			admins.join(", ")
241		)
242	}
243	.await
244}
245
246async fn deactivate_user(services: &Services, user_id: &UserId, no_leave_rooms: bool) -> Result {
247	if !no_leave_rooms {
248		services
249			.deactivate
250			.full_deactivate(user_id, false)
251			.boxed()
252			.await?;
253	} else {
254		services.users.deactivate_account(user_id).await?;
255	}
256
257	Ok(())
258}
259
260#[admin_command]
261pub(super) async fn list_joined_rooms(&self, user_id: String) -> Result {
262	// Validate user id
263	let user_id = parse_local_user_id(self.services, &user_id)?;
264
265	let mut rooms: Vec<(OwnedRoomId, u64, String)> = self
266		.services
267		.state_cache
268		.rooms_joined(&user_id)
269		.then(|room_id| get_room_info(self.services, room_id))
270		.collect()
271		.await;
272
273	if rooms.is_empty() {
274		return Err!("User is not in any rooms.");
275	}
276
277	rooms.sort_by_key(|r| r.1);
278	rooms.reverse();
279
280	let body = rooms
281		.iter()
282		.map(|(id, members, name)| format!("{id}\tMembers: {members}\tName: {name}"))
283		.collect::<Vec<_>>()
284		.join("\n");
285
286	self.write_str(&format!("Rooms {user_id} Joined ({}):\n```\n{body}\n```", rooms.len()))
287		.await
288}
289
290#[admin_command]
291pub(super) async fn force_join_list_of_local_users(
292	&self,
293	room: OwnedRoomOrAliasId,
294	yes_i_want_to_do_this: bool,
295) -> Result {
296	if self.body.len() < 2
297		|| !self.body[0].trim().starts_with("```")
298		|| self.body.last().unwrap_or(&"").trim() != "```"
299	{
300		return Err!("Expected code block in command body. Add --help for details.",);
301	}
302
303	if !yes_i_want_to_do_this {
304		return Err!(
305			"You must pass the --yes-i-want-to-do-this flag to ensure you really want to force \
306			 bulk join all specified local users.",
307		);
308	}
309
310	let (room_id, servers) = self
311		.services
312		.alias
313		.maybe_resolve_with_servers(&room, None)
314		.await?;
315
316	if !self
317		.services
318		.state_cache
319		.server_in_room(self.services.globals.server_name(), &room_id)
320		.await
321	{
322		return Err!("We are not joined in this room.");
323	}
324
325	let usernames = self
326		.body
327		.to_vec()
328		.drain(1..self.body.len().saturating_sub(1))
329		.collect::<Vec<_>>();
330
331	let mut user_ids: Vec<OwnedUserId> = Vec::with_capacity(usernames.len());
332
333	for username in usernames {
334		match parse_active_local_user_id(self.services, username).await {
335			| Ok(user_id) => {
336				// don't make the server service account join
337				if user_id == self.services.globals.server_user {
338					self.services
339						.admin
340						.send_text(&format!(
341							"{username} is the server service account, skipping over"
342						))
343						.await;
344
345					continue;
346				}
347
348				user_ids.push(user_id);
349			},
350			| Err(e) => {
351				self.services
352					.admin
353					.send_text(&format!("{username} is not a valid username, skipping over: {e}"))
354					.await;
355
356				continue;
357			},
358		}
359	}
360
361	let mut failed_joins: usize = 0;
362	let mut successful_joins: usize = 0;
363
364	for user_id in user_ids {
365		match self
366			.services
367			.membership
368			.join(
369				&user_id,
370				&room_id,
371				Some(&room),
372				Some(String::from(BULK_JOIN_REASON)),
373				&servers,
374				false,
375			)
376			.await
377		{
378			| Ok(_res) => {
379				successful_joins = successful_joins.saturating_add(1);
380			},
381			| Err(e) => {
382				debug_warn!("Failed force joining {user_id} to {room_id} during bulk join: {e}");
383				failed_joins = failed_joins.saturating_add(1);
384			},
385		}
386	}
387
388	self.write_str(&format!(
389		"{successful_joins} local users have been joined to {room_id}. {failed_joins} joins \
390		 failed.",
391	))
392	.await
393}
394
395#[admin_command]
396pub(super) async fn force_join_all_local_users(
397	&self,
398	room: OwnedRoomOrAliasId,
399	yes_i_want_to_do_this: bool,
400) -> Result {
401	if !yes_i_want_to_do_this {
402		return Err!(
403			"You must pass the --yes-i-want-to-do-this-flag to ensure you really want to force \
404			 bulk join all local users.",
405		);
406	}
407
408	let (room_id, servers) = self
409		.services
410		.alias
411		.maybe_resolve_with_servers(&room, None)
412		.await?;
413
414	if !self
415		.services
416		.state_cache
417		.server_in_room(self.services.globals.server_name(), &room_id)
418		.await
419	{
420		return Err!("We are not joined in this room.");
421	}
422
423	let mut failed_joins: usize = 0;
424	let mut successful_joins: usize = 0;
425
426	for user_id in &self
427		.services
428		.users
429		.list_local_users()
430		.map(UserId::to_owned)
431		.collect::<Vec<_>>()
432		.await
433	{
434		if user_id == &self.services.globals.server_user {
435			continue;
436		}
437
438		match self
439			.services
440			.membership
441			.join(
442				user_id,
443				&room_id,
444				Some(&room),
445				Some(String::from(BULK_JOIN_REASON)),
446				&servers,
447				false,
448			)
449			.await
450		{
451			| Ok(_res) => {
452				successful_joins = successful_joins.saturating_add(1);
453			},
454			| Err(e) => {
455				debug_warn!("Failed force joining {user_id} to {room_id} during bulk join: {e}");
456				failed_joins = failed_joins.saturating_add(1);
457			},
458		}
459	}
460
461	self.write_str(&format!(
462		"{successful_joins} local users have been joined to {room_id}. {failed_joins} joins \
463		 failed.",
464	))
465	.await
466}
467
468#[admin_command]
469pub(super) async fn force_join_room(&self, user_id: String, room: OwnedRoomOrAliasId) -> Result {
470	let user_id = parse_local_user_id(self.services, &user_id)?;
471	let (room_id, servers) = self
472		.services
473		.alias
474		.maybe_resolve_with_servers(&room, None)
475		.await?;
476
477	assert!(
478		self.services.globals.user_is_local(&user_id),
479		"Parsed user_id must be a local user"
480	);
481
482	self.services
483		.membership
484		.join(&user_id, &room_id, Some(&room), None, &servers, false)
485		.await?;
486
487	self.write_str(&format!("{user_id} has been joined to {room_id}."))
488		.await
489}
490
491#[admin_command]
492pub(super) async fn force_leave_room(
493	&self,
494	user_id: String,
495	room_id: OwnedRoomOrAliasId,
496) -> Result {
497	let user_id = parse_local_user_id(self.services, &user_id)?;
498	let room_id = self
499		.services
500		.alias
501		.maybe_resolve(&room_id)
502		.await?;
503
504	assert!(
505		self.services.globals.user_is_local(&user_id),
506		"Parsed user_id must be a local user"
507	);
508
509	if !self
510		.services
511		.state_cache
512		.is_joined(&user_id, &room_id)
513		.await
514	{
515		return Err!("{user_id} is not joined in the room");
516	}
517
518	let state_lock = self.services.state.mutex.lock(&room_id).await;
519
520	self.services
521		.membership
522		.leave(&user_id, &room_id, None, false, &state_lock)
523		.boxed()
524		.await?;
525
526	drop(state_lock);
527
528	self.write_str(&format!("{user_id} has left {room_id}."))
529		.await
530}
531
532#[admin_command]
533pub(super) async fn reject_invites(&self, user_id: String, reason: Option<String>) -> Result {
534	let user_id = parse_local_user_id(self.services, &user_id)?;
535	let reason = reason.as_deref();
536
537	let reject = async |room_id: OwnedRoomId| {
538		let state_lock = self.services.state.mutex.lock(&room_id).await;
539
540		match self
541			.services
542			.membership
543			.leave(&user_id, &room_id, reason.map(str::to_owned), false, &state_lock)
544			.boxed()
545			.await
546		{
547			| Ok(()) => RejectInvitesAcc { rejected: 1, ..Default::default() },
548			| Err(e) => {
549				debug_warn!(%user_id, %room_id, "Failed to reject invite: {e}");
550				RejectInvitesAcc { failed: 1, ..Default::default() }
551			},
552		}
553	};
554
555	let RejectInvitesAcc { rejected, failed } = self
556		.services
557		.state_cache
558		.rooms_invited(&user_id)
559		.map(ToOwned::to_owned)
560		.broad_then(reject)
561		.ready_fold(RejectInvitesAcc::default(), RejectInvitesAcc::merge)
562		.await;
563
564	if rejected == 0 && failed == 0 {
565		return Err!("{user_id} has no pending invites.");
566	}
567
568	self.write_string(format!("Rejected {rejected} invite(s) for {user_id}. {failed} failed."))
569		.await
570}
571
572#[admin_command]
573pub(super) async fn force_demote(&self, user_id: String, room_id: OwnedRoomOrAliasId) -> Result {
574	let user_id = parse_local_user_id(self.services, &user_id)?;
575	let room_id = self
576		.services
577		.alias
578		.maybe_resolve(&room_id)
579		.await?;
580
581	assert!(
582		self.services.globals.user_is_local(&user_id),
583		"Parsed user_id must be a local user"
584	);
585
586	let state_lock = self.services.state.mutex.lock(&room_id).await;
587
588	let room_power_levels: Option<RoomPowerLevels> = self
589		.services
590		.state_accessor
591		.get_power_levels(&room_id)
592		.await
593		.ok();
594
595	let user_can_change_self = room_power_levels
596		.as_ref()
597		.is_some_and(|power_levels| {
598			power_levels.user_can_change_user_power_level(&user_id, &user_id)
599		});
600
601	let user_can_demote_self = user_can_change_self
602		|| self
603			.services
604			.state_accessor
605			.room_state_get(&room_id, &StateEventType::RoomCreate, "")
606			.await
607			.is_ok_and(|event| event.sender() == user_id);
608
609	if !user_can_demote_self {
610		return Err!("User is not allowed to modify their own power levels in the room.");
611	}
612
613	let mut power_levels_content: RoomPowerLevelsEventContent = room_power_levels
614		.map(TryInto::try_into)
615		.transpose()?
616		.unwrap_or_default();
617
618	power_levels_content.users.remove(&user_id);
619
620	let event_id = self
621		.services
622		.timeline
623		.build_and_append_pdu(
624			PduBuilder::state(String::new(), &power_levels_content),
625			&user_id,
626			&room_id,
627			&state_lock,
628		)
629		.await?;
630
631	self.write_str(&format!(
632		"User {user_id} demoted themselves to the room default power level in {room_id} - \
633		 {event_id}"
634	))
635	.await
636}
637
638#[admin_command]
639pub(super) async fn force_promote(
640	&self,
641	target_id: String,
642	room_id: OwnedRoomOrAliasId,
643) -> Result {
644	let target_id = parse_user_id(self.services, &target_id)?;
645	let room_id = self
646		.services
647		.alias
648		.maybe_resolve(&room_id)
649		.await?;
650
651	let state_lock = self.services.state.mutex.lock(&room_id).await;
652
653	let room_power_levels = self
654		.services
655		.state_accessor
656		.get_power_levels(&room_id)
657		.await?;
658
659	let privileged_member = self
660		.services
661		.state_cache
662		.room_members(&room_id)
663		.ready_filter(|member_id| {
664			self.services.globals.user_is_local(member_id)
665				&& room_power_levels.user_can_change_user_power_level(member_id, &target_id)
666		})
667		.map(ToOwned::to_owned)
668		.ready_fold_default(|selected_user, member_id| match selected_user {
669			| None => Some(member_id),
670			| Some(selected_user) => Some(
671				if room_power_levels.for_user(&selected_user)
672					> room_power_levels.for_user(&member_id)
673				{
674					selected_user
675				} else {
676					member_id
677				},
678			),
679		})
680		.await;
681
682	let Some(privileged_member) = privileged_member else {
683		return Err!("No privileged user exists in room, cannot promote.");
684	};
685
686	info!("Selected privileged member {privileged_member}");
687
688	let power_level: Int = match room_power_levels.for_user(&privileged_member) {
689		| UserPowerLevel::Infinite => Int::MAX,
690		| UserPowerLevel::Int(x) => x,
691	};
692
693	let mut power_levels_content: RoomPowerLevelsEventContent = room_power_levels.try_into()?;
694
695	power_levels_content
696		.users
697		.insert(target_id.clone(), power_level);
698
699	let event_id = self
700		.services
701		.timeline
702		.build_and_append_pdu(
703			PduBuilder::state(String::new(), &power_levels_content),
704			&privileged_member,
705			&room_id,
706			&state_lock,
707		)
708		.await?;
709
710	drop(state_lock);
711
712	self.write_str(&format!(
713		"User {privileged_member} promoted {target_id} to {power_level} power level in \
714		 {room_id} - {event_id}"
715	))
716	.await?;
717
718	Ok(())
719}
720
721#[admin_command]
722pub(super) async fn make_user_admin(&self, user_id: String) -> Result {
723	let user_id = parse_local_user_id(self.services, &user_id)?;
724	assert!(
725		self.services.globals.user_is_local(&user_id),
726		"Parsed user_id must be a local user"
727	);
728
729	self.services
730		.admin
731		.make_user_admin(&user_id)
732		.boxed()
733		.await?;
734
735	self.write_str(&format!("{user_id} has been granted admin privileges."))
736		.await
737}
738
739#[admin_command]
740pub(super) async fn put_room_tag(
741	&self,
742	user_id: String,
743	room_id: OwnedRoomId,
744	tag: String,
745) -> Result {
746	let user_id = parse_active_local_user_id(self.services, &user_id).await?;
747
748	let mut tags_event = self
749		.services
750		.account_data
751		.get_room(&room_id, &user_id, RoomAccountDataEventType::Tag)
752		.await
753		.unwrap_or(TagEvent {
754			content: TagEventContent { tags: BTreeMap::new() },
755		});
756
757	tags_event
758		.content
759		.tags
760		.insert(tag.clone().into(), TagInfo::new());
761
762	self.services
763		.account_data
764		.update(
765			Some(&room_id),
766			&user_id,
767			RoomAccountDataEventType::Tag,
768			&serde_json::to_value(tags_event).expect("to json value always works"),
769		)
770		.await?;
771
772	self.write_str(&format!(
773		"Successfully updated room account data for {user_id} and room {room_id} with tag {tag}"
774	))
775	.await
776}
777
778#[admin_command]
779pub(super) async fn delete_room_tag(
780	&self,
781	user_id: String,
782	room_id: OwnedRoomId,
783	tag: String,
784) -> Result {
785	let user_id = parse_active_local_user_id(self.services, &user_id).await?;
786
787	let mut tags_event = self
788		.services
789		.account_data
790		.get_room(&room_id, &user_id, RoomAccountDataEventType::Tag)
791		.await
792		.unwrap_or(TagEvent {
793			content: TagEventContent { tags: BTreeMap::new() },
794		});
795
796	tags_event
797		.content
798		.tags
799		.remove(&tag.clone().into());
800
801	self.services
802		.account_data
803		.update(
804			Some(&room_id),
805			&user_id,
806			RoomAccountDataEventType::Tag,
807			&serde_json::to_value(tags_event).expect("to json value always works"),
808		)
809		.await?;
810
811	self.write_str(&format!(
812		"Successfully updated room account data for {user_id} and room {room_id}, deleting room \
813		 tag {tag}"
814	))
815	.await
816}
817
818#[admin_command]
819pub(super) async fn get_room_tags(&self, user_id: String, room_id: OwnedRoomId) -> Result {
820	let user_id = parse_active_local_user_id(self.services, &user_id).await?;
821
822	let tags_event = self
823		.services
824		.account_data
825		.get_room(&room_id, &user_id, RoomAccountDataEventType::Tag)
826		.await
827		.unwrap_or(TagEvent {
828			content: TagEventContent { tags: BTreeMap::new() },
829		});
830
831	self.write_str(&format!("```\n{:#?}\n```", tags_event.content.tags))
832		.await
833}
834
835#[admin_command]
836pub(super) async fn redact_event(&self, event_id: OwnedEventId) -> Result {
837	let Ok(event) = self
838		.services
839		.timeline
840		.get_non_outlier_pdu(&event_id)
841		.await
842	else {
843		return Err!("Event does not exist in our database.");
844	};
845
846	if event.is_redacted() {
847		return Err!("Event is already redacted.");
848	}
849
850	if !self
851		.services
852		.globals
853		.user_is_local(event.sender())
854	{
855		return Err!("This command only works on local users.");
856	}
857
858	let reason = format!(
859		"The administrator(s) of {} has redacted this user's message.",
860		self.services.globals.server_name()
861	);
862
863	let redaction_event_id = {
864		let state_lock = self
865			.services
866			.state
867			.mutex
868			.lock(event.room_id())
869			.await;
870
871		self.services
872			.timeline
873			.build_and_append_pdu(
874				PduBuilder {
875					redacts: Some(event.event_id().to_owned()),
876					..PduBuilder::timeline(&RoomRedactionEventContent {
877						redacts: Some(event.event_id().to_owned()),
878						reason: Some(reason),
879					})
880				},
881				event.sender(),
882				event.room_id(),
883				&state_lock,
884			)
885			.await?
886	};
887
888	self.write_str(&format!(
889		"Successfully redacted event. Redaction event ID: {redaction_event_id}"
890	))
891	.await
892}
893
894#[admin_command]
895pub(super) async fn last_active(&self, limit: Option<usize>) -> Result {
896	self.services
897		.users
898		.list_local_users()
899		.map(ToOwned::to_owned)
900		.then(async |user_id| {
901			self.services
902				.users
903				.all_devices_metadata(&user_id)
904				.ready_filter_map(|device| {
905					device
906						.last_seen_ts
907						.map(|ts| (ts, device.last_seen_ip))
908				})
909				.ready_fold((MilliSecondsSinceUnixEpoch(uint!(0)), None), cmp::max)
910				.map(|(last_seen_ts, last_seen_ip)| (last_seen_ts, last_seen_ip, user_id.clone()))
911				.await
912		})
913		.ready_filter(|(ts, ..)| ts.get() > uint!(0))
914		.collect::<Vec<_>>()
915		.map(|mut vec| {
916			vec.sort_by_key(|k| cmp::Reverse(k.0));
917			vec
918		})
919		.map(Vec::into_iter)
920		.map(IterStream::try_stream)
921		.flatten_stream()
922		.take(limit.unwrap_or(48))
923		.try_for_each(async |(last_seen_ts, last_seen_ip, user_id)| {
924			let ago = last_seen_ts;
925			let user_id = user_id.localpart();
926			let ip = last_seen_ip.as_deref().unwrap_or_default();
927
928			self.write_string(format!("{ago:?} {ip:<40} {user_id}\n"))
929				.await
930		})
931		.boxed()
932		.await
933}