tuwunel_database/
engine.rs1mod backup;
11mod cf_opts;
12pub(crate) mod context;
13mod db_opts;
14pub(crate) mod descriptor;
15mod events;
16mod files;
17mod logger;
18mod memory_usage;
19mod open;
20mod repair;
21#[cfg(test)]
22mod tests;
23
24use std::{
25 ffi::CStr,
26 sync::{
27 Arc,
28 atomic::{AtomicU32, Ordering},
29 },
30};
31
32use rocksdb::{
33 AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded,
34 WaitForCompactOptions,
35};
36use tuwunel_core::{Err, Result, debug, info, warn};
37
38use crate::{
39 Context,
40 pool::Pool,
41 util::{map_err, result},
42};
43
44pub struct Engine {
48 pub(crate) db: Db,
50
51 pub(crate) pool: Arc<Pool>,
54
55 pub(crate) ctx: Arc<Context>,
58
59 pub(super) read_only: bool,
61
62 pub(super) secondary: bool,
64
65 pub(crate) checksums: bool,
67
68 corks: AtomicU32,
70}
71
72pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
74
75impl Engine {
76 #[tracing::instrument(
81 level = "info",
82 skip_all,
83 fields(
84 sequence = ?self.current_sequence(),
85 ),
86 )]
87 pub fn wait_compactions_blocking(&self) -> Result {
88 let mut opts = WaitForCompactOptions::default();
89 opts.set_abort_on_pause(true);
90 opts.set_flush(false);
91 opts.set_timeout(0);
92
93 self.db.wait_for_compact(&opts).map_err(map_err)
94 }
95
96 #[tracing::instrument(
101 level = "info",
102 skip_all,
103 fields(
104 sequence = ?self.current_sequence(),
105 ),
106 )]
107 pub fn sort(&self) -> Result {
108 let flushoptions = rocksdb::FlushOptions::default();
109 result(DBCommon::flush_opt(&self.db, &flushoptions))
110 }
111
112 #[tracing::instrument(
117 level = "debug",
118 skip_all,
119 fields(
120 sequence = ?self.current_sequence(),
121 ),
122 )]
123 pub fn update(&self) -> Result {
124 self.db
125 .try_catch_up_with_primary()
126 .map_err(map_err)
127 }
128
129 #[tracing::instrument(level = "info", skip_all)]
134 pub fn sync(&self) -> Result { result(DBCommon::flush_wal(&self.db, true)) }
135
136 #[tracing::instrument(level = "debug", skip_all)]
141 pub fn flush(&self) -> Result { result(DBCommon::flush_wal(&self.db, false)) }
142
143 #[inline]
145 pub(crate) fn cork(&self) { self.corks.fetch_add(1, Ordering::Relaxed); }
146
147 #[inline]
149 pub(crate) fn uncork(&self) { self.corks.fetch_sub(1, Ordering::Relaxed); }
150
151 #[inline]
160 pub fn corked(&self) -> bool { self.corks.load(Ordering::Relaxed) > 0 }
161
162 pub(crate) fn property_integer(
166 &self,
167 cf: &impl AsColumnFamilyRef,
168 name: &CStr,
169 ) -> Result<u64> {
170 result(self.db.property_int_value_cf(cf, name))
171 .and_then(|val| val.map_or_else(|| Err!("Property {name:?} not found."), Ok))
172 }
173
174 pub(crate) fn property(&self, cf: &impl AsColumnFamilyRef, name: &str) -> Result<String> {
176 result(self.db.property_value_cf(cf, name))
177 .and_then(|val| val.map_or_else(|| Err!("Property {name:?} not found."), Ok))
178 }
179
180 pub(crate) fn cf(&self, name: &str) -> Arc<BoundColumnFamily<'_>> {
184 self.db
185 .cf_handle(name)
186 .expect("column must be described prior to database open")
187 }
188
189 #[inline]
191 #[must_use]
192 pub fn has_cf(&self, name: &str) -> bool { self.db.cf_handle(name).is_some() }
193
194 #[inline]
196 #[must_use]
197 #[tracing::instrument(
198 name = "sequence",
199 level = "debug",
200 skip_all,
201 fields(sequence)
202 )]
203 pub fn current_sequence(&self) -> u64 {
204 let sequence = self.db.latest_sequence_number();
205
206 #[cfg(debug_assertions)]
207 tracing::Span::current().record("sequence", sequence);
208
209 sequence
210 }
211
212 #[inline]
214 #[must_use]
215 pub fn is_read_only(&self) -> bool { self.secondary || self.read_only }
216
217 #[inline]
219 #[must_use]
220 pub fn is_secondary(&self) -> bool { self.secondary }
221}
222
223impl Drop for Engine {
224 #[cold]
225 fn drop(&mut self) {
226 const BLOCKING: bool = true;
227
228 debug!("Waiting for background tasks to finish...");
229 self.db.cancel_all_background_work(BLOCKING);
230
231 info!(
232 sequence = %self.current_sequence(),
233 "Closing database..."
234 );
235 }
236}