Skip to main content

tuwunel_database/map/
get.rs

1use std::{convert::AsRef, fmt::Debug, sync::Arc};
2
3use futures::{
4	Future, FutureExt, TryFutureExt,
5	future::{Either, ready},
6};
7use rocksdb::{DBPinnableSlice, ReadOptions};
8use tokio::task;
9use tuwunel_core::{Err, Result, err, implement, utils::result::MapExpect};
10
11use crate::{
12	Handle,
13	util::{is_incomplete, map_err, or_else},
14};
15
16/// Fetch a value from the database into cache, returning a reference-handle
17/// asynchronously. The key is referenced directly to perform the query.
18#[implement(super::Map)]
19#[tracing::instrument(skip(self, key), fields(%self), level = "trace")]
20pub fn get<K>(
21	self: &Arc<Self>,
22	key: &K,
23) -> impl Future<Output = Result<Handle<'_>>> + Send + use<'_, K>
24where
25	K: AsRef<[u8]> + Debug + ?Sized,
26{
27	use crate::pool::Get;
28
29	let cached = self.get_cached(key);
30	if matches!(cached, Err(_) | Ok(Some(_))) {
31		return Either::Left(
32			task::consume_budget().map(move |()| cached.map_expect("data found in cache")),
33		);
34	}
35
36	debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete");
37	let cmd = Get {
38		map: self.clone(),
39		key: [key.as_ref().into()].into(),
40		res: None,
41	};
42
43	Either::Right(
44		self.engine
45			.pool
46			.execute_get(cmd)
47			.and_then(|mut res| ready(res.remove(0))),
48	)
49}
50
51/// Fetch a value from the cache without I/O.
52#[implement(super::Map)]
53#[tracing::instrument(skip(self, key), name = "cache", level = "trace")]
54pub(crate) fn get_cached<K>(&self, key: &K) -> Result<Option<Handle<'_>>>
55where
56	K: AsRef<[u8]> + Debug + ?Sized,
57{
58	let res = self.get_blocking_opts(key, &self.cache_read_options);
59	cached_handle_from(res)
60}
61
62/// Fetch a value from the database into cache, returning a reference-handle.
63/// The key is referenced directly to perform the query. This is a thread-
64/// blocking call.
65#[implement(super::Map)]
66#[tracing::instrument(skip(self, key), name = "blocking", level = "trace")]
67pub fn get_blocking<K>(&self, key: &K) -> Result<Handle<'_>>
68where
69	K: AsRef<[u8]> + ?Sized,
70{
71	let res = self.get_blocking_opts(key, &self.read_options);
72	handle_from(res)
73}
74
75#[implement(super::Map)]
76fn get_blocking_opts<K>(
77	&self,
78	key: &K,
79	read_options: &ReadOptions,
80) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>
81where
82	K: AsRef<[u8]> + ?Sized,
83{
84	self.engine
85		.db
86		.get_pinned_cf_opt(&self.cf(), key, read_options)
87}
88
89#[inline]
90pub(super) fn handle_from(
91	result: Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>,
92) -> Result<Handle<'_>> {
93	result
94		.map_err(map_err)?
95		.map(Handle::from)
96		.ok_or(err!(Request(NotFound("Not found in database"))))
97}
98
99#[inline]
100pub(super) fn cached_handle_from(
101	result: Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>,
102) -> Result<Option<Handle<'_>>> {
103	match result {
104		// cache hit; not found
105		| Ok(None) => Err!(Request(NotFound("Not found in database"))),
106
107		// cache hit; value found
108		| Ok(Some(result)) => Ok(Some(Handle::from(result))),
109
110		// cache miss; unknown
111		| Err(error) if is_incomplete(&error) => Ok(None),
112
113		// some other error occurred
114		| Err(error) => or_else(error),
115	}
116}