Skip to main content

tuwunel_database/
engine.rs

1mod backup;
2mod cf_opts;
3pub(crate) mod context;
4mod db_opts;
5pub(crate) mod descriptor;
6mod events;
7mod files;
8mod logger;
9mod memory_usage;
10mod open;
11mod repair;
12#[cfg(test)]
13mod tests;
14
15use std::{
16	ffi::CStr,
17	sync::{
18		Arc,
19		atomic::{AtomicU32, Ordering},
20	},
21};
22
23use rocksdb::{
24	AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded,
25	WaitForCompactOptions,
26};
27use tuwunel_core::{Err, Result, debug, info, warn};
28
29use crate::{
30	Context,
31	pool::Pool,
32	util::{map_err, result},
33};
34
35pub struct Engine {
36	pub(crate) db: Db,
37	pub(crate) pool: Arc<Pool>,
38	pub(crate) ctx: Arc<Context>,
39	pub(super) read_only: bool,
40	pub(super) secondary: bool,
41	pub(crate) checksums: bool,
42	corks: AtomicU32,
43}
44
45pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
46
47impl Engine {
48	#[tracing::instrument(
49		level = "info",
50		skip_all,
51		fields(
52			sequence = ?self.current_sequence(),
53		),
54	)]
55	pub fn wait_compactions_blocking(&self) -> Result {
56		let mut opts = WaitForCompactOptions::default();
57		opts.set_abort_on_pause(true);
58		opts.set_flush(false);
59		opts.set_timeout(0);
60
61		self.db.wait_for_compact(&opts).map_err(map_err)
62	}
63
64	#[tracing::instrument(
65		level = "info",
66		skip_all,
67		fields(
68			sequence = ?self.current_sequence(),
69		),
70	)]
71	pub fn sort(&self) -> Result {
72		let flushoptions = rocksdb::FlushOptions::default();
73		result(DBCommon::flush_opt(&self.db, &flushoptions))
74	}
75
76	#[tracing::instrument(
77		level = "debug",
78		skip_all,
79		fields(
80			sequence = ?self.current_sequence(),
81		),
82	)]
83	pub fn update(&self) -> Result {
84		self.db
85			.try_catch_up_with_primary()
86			.map_err(map_err)
87	}
88
89	#[tracing::instrument(level = "info", skip_all)]
90	pub fn sync(&self) -> Result { result(DBCommon::flush_wal(&self.db, true)) }
91
92	#[tracing::instrument(level = "debug", skip_all)]
93	pub fn flush(&self) -> Result { result(DBCommon::flush_wal(&self.db, false)) }
94
95	#[inline]
96	pub(crate) fn cork(&self) { self.corks.fetch_add(1, Ordering::Relaxed); }
97
98	#[inline]
99	pub(crate) fn uncork(&self) { self.corks.fetch_sub(1, Ordering::Relaxed); }
100
101	#[inline]
102	pub fn corked(&self) -> bool { self.corks.load(Ordering::Relaxed) > 0 }
103
104	/// Query for database property by null-terminated name which is expected to
105	/// have a result with an integer representation. This is intended for
106	/// low-overhead programmatic use.
107	pub(crate) fn property_integer(
108		&self,
109		cf: &impl AsColumnFamilyRef,
110		name: &CStr,
111	) -> Result<u64> {
112		result(self.db.property_int_value_cf(cf, name))
113			.and_then(|val| val.map_or_else(|| Err!("Property {name:?} not found."), Ok))
114	}
115
116	/// Query for database property by name receiving the result in a string.
117	pub(crate) fn property(&self, cf: &impl AsColumnFamilyRef, name: &str) -> Result<String> {
118		result(self.db.property_value_cf(cf, name))
119			.and_then(|val| val.map_or_else(|| Err!("Property {name:?} not found."), Ok))
120	}
121
122	pub(crate) fn cf(&self, name: &str) -> Arc<BoundColumnFamily<'_>> {
123		self.db
124			.cf_handle(name)
125			.expect("column must be described prior to database open")
126	}
127
128	#[inline]
129	#[must_use]
130	#[tracing::instrument(
131		name = "sequence",
132		level = "debug",
133		skip_all,
134		fields(sequence)
135	)]
136	pub fn current_sequence(&self) -> u64 {
137		let sequence = self.db.latest_sequence_number();
138
139		#[cfg(debug_assertions)]
140		tracing::Span::current().record("sequence", sequence);
141
142		sequence
143	}
144
145	#[inline]
146	#[must_use]
147	pub fn is_read_only(&self) -> bool { self.secondary || self.read_only }
148
149	#[inline]
150	#[must_use]
151	pub fn is_secondary(&self) -> bool { self.secondary }
152}
153
154impl Drop for Engine {
155	#[cold]
156	fn drop(&mut self) {
157		const BLOCKING: bool = true;
158
159		debug!("Waiting for background tasks to finish...");
160		self.db.cancel_all_background_work(BLOCKING);
161
162		info!(
163			sequence = %self.current_sequence(),
164			"Closing database..."
165		);
166	}
167}