tuwunel_database/map/
rev_stream.rs1use std::sync::Arc;
2
3use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
4use rocksdb::Direction;
5use serde::Deserialize;
6use tokio::task;
7use tuwunel_core::{Result, implement};
8
9use crate::{keyval, keyval::KeyVal, stream};
10
11#[implement(super::Map)]
15pub fn rev_stream<'a, K, V>(
16 self: &'a Arc<Self>,
17) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send
18where
19 K: Deserialize<'a> + Send,
20 V: Deserialize<'a> + Send,
21{
22 self.rev_raw_stream()
23 .map(keyval::result_deserialize::<K, V>)
24}
25
26#[implement(super::Map)]
30#[tracing::instrument(skip(self), fields(%self), level = "trace")]
31pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
32 use crate::pool::Seek;
33
34 let opts = super::iter_options_default(&self.engine);
35 let state = stream::State::new(self, opts);
36 if is_cached(self) {
37 let state = state.init_rev(None);
38 return Either::Left(
39 task::consume_budget()
40 .map(move |()| stream::ItemsRev::<'_>::from(state))
41 .into_stream()
42 .flatten(),
43 );
44 }
45
46 let seek = Seek {
47 map: self.clone(),
48 dir: Direction::Reverse,
49 state: crate::pool::into_send_seek(state),
50 key: None,
51 res: None,
52 };
53
54 Either::Right(
55 self.engine
56 .pool
57 .execute_iter(seek)
58 .ok_into::<stream::ItemsRev<'_>>()
59 .into_stream()
60 .try_flatten(),
61 )
62}
63
64#[tracing::instrument(
65 name = "cached",
66 level = "trace",
67 skip_all,
68 fields(%map),
69)]
70pub(super) fn is_cached(map: &Arc<super::Map>) -> bool {
71 let opts = super::cache_iter_options_default(&map.engine);
72 let state = stream::State::new(map, opts).init_rev(None);
73
74 !state.is_incomplete()
75}