Skip to main content

tuwunel_database/map/
stream_prefix.rs

1use std::{convert::AsRef, fmt::Debug, sync::Arc};
2
3use futures::{Stream, StreamExt, TryStreamExt, future};
4use serde::{Deserialize, Serialize};
5use tuwunel_core::{Result, implement};
6
7use crate::keyval::{KeyVal, result_deserialize, serialize_key};
8
9/// Iterate key-value entries in the map where the key matches a prefix.
10///
11/// - Query is serialized
12/// - Result is deserialized
13#[implement(super::Map)]
14pub fn stream_prefix<'a, K, V, P>(
15	self: &'a Arc<Self>,
16	prefix: &P,
17) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + use<'a, K, V, P>
18where
19	P: Serialize + ?Sized + Debug,
20	K: Deserialize<'a> + Send,
21	V: Deserialize<'a> + Send,
22{
23	self.stream_prefix_raw(prefix)
24		.map(result_deserialize::<K, V>)
25}
26
27/// Iterate key-value entries in the map where the key matches a prefix.
28///
29/// - Query is serialized
30/// - Result is raw
31#[implement(super::Map)]
32#[tracing::instrument(skip(self), level = "trace")]
33pub fn stream_prefix_raw<P>(
34	self: &Arc<Self>,
35	prefix: &P,
36) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + use<'_, P>
37where
38	P: Serialize + ?Sized + Debug,
39{
40	let key = serialize_key(prefix).expect("failed to serialize query key");
41	self.raw_stream_from(&key)
42		.try_take_while(move |(k, _): &KeyVal<'_>| future::ok(k.starts_with(&key)))
43}
44
45/// Iterate key-value entries in the map where the key matches a prefix.
46///
47/// - Query is raw
48/// - Result is deserialized
49#[implement(super::Map)]
50pub fn stream_raw_prefix<'a, K, V, P>(
51	self: &'a Arc<Self>,
52	prefix: &'a P,
53) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + 'a
54where
55	P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
56	K: Deserialize<'a> + Send + 'a,
57	V: Deserialize<'a> + Send + 'a,
58{
59	self.raw_stream_prefix(prefix)
60		.map(result_deserialize::<K, V>)
61}
62
63/// Iterate key-value entries in the map where the key matches a prefix.
64///
65/// - Query is raw
66/// - Result is raw
67#[implement(super::Map)]
68pub fn raw_stream_prefix<'a, P>(
69	self: &'a Arc<Self>,
70	prefix: &'a P,
71) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + 'a
72where
73	P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
74{
75	self.raw_stream_from(prefix)
76		.try_take_while(|(k, _): &KeyVal<'_>| future::ok(k.starts_with(prefix.as_ref())))
77}