tuwunel_database/engine/
db_opts.rs1use std::{cmp, convert::TryFrom};
2
3use rocksdb::{Cache, DBRecoveryMode, Env, LogLevel, Options, statistics::StatsLevel};
4use tuwunel_core::{Config, Result, utils};
5
6use super::{cf_opts::cache_size_f64, events::Events, logger::handle as handle_log};
7use crate::util::map_err;
8
9pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Result<Options> {
15 const DEFAULT_STATS_LEVEL: StatsLevel = if cfg!(debug_assertions) {
16 StatsLevel::ExceptDetailedTimers
17 } else {
18 StatsLevel::DisableAll
19 };
20
21 let mut opts = Options::default();
22
23 set_logging_defaults(&mut opts, config);
25 opts.add_event_listener(Events::new(config, env));
26
27 opts.set_max_background_jobs(num_threads::<i32>(config)?);
29 opts.set_max_subcompactions(num_threads::<u32>(config)?);
30 opts.set_avoid_unnecessary_blocking_io(true);
31 opts.set_max_file_opening_threads(0);
32
33 opts.set_manual_wal_flush(true);
35 opts.set_atomic_flush(config.rocksdb_atomic_flush);
36 opts.set_enable_pipelined_write(!config.rocksdb_atomic_flush);
37 if config.rocksdb_direct_io {
38 opts.set_use_direct_reads(true);
39 opts.set_use_direct_io_for_flush_and_compaction(true);
40 }
41 if config.rocksdb_optimize_for_spinning_disks {
42 opts.set_skip_stats_update_on_db_open(true);
43 } else {
45 opts.set_max_file_opening_threads(num_threads(config)?);
46 opts.set_compaction_readahead_size(1024 * 512);
47 }
48
49 opts.set_row_cache(row_cache);
51 opts.set_db_write_buffer_size(cache_size_f64(
52 config,
53 config.db_write_buffer_capacity_mb,
54 1_048_576,
55 ));
56
57 opts.set_table_cache_num_shard_bits(7);
59 opts.set_wal_size_limit_mb(1024);
60 opts.set_max_total_wal_size(1024 * 1024 * 512);
61 opts.set_writable_file_max_buffer_size(1024 * 1024 * 2);
62 if !config.rocksdb_allow_fallocate {
63 opts.set_options_from_string("allow_fallocate=false")
64 .map_err(map_err)?;
65 }
66
67 opts.set_disable_auto_compactions(!config.rocksdb_compaction);
69 opts.create_missing_column_families(true);
70 opts.create_if_missing(true);
71
72 opts.set_statistics_level(match config.rocksdb_stats_level {
73 | 0 => StatsLevel::DisableAll,
74 | 1 => DEFAULT_STATS_LEVEL,
75 | 2 => StatsLevel::ExceptHistogramOrTimers,
76 | 3 => StatsLevel::ExceptTimers,
77 | 4 => StatsLevel::ExceptDetailedTimers,
78 | 5 => StatsLevel::ExceptTimeForMutex,
79 | 6_u8..=u8::MAX => StatsLevel::All,
80 });
81
82 opts.set_report_bg_io_stats(match config.rocksdb_stats_level {
83 | 0..=1 => false,
84 | 2_u8..=u8::MAX => true,
85 });
86
87 opts.set_wal_recovery_mode(match config.rocksdb_recovery_mode {
93 | 0 => DBRecoveryMode::AbsoluteConsistency,
94 | 1 => DBRecoveryMode::TolerateCorruptedTailRecords,
95 | 2 => DBRecoveryMode::PointInTime,
96 | 3 => DBRecoveryMode::SkipAnyCorruptedRecord,
97 | 4_u8..=u8::MAX => unimplemented!(),
98 });
99
100 opts.set_track_and_verify_wals_in_manifest(true);
105
106 opts.set_paranoid_checks(config.rocksdb_paranoid_file_checks);
107
108 opts.set_env(env);
109
110 Ok(opts)
111}
112
113fn set_logging_defaults(opts: &mut Options, config: &Config) {
114 let rocksdb_log_level = match config.rocksdb_log_level.as_ref() {
115 | "debug" => LogLevel::Debug,
116 | "info" => LogLevel::Info,
117 | "warn" => LogLevel::Warn,
118 | "fatal" => LogLevel::Fatal,
119 | _ => LogLevel::Error,
120 };
121
122 opts.set_log_level(rocksdb_log_level);
123 opts.set_max_log_file_size(config.rocksdb_max_log_file_size);
124 opts.set_log_file_time_to_roll(config.rocksdb_log_time_to_roll);
125 opts.set_keep_log_file_num(config.rocksdb_max_log_files);
126 opts.set_stats_dump_period_sec(0);
127
128 if config.rocksdb_log_stderr {
129 opts.set_stderr_logger(rocksdb_log_level, "rocksdb");
130 } else {
131 opts.set_callback_logger(rocksdb_log_level, handle_log);
132 }
133}
134
135fn num_threads<T: TryFrom<usize>>(config: &Config) -> Result<T> {
136 const MIN_PARALLELISM: usize = 2;
137
138 let requested = if config.rocksdb_parallelism_threads != 0 {
139 config.rocksdb_parallelism_threads
140 } else {
141 utils::available_parallelism()
142 };
143
144 utils::math::try_into::<T, usize>(cmp::max(MIN_PARALLELISM, requested))
145}