tuwunel_database/map/
qry_batch.rs1use std::{fmt::Debug, sync::Arc};
2
3use futures::{Stream, StreamExt, TryStreamExt};
4use serde::Serialize;
5use tuwunel_core::{
6 Result, implement,
7 utils::{
8 IterStream,
9 stream::{WidebandExt, automatic_amplification, automatic_width},
10 },
11};
12
13use crate::{Handle, keyval::KeyBuf, ser};
14
15pub trait Qry<'a, K, S>
16where
17 S: Stream<Item = K> + Send + 'a,
18 K: Serialize + Debug,
19{
20 fn qry(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a;
21}
22
23impl<'a, K, S> Qry<'a, K, S> for S
24where
25 Self: 'a,
26 S: Stream<Item = K> + Send + 'a,
27 K: Serialize + Debug + 'a,
28{
29 #[inline]
30 fn qry(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a {
31 map.qry_batch(self)
32 }
33}
34
35#[implement(super::Map)]
36#[tracing::instrument(skip(self, keys), level = "trace")]
37pub(crate) fn qry_batch<'a, S, K>(
38 self: &'a Arc<Self>,
39 keys: S,
40) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
41where
42 S: Stream<Item = K> + Send + 'a,
43 K: Serialize + Debug + 'a,
44{
45 use crate::pool::Get;
46
47 keys.ready_chunks(automatic_amplification())
48 .widen_then(automatic_width(), |chunk| {
49 let keys = chunk
50 .iter()
51 .map(ser::serialize_to::<KeyBuf, _>)
52 .map(|result| result.expect("failed to serialize query key"))
53 .collect();
54
55 self.engine
56 .pool
57 .execute_get(Get { map: self.clone(), key: keys, res: None })
58 })
59 .map_ok(|results| results.into_iter().stream())
60 .try_flatten()
61}