tuwunel_database/
stream.rs1mod items;
2mod items_rev;
3mod keys;
4mod keys_rev;
5
6use std::{mem::replace, sync::Arc};
7
8use rocksdb::{DBRawIteratorWithThreadMode, ReadOptions};
9use tuwunel_core::Result;
10
11pub(crate) use self::{items::Items, items_rev::ItemsRev, keys::Keys, keys_rev::KeysRev};
12use crate::{
13 Map, Slice,
14 engine::Db,
15 keyval::{Key, KeyVal, Val},
16 util::{is_incomplete, map_err},
17};
18
19pub(crate) struct State<'a> {
20 inner: Inner<'a>,
21 seek: bool,
22 init: bool,
23}
24
25pub(crate) trait Cursor<'a, T>: Send {
26 fn state(&self) -> &State<'a>;
27
28 fn state_mut(&mut self) -> &mut State<'a>;
29
30 fn count(&self) -> (usize, Option<usize>);
31
32 fn fetch(&self) -> Option<T>;
33
34 fn seek(&mut self);
35
36 #[inline]
37 fn get(&self) -> Option<Result<T>> {
38 self.fetch()
39 .map(Ok)
40 .or_else(|| self.state().status().map(map_err).map(Err))
41 }
42
43 #[inline]
44 fn seek_and_get(&mut self) -> Option<Result<T>> {
45 self.seek();
46 self.get()
47 }
48}
49
50type Inner<'a> = DBRawIteratorWithThreadMode<'a, Db>;
51type From<'a> = Option<Key<'a>>;
52
53impl<'a> State<'a> {
54 #[inline]
55 pub(super) fn new(map: &'a Arc<Map>, opts: ReadOptions) -> Self {
56 Self {
57 init: true,
58 seek: false,
59 inner: map
60 .engine()
61 .db
62 .raw_iterator_cf_opt(&map.cf(), opts),
63 }
64 }
65
66 #[inline]
67 #[tracing::instrument(level = "trace", skip_all)]
68 pub(super) fn init_fwd(mut self, from: From<'_>) -> Self {
69 debug_assert!(self.init, "init must be set to make this call");
70 debug_assert!(!self.seek, "seek must not be set to make this call");
71
72 if let Some(key) = from {
73 self.inner.seek(key);
74 } else {
75 self.inner.seek_to_first();
76 }
77
78 self.seek = true;
79 self
80 }
81
82 #[inline]
83 #[tracing::instrument(level = "trace", skip_all)]
84 pub(super) fn init_rev(mut self, from: From<'_>) -> Self {
85 debug_assert!(self.init, "init must be set to make this call");
86 debug_assert!(!self.seek, "seek must not be set to make this call");
87
88 if let Some(key) = from {
89 self.inner.seek_for_prev(key);
90 } else {
91 self.inner.seek_to_last();
92 }
93
94 self.seek = true;
95 self
96 }
97
98 #[inline]
99 #[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
100 pub(super) fn seek_fwd(&mut self) {
101 if !replace(&mut self.init, false) {
102 self.inner.next();
103 } else if !self.seek {
104 self.inner.seek_to_first();
105 }
106 }
107
108 #[inline]
109 #[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
110 pub(super) fn seek_rev(&mut self) {
111 if !replace(&mut self.init, false) {
112 self.inner.prev();
113 } else if !self.seek {
114 self.inner.seek_to_last();
115 }
116 }
117
118 #[inline]
119 #[expect(clippy::unused_self)]
120 #[tracing::instrument(level = "trace", skip_all, ret)]
121 pub(super) fn count_fwd(&self) -> (usize, Option<usize>) { (0, None) }
122
123 #[inline]
124 #[expect(clippy::unused_self)]
125 #[tracing::instrument(level = "trace", skip_all, ret)]
126 pub(super) fn count_rev(&self) -> (usize, Option<usize>) { (0, None) }
127
128 #[inline]
129 fn fetch_key(&self) -> Option<Key<'_>> { self.inner.key() }
130
131 #[inline]
132 fn _fetch_val(&self) -> Option<Val<'_>> { self.inner.value() }
133
134 #[inline]
135 fn fetch(&self) -> Option<KeyVal<'_>> { self.inner.item() }
136
137 pub(super) fn is_incomplete(&self) -> bool {
138 matches!(self.status(), Some(e) if is_incomplete(&e))
139 }
140
141 #[inline]
142 pub(super) fn status(&self) -> Option<rocksdb::Error> { self.inner.status().err() }
143
144 #[inline]
145 pub(super) fn valid(&self) -> bool { self.inner.valid() }
146}
147
148fn keyval_longevity<'a, 'b: 'a>(item: KeyVal<'a>) -> KeyVal<'b> {
149 (slice_longevity::<'a, 'b>(item.0), slice_longevity::<'a, 'b>(item.1))
150}
151
152fn slice_longevity<'a, 'b: 'a>(item: &'a Slice) -> &'b Slice {
153 unsafe { std::mem::transmute(item) }
169}