Skip to main content

tuwunel_database/map/
rev_keys.rs

1use std::sync::Arc;
2
3use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
4use rocksdb::Direction;
5use serde::Deserialize;
6use tokio::task;
7use tuwunel_core::{Result, implement};
8
9use super::rev_stream::is_cached;
10use crate::{keyval, keyval::Key, stream};
11
12#[implement(super::Map)]
13pub fn rev_keys<'a, K>(self: &'a Arc<Self>) -> impl Stream<Item = Result<Key<'_, K>>> + Send
14where
15	K: Deserialize<'a> + Send,
16{
17	self.rev_raw_keys()
18		.map(keyval::result_deserialize_key::<K>)
19}
20
21#[implement(super::Map)]
22#[tracing::instrument(skip(self), fields(%self), level = "trace")]
23pub fn rev_raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send {
24	use crate::pool::Seek;
25
26	let opts = super::iter_options_default(&self.engine);
27	let state = stream::State::new(self, opts);
28	if is_cached(self) {
29		let state = state.init_rev(None);
30		return Either::Left(
31			task::consume_budget()
32				.map(move |()| stream::KeysRev::<'_>::from(state))
33				.into_stream()
34				.flatten(),
35		);
36	}
37
38	let seek = Seek {
39		map: self.clone(),
40		dir: Direction::Reverse,
41		state: crate::pool::into_send_seek(state),
42		key: None,
43		res: None,
44	};
45
46	Either::Right(
47		self.engine
48			.pool
49			.execute_iter(seek)
50			.ok_into::<stream::KeysRev<'_>>()
51			.into_stream()
52			.try_flatten(),
53	)
54}