Skip to main content

tuwunel_database/engine/
context.rs

1use std::{
2	collections::BTreeMap,
3	fs::remove_dir_all,
4	path::Path,
5	sync::{Arc, Mutex},
6};
7
8use rocksdb::{Cache, Env, LruCacheOptions};
9use tuwunel_core::{
10	Result, Server, debug,
11	utils::{math::usize_from_f64, result::LogErr},
12};
13
14use crate::{or_else, pool::Pool};
15
16/// Name under which the shared block cache (every CF with
17/// `CacheDisp::Shared`) is registered in [`Context::col_cache`].
18pub(crate) const SHARED_POOL: &str = "Shared";
19
20/// One block-cache pool, plus the column families participating in it.
21///
22/// Pools may be shared by multiple CFs (`SHARED_POOL`, symmetric
23/// `CacheDisp::SharedWith` pairs); the participant list lets the admin
24/// surface name them.
25pub(crate) struct ColCache {
26	pub(crate) cache: Cache,
27	pub(crate) participants: Vec<&'static str>,
28}
29
30/// Map of block-cache pools keyed by pool name. The pool name is either
31/// `SHARED_POOL` or the first-arrival CF that created it.
32pub(crate) type ColCaches = BTreeMap<&'static str, ColCache>;
33
34/// Some components are constructed prior to opening the database and must
35/// outlive the database. These can also be shared between database instances
36/// though at the time of this comment we only open one database per process.
37/// These assets are housed in the shared Context.
38pub(crate) struct Context {
39	pub(crate) pool: Arc<Pool>,
40	pub(crate) col_cache: Mutex<ColCaches>,
41	pub(crate) row_cache: Mutex<Cache>,
42	/// Retained because rust-rocksdb's `Cache` binding lacks `get_capacity`;
43	/// needed for the admin renderer's row-cache util%.
44	pub(crate) row_cache_capacity: usize,
45	pub(crate) env: Mutex<Env>,
46	pub(crate) server: Arc<Server>,
47}
48
49impl Context {
50	pub(crate) fn new(server: &Arc<Server>) -> Result<Arc<Self>> {
51		let config = &server.config;
52		let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0;
53
54		let col_shard_bits = 7;
55		let col_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?;
56
57		let row_shard_bits = 7;
58		let row_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?;
59
60		let mut row_cache_opts = LruCacheOptions::default();
61		row_cache_opts.set_num_shard_bits(row_shard_bits);
62		row_cache_opts.set_capacity(row_cache_capacity_bytes);
63		let row_cache = Cache::new_lru_cache_opts(&row_cache_opts);
64
65		let mut col_cache_opts = LruCacheOptions::default();
66		col_cache_opts.set_num_shard_bits(col_shard_bits);
67		col_cache_opts.set_capacity(col_cache_capacity_bytes);
68		let col_cache = Cache::new_lru_cache_opts(&col_cache_opts);
69		let shared = ColCache {
70			cache: col_cache,
71			participants: Vec::new(),
72		};
73		let col_cache: ColCaches = [(SHARED_POOL, shared)].into();
74
75		let mut env = Env::new().or_else(or_else)?;
76
77		if config.rocksdb_compaction_prio_idle {
78			env.lower_thread_pool_cpu_priority();
79		}
80
81		if config.rocksdb_compaction_ioprio_idle {
82			env.lower_thread_pool_io_priority();
83		}
84
85		Ok(Arc::new(Self {
86			pool: Pool::new(server)?,
87			col_cache: col_cache.into(),
88			row_cache: row_cache.into(),
89			row_cache_capacity: row_cache_capacity_bytes,
90			env: env.into(),
91			server: server.clone(),
92		}))
93	}
94}
95
96impl Drop for Context {
97	#[cold]
98	fn drop(&mut self) {
99		debug!("Closing frontend pool");
100		self.pool.close();
101
102		let mut env = self.env.lock().expect("locked");
103
104		debug!("Shutting down background threads");
105		env.set_high_priority_background_threads(0);
106		env.set_low_priority_background_threads(0);
107		env.set_bottom_priority_background_threads(0);
108		env.set_background_threads(0);
109
110		debug!("Joining background threads...");
111		env.join_all_threads();
112
113		after_close(self, &self.server.config.database_path)
114			.expect("Failed to execute after_close handler");
115	}
116}
117
118/// For unit and integration tests the 'fresh' directive deletes found db.
119pub(super) fn before_open(ctx: &Arc<Context>, path: &Path) -> Result {
120	if ctx.server.config.test.contains("fresh") {
121		match delete_database_for_testing(ctx, path) {
122			| Err(e) if !e.is_not_found() => return Err(e),
123			| _ => (),
124		}
125	}
126
127	Ok(())
128}
129
130/// For unit and integration tests the 'cleanup' directive deletes after close
131/// to cleanup.
132fn after_close(ctx: &Context, path: &Path) -> Result {
133	if ctx.server.config.test.contains("cleanup") {
134		delete_database_for_testing(ctx, path)
135			.log_err()
136			.ok();
137	}
138
139	Ok(())
140}
141
142/// For unit and integration tests; removes the database directory when called.
143/// To prevent misuse, cfg!(test) must be true for a unit test or the
144/// integration test server is named localhost.
145#[tracing::instrument(level = "debug", skip_all)]
146fn delete_database_for_testing(ctx: &Context, path: &Path) -> Result {
147	let config = &ctx.server.config;
148	let localhost = config
149		.server_name
150		.as_str()
151		.starts_with("localhost");
152
153	if !cfg!(test) && !localhost {
154		return Ok(());
155	}
156
157	debug_assert!(
158		config.test.contains("cleanup") | config.test.contains("fresh"),
159		"missing any test directive legitimating this call.",
160	);
161
162	remove_dir_all(path).map_err(Into::into)
163}