Skip to main content

tuwunel_admin/query/
storage.rs

1use std::collections::HashSet;
2
3use futures::{FutureExt, StreamExt, TryStreamExt, future::try_join};
4use tuwunel_core::{
5	Result,
6	utils::{
7		result::LogErr,
8		stream::{IterStream, TryBroadbandExt},
9		string::SplitInfallible,
10	},
11};
12use tuwunel_service::storage::CopyMode;
13
14use crate::{admin_command, admin_command_dispatch};
15
16#[admin_command_dispatch(handler_prefix = "query_storage")]
17#[derive(Debug, clap::Subcommand)]
18pub(crate) enum StorageCommand {
19	/// List provider configurations.
20	Configs,
21
22	/// List provider instances.
23	Providers,
24
25	Debug {
26		/// Use configured provider by name.
27		provider: String,
28	},
29
30	/// Show metadata for an object.
31	Show {
32		/// Use configured provider by name.
33		#[arg(short, long)]
34		provider: Option<String>,
35
36		/// Path to the object.
37		src: String,
38	},
39
40	/// List metadata for all objects.
41	List {
42		/// Use configured provider by name.
43		#[arg(short, long)]
44		provider: Option<String>,
45
46		/// Optionally filter by matching prefix.
47		prefix: Option<String>,
48	},
49
50	/// List objects duplicated between two providers
51	Duplicates {
52		/// The first provider name.
53		src: String,
54
55		/// The second provider name.
56		dst: String,
57	},
58
59	/// List objects duplicated between two providers
60	Differences {
61		/// The first provider name.
62		src: String,
63
64		/// The second provider name.
65		dst: String,
66	},
67
68	/// Copy an object from a source to a destination.
69	Copy {
70		/// Use configured provider by name.
71		#[arg(short, long)]
72		provider: Option<String>,
73
74		/// Overwrite existing destination.
75		#[arg(short, long)]
76		force: bool,
77
78		/// Path to the source.
79		src: String,
80
81		/// Path to the destination.
82		dst: String,
83	},
84
85	/// Move an object from a source to a destination.
86	Move {
87		/// Use configured provider by name.
88		#[arg(short, long)]
89		provider: Option<String>,
90
91		/// Overwrite existing destination.
92		#[arg(short, long)]
93		force: bool,
94
95		/// Path to the source.
96		src: String,
97
98		/// Path to the destination.
99		dst: String,
100	},
101
102	/// Delete an object at the specified location.
103	Delete {
104		/// Use configured provider by name.
105		#[arg(short, long)]
106		provider: Option<String>,
107
108		/// Path to the location to delete. Multiple arguments allowed.
109		src: Vec<String>,
110
111		/// Report successful results in addition to failures.
112		#[arg(short, long)]
113		verbose: bool,
114	},
115
116	/// Transfer objects from a source provider which do not exist on a
117	/// destination provider.
118	Sync {
119		/// Use source configured provider by name.
120		src: String,
121
122		/// Use destination configured provider by name.
123		dst: String,
124	},
125}
126
127#[admin_command]
128async fn query_storage_configs(&self) -> Result {
129	self.services
130		.storage
131		.configs(None)
132		.try_stream()
133		.try_for_each(|(id, conf)| writeln!(&self, "\n`{id:?}`\n{conf:#?}"))
134		.await
135}
136
137#[admin_command]
138async fn query_storage_providers(&self) -> Result {
139	self.services
140		.storage
141		.providers()
142		.try_stream()
143		.try_for_each(|conf| writeln!(&self, "\n`{:?}`\n{conf:#?}", conf.name))
144		.await
145}
146
147#[admin_command]
148async fn query_storage_debug(&self, provider: String) -> Result {
149	let provider = self.services.storage.provider(&provider)?;
150
151	self.write_string(format!("{provider:#?}\n"))
152		.await
153}
154
155#[admin_command]
156async fn query_storage_show(&self, provider: Option<String>, src: String) -> Result {
157	let (prefix, src) = src.as_str().split_once_infallible("//");
158	let id = provider.as_deref().unwrap_or(prefix);
159
160	let provider = self.services.storage.provider(id)?;
161	let meta = provider.head(src).await?;
162
163	self.write_string(format!("{meta:#?}\n")).await
164}
165
166#[admin_command]
167async fn query_storage_list(&self, provider: Option<String>, prefix: Option<String>) -> Result {
168	let id = provider.as_deref().unwrap_or_default();
169	let provider = self.services.storage.provider(id)?;
170
171	provider
172		.list(prefix.as_deref())
173		.try_for_each(|meta| writeln!(&self, "{meta:?}"))
174		.boxed()
175		.await
176}
177
178#[admin_command]
179async fn query_storage_duplicates(&self, provider_a: String, provider_b: String) -> Result {
180	let a = self
181		.services
182		.storage
183		.provider(&provider_a)?
184		.list(None)
185		.map_ok(|meta| meta.location)
186		.try_collect::<HashSet<_>>();
187
188	let b = self
189		.services
190		.storage
191		.provider(&provider_b)?
192		.list(None)
193		.map_ok(|meta| meta.location)
194		.try_collect::<HashSet<_>>();
195
196	let (a, b) = try_join(a, b).await?;
197	a.intersection(&b)
198		.try_stream()
199		.try_for_each(|item| writeln!(&self, "{item}"))
200		.await
201}
202
203#[admin_command]
204async fn query_storage_differences(&self, provider_a: String, provider_b: String) -> Result {
205	let a = self
206		.services
207		.storage
208		.provider(&provider_a)?
209		.list(None)
210		.map_ok(|meta| meta.location)
211		.try_collect::<HashSet<_>>();
212
213	let b = self
214		.services
215		.storage
216		.provider(&provider_b)?
217		.list(None)
218		.map_ok(|meta| meta.location)
219		.try_collect::<HashSet<_>>();
220
221	let (a, b) = try_join(a, b).await?;
222	a.difference(&b)
223		.try_stream()
224		.try_for_each(|item| writeln!(&self, "{item}"))
225		.await
226}
227
228#[admin_command]
229async fn query_storage_copy(
230	&self,
231	provider: Option<String>,
232	force: bool,
233	src: String,
234	dst: String,
235) -> Result {
236	let id = provider.as_deref().unwrap_or_default();
237	let provider = self.services.storage.provider(id)?;
238	let overwrite = force
239		.then_some(CopyMode::Overwrite)
240		.unwrap_or(CopyMode::Create);
241
242	let result = provider.copy(&src, &dst, overwrite).await;
243
244	self.write_string(format!("{result:#?}\n")).await
245}
246
247#[admin_command]
248async fn query_storage_move(
249	&self,
250	provider: Option<String>,
251	force: bool,
252	src: String,
253	dst: String,
254) -> Result {
255	let id = provider.as_deref().unwrap_or_default();
256	let provider = self.services.storage.provider(id)?;
257	let overwrite = force
258		.then_some(CopyMode::Overwrite)
259		.unwrap_or(CopyMode::Create);
260
261	let result = provider.rename(&src, &dst, overwrite).await;
262
263	self.write_string(format!("{result:#?}\n")).await
264}
265
266#[admin_command]
267async fn query_storage_delete(
268	&self,
269	provider: Option<String>,
270	src: Vec<String>,
271	verbose: bool,
272) -> Result {
273	let id = provider.as_deref().unwrap_or_default();
274	let provider = self.services.storage.provider(id)?;
275
276	provider
277		.delete(src.into_iter().stream())
278		.for_each(async |result| {
279			match result {
280				| Ok(_) if !verbose => Ok(()),
281				| Ok(path) =>
282					self.write_string(format!("deleted {path}\n"))
283						.await,
284				| Err(e) =>
285					self.write_string(format!("failed: {e:?}\n"))
286						.await,
287			}
288			.log_err()
289			.ok();
290		})
291		.map(Ok)
292		.await
293}
294
295#[admin_command]
296async fn query_storage_sync(&self, src: String, dst: String) -> Result {
297	let src_p = self.services.storage.provider(&src)?;
298
299	let dst_p = self.services.storage.provider(&dst)?;
300
301	let src = src_p
302		.list(None)
303		.map_ok(|meta| meta.location)
304		.try_collect::<HashSet<_>>();
305
306	let dst = dst_p
307		.list(None)
308		.map_ok(|meta| meta.location)
309		.try_collect::<HashSet<_>>();
310
311	let (src, dst) = try_join(src, dst).await?;
312
313	src.difference(&dst)
314		.try_stream()
315		.broadn_and_then(2, async |item| {
316			let data = src_p.get(item.as_ref()).await?;
317			let put = dst_p.put_one(item.as_ref(), data).await?;
318
319			Ok((item, put))
320		})
321		.try_for_each(|(item, put)| {
322			writeln!(&self, "Moved {item} from {src:?} to {dst:?}; {put:?}")
323		})
324		.await
325}