tuwunel_database/
engine.rs1mod backup;
2mod cf_opts;
3pub(crate) mod context;
4mod db_opts;
5pub(crate) mod descriptor;
6mod events;
7mod files;
8mod logger;
9mod memory_usage;
10mod open;
11mod repair;
12#[cfg(test)]
13mod tests;
14
15use std::{
16 ffi::CStr,
17 sync::{
18 Arc,
19 atomic::{AtomicU32, Ordering},
20 },
21};
22
23use rocksdb::{
24 AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded,
25 WaitForCompactOptions,
26};
27use tuwunel_core::{Err, Result, debug, info, warn};
28
29use crate::{
30 Context,
31 pool::Pool,
32 util::{map_err, result},
33};
34
35pub struct Engine {
36 pub(crate) db: Db,
37 pub(crate) pool: Arc<Pool>,
38 pub(crate) ctx: Arc<Context>,
39 pub(super) read_only: bool,
40 pub(super) secondary: bool,
41 pub(crate) checksums: bool,
42 corks: AtomicU32,
43}
44
45pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
46
47impl Engine {
48 #[tracing::instrument(
49 level = "info",
50 skip_all,
51 fields(
52 sequence = ?self.current_sequence(),
53 ),
54 )]
55 pub fn wait_compactions_blocking(&self) -> Result {
56 let mut opts = WaitForCompactOptions::default();
57 opts.set_abort_on_pause(true);
58 opts.set_flush(false);
59 opts.set_timeout(0);
60
61 self.db.wait_for_compact(&opts).map_err(map_err)
62 }
63
64 #[tracing::instrument(
65 level = "info",
66 skip_all,
67 fields(
68 sequence = ?self.current_sequence(),
69 ),
70 )]
71 pub fn sort(&self) -> Result {
72 let flushoptions = rocksdb::FlushOptions::default();
73 result(DBCommon::flush_opt(&self.db, &flushoptions))
74 }
75
76 #[tracing::instrument(
77 level = "debug",
78 skip_all,
79 fields(
80 sequence = ?self.current_sequence(),
81 ),
82 )]
83 pub fn update(&self) -> Result {
84 self.db
85 .try_catch_up_with_primary()
86 .map_err(map_err)
87 }
88
89 #[tracing::instrument(level = "info", skip_all)]
90 pub fn sync(&self) -> Result { result(DBCommon::flush_wal(&self.db, true)) }
91
92 #[tracing::instrument(level = "debug", skip_all)]
93 pub fn flush(&self) -> Result { result(DBCommon::flush_wal(&self.db, false)) }
94
95 #[inline]
96 pub(crate) fn cork(&self) { self.corks.fetch_add(1, Ordering::Relaxed); }
97
98 #[inline]
99 pub(crate) fn uncork(&self) { self.corks.fetch_sub(1, Ordering::Relaxed); }
100
101 #[inline]
102 pub fn corked(&self) -> bool { self.corks.load(Ordering::Relaxed) > 0 }
103
104 pub(crate) fn property_integer(
108 &self,
109 cf: &impl AsColumnFamilyRef,
110 name: &CStr,
111 ) -> Result<u64> {
112 result(self.db.property_int_value_cf(cf, name))
113 .and_then(|val| val.map_or_else(|| Err!("Property {name:?} not found."), Ok))
114 }
115
116 pub(crate) fn property(&self, cf: &impl AsColumnFamilyRef, name: &str) -> Result<String> {
118 result(self.db.property_value_cf(cf, name))
119 .and_then(|val| val.map_or_else(|| Err!("Property {name:?} not found."), Ok))
120 }
121
122 pub(crate) fn cf(&self, name: &str) -> Arc<BoundColumnFamily<'_>> {
123 self.db
124 .cf_handle(name)
125 .expect("column must be described prior to database open")
126 }
127
128 #[inline]
129 #[must_use]
130 #[tracing::instrument(
131 name = "sequence",
132 level = "debug",
133 skip_all,
134 fields(sequence)
135 )]
136 pub fn current_sequence(&self) -> u64 {
137 let sequence = self.db.latest_sequence_number();
138
139 #[cfg(debug_assertions)]
140 tracing::Span::current().record("sequence", sequence);
141
142 sequence
143 }
144
145 #[inline]
146 #[must_use]
147 pub fn is_read_only(&self) -> bool { self.secondary || self.read_only }
148
149 #[inline]
150 #[must_use]
151 pub fn is_secondary(&self) -> bool { self.secondary }
152}
153
154impl Drop for Engine {
155 #[cold]
156 fn drop(&mut self) {
157 const BLOCKING: bool = true;
158
159 debug!("Waiting for background tasks to finish...");
160 self.db.cancel_all_background_work(BLOCKING);
161
162 info!(
163 sequence = %self.current_sequence(),
164 "Closing database..."
165 );
166 }
167}