Skip to main content

tuwunel_database/map/
stream_from.rs

1use std::{convert::AsRef, fmt::Debug, sync::Arc};
2
3use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
4use rocksdb::Direction;
5use serde::{Deserialize, Serialize};
6use tokio::task;
7use tuwunel_core::{Result, implement};
8
9use crate::{
10	keyval::{KeyVal, result_deserialize, serialize_key},
11	stream,
12};
13
14/// Iterate key-value entries in the map starting from lower-bound.
15///
16/// - Query is serialized
17/// - Result is deserialized
18#[implement(super::Map)]
19pub fn stream_from<'a, K, V, P>(
20	self: &'a Arc<Self>,
21	from: &P,
22) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + use<'a, K, V, P>
23where
24	P: Serialize + ?Sized + Debug,
25	K: Deserialize<'a> + Send,
26	V: Deserialize<'a> + Send,
27{
28	self.stream_from_raw(from)
29		.map(result_deserialize::<K, V>)
30}
31
32/// Iterate key-value entries in the map starting from lower-bound.
33///
34/// - Query is serialized
35/// - Result is raw
36#[implement(super::Map)]
37#[tracing::instrument(skip(self), level = "trace")]
38pub fn stream_from_raw<P>(
39	self: &Arc<Self>,
40	from: &P,
41) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + use<'_, P>
42where
43	P: Serialize + ?Sized + Debug,
44{
45	let key = serialize_key(from).expect("failed to serialize query key");
46	self.raw_stream_from(&key)
47}
48
49/// Iterate key-value entries in the map starting from lower-bound.
50///
51/// - Query is raw
52/// - Result is deserialized
53#[implement(super::Map)]
54pub fn stream_raw_from<'a, K, V, P>(
55	self: &'a Arc<Self>,
56	from: &P,
57) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + use<'a, K, V, P>
58where
59	P: AsRef<[u8]> + ?Sized + Debug + Sync,
60	K: Deserialize<'a> + Send,
61	V: Deserialize<'a> + Send,
62{
63	self.raw_stream_from(from)
64		.map(result_deserialize::<K, V>)
65}
66
67/// Iterate key-value entries in the map starting from lower-bound.
68///
69/// - Query is raw
70/// - Result is raw
71#[implement(super::Map)]
72#[tracing::instrument(skip(self, from), fields(%self), level = "trace")]
73pub fn raw_stream_from<P>(
74	self: &Arc<Self>,
75	from: &P,
76) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + use<'_, P>
77where
78	P: AsRef<[u8]> + ?Sized + Debug,
79{
80	use crate::pool::Seek;
81
82	let opts = super::iter_options_default(&self.engine);
83	let state = stream::State::new(self, opts);
84	if is_cached(self, from) {
85		let state = state.init_fwd(from.as_ref().into());
86		return Either::Left(
87			task::consume_budget()
88				.map(move |()| stream::Items::<'_>::from(state))
89				.into_stream()
90				.flatten(),
91		);
92	}
93
94	let seek = Seek {
95		map: self.clone(),
96		dir: Direction::Forward,
97		key: Some(from.as_ref().into()),
98		state: crate::pool::into_send_seek(state),
99		res: None,
100	};
101
102	Either::Right(
103		self.engine
104			.pool
105			.execute_iter(seek)
106			.ok_into::<stream::Items<'_>>()
107			.into_stream()
108			.try_flatten(),
109	)
110}
111
112#[tracing::instrument(
113    name = "cached",
114    level = "trace",
115    skip(map, from),
116    fields(%map),
117)]
118pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
119where
120	P: AsRef<[u8]> + ?Sized,
121{
122	let opts = super::cache_iter_options_default(&map.engine);
123	let state = stream::State::new(map, opts).init_fwd(from.as_ref().into());
124
125	!state.is_incomplete()
126}