Skip to main content

tuwunel_admin/query/
raw.rs

1use std::{collections::BTreeMap, fmt::Write, sync::Arc};
2
3use base64::prelude::*;
4use clap::Subcommand;
5use futures::{FutureExt, StreamExt, TryStreamExt};
6use tokio::time::Instant;
7use tuwunel_core::{
8	Err, Result, apply, at, err, is_zero,
9	itertools::Itertools,
10	utils::{
11		TryReadyExt,
12		math::Expected,
13		stream::{IterStream, ReadyExt, TryIgnore, TryParallelExt},
14		string::EMPTY,
15	},
16};
17use tuwunel_database::{KeyVal, Map};
18use tuwunel_service::Services;
19
20use crate::{admin_command, admin_command_dispatch};
21
22#[admin_command_dispatch(handler_prefix = "raw")]
23#[derive(Debug, Subcommand)]
24/// Query tables from database
25pub(crate) enum RawCommand {
26	/// - List database maps
27	Maps,
28
29	/// - Current rocksdb sequence number.
30	Sequence,
31
32	/// - Raw database query
33	Get {
34		/// Map name
35		map: String,
36
37		/// Key
38		key: String,
39
40		/// Encode as base64
41		#[arg(long, short)]
42		base64: bool,
43	},
44
45	/// - Raw database keys iteration
46	Keys {
47		/// Map name
48		map: String,
49
50		/// Key prefix
51		prefix: Option<String>,
52
53		/// Limit
54		#[arg(short, long)]
55		limit: Option<usize>,
56
57		/// Lower bound
58		#[arg(short, long)]
59		from: Option<String>,
60
61		/// Reverse iteration order
62		#[arg(short, long, default_value("false"))]
63		backwards: bool,
64	},
65
66	/// - Raw database items iteration
67	Iter {
68		/// Map name
69		map: String,
70
71		/// Key prefix
72		prefix: Option<String>,
73
74		/// Limit
75		#[arg(short, long)]
76		limit: Option<usize>,
77
78		/// Lower bound
79		#[arg(short, long)]
80		from: Option<String>,
81
82		/// Reverse iteration order
83		#[arg(short, long, default_value("false"))]
84		backwards: bool,
85	},
86
87	/// - Raw database key size breakdown
88	KeysSizes {
89		/// Map name
90		map: Option<String>,
91
92		/// Key prefix
93		prefix: Option<String>,
94	},
95
96	/// - Raw database keys total bytes
97	KeysTotal {
98		/// Map name
99		map: Option<String>,
100
101		/// Key prefix
102		prefix: Option<String>,
103	},
104
105	/// - Raw database values size breakdown
106	ValsSizes {
107		/// Map name
108		map: Option<String>,
109
110		/// Key prefix
111		prefix: Option<String>,
112	},
113
114	/// - Raw database values total bytes
115	ValsTotal {
116		/// Map name
117		map: Option<String>,
118
119		/// Key prefix
120		prefix: Option<String>,
121	},
122
123	/// - Raw database record count
124	Count {
125		/// Map name
126		map: Option<String>,
127
128		/// Key prefix
129		prefix: Option<String>,
130	},
131
132	/// - Raw database delete (for string keys) DANGER!!!
133	Del {
134		/// Map name
135		map: String,
136
137		/// Key
138		key: String,
139	},
140
141	/// - Clear database table DANGER!!!
142	Clear {
143		/// Map name
144		map: String,
145
146		/// Confirm
147		#[arg(long)]
148		confirm: bool,
149	},
150
151	/// - Compact database DANGER!!!
152	Compact {
153		#[arg(short, long, alias("column"))]
154		maps: Option<Vec<String>>,
155
156		#[arg(long)]
157		start: Option<String>,
158
159		#[arg(long)]
160		stop: Option<String>,
161
162		#[arg(long)]
163		from: Option<usize>,
164
165		#[arg(long)]
166		into: Option<usize>,
167
168		/// There is one compaction job per column; then this controls how many
169		/// columns are compacted in parallel. If zero, one compaction job is
170		/// still run at a time here, but in exclusive-mode blocking any other
171		/// automatic compaction jobs until complete.
172		#[arg(long)]
173		parallelism: Option<usize>,
174
175		#[arg(long, default_value("false"))]
176		exhaustive: bool,
177	},
178}
179
180#[admin_command]
181pub(super) async fn raw_compact(
182	&self,
183	maps: Option<Vec<String>>,
184	start: Option<String>,
185	stop: Option<String>,
186	from: Option<usize>,
187	into: Option<usize>,
188	parallelism: Option<usize>,
189	exhaustive: bool,
190) -> Result {
191	use tuwunel_database::compact::Options;
192
193	let maps = with_maps_or(maps.as_deref(), self.services)?;
194
195	let range = (
196		start
197			.as_ref()
198			.map(String::as_bytes)
199			.map(Into::into),
200		stop.as_ref()
201			.map(String::as_bytes)
202			.map(Into::into),
203	);
204
205	let options = Options {
206		range,
207		level: (from, into),
208		exclusive: parallelism.is_some_and(is_zero!()),
209		exhaustive,
210	};
211
212	let runtime = self.services.server.runtime().clone();
213	let parallelism = parallelism.unwrap_or(1);
214	let results = maps
215		.into_iter()
216		.try_stream()
217		.paralleln_and_then(runtime, parallelism, move |map| {
218			map.compact_blocking(options.clone())?;
219			Ok(map.name().to_owned())
220		})
221		.collect::<Vec<_>>();
222
223	let timer = Instant::now();
224	let results = results.await;
225	let query_time = timer.elapsed();
226	self.write_str(&format!("Jobs completed in {query_time:?}:\n\n```rs\n{results:#?}\n```"))
227		.await
228}
229
230#[admin_command]
231pub(super) async fn raw_count(&self, map: Option<String>, prefix: Option<String>) -> Result {
232	let prefix = prefix.as_deref().unwrap_or(EMPTY);
233
234	let timer = Instant::now();
235	let count = with_map_or(map.as_deref(), self.services)?
236		.iter()
237		.stream()
238		.then(|map| map.raw_count_prefix(&prefix))
239		.ready_fold(0_usize, usize::saturating_add)
240		.await;
241
242	let query_time = timer.elapsed();
243	self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{count:#?}\n```"))
244		.await
245}
246
247#[admin_command]
248pub(super) async fn raw_keys(
249	&self,
250	map: String,
251	prefix: Option<String>,
252	limit: Option<usize>,
253	from: Option<String>,
254	backwards: bool,
255) -> Result {
256	writeln!(self, "```").boxed().await?;
257
258	let map = self.services.db.get(map.as_str())?;
259	let timer = Instant::now();
260	let stream = match from.as_ref().or(prefix.as_ref()) {
261		| Some(from) if !backwards => map.raw_keys_from(from).boxed(),
262		| Some(from) => map.rev_raw_keys_from(from).boxed(),
263		| None if !backwards => map.raw_keys().boxed(),
264		| None => map.rev_raw_keys().boxed(),
265	};
266
267	let prefix = prefix.as_ref().map(String::as_bytes);
268
269	stream
270		.ready_try_take_while(|k| {
271			Ok(prefix
272				.map(|prefix| k.starts_with(prefix))
273				.unwrap_or(true))
274		})
275		.take(limit.unwrap_or(usize::MAX))
276		.map_ok(encode)
277		.try_for_each(|str| writeln!(self, "{str}"))
278		.boxed()
279		.await?;
280
281	let query_time = timer.elapsed();
282	self.write_str(&format!("\n```\n\nQuery completed in {query_time:?}"))
283		.await
284}
285
286#[admin_command]
287pub(super) async fn raw_keys_sizes(&self, map: Option<String>, prefix: Option<String>) -> Result {
288	let prefix = prefix.as_deref().unwrap_or(EMPTY);
289
290	let timer = Instant::now();
291	let result = with_map_or(map.as_deref(), self.services)?
292		.iter()
293		.stream()
294		.map(|map| map.raw_keys_prefix(&prefix))
295		.flatten()
296		.ignore_err()
297		.map(<[u8]>::len)
298		.ready_fold_default(|mut map: BTreeMap<_, usize>, len| {
299			let entry = map.entry(len).or_default();
300			*entry = entry.saturating_add(1);
301			map
302		})
303		.await;
304
305	let query_time = timer.elapsed();
306	self.write_str(&format!("```\n{result:#?}\n```\n\nQuery completed in {query_time:?}"))
307		.await
308}
309
310#[admin_command]
311pub(super) async fn raw_keys_total(&self, map: Option<String>, prefix: Option<String>) -> Result {
312	let prefix = prefix.as_deref().unwrap_or(EMPTY);
313
314	let timer = Instant::now();
315	let result = with_map_or(map.as_deref(), self.services)?
316		.iter()
317		.stream()
318		.map(|map| map.raw_keys_prefix(&prefix))
319		.flatten()
320		.ignore_err()
321		.map(<[u8]>::len)
322		.ready_fold_default(|acc: usize, len| acc.saturating_add(len))
323		.await;
324
325	let query_time = timer.elapsed();
326	self.write_str(&format!("```\n{result:#?}\n\n```\n\nQuery completed in {query_time:?}"))
327		.await
328}
329
330#[admin_command]
331pub(super) async fn raw_vals_sizes(&self, map: Option<String>, prefix: Option<String>) -> Result {
332	let prefix = prefix.as_deref().unwrap_or(EMPTY);
333
334	let timer = Instant::now();
335	let result = with_map_or(map.as_deref(), self.services)?
336		.iter()
337		.stream()
338		.map(|map| map.raw_stream_prefix(&prefix))
339		.flatten()
340		.ignore_err()
341		.map(at!(1))
342		.map(<[u8]>::len)
343		.ready_fold_default(|mut map: BTreeMap<_, usize>, len| {
344			let entry = map.entry(len).or_default();
345			*entry = entry.saturating_add(1);
346			map
347		})
348		.await;
349
350	let query_time = timer.elapsed();
351	self.write_str(&format!("```\n{result:#?}\n```\n\nQuery completed in {query_time:?}"))
352		.await
353}
354
355#[admin_command]
356pub(super) async fn raw_vals_total(&self, map: Option<String>, prefix: Option<String>) -> Result {
357	let prefix = prefix.as_deref().unwrap_or(EMPTY);
358
359	let timer = Instant::now();
360	let result = with_map_or(map.as_deref(), self.services)?
361		.iter()
362		.stream()
363		.map(|map| map.raw_stream_prefix(&prefix))
364		.flatten()
365		.ignore_err()
366		.map(at!(1))
367		.map(<[u8]>::len)
368		.ready_fold_default(|acc: usize, len| acc.saturating_add(len))
369		.await;
370
371	let query_time = timer.elapsed();
372	self.write_str(&format!("```\n{result:#?}\n\n```\n\nQuery completed in {query_time:?}"))
373		.await
374}
375
376#[admin_command]
377pub(super) async fn raw_iter(
378	&self,
379	map: String,
380	prefix: Option<String>,
381	limit: Option<usize>,
382	from: Option<String>,
383	backwards: bool,
384) -> Result {
385	writeln!(self, "```").await?;
386
387	let map = self.services.db.get(&map)?;
388	let timer = Instant::now();
389	let stream = match from.as_ref().or(prefix.as_ref()) {
390		| Some(from) if !backwards => map.raw_stream_from(from).boxed(),
391		| Some(from) => map.rev_raw_stream_from(from).boxed(),
392		| None if !backwards => map.raw_stream().boxed(),
393		| None => map.rev_raw_stream().boxed(),
394	};
395
396	let prefix = prefix.as_ref().map(String::as_bytes);
397
398	stream
399		.ready_try_take_while(|(k, _): &KeyVal<'_>| {
400			Ok(prefix
401				.map(|prefix| k.starts_with(prefix))
402				.unwrap_or(true))
403		})
404		.take(limit.unwrap_or(usize::MAX))
405		.map_ok(apply!(2, encode))
406		.try_for_each(|(key, val)| writeln!(self, "{{{key} => {val}}}"))
407		.boxed()
408		.await?;
409
410	let query_time = timer.elapsed();
411	self.write_str(&format!("\n```\n\nQuery completed in {query_time:?}"))
412		.await
413}
414
415#[admin_command]
416pub(super) async fn raw_del(&self, map: String, key: String) -> Result {
417	let map = self.services.db.get(&map)?;
418	let timer = Instant::now();
419	map.remove(&key);
420
421	let query_time = timer.elapsed();
422	self.write_str(&format!("Operation completed in {query_time:?}"))
423		.await
424}
425
426#[admin_command]
427pub(super) async fn raw_clear(&self, map: String, confirm: bool) -> Result {
428	let map = self.services.db.get(&map)?;
429
430	if !confirm {
431		return Err!("Are you really sure you want to clear all data? Add the --confirm option.");
432	}
433
434	let timer = Instant::now();
435	let cork = self.services.db.cork();
436	let count = map
437		.raw_keys()
438		.ignore_err()
439		.map(|key| map.remove(&key))
440		.count()
441		.boxed()
442		.await;
443
444	drop(cork);
445	let query_time = timer.elapsed();
446	self.write_string(format!("Operation completed in {query_time:?}\n\nremoved {count} keys\n"))
447		.await
448}
449
450#[admin_command]
451pub(super) async fn raw_get(&self, map: String, key: String, base64: bool) -> Result {
452	let map = self.services.db.get(&map)?;
453	let timer = Instant::now();
454	let handle = map.get(&key).await?;
455
456	let query_time = timer.elapsed();
457
458	let result = if base64 {
459		BASE64_STANDARD.encode(&handle)
460	} else {
461		encode(&handle)
462	};
463
464	self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{result:?}\n```"))
465		.await
466}
467
468#[admin_command]
469pub(super) async fn raw_sequence(&self) -> Result {
470	let sequence = self.services.db.engine.current_sequence();
471
472	self.write_str(&format!("{sequence:#?}")).await
473}
474
475#[admin_command]
476pub(super) async fn raw_maps(&self) -> Result {
477	let list: Vec<_> = self
478		.services
479		.db
480		.iter()
481		.map(at!(0))
482		.copied()
483		.collect();
484
485	self.write_str(&format!("{list:#?}")).await
486}
487
488fn with_map_or(map: Option<&str>, services: &Services) -> Result<Vec<Arc<Map>>> {
489	with_maps_or(
490		map.map(|map| [map])
491			.as_ref()
492			.map(<[&str; 1]>::as_slice),
493		services,
494	)
495}
496
497fn with_maps_or<S: AsRef<str>>(maps: Option<&[S]>, services: &Services) -> Result<Vec<Arc<Map>>> {
498	Ok(if let Some(maps) = maps {
499		maps.iter()
500			.map(|map| {
501				let map = map.as_ref();
502				services
503					.db
504					.get(map)
505					.cloned()
506					.map_err(|_| err!("map {map} not found"))
507			})
508			.try_collect()?
509	} else {
510		services.db.iter().map(|x| x.1.clone()).collect()
511	})
512}
513
514#[expect(clippy::as_conversions)]
515fn encode(data: &[u8]) -> String {
516	let mut res = String::with_capacity(data.len().expected_mul(4));
517
518	for byte in data {
519		if *byte < 0x20 || *byte > 0x7E {
520			let _ = write!(res, "\\x{byte:02x}");
521		} else {
522			res.push(*byte as char);
523		}
524	}
525
526	res
527}