Skip to main content

tuwunel_database/
stream.rs

1mod items;
2mod items_rev;
3mod keys;
4mod keys_rev;
5
6use std::{mem::replace, sync::Arc};
7
8use rocksdb::{DBRawIteratorWithThreadMode, ReadOptions};
9use tuwunel_core::Result;
10
11pub(crate) use self::{items::Items, items_rev::ItemsRev, keys::Keys, keys_rev::KeysRev};
12use crate::{
13	Map, Slice,
14	engine::Db,
15	keyval::{Key, KeyVal, Val},
16	util::{is_incomplete, map_err},
17};
18
19pub(crate) struct State<'a> {
20	inner: Inner<'a>,
21	seek: bool,
22	init: bool,
23}
24
25pub(crate) trait Cursor<'a, T>: Send {
26	fn state(&self) -> &State<'a>;
27
28	fn state_mut(&mut self) -> &mut State<'a>;
29
30	fn count(&self) -> (usize, Option<usize>);
31
32	fn fetch(&self) -> Option<T>;
33
34	fn seek(&mut self);
35
36	#[inline]
37	fn get(&self) -> Option<Result<T>> {
38		self.fetch()
39			.map(Ok)
40			.or_else(|| self.state().status().map(map_err).map(Err))
41	}
42
43	#[inline]
44	fn seek_and_get(&mut self) -> Option<Result<T>> {
45		self.seek();
46		self.get()
47	}
48}
49
50type Inner<'a> = DBRawIteratorWithThreadMode<'a, Db>;
51type From<'a> = Option<Key<'a>>;
52
53impl<'a> State<'a> {
54	#[inline]
55	pub(super) fn new(map: &'a Arc<Map>, opts: ReadOptions) -> Self {
56		Self {
57			init: true,
58			seek: false,
59			inner: map
60				.engine()
61				.db
62				.raw_iterator_cf_opt(&map.cf(), opts),
63		}
64	}
65
66	#[inline]
67	#[tracing::instrument(level = "trace", skip_all)]
68	pub(super) fn init_fwd(mut self, from: From<'_>) -> Self {
69		debug_assert!(self.init, "init must be set to make this call");
70		debug_assert!(!self.seek, "seek must not be set to make this call");
71
72		if let Some(key) = from {
73			self.inner.seek(key);
74		} else {
75			self.inner.seek_to_first();
76		}
77
78		self.seek = true;
79		self
80	}
81
82	#[inline]
83	#[tracing::instrument(level = "trace", skip_all)]
84	pub(super) fn init_rev(mut self, from: From<'_>) -> Self {
85		debug_assert!(self.init, "init must be set to make this call");
86		debug_assert!(!self.seek, "seek must not be set to make this call");
87
88		if let Some(key) = from {
89			self.inner.seek_for_prev(key);
90		} else {
91			self.inner.seek_to_last();
92		}
93
94		self.seek = true;
95		self
96	}
97
98	#[inline]
99	#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
100	pub(super) fn seek_fwd(&mut self) {
101		if !replace(&mut self.init, false) {
102			self.inner.next();
103		} else if !self.seek {
104			self.inner.seek_to_first();
105		}
106	}
107
108	#[inline]
109	#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
110	pub(super) fn seek_rev(&mut self) {
111		if !replace(&mut self.init, false) {
112			self.inner.prev();
113		} else if !self.seek {
114			self.inner.seek_to_last();
115		}
116	}
117
118	#[inline]
119	#[expect(clippy::unused_self)]
120	#[tracing::instrument(level = "trace", skip_all, ret)]
121	pub(super) fn count_fwd(&self) -> (usize, Option<usize>) { (0, None) }
122
123	#[inline]
124	#[expect(clippy::unused_self)]
125	#[tracing::instrument(level = "trace", skip_all, ret)]
126	pub(super) fn count_rev(&self) -> (usize, Option<usize>) { (0, None) }
127
128	#[inline]
129	fn fetch_key(&self) -> Option<Key<'_>> { self.inner.key() }
130
131	#[inline]
132	fn _fetch_val(&self) -> Option<Val<'_>> { self.inner.value() }
133
134	#[inline]
135	fn fetch(&self) -> Option<KeyVal<'_>> { self.inner.item() }
136
137	pub(super) fn is_incomplete(&self) -> bool {
138		matches!(self.status(), Some(e) if is_incomplete(&e))
139	}
140
141	#[inline]
142	pub(super) fn status(&self) -> Option<rocksdb::Error> { self.inner.status().err() }
143
144	#[inline]
145	pub(super) fn valid(&self) -> bool { self.inner.valid() }
146}
147
148fn keyval_longevity<'a, 'b: 'a>(item: KeyVal<'a>) -> KeyVal<'b> {
149	(slice_longevity::<'a, 'b>(item.0), slice_longevity::<'a, 'b>(item.1))
150}
151
152fn slice_longevity<'a, 'b: 'a>(item: &'a Slice) -> &'b Slice {
153	// SAFETY: The lifetime of the data returned by the rocksdb cursor is only valid
154	// between each movement of the cursor. It is hereby unsafely extended to match
155	// the lifetime of the cursor itself. This is due to the limitation of the
156	// Stream trait where the Item is incapable of conveying a lifetime; this is due
157	// to GAT's being unstable during its development. This unsafety can be removed
158	// as soon as this limitation is addressed by an upcoming version.
159	//
160	// We have done our best to mitigate the implications of this in conjunction
161	// with the deserialization API such that borrows being held across movements of
162	// the cursor do not happen accidentally. The compiler will still error when
163	// values herein produced try to leave a closure passed to a StreamExt API. But
164	// escapes can happen if you explicitly and intentionally attempt it, and there
165	// will be no compiler error or warning. This is primarily the case with
166	// calling collect() without a preceding map(ToOwned::to_owned). A collection
167	// of references here is illegal, but this will not be enforced by the compiler.
168	unsafe { std::mem::transmute(item) }
169}