tuwunel_database/map/
rev_keys_from.rs1use std::{convert::AsRef, fmt::Debug, sync::Arc};
2
3use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
4use rocksdb::Direction;
5use serde::{Deserialize, Serialize};
6use tokio::task;
7use tuwunel_core::{Result, implement};
8
9use super::rev_stream_from::is_cached;
10use crate::{
11 keyval::{Key, result_deserialize_key, serialize_key},
12 stream,
13};
14
15#[implement(super::Map)]
16pub fn rev_keys_from<'a, K, P>(
17 self: &'a Arc<Self>,
18 from: &P,
19) -> impl Stream<Item = Result<Key<'_, K>>> + Send + use<'a, K, P>
20where
21 P: Serialize + ?Sized + Debug,
22 K: Deserialize<'a> + Send,
23{
24 self.rev_keys_from_raw(from)
25 .map(result_deserialize_key::<K>)
26}
27
28#[implement(super::Map)]
29#[tracing::instrument(skip(self), level = "trace")]
30pub fn rev_keys_from_raw<P>(
31 self: &Arc<Self>,
32 from: &P,
33) -> impl Stream<Item = Result<Key<'_>>> + Send + use<'_, P>
34where
35 P: Serialize + ?Sized + Debug,
36{
37 let key = serialize_key(from).expect("failed to serialize query key");
38 self.rev_raw_keys_from(&key)
39}
40
41#[implement(super::Map)]
42pub fn rev_keys_raw_from<'a, K, P>(
43 self: &'a Arc<Self>,
44 from: &P,
45) -> impl Stream<Item = Result<Key<'_, K>>> + Send + use<'a, K, P>
46where
47 P: AsRef<[u8]> + ?Sized + Debug + Sync,
48 K: Deserialize<'a> + Send,
49{
50 self.rev_raw_keys_from(from)
51 .map(result_deserialize_key::<K>)
52}
53
54#[implement(super::Map)]
55#[tracing::instrument(skip(self, from), fields(%self), level = "trace")]
56pub fn rev_raw_keys_from<P>(
57 self: &Arc<Self>,
58 from: &P,
59) -> impl Stream<Item = Result<Key<'_>>> + Send + use<'_, P>
60where
61 P: AsRef<[u8]> + ?Sized + Debug,
62{
63 use crate::pool::Seek;
64
65 let opts = super::iter_options_default(&self.engine);
66 let state = stream::State::new(self, opts);
67 if is_cached(self, from) {
68 let state = state.init_rev(from.as_ref().into());
69 return Either::Left(
70 task::consume_budget()
71 .map(move |()| stream::KeysRev::<'_>::from(state))
72 .into_stream()
73 .flatten(),
74 );
75 }
76
77 let seek = Seek {
78 map: self.clone(),
79 dir: Direction::Reverse,
80 key: Some(from.as_ref().into()),
81 state: crate::pool::into_send_seek(state),
82 res: None,
83 };
84
85 Either::Right(
86 self.engine
87 .pool
88 .execute_iter(seek)
89 .ok_into::<stream::KeysRev<'_>>()
90 .into_stream()
91 .try_flatten(),
92 )
93}