tuwunel_database/map/
stream_from.rs1use std::{convert::AsRef, fmt::Debug, sync::Arc};
2
3use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
4use rocksdb::Direction;
5use serde::{Deserialize, Serialize};
6use tokio::task;
7use tuwunel_core::{Result, implement};
8
9use crate::{
10 keyval::{KeyVal, result_deserialize, serialize_key},
11 stream,
12};
13
14#[implement(super::Map)]
19pub fn stream_from<'a, K, V, P>(
20 self: &'a Arc<Self>,
21 from: &P,
22) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + use<'a, K, V, P>
23where
24 P: Serialize + ?Sized + Debug,
25 K: Deserialize<'a> + Send,
26 V: Deserialize<'a> + Send,
27{
28 self.stream_from_raw(from)
29 .map(result_deserialize::<K, V>)
30}
31
32#[implement(super::Map)]
37#[tracing::instrument(skip(self), level = "trace")]
38pub fn stream_from_raw<P>(
39 self: &Arc<Self>,
40 from: &P,
41) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + use<'_, P>
42where
43 P: Serialize + ?Sized + Debug,
44{
45 let key = serialize_key(from).expect("failed to serialize query key");
46 self.raw_stream_from(&key)
47}
48
49#[implement(super::Map)]
54pub fn stream_raw_from<'a, K, V, P>(
55 self: &'a Arc<Self>,
56 from: &P,
57) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + use<'a, K, V, P>
58where
59 P: AsRef<[u8]> + ?Sized + Debug + Sync,
60 K: Deserialize<'a> + Send,
61 V: Deserialize<'a> + Send,
62{
63 self.raw_stream_from(from)
64 .map(result_deserialize::<K, V>)
65}
66
67#[implement(super::Map)]
72#[tracing::instrument(skip(self, from), fields(%self), level = "trace")]
73pub fn raw_stream_from<P>(
74 self: &Arc<Self>,
75 from: &P,
76) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + use<'_, P>
77where
78 P: AsRef<[u8]> + ?Sized + Debug,
79{
80 use crate::pool::Seek;
81
82 let opts = super::iter_options_default(&self.engine);
83 let state = stream::State::new(self, opts);
84 if is_cached(self, from) {
85 let state = state.init_fwd(from.as_ref().into());
86 return Either::Left(
87 task::consume_budget()
88 .map(move |()| stream::Items::<'_>::from(state))
89 .into_stream()
90 .flatten(),
91 );
92 }
93
94 let seek = Seek {
95 map: self.clone(),
96 dir: Direction::Forward,
97 key: Some(from.as_ref().into()),
98 state: crate::pool::into_send_seek(state),
99 res: None,
100 };
101
102 Either::Right(
103 self.engine
104 .pool
105 .execute_iter(seek)
106 .ok_into::<stream::Items<'_>>()
107 .into_stream()
108 .try_flatten(),
109 )
110}
111
112#[tracing::instrument(
113 name = "cached",
114 level = "trace",
115 skip(map, from),
116 fields(%map),
117)]
118pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
119where
120 P: AsRef<[u8]> + ?Sized,
121{
122 let opts = super::cache_iter_options_default(&map.engine);
123 let state = stream::State::new(map, opts).init_fwd(from.as_ref().into());
124
125 !state.is_incomplete()
126}