tuwunel_admin/query/storage/
sync.rs1use std::collections::HashSet;
2
3use futures::{TryStreamExt, future::try_join};
4use tuwunel_core::{
5 Result,
6 utils::stream::{IterStream, TryBroadbandExt},
7};
8
9use crate::admin_command;
10
11#[admin_command]
12pub(super) async fn query_storage_sync(&self, src: String, dst: String) -> Result {
13 let src_p = self.services.storage.provider(&src)?;
14
15 let dst_p = self.services.storage.provider(&dst)?;
16
17 let src = src_p
18 .list(None)
19 .map_ok(|meta| meta.location)
20 .try_collect::<HashSet<_>>();
21
22 let dst = dst_p
23 .list(None)
24 .map_ok(|meta| meta.location)
25 .try_collect::<HashSet<_>>();
26
27 let (src, dst) = try_join(src, dst).await?;
28
29 src.difference(&dst)
30 .try_stream()
31 .broadn_and_then(2, async |item| {
32 let data = src_p.get(item.as_ref()).await?;
33 let put = dst_p.put_one(item.as_ref(), data).await?;
34
35 Ok((item, put))
36 })
37 .try_for_each(|(item, put)| {
38 writeln!(&self, "Moved {item} from {src:?} to {dst:?}; {put:?}")
39 })
40 .await
41}