Skip to main content

tuwunel_database/map/
keys_from.rs

1use std::{convert::AsRef, fmt::Debug, sync::Arc};
2
3use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
4use rocksdb::Direction;
5use serde::{Deserialize, Serialize};
6use tokio::task;
7use tuwunel_core::{Result, implement};
8
9use super::stream_from::is_cached;
10use crate::{
11	keyval::{Key, result_deserialize_key, serialize_key},
12	stream,
13};
14
15#[implement(super::Map)]
16pub fn keys_from<'a, K, P>(
17	self: &'a Arc<Self>,
18	from: &P,
19) -> impl Stream<Item = Result<Key<'_, K>>> + Send + use<'a, K, P>
20where
21	P: Serialize + ?Sized + Debug,
22	K: Deserialize<'a> + Send,
23{
24	self.keys_from_raw(from)
25		.map(result_deserialize_key::<K>)
26}
27
28#[implement(super::Map)]
29#[tracing::instrument(skip(self), level = "trace")]
30pub fn keys_from_raw<P>(
31	self: &Arc<Self>,
32	from: &P,
33) -> impl Stream<Item = Result<Key<'_>>> + Send + use<'_, P>
34where
35	P: Serialize + ?Sized + Debug,
36{
37	let key = serialize_key(from).expect("failed to serialize query key");
38	self.raw_keys_from(&key)
39}
40
41#[implement(super::Map)]
42pub fn keys_raw_from<'a, K, P>(
43	self: &'a Arc<Self>,
44	from: &P,
45) -> impl Stream<Item = Result<Key<'_, K>>> + Send + use<'a, K, P>
46where
47	P: AsRef<[u8]> + ?Sized + Debug + Sync,
48	K: Deserialize<'a> + Send,
49{
50	self.raw_keys_from(from)
51		.map(result_deserialize_key::<K>)
52}
53
54#[implement(super::Map)]
55#[tracing::instrument(skip(self, from), fields(%self), level = "trace")]
56pub fn raw_keys_from<P>(
57	self: &Arc<Self>,
58	from: &P,
59) -> impl Stream<Item = Result<Key<'_>>> + Send + use<'_, P>
60where
61	P: AsRef<[u8]> + ?Sized + Debug,
62{
63	use crate::pool::Seek;
64
65	let opts = super::iter_options_default(&self.engine);
66	let state = stream::State::new(self, opts);
67	if is_cached(self, from) {
68		let state = state.init_fwd(from.as_ref().into());
69		return Either::Left(
70			task::consume_budget()
71				.map(move |()| stream::Keys::<'_>::from(state))
72				.into_stream()
73				.flatten(),
74		);
75	}
76
77	let seek = Seek {
78		map: self.clone(),
79		dir: Direction::Forward,
80		key: Some(from.as_ref().into()),
81		state: crate::pool::into_send_seek(state),
82		res: None,
83	};
84
85	Either::Right(
86		self.engine
87			.pool
88			.execute_iter(seek)
89			.ok_into::<stream::Keys<'_>>()
90			.into_stream()
91			.try_flatten(),
92	)
93}