Skip to main content

tuwunel_database/map/
qry_batch.rs

1use std::{fmt::Debug, sync::Arc};
2
3use futures::{Stream, StreamExt, TryStreamExt};
4use serde::Serialize;
5use tuwunel_core::{
6	Result, implement,
7	utils::{
8		IterStream,
9		stream::{WidebandExt, automatic_amplification, automatic_width},
10	},
11};
12
13use crate::{Handle, keyval::KeyBuf, ser};
14
15pub trait Qry<'a, K, S>
16where
17	S: Stream<Item = K> + Send + 'a,
18	K: Serialize + Debug,
19{
20	fn qry(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a;
21}
22
23impl<'a, K, S> Qry<'a, K, S> for S
24where
25	Self: 'a,
26	S: Stream<Item = K> + Send + 'a,
27	K: Serialize + Debug + 'a,
28{
29	#[inline]
30	fn qry(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a {
31		map.qry_batch(self)
32	}
33}
34
35#[implement(super::Map)]
36#[tracing::instrument(skip(self, keys), level = "trace")]
37pub(crate) fn qry_batch<'a, S, K>(
38	self: &'a Arc<Self>,
39	keys: S,
40) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
41where
42	S: Stream<Item = K> + Send + 'a,
43	K: Serialize + Debug + 'a,
44{
45	use crate::pool::Get;
46
47	keys.ready_chunks(automatic_amplification())
48		.widen_then(automatic_width(), |chunk| {
49			let keys = chunk
50				.iter()
51				.map(ser::serialize_to::<KeyBuf, _>)
52				.map(|result| result.expect("failed to serialize query key"))
53				.collect();
54
55			self.engine
56				.pool
57				.execute_get(Get { map: self.clone(), key: keys, res: None })
58		})
59		.map_ok(|results| results.into_iter().stream())
60		.try_flatten()
61}