1use std::{collections::BTreeMap, fmt::Write, sync::Arc};
2
3use base64::prelude::*;
4use clap::Subcommand;
5use futures::{FutureExt, StreamExt, TryStreamExt};
6use tokio::time::Instant;
7use tuwunel_core::{
8 Err, Result, apply, at, err, is_zero,
9 itertools::Itertools,
10 utils::{
11 TryReadyExt,
12 math::Expected,
13 stream::{IterStream, ReadyExt, TryIgnore, TryParallelExt},
14 string::EMPTY,
15 },
16};
17use tuwunel_database::{KeyVal, Map};
18use tuwunel_service::Services;
19
20use crate::{admin_command, admin_command_dispatch};
21
22#[admin_command_dispatch(handler_prefix = "raw")]
23#[derive(Debug, Subcommand)]
24pub(crate) enum RawCommand {
26 Maps,
28
29 Sequence,
31
32 Get {
34 map: String,
36
37 key: String,
39
40 #[arg(long, short)]
42 base64: bool,
43 },
44
45 Keys {
47 map: String,
49
50 prefix: Option<String>,
52
53 #[arg(short, long)]
55 limit: Option<usize>,
56
57 #[arg(short, long)]
59 from: Option<String>,
60
61 #[arg(short, long, default_value("false"))]
63 backwards: bool,
64 },
65
66 Iter {
68 map: String,
70
71 prefix: Option<String>,
73
74 #[arg(short, long)]
76 limit: Option<usize>,
77
78 #[arg(short, long)]
80 from: Option<String>,
81
82 #[arg(short, long, default_value("false"))]
84 backwards: bool,
85 },
86
87 KeysSizes {
89 map: Option<String>,
91
92 prefix: Option<String>,
94 },
95
96 KeysTotal {
98 map: Option<String>,
100
101 prefix: Option<String>,
103 },
104
105 ValsSizes {
107 map: Option<String>,
109
110 prefix: Option<String>,
112 },
113
114 ValsTotal {
116 map: Option<String>,
118
119 prefix: Option<String>,
121 },
122
123 Count {
125 map: Option<String>,
127
128 prefix: Option<String>,
130 },
131
132 Del {
134 map: String,
136
137 key: String,
139 },
140
141 Clear {
143 map: String,
145
146 #[arg(long)]
148 confirm: bool,
149 },
150
151 Compact {
153 #[arg(short, long, alias("column"))]
154 maps: Option<Vec<String>>,
155
156 #[arg(long)]
157 start: Option<String>,
158
159 #[arg(long)]
160 stop: Option<String>,
161
162 #[arg(long)]
163 from: Option<usize>,
164
165 #[arg(long)]
166 into: Option<usize>,
167
168 #[arg(long)]
173 parallelism: Option<usize>,
174
175 #[arg(long, default_value("false"))]
176 exhaustive: bool,
177 },
178}
179
180#[admin_command]
181pub(super) async fn raw_compact(
182 &self,
183 maps: Option<Vec<String>>,
184 start: Option<String>,
185 stop: Option<String>,
186 from: Option<usize>,
187 into: Option<usize>,
188 parallelism: Option<usize>,
189 exhaustive: bool,
190) -> Result {
191 use tuwunel_database::compact::Options;
192
193 let maps = with_maps_or(maps.as_deref(), self.services)?;
194
195 let range = (
196 start
197 .as_ref()
198 .map(String::as_bytes)
199 .map(Into::into),
200 stop.as_ref()
201 .map(String::as_bytes)
202 .map(Into::into),
203 );
204
205 let options = Options {
206 range,
207 level: (from, into),
208 exclusive: parallelism.is_some_and(is_zero!()),
209 exhaustive,
210 };
211
212 let runtime = self.services.server.runtime().clone();
213 let parallelism = parallelism.unwrap_or(1);
214 let results = maps
215 .into_iter()
216 .try_stream()
217 .paralleln_and_then(runtime, parallelism, move |map| {
218 map.compact_blocking(options.clone())?;
219 Ok(map.name().to_owned())
220 })
221 .collect::<Vec<_>>();
222
223 let timer = Instant::now();
224 let results = results.await;
225 let query_time = timer.elapsed();
226 self.write_str(&format!("Jobs completed in {query_time:?}:\n\n```rs\n{results:#?}\n```"))
227 .await
228}
229
230#[admin_command]
231pub(super) async fn raw_count(&self, map: Option<String>, prefix: Option<String>) -> Result {
232 let prefix = prefix.as_deref().unwrap_or(EMPTY);
233
234 let timer = Instant::now();
235 let count = with_map_or(map.as_deref(), self.services)?
236 .iter()
237 .stream()
238 .then(|map| map.raw_count_prefix(&prefix))
239 .ready_fold(0_usize, usize::saturating_add)
240 .await;
241
242 let query_time = timer.elapsed();
243 self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{count:#?}\n```"))
244 .await
245}
246
247#[admin_command]
248pub(super) async fn raw_keys(
249 &self,
250 map: String,
251 prefix: Option<String>,
252 limit: Option<usize>,
253 from: Option<String>,
254 backwards: bool,
255) -> Result {
256 writeln!(self, "```").boxed().await?;
257
258 let map = self.services.db.get(map.as_str())?;
259 let timer = Instant::now();
260 let stream = match from.as_ref().or(prefix.as_ref()) {
261 | Some(from) if !backwards => map.raw_keys_from(from).boxed(),
262 | Some(from) => map.rev_raw_keys_from(from).boxed(),
263 | None if !backwards => map.raw_keys().boxed(),
264 | None => map.rev_raw_keys().boxed(),
265 };
266
267 let prefix = prefix.as_ref().map(String::as_bytes);
268
269 stream
270 .ready_try_take_while(|k| {
271 Ok(prefix
272 .map(|prefix| k.starts_with(prefix))
273 .unwrap_or(true))
274 })
275 .take(limit.unwrap_or(usize::MAX))
276 .map_ok(encode)
277 .try_for_each(|str| writeln!(self, "{str}"))
278 .boxed()
279 .await?;
280
281 let query_time = timer.elapsed();
282 self.write_str(&format!("\n```\n\nQuery completed in {query_time:?}"))
283 .await
284}
285
286#[admin_command]
287pub(super) async fn raw_keys_sizes(&self, map: Option<String>, prefix: Option<String>) -> Result {
288 let prefix = prefix.as_deref().unwrap_or(EMPTY);
289
290 let timer = Instant::now();
291 let result = with_map_or(map.as_deref(), self.services)?
292 .iter()
293 .stream()
294 .map(|map| map.raw_keys_prefix(&prefix))
295 .flatten()
296 .ignore_err()
297 .map(<[u8]>::len)
298 .ready_fold_default(|mut map: BTreeMap<_, usize>, len| {
299 let entry = map.entry(len).or_default();
300 *entry = entry.saturating_add(1);
301 map
302 })
303 .await;
304
305 let query_time = timer.elapsed();
306 self.write_str(&format!("```\n{result:#?}\n```\n\nQuery completed in {query_time:?}"))
307 .await
308}
309
310#[admin_command]
311pub(super) async fn raw_keys_total(&self, map: Option<String>, prefix: Option<String>) -> Result {
312 let prefix = prefix.as_deref().unwrap_or(EMPTY);
313
314 let timer = Instant::now();
315 let result = with_map_or(map.as_deref(), self.services)?
316 .iter()
317 .stream()
318 .map(|map| map.raw_keys_prefix(&prefix))
319 .flatten()
320 .ignore_err()
321 .map(<[u8]>::len)
322 .ready_fold_default(|acc: usize, len| acc.saturating_add(len))
323 .await;
324
325 let query_time = timer.elapsed();
326 self.write_str(&format!("```\n{result:#?}\n\n```\n\nQuery completed in {query_time:?}"))
327 .await
328}
329
330#[admin_command]
331pub(super) async fn raw_vals_sizes(&self, map: Option<String>, prefix: Option<String>) -> Result {
332 let prefix = prefix.as_deref().unwrap_or(EMPTY);
333
334 let timer = Instant::now();
335 let result = with_map_or(map.as_deref(), self.services)?
336 .iter()
337 .stream()
338 .map(|map| map.raw_stream_prefix(&prefix))
339 .flatten()
340 .ignore_err()
341 .map(at!(1))
342 .map(<[u8]>::len)
343 .ready_fold_default(|mut map: BTreeMap<_, usize>, len| {
344 let entry = map.entry(len).or_default();
345 *entry = entry.saturating_add(1);
346 map
347 })
348 .await;
349
350 let query_time = timer.elapsed();
351 self.write_str(&format!("```\n{result:#?}\n```\n\nQuery completed in {query_time:?}"))
352 .await
353}
354
355#[admin_command]
356pub(super) async fn raw_vals_total(&self, map: Option<String>, prefix: Option<String>) -> Result {
357 let prefix = prefix.as_deref().unwrap_or(EMPTY);
358
359 let timer = Instant::now();
360 let result = with_map_or(map.as_deref(), self.services)?
361 .iter()
362 .stream()
363 .map(|map| map.raw_stream_prefix(&prefix))
364 .flatten()
365 .ignore_err()
366 .map(at!(1))
367 .map(<[u8]>::len)
368 .ready_fold_default(|acc: usize, len| acc.saturating_add(len))
369 .await;
370
371 let query_time = timer.elapsed();
372 self.write_str(&format!("```\n{result:#?}\n\n```\n\nQuery completed in {query_time:?}"))
373 .await
374}
375
376#[admin_command]
377pub(super) async fn raw_iter(
378 &self,
379 map: String,
380 prefix: Option<String>,
381 limit: Option<usize>,
382 from: Option<String>,
383 backwards: bool,
384) -> Result {
385 writeln!(self, "```").await?;
386
387 let map = self.services.db.get(&map)?;
388 let timer = Instant::now();
389 let stream = match from.as_ref().or(prefix.as_ref()) {
390 | Some(from) if !backwards => map.raw_stream_from(from).boxed(),
391 | Some(from) => map.rev_raw_stream_from(from).boxed(),
392 | None if !backwards => map.raw_stream().boxed(),
393 | None => map.rev_raw_stream().boxed(),
394 };
395
396 let prefix = prefix.as_ref().map(String::as_bytes);
397
398 stream
399 .ready_try_take_while(|(k, _): &KeyVal<'_>| {
400 Ok(prefix
401 .map(|prefix| k.starts_with(prefix))
402 .unwrap_or(true))
403 })
404 .take(limit.unwrap_or(usize::MAX))
405 .map_ok(apply!(2, encode))
406 .try_for_each(|(key, val)| writeln!(self, "{{{key} => {val}}}"))
407 .boxed()
408 .await?;
409
410 let query_time = timer.elapsed();
411 self.write_str(&format!("\n```\n\nQuery completed in {query_time:?}"))
412 .await
413}
414
415#[admin_command]
416pub(super) async fn raw_del(&self, map: String, key: String) -> Result {
417 let map = self.services.db.get(&map)?;
418 let timer = Instant::now();
419 map.remove(&key);
420
421 let query_time = timer.elapsed();
422 self.write_str(&format!("Operation completed in {query_time:?}"))
423 .await
424}
425
426#[admin_command]
427pub(super) async fn raw_clear(&self, map: String, confirm: bool) -> Result {
428 let map = self.services.db.get(&map)?;
429
430 if !confirm {
431 return Err!("Are you really sure you want to clear all data? Add the --confirm option.");
432 }
433
434 let timer = Instant::now();
435 let cork = self.services.db.cork();
436 let count = map
437 .raw_keys()
438 .ignore_err()
439 .map(|key| map.remove(&key))
440 .count()
441 .boxed()
442 .await;
443
444 drop(cork);
445 let query_time = timer.elapsed();
446 self.write_string(format!("Operation completed in {query_time:?}\n\nremoved {count} keys\n"))
447 .await
448}
449
450#[admin_command]
451pub(super) async fn raw_get(&self, map: String, key: String, base64: bool) -> Result {
452 let map = self.services.db.get(&map)?;
453 let timer = Instant::now();
454 let handle = map.get(&key).await?;
455
456 let query_time = timer.elapsed();
457
458 let result = if base64 {
459 BASE64_STANDARD.encode(&handle)
460 } else {
461 encode(&handle)
462 };
463
464 self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{result:?}\n```"))
465 .await
466}
467
468#[admin_command]
469pub(super) async fn raw_sequence(&self) -> Result {
470 let sequence = self.services.db.engine.current_sequence();
471
472 self.write_str(&format!("{sequence:#?}")).await
473}
474
475#[admin_command]
476pub(super) async fn raw_maps(&self) -> Result {
477 let list: Vec<_> = self
478 .services
479 .db
480 .iter()
481 .map(at!(0))
482 .copied()
483 .collect();
484
485 self.write_str(&format!("{list:#?}")).await
486}
487
488fn with_map_or(map: Option<&str>, services: &Services) -> Result<Vec<Arc<Map>>> {
489 with_maps_or(
490 map.map(|map| [map])
491 .as_ref()
492 .map(<[&str; 1]>::as_slice),
493 services,
494 )
495}
496
497fn with_maps_or<S: AsRef<str>>(maps: Option<&[S]>, services: &Services) -> Result<Vec<Arc<Map>>> {
498 Ok(if let Some(maps) = maps {
499 maps.iter()
500 .map(|map| {
501 let map = map.as_ref();
502 services
503 .db
504 .get(map)
505 .cloned()
506 .map_err(|_| err!("map {map} not found"))
507 })
508 .try_collect()?
509 } else {
510 services.db.iter().map(|x| x.1.clone()).collect()
511 })
512}
513
514#[expect(clippy::as_conversions)]
515fn encode(data: &[u8]) -> String {
516 let mut res = String::with_capacity(data.len().expected_mul(4));
517
518 for byte in data {
519 if *byte < 0x20 || *byte > 0x7E {
520 let _ = write!(res, "\\x{byte:02x}");
521 } else {
522 res.push(*byte as char);
523 }
524 }
525
526 res
527}