Skip to main content

tuwunel_database/
engine.rs

1//! RocksDB engine: database-wide operations and shared resources.
2//!
3//! `Engine` owns the opened RocksDB instance together with the worker pool, the
4//! shared open-time context, and the flags fixed at open (read-only, secondary,
5//! checksums). Per-column-family reads and writes go through `Map`; the methods
6//! here act on the database as a whole: WAL flush and sync, memtable flush,
7//! manual compaction and primary catch-up, property queries, and the cork
8//! counter that coalesces WAL writes (see the `cork` module).
9
10mod backup;
11mod cf_opts;
12pub(crate) mod context;
13mod db_opts;
14pub(crate) mod descriptor;
15mod events;
16mod files;
17mod logger;
18mod memory_usage;
19mod open;
20mod repair;
21#[cfg(test)]
22mod tests;
23
24use std::{
25	ffi::CStr,
26	sync::{
27		Arc,
28		atomic::{AtomicU32, Ordering},
29	},
30};
31
32use rocksdb::{
33	AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded,
34	WaitForCompactOptions,
35};
36use tuwunel_core::{Err, Result, debug, info, warn};
37
38use crate::{
39	Context,
40	pool::Pool,
41	util::{map_err, result},
42};
43
44/// Handle to the opened RocksDB database and its shared resources.
45///
46/// One `Engine` exists per database, shared behind an `Arc` by every `Map`.
47pub struct Engine {
48	/// The opened RocksDB instance.
49	pub(crate) db: Db,
50
51	/// Thread pool offloading uncached, blocking database requests from the
52	/// tokio workers.
53	pub(crate) pool: Arc<Pool>,
54
55	/// Resources constructed before the database is opened and outliving it
56	/// (block caches, environment, column descriptors).
57	pub(crate) ctx: Arc<Context>,
58
59	/// Database was opened read-only; writes are rejected.
60	pub(super) read_only: bool,
61
62	/// Database was opened as a secondary follower of a primary instance.
63	pub(super) secondary: bool,
64
65	/// Verify block checksums on read.
66	pub(crate) checksums: bool,
67
68	/// Live cork count; nonzero suppresses the per-write WAL flush.
69	corks: AtomicU32,
70}
71
72/// Backing RocksDB type: multi-threaded column-family access, no transactions.
73pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
74
75impl Engine {
76	/// Block until outstanding background compactions finish.
77	///
78	/// Waits without a timeout and does not flush first; aborts the wait if
79	/// compaction has been paused.
80	#[tracing::instrument(
81		level = "info",
82		skip_all,
83		fields(
84			sequence = ?self.current_sequence(),
85		),
86	)]
87	pub fn wait_compactions_blocking(&self) -> Result {
88		let mut opts = WaitForCompactOptions::default();
89		opts.set_abort_on_pause(true);
90		opts.set_flush(false);
91		opts.set_timeout(0);
92
93		self.db.wait_for_compact(&opts).map_err(map_err)
94	}
95
96	/// Flush the memtables to SST files.
97	///
98	/// Forces buffered writes out of memory into the on-disk LSM tree; distinct
99	/// from `flush` and `sync`, which act on the write-ahead log.
100	#[tracing::instrument(
101		level = "info",
102		skip_all,
103		fields(
104			sequence = ?self.current_sequence(),
105		),
106	)]
107	pub fn sort(&self) -> Result {
108		let flushoptions = rocksdb::FlushOptions::default();
109		result(DBCommon::flush_opt(&self.db, &flushoptions))
110	}
111
112	/// Catch a secondary instance up to the primary's latest writes.
113	///
114	/// Replays the primary's newly appended WAL into this instance's view;
115	/// meaningful only when the database was opened as a secondary.
116	#[tracing::instrument(
117		level = "debug",
118		skip_all,
119		fields(
120			sequence = ?self.current_sequence(),
121		),
122	)]
123	pub fn update(&self) -> Result {
124		self.db
125			.try_catch_up_with_primary()
126			.map_err(map_err)
127	}
128
129	/// Flush the write-ahead log and fsync it to disk.
130	///
131	/// Once this returns the buffered writes survive power loss. Heavier than
132	/// `flush`, which stops at the OS page cache.
133	#[tracing::instrument(level = "info", skip_all)]
134	pub fn sync(&self) -> Result { result(DBCommon::flush_wal(&self.db, true)) }
135
136	/// Flush the buffered write-ahead log to the OS without an fsync.
137	///
138	/// Pushes WAL bytes to the page cache (durable against process crash, not
139	/// power loss). This is the per-write flush that corking suppresses.
140	#[tracing::instrument(level = "debug", skip_all)]
141	pub fn flush(&self) -> Result { result(DBCommon::flush_wal(&self.db, false)) }
142
143	/// Increment the cork count, suppressing the per-write WAL flush.
144	#[inline]
145	pub(crate) fn cork(&self) { self.corks.fetch_add(1, Ordering::Relaxed); }
146
147	/// Decrement the cork count; the per-write flush resumes at zero.
148	#[inline]
149	pub(crate) fn uncork(&self) { self.corks.fetch_sub(1, Ordering::Relaxed); }
150
151	/// Whether any cork is currently held.
152	///
153	/// When true, `Map` insert and remove skip their post-write WAL flush so
154	/// the records coalesce into one batch. Corking is purely a backend
155	/// write-buffering signal: it never changes application logic or any
156	/// observable database API behavior, because a write lands in the memtable
157	/// synchronously and reads back regardless of WAL flush state. See the
158	/// `cork` module.
159	#[inline]
160	pub fn corked(&self) -> bool { self.corks.load(Ordering::Relaxed) > 0 }
161
162	/// Query for database property by null-terminated name which is expected to
163	/// have a result with an integer representation. This is intended for
164	/// low-overhead programmatic use.
165	pub(crate) fn property_integer(
166		&self,
167		cf: &impl AsColumnFamilyRef,
168		name: &CStr,
169	) -> Result<u64> {
170		result(self.db.property_int_value_cf(cf, name))
171			.and_then(|val| val.map_or_else(|| Err!("Property {name:?} not found."), Ok))
172	}
173
174	/// Query for database property by name receiving the result in a string.
175	pub(crate) fn property(&self, cf: &impl AsColumnFamilyRef, name: &str) -> Result<String> {
176		result(self.db.property_value_cf(cf, name))
177			.and_then(|val| val.map_or_else(|| Err!("Property {name:?} not found."), Ok))
178	}
179
180	/// Look up a column-family handle by name.
181	///
182	/// Panics if the family was not described before the database was opened.
183	pub(crate) fn cf(&self, name: &str) -> Arc<BoundColumnFamily<'_>> {
184		self.db
185			.cf_handle(name)
186			.expect("column must be described prior to database open")
187	}
188
189	/// Whether a column family with this name exists.
190	#[inline]
191	#[must_use]
192	pub fn has_cf(&self, name: &str) -> bool { self.db.cf_handle(name).is_some() }
193
194	/// The latest RocksDB sequence number, a monotonic counter of writes.
195	#[inline]
196	#[must_use]
197	#[tracing::instrument(
198		name = "sequence",
199		level = "debug",
200		skip_all,
201		fields(sequence)
202	)]
203	pub fn current_sequence(&self) -> u64 {
204		let sequence = self.db.latest_sequence_number();
205
206		#[cfg(debug_assertions)]
207		tracing::Span::current().record("sequence", sequence);
208
209		sequence
210	}
211
212	/// Whether writes are rejected: true for a read-only or secondary open.
213	#[inline]
214	#[must_use]
215	pub fn is_read_only(&self) -> bool { self.secondary || self.read_only }
216
217	/// Whether the database was opened as a secondary follower of a primary.
218	#[inline]
219	#[must_use]
220	pub fn is_secondary(&self) -> bool { self.secondary }
221}
222
223impl Drop for Engine {
224	#[cold]
225	fn drop(&mut self) {
226		const BLOCKING: bool = true;
227
228		debug!("Waiting for background tasks to finish...");
229		self.db.cancel_all_background_work(BLOCKING);
230
231		info!(
232			sequence = %self.current_sequence(),
233			"Closing database..."
234		);
235	}
236}