Skip to main content

tuwunel_database/map/
stream.rs

1use std::sync::Arc;
2
3use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either};
4use rocksdb::Direction;
5use serde::Deserialize;
6use tokio::task;
7use tuwunel_core::{Result, implement};
8
9use crate::{keyval, keyval::KeyVal, stream};
10
11/// Iterate key-value entries in the map from the beginning.
12///
13/// - Result is deserialized
14#[implement(super::Map)]
15pub fn stream<'a, K, V>(
16	self: &'a Arc<Self>,
17) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send
18where
19	K: Deserialize<'a> + Send,
20	V: Deserialize<'a> + Send,
21{
22	self.raw_stream()
23		.map(keyval::result_deserialize::<K, V>)
24}
25
26/// Iterate key-value entries in the map from the beginning.
27///
28/// - Result is raw
29#[implement(super::Map)]
30#[tracing::instrument(skip(self), fields(%self), level = "trace")]
31pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
32	use crate::pool::Seek;
33
34	let opts = super::iter_options_default(&self.engine);
35	let state = stream::State::new(self, opts);
36	if is_cached(self) {
37		let state = state.init_fwd(None);
38		return Either::Left(
39			task::consume_budget()
40				.map(move |()| stream::Items::<'_>::from(state))
41				.into_stream()
42				.flatten(),
43		);
44	}
45
46	let seek = Seek {
47		map: self.clone(),
48		dir: Direction::Forward,
49		state: crate::pool::into_send_seek(state),
50		key: None,
51		res: None,
52	};
53
54	Either::Right(
55		self.engine
56			.pool
57			.execute_iter(seek)
58			.ok_into::<stream::Items<'_>>()
59			.into_stream()
60			.try_flatten(),
61	)
62}
63
64#[tracing::instrument(
65    name = "cached",
66    level = "trace",
67    skip_all,
68    fields(%map),
69)]
70pub(super) fn is_cached(map: &Arc<super::Map>) -> bool {
71	let opts = super::cache_iter_options_default(&map.engine);
72	let state = stream::State::new(map, opts).init_fwd(None);
73
74	!state.is_incomplete()
75}