1use std::collections::HashSet;
2
3use futures::{FutureExt, StreamExt, TryStreamExt, future::try_join};
4use tuwunel_core::{
5 Result,
6 utils::{
7 result::LogErr,
8 stream::{IterStream, TryBroadbandExt},
9 string::SplitInfallible,
10 },
11};
12use tuwunel_service::storage::CopyMode;
13
14use crate::{admin_command, admin_command_dispatch};
15
16#[admin_command_dispatch(handler_prefix = "query_storage")]
17#[derive(Debug, clap::Subcommand)]
18pub(crate) enum StorageCommand {
19 Configs,
21
22 Providers,
24
25 Debug {
26 provider: String,
28 },
29
30 Show {
32 #[arg(short, long)]
34 provider: Option<String>,
35
36 src: String,
38 },
39
40 List {
42 #[arg(short, long)]
44 provider: Option<String>,
45
46 prefix: Option<String>,
48 },
49
50 Duplicates {
52 src: String,
54
55 dst: String,
57 },
58
59 Differences {
61 src: String,
63
64 dst: String,
66 },
67
68 Copy {
70 #[arg(short, long)]
72 provider: Option<String>,
73
74 #[arg(short, long)]
76 force: bool,
77
78 src: String,
80
81 dst: String,
83 },
84
85 Move {
87 #[arg(short, long)]
89 provider: Option<String>,
90
91 #[arg(short, long)]
93 force: bool,
94
95 src: String,
97
98 dst: String,
100 },
101
102 Delete {
104 #[arg(short, long)]
106 provider: Option<String>,
107
108 src: Vec<String>,
110
111 #[arg(short, long)]
113 verbose: bool,
114 },
115
116 Sync {
119 src: String,
121
122 dst: String,
124 },
125}
126
127#[admin_command]
128async fn query_storage_configs(&self) -> Result {
129 self.services
130 .storage
131 .configs(None)
132 .try_stream()
133 .try_for_each(|(id, conf)| writeln!(&self, "\n`{id:?}`\n{conf:#?}"))
134 .await
135}
136
137#[admin_command]
138async fn query_storage_providers(&self) -> Result {
139 self.services
140 .storage
141 .providers()
142 .try_stream()
143 .try_for_each(|conf| writeln!(&self, "\n`{:?}`\n{conf:#?}", conf.name))
144 .await
145}
146
147#[admin_command]
148async fn query_storage_debug(&self, provider: String) -> Result {
149 let provider = self.services.storage.provider(&provider)?;
150
151 self.write_string(format!("{provider:#?}\n"))
152 .await
153}
154
155#[admin_command]
156async fn query_storage_show(&self, provider: Option<String>, src: String) -> Result {
157 let (prefix, src) = src.as_str().split_once_infallible("//");
158 let id = provider.as_deref().unwrap_or(prefix);
159
160 let provider = self.services.storage.provider(id)?;
161 let meta = provider.head(src).await?;
162
163 self.write_string(format!("{meta:#?}\n")).await
164}
165
166#[admin_command]
167async fn query_storage_list(&self, provider: Option<String>, prefix: Option<String>) -> Result {
168 let id = provider.as_deref().unwrap_or_default();
169 let provider = self.services.storage.provider(id)?;
170
171 provider
172 .list(prefix.as_deref())
173 .try_for_each(|meta| writeln!(&self, "{meta:?}"))
174 .boxed()
175 .await
176}
177
178#[admin_command]
179async fn query_storage_duplicates(&self, provider_a: String, provider_b: String) -> Result {
180 let a = self
181 .services
182 .storage
183 .provider(&provider_a)?
184 .list(None)
185 .map_ok(|meta| meta.location)
186 .try_collect::<HashSet<_>>();
187
188 let b = self
189 .services
190 .storage
191 .provider(&provider_b)?
192 .list(None)
193 .map_ok(|meta| meta.location)
194 .try_collect::<HashSet<_>>();
195
196 let (a, b) = try_join(a, b).await?;
197 a.intersection(&b)
198 .try_stream()
199 .try_for_each(|item| writeln!(&self, "{item}"))
200 .await
201}
202
203#[admin_command]
204async fn query_storage_differences(&self, provider_a: String, provider_b: String) -> Result {
205 let a = self
206 .services
207 .storage
208 .provider(&provider_a)?
209 .list(None)
210 .map_ok(|meta| meta.location)
211 .try_collect::<HashSet<_>>();
212
213 let b = self
214 .services
215 .storage
216 .provider(&provider_b)?
217 .list(None)
218 .map_ok(|meta| meta.location)
219 .try_collect::<HashSet<_>>();
220
221 let (a, b) = try_join(a, b).await?;
222 a.difference(&b)
223 .try_stream()
224 .try_for_each(|item| writeln!(&self, "{item}"))
225 .await
226}
227
228#[admin_command]
229async fn query_storage_copy(
230 &self,
231 provider: Option<String>,
232 force: bool,
233 src: String,
234 dst: String,
235) -> Result {
236 let id = provider.as_deref().unwrap_or_default();
237 let provider = self.services.storage.provider(id)?;
238 let overwrite = force
239 .then_some(CopyMode::Overwrite)
240 .unwrap_or(CopyMode::Create);
241
242 let result = provider.copy(&src, &dst, overwrite).await;
243
244 self.write_string(format!("{result:#?}\n")).await
245}
246
247#[admin_command]
248async fn query_storage_move(
249 &self,
250 provider: Option<String>,
251 force: bool,
252 src: String,
253 dst: String,
254) -> Result {
255 let id = provider.as_deref().unwrap_or_default();
256 let provider = self.services.storage.provider(id)?;
257 let overwrite = force
258 .then_some(CopyMode::Overwrite)
259 .unwrap_or(CopyMode::Create);
260
261 let result = provider.rename(&src, &dst, overwrite).await;
262
263 self.write_string(format!("{result:#?}\n")).await
264}
265
266#[admin_command]
267async fn query_storage_delete(
268 &self,
269 provider: Option<String>,
270 src: Vec<String>,
271 verbose: bool,
272) -> Result {
273 let id = provider.as_deref().unwrap_or_default();
274 let provider = self.services.storage.provider(id)?;
275
276 provider
277 .delete(src.into_iter().stream())
278 .for_each(async |result| {
279 match result {
280 | Ok(_) if !verbose => Ok(()),
281 | Ok(path) =>
282 self.write_string(format!("deleted {path}\n"))
283 .await,
284 | Err(e) =>
285 self.write_string(format!("failed: {e:?}\n"))
286 .await,
287 }
288 .log_err()
289 .ok();
290 })
291 .map(Ok)
292 .await
293}
294
295#[admin_command]
296async fn query_storage_sync(&self, src: String, dst: String) -> Result {
297 let src_p = self.services.storage.provider(&src)?;
298
299 let dst_p = self.services.storage.provider(&dst)?;
300
301 let src = src_p
302 .list(None)
303 .map_ok(|meta| meta.location)
304 .try_collect::<HashSet<_>>();
305
306 let dst = dst_p
307 .list(None)
308 .map_ok(|meta| meta.location)
309 .try_collect::<HashSet<_>>();
310
311 let (src, dst) = try_join(src, dst).await?;
312
313 src.difference(&dst)
314 .try_stream()
315 .broadn_and_then(2, async |item| {
316 let data = src_p.get(item.as_ref()).await?;
317 let put = dst_p.put_one(item.as_ref(), data).await?;
318
319 Ok((item, put))
320 })
321 .try_for_each(|(item, put)| {
322 writeln!(&self, "Moved {item} from {src:?} to {dst:?}; {put:?}")
323 })
324 .await
325}