Skip to main content

tuwunel_admin/query/raw/
compact.rs

1use futures::StreamExt;
2use tokio::time::Instant;
3use tuwunel_core::{
4	Result, is_zero,
5	utils::stream::{IterStream, TryParallelExt},
6};
7use tuwunel_database::compact::Options;
8
9use super::with_maps_or;
10use crate::admin_command;
11
12#[admin_command]
13pub(super) async fn raw_compact(
14	&self,
15	maps: Option<Vec<String>>,
16	start: Option<String>,
17	stop: Option<String>,
18	from: Option<usize>,
19	into: Option<usize>,
20	parallelism: Option<usize>,
21	exhaustive: bool,
22) -> Result {
23	let maps = with_maps_or(maps.as_deref(), self.services)?;
24
25	let range = (
26		start
27			.as_ref()
28			.map(String::as_bytes)
29			.map(Into::into),
30		stop.as_ref()
31			.map(String::as_bytes)
32			.map(Into::into),
33	);
34
35	let options = Options {
36		range,
37		level: (from, into),
38		exclusive: parallelism.is_some_and(is_zero!()),
39		exhaustive,
40	};
41
42	let runtime = self.services.server.runtime().clone();
43	let parallelism = parallelism.unwrap_or(1);
44	let results = maps
45		.into_iter()
46		.try_stream()
47		.paralleln_and_then(runtime, parallelism, move |map| {
48			map.compact_blocking(options.clone())?;
49			Ok(map.name().to_owned())
50		})
51		.collect::<Vec<_>>();
52
53	let timer = Instant::now();
54	let results = results.await;
55	let query_time = timer.elapsed();
56	write!(self, "Jobs completed in {query_time:?}:\n\n```rs\n{results:#?}\n```").await
57}