tuwunel_database/map/
get.rs1use std::{convert::AsRef, fmt::Debug, sync::Arc};
2
3use futures::{
4 Future, FutureExt, TryFutureExt,
5 future::{Either, ready},
6};
7use rocksdb::{DBPinnableSlice, ReadOptions};
8use tokio::task;
9use tuwunel_core::{Err, Result, err, implement, utils::result::MapExpect};
10
11use crate::{
12 Handle,
13 util::{is_incomplete, map_err, or_else},
14};
15
16#[implement(super::Map)]
19#[tracing::instrument(skip(self, key), fields(%self), level = "trace")]
20pub fn get<K>(
21 self: &Arc<Self>,
22 key: &K,
23) -> impl Future<Output = Result<Handle<'_>>> + Send + use<'_, K>
24where
25 K: AsRef<[u8]> + Debug + ?Sized,
26{
27 use crate::pool::Get;
28
29 let cached = self.get_cached(key);
30 if matches!(cached, Err(_) | Ok(Some(_))) {
31 return Either::Left(
32 task::consume_budget().map(move |()| cached.map_expect("data found in cache")),
33 );
34 }
35
36 debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete");
37 let cmd = Get {
38 map: self.clone(),
39 key: [key.as_ref().into()].into(),
40 res: None,
41 };
42
43 Either::Right(
44 self.engine
45 .pool
46 .execute_get(cmd)
47 .and_then(|mut res| ready(res.remove(0))),
48 )
49}
50
51#[implement(super::Map)]
53#[tracing::instrument(skip(self, key), name = "cache", level = "trace")]
54pub(crate) fn get_cached<K>(&self, key: &K) -> Result<Option<Handle<'_>>>
55where
56 K: AsRef<[u8]> + Debug + ?Sized,
57{
58 let res = self.get_blocking_opts(key, &self.cache_read_options);
59 cached_handle_from(res)
60}
61
62#[implement(super::Map)]
66#[tracing::instrument(skip(self, key), name = "blocking", level = "trace")]
67pub fn get_blocking<K>(&self, key: &K) -> Result<Handle<'_>>
68where
69 K: AsRef<[u8]> + ?Sized,
70{
71 let res = self.get_blocking_opts(key, &self.read_options);
72 handle_from(res)
73}
74
75#[implement(super::Map)]
76fn get_blocking_opts<K>(
77 &self,
78 key: &K,
79 read_options: &ReadOptions,
80) -> Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>
81where
82 K: AsRef<[u8]> + ?Sized,
83{
84 self.engine
85 .db
86 .get_pinned_cf_opt(&self.cf(), key, read_options)
87}
88
89#[inline]
90pub(super) fn handle_from(
91 result: Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>,
92) -> Result<Handle<'_>> {
93 result
94 .map_err(map_err)?
95 .map(Handle::from)
96 .ok_or(err!(Request(NotFound("Not found in database"))))
97}
98
99#[inline]
100pub(super) fn cached_handle_from(
101 result: Result<Option<DBPinnableSlice<'_>>, rocksdb::Error>,
102) -> Result<Option<Handle<'_>>> {
103 match result {
104 | Ok(None) => Err!(Request(NotFound("Not found in database"))),
106
107 | Ok(Some(result)) => Ok(Some(Handle::from(result))),
109
110 | Err(error) if is_incomplete(&error) => Ok(None),
112
113 | Err(error) => or_else(error),
115 }
116}