tuwunel_database/map/
rev_keys.rs1use std::sync::Arc;
2
3use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
4use rocksdb::Direction;
5use serde::Deserialize;
6use tokio::task;
7use tuwunel_core::{Result, implement};
8
9use super::rev_stream::is_cached;
10use crate::{keyval, keyval::Key, stream};
11
12#[implement(super::Map)]
13pub fn rev_keys<'a, K>(self: &'a Arc<Self>) -> impl Stream<Item = Result<Key<'_, K>>> + Send
14where
15 K: Deserialize<'a> + Send,
16{
17 self.rev_raw_keys()
18 .map(keyval::result_deserialize_key::<K>)
19}
20
21#[implement(super::Map)]
22#[tracing::instrument(skip(self), fields(%self), level = "trace")]
23pub fn rev_raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send {
24 use crate::pool::Seek;
25
26 let opts = super::iter_options_default(&self.engine);
27 let state = stream::State::new(self, opts);
28 if is_cached(self) {
29 let state = state.init_rev(None);
30 return Either::Left(
31 task::consume_budget()
32 .map(move |()| stream::KeysRev::<'_>::from(state))
33 .into_stream()
34 .flatten(),
35 );
36 }
37
38 let seek = Seek {
39 map: self.clone(),
40 dir: Direction::Reverse,
41 state: crate::pool::into_send_seek(state),
42 key: None,
43 res: None,
44 };
45
46 Either::Right(
47 self.engine
48 .pool
49 .execute_iter(seek)
50 .ok_into::<stream::KeysRev<'_>>()
51 .into_stream()
52 .try_flatten(),
53 )
54}