Skip to main content

tuwunel_admin/query/raw/
iter.rs

1use futures::{FutureExt, StreamExt, TryStreamExt};
2use tokio::time::Instant;
3use tuwunel_core::{Result, apply, utils::TryReadyExt};
4use tuwunel_database::KeyVal;
5
6use super::encode;
7use crate::admin_command;
8
9#[admin_command]
10pub(super) async fn raw_iter(
11	&self,
12	map: String,
13	prefix: Option<String>,
14	limit: Option<usize>,
15	from: Option<String>,
16	backwards: bool,
17) -> Result {
18	writeln!(self, "```").await?;
19
20	let map = self.services.db.get(&map)?;
21	let timer = Instant::now();
22	let stream = match from.as_ref().or(prefix.as_ref()) {
23		| Some(from) if !backwards => map.raw_stream_from(from).boxed(),
24		| Some(from) => map.rev_raw_stream_from(from).boxed(),
25		| None if !backwards => map.raw_stream().boxed(),
26		| None => map.rev_raw_stream().boxed(),
27	};
28
29	let prefix = prefix.as_ref().map(String::as_bytes);
30
31	stream
32		.ready_try_take_while(|(k, _): &KeyVal<'_>| {
33			Ok(prefix
34				.map(|prefix| k.starts_with(prefix))
35				.unwrap_or(true))
36		})
37		.take(limit.unwrap_or(usize::MAX))
38		.map_ok(apply!(2, encode))
39		.try_for_each(|(key, val)| writeln!(self, "{{{key} => {val}}}"))
40		.boxed()
41		.await?;
42
43	let query_time = timer.elapsed();
44	write!(self, "\n```\n\nQuery completed in {query_time:?}").await
45}