Skip to main content

tuwunel_database/map/
get_batch.rs

1use std::{convert::AsRef, sync::Arc};
2
3use futures::{Stream, StreamExt, TryStreamExt};
4use rocksdb::{DBPinnableSlice, ReadOptions};
5use tuwunel_core::{
6	Result, implement,
7	utils::{
8		IterStream,
9		stream::{WidebandExt, automatic_amplification, automatic_width},
10	},
11};
12
13use super::get::{cached_handle_from, handle_from};
14use crate::Handle;
15
16pub trait Get<'a, K, S>
17where
18	Self: Sized,
19	S: Stream<Item = K> + Send + 'a,
20	K: AsRef<[u8]> + Send + Sync + 'a,
21{
22	fn get(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a;
23}
24
25impl<'a, K, S> Get<'a, K, S> for S
26where
27	Self: Sized,
28	S: Stream<Item = K> + Send + 'a,
29	K: AsRef<[u8]> + Send + Sync + 'a,
30{
31	#[inline]
32	fn get(self, map: &'a Arc<super::Map>) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a {
33		map.get_batch(self)
34	}
35}
36
37#[implement(super::Map)]
38#[tracing::instrument(skip(self, keys), level = "trace")]
39pub(crate) fn get_batch<'a, S, K>(
40	self: &'a Arc<Self>,
41	keys: S,
42) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
43where
44	S: Stream<Item = K> + Send + 'a,
45	K: AsRef<[u8]> + Send + Sync + 'a,
46{
47	use crate::pool::Get;
48
49	keys.ready_chunks(automatic_amplification())
50		.widen_then(automatic_width(), |chunk| {
51			self.engine.pool.execute_get(Get {
52				map: self.clone(),
53				res: None,
54				key: chunk
55					.iter()
56					.map(AsRef::as_ref)
57					.map(Into::into)
58					.collect(),
59			})
60		})
61		.map_ok(|results| results.into_iter().stream())
62		.try_flatten()
63}
64
65#[implement(super::Map)]
66#[tracing::instrument(name = "batch_cached", level = "trace", skip_all)]
67pub(crate) fn _get_batch_cached<'a, I, K>(
68	&self,
69	keys: I,
70) -> impl Iterator<Item = Result<Option<Handle<'_>>>> + Send + use<'_, I, K>
71where
72	I: Iterator<Item = &'a K> + ExactSizeIterator + Send,
73	K: AsRef<[u8]> + Send + ?Sized + Sync + 'a,
74{
75	self.get_batch_blocking_opts(keys, &self.cache_read_options)
76		.map(cached_handle_from)
77}
78
79#[implement(super::Map)]
80#[tracing::instrument(name = "batch_blocking", level = "trace", skip_all)]
81pub(crate) fn get_batch_blocking<'a, I, K>(
82	&self,
83	keys: I,
84) -> impl Iterator<Item = Result<Handle<'_>>> + Send + use<'_, I, K>
85where
86	I: Iterator<Item = &'a K> + ExactSizeIterator + Send,
87	K: AsRef<[u8]> + Send + ?Sized + Sync + 'a,
88{
89	self.get_batch_blocking_opts(keys, &self.read_options)
90		.map(handle_from)
91}
92
93#[implement(super::Map)]
94fn get_batch_blocking_opts<'a, I, K>(
95	&self,
96	keys: I,
97	read_options: &ReadOptions,
98) -> impl Iterator<Item = Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>> + Send + use<'_, I, K>
99where
100	I: Iterator<Item = &'a K> + ExactSizeIterator + Send,
101	K: AsRef<[u8]> + Send + ?Sized + Sync + 'a,
102{
103	// Optimization can be `true` if key vector is pre-sorted **by the column
104	// comparator**.
105	const SORTED: bool = false;
106
107	self.engine
108		.db
109		.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
110		.into_iter()
111}