tuwunel_database/map/
stream_prefix.rs1use std::{convert::AsRef, fmt::Debug, sync::Arc};
2
3use futures::{Stream, StreamExt, TryStreamExt, future};
4use serde::{Deserialize, Serialize};
5use tuwunel_core::{Result, implement};
6
7use crate::keyval::{KeyVal, result_deserialize, serialize_key};
8
9#[implement(super::Map)]
14pub fn stream_prefix<'a, K, V, P>(
15 self: &'a Arc<Self>,
16 prefix: &P,
17) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + use<'a, K, V, P>
18where
19 P: Serialize + ?Sized + Debug,
20 K: Deserialize<'a> + Send,
21 V: Deserialize<'a> + Send,
22{
23 self.stream_prefix_raw(prefix)
24 .map(result_deserialize::<K, V>)
25}
26
27#[implement(super::Map)]
32#[tracing::instrument(skip(self), level = "trace")]
33pub fn stream_prefix_raw<P>(
34 self: &Arc<Self>,
35 prefix: &P,
36) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + use<'_, P>
37where
38 P: Serialize + ?Sized + Debug,
39{
40 let key = serialize_key(prefix).expect("failed to serialize query key");
41 self.raw_stream_from(&key)
42 .try_take_while(move |(k, _): &KeyVal<'_>| future::ok(k.starts_with(&key)))
43}
44
45#[implement(super::Map)]
50pub fn stream_raw_prefix<'a, K, V, P>(
51 self: &'a Arc<Self>,
52 prefix: &'a P,
53) -> impl Stream<Item = Result<KeyVal<'_, K, V>>> + Send + 'a
54where
55 P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
56 K: Deserialize<'a> + Send + 'a,
57 V: Deserialize<'a> + Send + 'a,
58{
59 self.raw_stream_prefix(prefix)
60 .map(result_deserialize::<K, V>)
61}
62
63#[implement(super::Map)]
68pub fn raw_stream_prefix<'a, P>(
69 self: &'a Arc<Self>,
70 prefix: &'a P,
71) -> impl Stream<Item = Result<KeyVal<'_>>> + Send + 'a
72where
73 P: AsRef<[u8]> + ?Sized + Debug + Sync + 'a,
74{
75 self.raw_stream_from(prefix)
76 .try_take_while(|(k, _): &KeyVal<'_>| future::ok(k.starts_with(prefix.as_ref())))
77}