1use std::{cmp, collections::BTreeMap};
2
3use futures::{FutureExt, StreamExt, TryStreamExt};
4use ruma::{
5 Int, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedRoomId,
6 OwnedRoomOrAliasId, OwnedUserId, UserId,
7 events::{
8 RoomAccountDataEventType, StateEventType,
9 room::{
10 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent, UserPowerLevel},
11 redaction::RoomRedactionEventContent,
12 },
13 tag::{TagEvent, TagEventContent, TagInfo},
14 },
15 uint,
16};
17use tuwunel_core::{
18 Err, Result, debug_warn, info,
19 matrix::{Event, pdu::PduBuilder},
20 utils::{
21 self, ReadyExt,
22 stream::{BroadbandExt, IterStream},
23 },
24};
25use tuwunel_service::{Services, users::Register};
26
27use crate::{
28 admin_command, get_room_info,
29 utils::{parse_active_local_user_id, parse_local_user_id, parse_user_id},
30};
31
32const AUTO_GEN_PASSWORD_LENGTH: usize = 25;
33const BULK_JOIN_REASON: &str = "Bulk force joining this room as initiated by the server admin.";
34
35#[derive(Default)]
36struct RejectInvitesAcc {
37 rejected: usize,
38 failed: usize,
39}
40
41impl RejectInvitesAcc {
42 fn merge(mut self, Self { rejected, failed }: Self) -> Self {
43 self.rejected = self.rejected.saturating_add(rejected);
44 self.failed = self.failed.saturating_add(failed);
45 self
46 }
47}
48
49#[admin_command]
50pub(super) async fn list_users(&self) -> Result {
51 let users: Vec<_> = self
52 .services
53 .users
54 .list_local_users()
55 .map(ToString::to_string)
56 .collect()
57 .await;
58
59 let mut plain_msg = format!("Found {} local user account(s):\n```\n", users.len());
60 plain_msg += users.join("\n").as_str();
61 plain_msg += "\n```";
62
63 self.write_str(&plain_msg).await
64}
65
66#[admin_command]
67pub(super) async fn create_user(&self, username: String, password: Option<String>) -> Result {
68 let user_id = parse_local_user_id(self.services, &username)?;
70
71 if let Err(e) = user_id.validate_strict()
72 && self.services.config.emergency_password.is_none()
73 {
74 return Err!("Username {user_id} contains disallowed characters or spaces: {e}");
75 }
76
77 if self.services.users.exists(&user_id).await {
78 return Err!("User {user_id} already exists");
79 }
80
81 let password = password.unwrap_or_else(|| utils::random_string(AUTO_GEN_PASSWORD_LENGTH));
82
83 self.services
84 .users
85 .full_register(Register {
86 user_id: Some(&user_id),
87 password: Some(&password),
88 grant_first_user_admin: true,
89 ..Default::default()
90 })
91 .await?;
92
93 self.write_str(&format!("Created user with user_id: {user_id} and password: `{password}`"))
94 .await
95}
96
97#[admin_command]
98pub(super) async fn deactivate(&self, no_leave_rooms: bool, user_id: String) -> Result {
99 let user_id = parse_local_user_id(self.services, &user_id)?;
101
102 if user_id == self.services.globals.server_user {
104 return Err!("Not allowed to deactivate the server service account.",);
105 }
106
107 deactivate_user(self.services, &user_id, no_leave_rooms).await?;
108
109 self.write_str(&format!("User {user_id} has been deactivated"))
110 .await
111}
112
113#[admin_command]
114pub(super) async fn delete_device(
115 &self,
116 user_id: OwnedUserId,
117 device_id: OwnedDeviceId,
118) -> Result {
119 if !self.services.globals.user_is_local(&user_id) {
120 return Err!("Cannot delete device of remote user");
121 }
122
123 self.services
124 .users
125 .remove_device(&user_id, &device_id)
126 .await;
127
128 self.write_str(&format!("User {user_id}'s device {device_id} removed."))
129 .await
130}
131
132#[admin_command]
133pub(super) async fn reset_password(&self, username: String, password: Option<String>) -> Result {
134 let user_id = parse_local_user_id(self.services, &username)?;
135
136 if user_id == self.services.globals.server_user {
137 return Err!(
138 "Not allowed to set the password for the server account. Please use the emergency \
139 password config option.",
140 );
141 }
142
143 let new_password = password.unwrap_or_else(|| utils::random_string(AUTO_GEN_PASSWORD_LENGTH));
144
145 match self
146 .services
147 .users
148 .set_password(&user_id, Some(new_password.as_str()))
149 .await
150 {
151 | Err(e) => return Err!("Couldn't reset the password for user {user_id}: {e}"),
152 | Ok(()) => {
153 write!(self, "Successfully reset the password for user {user_id}: `{new_password}`")
154 },
155 }
156 .await
157}
158
159#[admin_command]
160pub(super) async fn deactivate_all(&self, no_leave_rooms: bool, force: bool) -> Result {
161 if self.body.len() < 2
162 || !self.body[0].trim().starts_with("```")
163 || self.body.last().unwrap_or(&"").trim() != "```"
164 {
165 return Err!("Expected code block in command body. Add --help for details.",);
166 }
167
168 let usernames = self
169 .body
170 .to_vec()
171 .drain(1..self.body.len().saturating_sub(1))
172 .collect::<Vec<_>>();
173
174 let mut user_ids: Vec<OwnedUserId> = Vec::with_capacity(usernames.len());
175 let mut admins = Vec::new();
176
177 for username in usernames {
178 match parse_active_local_user_id(self.services, username).await {
179 | Err(e) => {
180 self.services
181 .admin
182 .send_text(&format!("{username} is not a valid username, skipping over: {e}"))
183 .await;
184
185 continue;
186 },
187 | Ok(user_id) => {
188 if self.services.admin.user_is_admin(&user_id).await && !force {
189 self.services
190 .admin
191 .send_text(&format!(
192 "{username} is an admin and --force is not set, skipping over"
193 ))
194 .await;
195
196 admins.push(username);
197 continue;
198 }
199
200 if user_id == self.services.globals.server_user {
202 self.services
203 .admin
204 .send_text(&format!(
205 "{username} is the server service account, skipping over"
206 ))
207 .await;
208
209 continue;
210 }
211
212 user_ids.push(user_id);
213 },
214 }
215 }
216
217 let mut deactivation_count: usize = 0;
218
219 for user_id in user_ids {
220 match deactivate_user(self.services, &user_id, no_leave_rooms).await {
221 | Ok(()) => {
222 deactivation_count = deactivation_count.saturating_add(1);
223 },
224 | Err(e) => {
225 self.services
226 .admin
227 .send_text(&format!("Failed deactivating user: {e}"))
228 .await;
229 },
230 }
231 }
232
233 if admins.is_empty() {
234 write!(self, "Deactivated {deactivation_count} accounts.")
235 } else {
236 write!(
237 self,
238 "Deactivated {deactivation_count} accounts.\nSkipped admin accounts: {}. Use \
239 --force to deactivate admin accounts",
240 admins.join(", ")
241 )
242 }
243 .await
244}
245
246async fn deactivate_user(services: &Services, user_id: &UserId, no_leave_rooms: bool) -> Result {
247 if !no_leave_rooms {
248 services
249 .deactivate
250 .full_deactivate(user_id, false)
251 .boxed()
252 .await?;
253 } else {
254 services.users.deactivate_account(user_id).await?;
255 }
256
257 Ok(())
258}
259
260#[admin_command]
261pub(super) async fn list_joined_rooms(&self, user_id: String) -> Result {
262 let user_id = parse_local_user_id(self.services, &user_id)?;
264
265 let mut rooms: Vec<(OwnedRoomId, u64, String)> = self
266 .services
267 .state_cache
268 .rooms_joined(&user_id)
269 .then(|room_id| get_room_info(self.services, room_id))
270 .collect()
271 .await;
272
273 if rooms.is_empty() {
274 return Err!("User is not in any rooms.");
275 }
276
277 rooms.sort_by_key(|r| r.1);
278 rooms.reverse();
279
280 let body = rooms
281 .iter()
282 .map(|(id, members, name)| format!("{id}\tMembers: {members}\tName: {name}"))
283 .collect::<Vec<_>>()
284 .join("\n");
285
286 self.write_str(&format!("Rooms {user_id} Joined ({}):\n```\n{body}\n```", rooms.len()))
287 .await
288}
289
290#[admin_command]
291pub(super) async fn force_join_list_of_local_users(
292 &self,
293 room: OwnedRoomOrAliasId,
294 yes_i_want_to_do_this: bool,
295) -> Result {
296 if self.body.len() < 2
297 || !self.body[0].trim().starts_with("```")
298 || self.body.last().unwrap_or(&"").trim() != "```"
299 {
300 return Err!("Expected code block in command body. Add --help for details.",);
301 }
302
303 if !yes_i_want_to_do_this {
304 return Err!(
305 "You must pass the --yes-i-want-to-do-this flag to ensure you really want to force \
306 bulk join all specified local users.",
307 );
308 }
309
310 let (room_id, servers) = self
311 .services
312 .alias
313 .maybe_resolve_with_servers(&room, None)
314 .await?;
315
316 if !self
317 .services
318 .state_cache
319 .server_in_room(self.services.globals.server_name(), &room_id)
320 .await
321 {
322 return Err!("We are not joined in this room.");
323 }
324
325 let usernames = self
326 .body
327 .to_vec()
328 .drain(1..self.body.len().saturating_sub(1))
329 .collect::<Vec<_>>();
330
331 let mut user_ids: Vec<OwnedUserId> = Vec::with_capacity(usernames.len());
332
333 for username in usernames {
334 match parse_active_local_user_id(self.services, username).await {
335 | Ok(user_id) => {
336 if user_id == self.services.globals.server_user {
338 self.services
339 .admin
340 .send_text(&format!(
341 "{username} is the server service account, skipping over"
342 ))
343 .await;
344
345 continue;
346 }
347
348 user_ids.push(user_id);
349 },
350 | Err(e) => {
351 self.services
352 .admin
353 .send_text(&format!("{username} is not a valid username, skipping over: {e}"))
354 .await;
355
356 continue;
357 },
358 }
359 }
360
361 let mut failed_joins: usize = 0;
362 let mut successful_joins: usize = 0;
363
364 for user_id in user_ids {
365 match self
366 .services
367 .membership
368 .join(
369 &user_id,
370 &room_id,
371 Some(&room),
372 Some(String::from(BULK_JOIN_REASON)),
373 &servers,
374 false,
375 )
376 .await
377 {
378 | Ok(_res) => {
379 successful_joins = successful_joins.saturating_add(1);
380 },
381 | Err(e) => {
382 debug_warn!("Failed force joining {user_id} to {room_id} during bulk join: {e}");
383 failed_joins = failed_joins.saturating_add(1);
384 },
385 }
386 }
387
388 self.write_str(&format!(
389 "{successful_joins} local users have been joined to {room_id}. {failed_joins} joins \
390 failed.",
391 ))
392 .await
393}
394
395#[admin_command]
396pub(super) async fn force_join_all_local_users(
397 &self,
398 room: OwnedRoomOrAliasId,
399 yes_i_want_to_do_this: bool,
400) -> Result {
401 if !yes_i_want_to_do_this {
402 return Err!(
403 "You must pass the --yes-i-want-to-do-this-flag to ensure you really want to force \
404 bulk join all local users.",
405 );
406 }
407
408 let (room_id, servers) = self
409 .services
410 .alias
411 .maybe_resolve_with_servers(&room, None)
412 .await?;
413
414 if !self
415 .services
416 .state_cache
417 .server_in_room(self.services.globals.server_name(), &room_id)
418 .await
419 {
420 return Err!("We are not joined in this room.");
421 }
422
423 let mut failed_joins: usize = 0;
424 let mut successful_joins: usize = 0;
425
426 for user_id in &self
427 .services
428 .users
429 .list_local_users()
430 .map(UserId::to_owned)
431 .collect::<Vec<_>>()
432 .await
433 {
434 if user_id == &self.services.globals.server_user {
435 continue;
436 }
437
438 match self
439 .services
440 .membership
441 .join(
442 user_id,
443 &room_id,
444 Some(&room),
445 Some(String::from(BULK_JOIN_REASON)),
446 &servers,
447 false,
448 )
449 .await
450 {
451 | Ok(_res) => {
452 successful_joins = successful_joins.saturating_add(1);
453 },
454 | Err(e) => {
455 debug_warn!("Failed force joining {user_id} to {room_id} during bulk join: {e}");
456 failed_joins = failed_joins.saturating_add(1);
457 },
458 }
459 }
460
461 self.write_str(&format!(
462 "{successful_joins} local users have been joined to {room_id}. {failed_joins} joins \
463 failed.",
464 ))
465 .await
466}
467
468#[admin_command]
469pub(super) async fn force_join_room(&self, user_id: String, room: OwnedRoomOrAliasId) -> Result {
470 let user_id = parse_local_user_id(self.services, &user_id)?;
471 let (room_id, servers) = self
472 .services
473 .alias
474 .maybe_resolve_with_servers(&room, None)
475 .await?;
476
477 assert!(
478 self.services.globals.user_is_local(&user_id),
479 "Parsed user_id must be a local user"
480 );
481
482 self.services
483 .membership
484 .join(&user_id, &room_id, Some(&room), None, &servers, false)
485 .await?;
486
487 self.write_str(&format!("{user_id} has been joined to {room_id}."))
488 .await
489}
490
491#[admin_command]
492pub(super) async fn force_leave_room(
493 &self,
494 user_id: String,
495 room_id: OwnedRoomOrAliasId,
496) -> Result {
497 let user_id = parse_local_user_id(self.services, &user_id)?;
498 let room_id = self
499 .services
500 .alias
501 .maybe_resolve(&room_id)
502 .await?;
503
504 assert!(
505 self.services.globals.user_is_local(&user_id),
506 "Parsed user_id must be a local user"
507 );
508
509 if !self
510 .services
511 .state_cache
512 .is_joined(&user_id, &room_id)
513 .await
514 {
515 return Err!("{user_id} is not joined in the room");
516 }
517
518 let state_lock = self.services.state.mutex.lock(&room_id).await;
519
520 self.services
521 .membership
522 .leave(&user_id, &room_id, None, false, &state_lock)
523 .boxed()
524 .await?;
525
526 drop(state_lock);
527
528 self.write_str(&format!("{user_id} has left {room_id}."))
529 .await
530}
531
532#[admin_command]
533pub(super) async fn reject_invites(&self, user_id: String, reason: Option<String>) -> Result {
534 let user_id = parse_local_user_id(self.services, &user_id)?;
535 let reason = reason.as_deref();
536
537 let reject = async |room_id: OwnedRoomId| {
538 let state_lock = self.services.state.mutex.lock(&room_id).await;
539
540 match self
541 .services
542 .membership
543 .leave(&user_id, &room_id, reason.map(str::to_owned), false, &state_lock)
544 .boxed()
545 .await
546 {
547 | Ok(()) => RejectInvitesAcc { rejected: 1, ..Default::default() },
548 | Err(e) => {
549 debug_warn!(%user_id, %room_id, "Failed to reject invite: {e}");
550 RejectInvitesAcc { failed: 1, ..Default::default() }
551 },
552 }
553 };
554
555 let RejectInvitesAcc { rejected, failed } = self
556 .services
557 .state_cache
558 .rooms_invited(&user_id)
559 .map(ToOwned::to_owned)
560 .broad_then(reject)
561 .ready_fold(RejectInvitesAcc::default(), RejectInvitesAcc::merge)
562 .await;
563
564 if rejected == 0 && failed == 0 {
565 return Err!("{user_id} has no pending invites.");
566 }
567
568 self.write_string(format!("Rejected {rejected} invite(s) for {user_id}. {failed} failed."))
569 .await
570}
571
572#[admin_command]
573pub(super) async fn force_demote(&self, user_id: String, room_id: OwnedRoomOrAliasId) -> Result {
574 let user_id = parse_local_user_id(self.services, &user_id)?;
575 let room_id = self
576 .services
577 .alias
578 .maybe_resolve(&room_id)
579 .await?;
580
581 assert!(
582 self.services.globals.user_is_local(&user_id),
583 "Parsed user_id must be a local user"
584 );
585
586 let state_lock = self.services.state.mutex.lock(&room_id).await;
587
588 let room_power_levels: Option<RoomPowerLevels> = self
589 .services
590 .state_accessor
591 .get_power_levels(&room_id)
592 .await
593 .ok();
594
595 let user_can_change_self = room_power_levels
596 .as_ref()
597 .is_some_and(|power_levels| {
598 power_levels.user_can_change_user_power_level(&user_id, &user_id)
599 });
600
601 let user_can_demote_self = user_can_change_self
602 || self
603 .services
604 .state_accessor
605 .room_state_get(&room_id, &StateEventType::RoomCreate, "")
606 .await
607 .is_ok_and(|event| event.sender() == user_id);
608
609 if !user_can_demote_self {
610 return Err!("User is not allowed to modify their own power levels in the room.");
611 }
612
613 let mut power_levels_content: RoomPowerLevelsEventContent = room_power_levels
614 .map(TryInto::try_into)
615 .transpose()?
616 .unwrap_or_default();
617
618 power_levels_content.users.remove(&user_id);
619
620 let event_id = self
621 .services
622 .timeline
623 .build_and_append_pdu(
624 PduBuilder::state(String::new(), &power_levels_content),
625 &user_id,
626 &room_id,
627 &state_lock,
628 )
629 .await?;
630
631 self.write_str(&format!(
632 "User {user_id} demoted themselves to the room default power level in {room_id} - \
633 {event_id}"
634 ))
635 .await
636}
637
638#[admin_command]
639pub(super) async fn force_promote(
640 &self,
641 target_id: String,
642 room_id: OwnedRoomOrAliasId,
643) -> Result {
644 let target_id = parse_user_id(self.services, &target_id)?;
645 let room_id = self
646 .services
647 .alias
648 .maybe_resolve(&room_id)
649 .await?;
650
651 let state_lock = self.services.state.mutex.lock(&room_id).await;
652
653 let room_power_levels = self
654 .services
655 .state_accessor
656 .get_power_levels(&room_id)
657 .await?;
658
659 let privileged_member = self
660 .services
661 .state_cache
662 .room_members(&room_id)
663 .ready_filter(|member_id| {
664 self.services.globals.user_is_local(member_id)
665 && room_power_levels.user_can_change_user_power_level(member_id, &target_id)
666 })
667 .map(ToOwned::to_owned)
668 .ready_fold_default(|selected_user, member_id| match selected_user {
669 | None => Some(member_id),
670 | Some(selected_user) => Some(
671 if room_power_levels.for_user(&selected_user)
672 > room_power_levels.for_user(&member_id)
673 {
674 selected_user
675 } else {
676 member_id
677 },
678 ),
679 })
680 .await;
681
682 let Some(privileged_member) = privileged_member else {
683 return Err!("No privileged user exists in room, cannot promote.");
684 };
685
686 info!("Selected privileged member {privileged_member}");
687
688 let power_level: Int = match room_power_levels.for_user(&privileged_member) {
689 | UserPowerLevel::Infinite => Int::MAX,
690 | UserPowerLevel::Int(x) => x,
691 };
692
693 let mut power_levels_content: RoomPowerLevelsEventContent = room_power_levels.try_into()?;
694
695 power_levels_content
696 .users
697 .insert(target_id.clone(), power_level);
698
699 let event_id = self
700 .services
701 .timeline
702 .build_and_append_pdu(
703 PduBuilder::state(String::new(), &power_levels_content),
704 &privileged_member,
705 &room_id,
706 &state_lock,
707 )
708 .await?;
709
710 drop(state_lock);
711
712 self.write_str(&format!(
713 "User {privileged_member} promoted {target_id} to {power_level} power level in \
714 {room_id} - {event_id}"
715 ))
716 .await?;
717
718 Ok(())
719}
720
721#[admin_command]
722pub(super) async fn make_user_admin(&self, user_id: String) -> Result {
723 let user_id = parse_local_user_id(self.services, &user_id)?;
724 assert!(
725 self.services.globals.user_is_local(&user_id),
726 "Parsed user_id must be a local user"
727 );
728
729 self.services
730 .admin
731 .make_user_admin(&user_id)
732 .boxed()
733 .await?;
734
735 self.write_str(&format!("{user_id} has been granted admin privileges."))
736 .await
737}
738
739#[admin_command]
740pub(super) async fn put_room_tag(
741 &self,
742 user_id: String,
743 room_id: OwnedRoomId,
744 tag: String,
745) -> Result {
746 let user_id = parse_active_local_user_id(self.services, &user_id).await?;
747
748 let mut tags_event = self
749 .services
750 .account_data
751 .get_room(&room_id, &user_id, RoomAccountDataEventType::Tag)
752 .await
753 .unwrap_or(TagEvent {
754 content: TagEventContent { tags: BTreeMap::new() },
755 });
756
757 tags_event
758 .content
759 .tags
760 .insert(tag.clone().into(), TagInfo::new());
761
762 self.services
763 .account_data
764 .update(
765 Some(&room_id),
766 &user_id,
767 RoomAccountDataEventType::Tag,
768 &serde_json::to_value(tags_event).expect("to json value always works"),
769 )
770 .await?;
771
772 self.write_str(&format!(
773 "Successfully updated room account data for {user_id} and room {room_id} with tag {tag}"
774 ))
775 .await
776}
777
778#[admin_command]
779pub(super) async fn delete_room_tag(
780 &self,
781 user_id: String,
782 room_id: OwnedRoomId,
783 tag: String,
784) -> Result {
785 let user_id = parse_active_local_user_id(self.services, &user_id).await?;
786
787 let mut tags_event = self
788 .services
789 .account_data
790 .get_room(&room_id, &user_id, RoomAccountDataEventType::Tag)
791 .await
792 .unwrap_or(TagEvent {
793 content: TagEventContent { tags: BTreeMap::new() },
794 });
795
796 tags_event
797 .content
798 .tags
799 .remove(&tag.clone().into());
800
801 self.services
802 .account_data
803 .update(
804 Some(&room_id),
805 &user_id,
806 RoomAccountDataEventType::Tag,
807 &serde_json::to_value(tags_event).expect("to json value always works"),
808 )
809 .await?;
810
811 self.write_str(&format!(
812 "Successfully updated room account data for {user_id} and room {room_id}, deleting room \
813 tag {tag}"
814 ))
815 .await
816}
817
818#[admin_command]
819pub(super) async fn get_room_tags(&self, user_id: String, room_id: OwnedRoomId) -> Result {
820 let user_id = parse_active_local_user_id(self.services, &user_id).await?;
821
822 let tags_event = self
823 .services
824 .account_data
825 .get_room(&room_id, &user_id, RoomAccountDataEventType::Tag)
826 .await
827 .unwrap_or(TagEvent {
828 content: TagEventContent { tags: BTreeMap::new() },
829 });
830
831 self.write_str(&format!("```\n{:#?}\n```", tags_event.content.tags))
832 .await
833}
834
835#[admin_command]
836pub(super) async fn redact_event(&self, event_id: OwnedEventId) -> Result {
837 let Ok(event) = self
838 .services
839 .timeline
840 .get_non_outlier_pdu(&event_id)
841 .await
842 else {
843 return Err!("Event does not exist in our database.");
844 };
845
846 if event.is_redacted() {
847 return Err!("Event is already redacted.");
848 }
849
850 if !self
851 .services
852 .globals
853 .user_is_local(event.sender())
854 {
855 return Err!("This command only works on local users.");
856 }
857
858 let reason = format!(
859 "The administrator(s) of {} has redacted this user's message.",
860 self.services.globals.server_name()
861 );
862
863 let redaction_event_id = {
864 let state_lock = self
865 .services
866 .state
867 .mutex
868 .lock(event.room_id())
869 .await;
870
871 self.services
872 .timeline
873 .build_and_append_pdu(
874 PduBuilder {
875 redacts: Some(event.event_id().to_owned()),
876 ..PduBuilder::timeline(&RoomRedactionEventContent {
877 redacts: Some(event.event_id().to_owned()),
878 reason: Some(reason),
879 })
880 },
881 event.sender(),
882 event.room_id(),
883 &state_lock,
884 )
885 .await?
886 };
887
888 self.write_str(&format!(
889 "Successfully redacted event. Redaction event ID: {redaction_event_id}"
890 ))
891 .await
892}
893
894#[admin_command]
895pub(super) async fn last_active(&self, limit: Option<usize>) -> Result {
896 self.services
897 .users
898 .list_local_users()
899 .map(ToOwned::to_owned)
900 .then(async |user_id| {
901 self.services
902 .users
903 .all_devices_metadata(&user_id)
904 .ready_filter_map(|device| {
905 device
906 .last_seen_ts
907 .map(|ts| (ts, device.last_seen_ip))
908 })
909 .ready_fold((MilliSecondsSinceUnixEpoch(uint!(0)), None), cmp::max)
910 .map(|(last_seen_ts, last_seen_ip)| (last_seen_ts, last_seen_ip, user_id.clone()))
911 .await
912 })
913 .ready_filter(|(ts, ..)| ts.get() > uint!(0))
914 .collect::<Vec<_>>()
915 .map(|mut vec| {
916 vec.sort_by_key(|k| cmp::Reverse(k.0));
917 vec
918 })
919 .map(Vec::into_iter)
920 .map(IterStream::try_stream)
921 .flatten_stream()
922 .take(limit.unwrap_or(48))
923 .try_for_each(async |(last_seen_ts, last_seen_ip, user_id)| {
924 let ago = last_seen_ts;
925 let user_id = user_id.localpart();
926 let ip = last_seen_ip.as_deref().unwrap_or_default();
927
928 self.write_string(format!("{ago:?} {ip:<40} {user_id}\n"))
929 .await
930 })
931 .boxed()
932 .await
933}