tuwunel_database/engine/
context.rs1use std::{
2 collections::BTreeMap,
3 fs::remove_dir_all,
4 path::Path,
5 sync::{Arc, Mutex},
6};
7
8use rocksdb::{Cache, Env, LruCacheOptions};
9use tuwunel_core::{
10 Result, Server, debug,
11 utils::{math::usize_from_f64, result::LogErr},
12};
13
14use crate::{or_else, pool::Pool};
15
16pub(crate) const SHARED_POOL: &str = "Shared";
19
20pub(crate) struct ColCache {
26 pub(crate) cache: Cache,
27 pub(crate) participants: Vec<&'static str>,
28}
29
30pub(crate) type ColCaches = BTreeMap<&'static str, ColCache>;
33
34pub(crate) struct Context {
39 pub(crate) pool: Arc<Pool>,
40 pub(crate) col_cache: Mutex<ColCaches>,
41 pub(crate) row_cache: Mutex<Cache>,
42 pub(crate) row_cache_capacity: usize,
45 pub(crate) env: Mutex<Env>,
46 pub(crate) server: Arc<Server>,
47}
48
49impl Context {
50 pub(crate) fn new(server: &Arc<Server>) -> Result<Arc<Self>> {
51 let config = &server.config;
52 let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0;
53
54 let col_shard_bits = 7;
55 let col_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?;
56
57 let row_shard_bits = 7;
58 let row_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?;
59
60 let mut row_cache_opts = LruCacheOptions::default();
61 row_cache_opts.set_num_shard_bits(row_shard_bits);
62 row_cache_opts.set_capacity(row_cache_capacity_bytes);
63 let row_cache = Cache::new_lru_cache_opts(&row_cache_opts);
64
65 let mut col_cache_opts = LruCacheOptions::default();
66 col_cache_opts.set_num_shard_bits(col_shard_bits);
67 col_cache_opts.set_capacity(col_cache_capacity_bytes);
68 let col_cache = Cache::new_lru_cache_opts(&col_cache_opts);
69 let shared = ColCache {
70 cache: col_cache,
71 participants: Vec::new(),
72 };
73 let col_cache: ColCaches = [(SHARED_POOL, shared)].into();
74
75 let mut env = Env::new().or_else(or_else)?;
76
77 if config.rocksdb_compaction_prio_idle {
78 env.lower_thread_pool_cpu_priority();
79 }
80
81 if config.rocksdb_compaction_ioprio_idle {
82 env.lower_thread_pool_io_priority();
83 }
84
85 Ok(Arc::new(Self {
86 pool: Pool::new(server)?,
87 col_cache: col_cache.into(),
88 row_cache: row_cache.into(),
89 row_cache_capacity: row_cache_capacity_bytes,
90 env: env.into(),
91 server: server.clone(),
92 }))
93 }
94}
95
96impl Drop for Context {
97 #[cold]
98 fn drop(&mut self) {
99 debug!("Closing frontend pool");
100 self.pool.close();
101
102 let mut env = self.env.lock().expect("locked");
103
104 debug!("Shutting down background threads");
105 env.set_high_priority_background_threads(0);
106 env.set_low_priority_background_threads(0);
107 env.set_bottom_priority_background_threads(0);
108 env.set_background_threads(0);
109
110 debug!("Joining background threads...");
111 env.join_all_threads();
112
113 after_close(self, &self.server.config.database_path)
114 .expect("Failed to execute after_close handler");
115 }
116}
117
118pub(super) fn before_open(ctx: &Arc<Context>, path: &Path) -> Result {
120 if ctx.server.config.test.contains("fresh") {
121 match delete_database_for_testing(ctx, path) {
122 | Err(e) if !e.is_not_found() => return Err(e),
123 | _ => (),
124 }
125 }
126
127 Ok(())
128}
129
130fn after_close(ctx: &Context, path: &Path) -> Result {
133 if ctx.server.config.test.contains("cleanup") {
134 delete_database_for_testing(ctx, path)
135 .log_err()
136 .ok();
137 }
138
139 Ok(())
140}
141
142#[tracing::instrument(level = "debug", skip_all)]
146fn delete_database_for_testing(ctx: &Context, path: &Path) -> Result {
147 let config = &ctx.server.config;
148 let localhost = config
149 .server_name
150 .as_str()
151 .starts_with("localhost");
152
153 if !cfg!(test) && !localhost {
154 return Ok(());
155 }
156
157 debug_assert!(
158 config.test.contains("cleanup") | config.test.contains("fresh"),
159 "missing any test directive legitimating this call.",
160 );
161
162 remove_dir_all(path).map_err(Into::into)
163}