Skip to main content

tuwunel_database/engine/
cf_opts.rs

1use rocksdb::{
2	BlockBasedIndexType, BlockBasedOptions, BlockBasedPinningTier, Cache,
3	DBCompressionType as CompressionType, DataBlockIndexType, FifoCompactOptions,
4	LruCacheOptions, Options, UniversalCompactOptions, UniversalCompactionStopStyle,
5};
6use tuwunel_core::{Config, Result, err, utils::math::Expected};
7
8use super::{
9	context::{ColCache, ColCaches, SHARED_POOL},
10	descriptor::{CacheDisp, Descriptor},
11};
12use crate::{Context, util::map_err};
13
14pub(super) const SENTINEL_COMPRESSION_LEVEL: i32 = 32767;
15
16/// Adjust options for the specific column by name. Provide the result of
17/// db_options() as the argument to this function and use the return value in
18/// the arguments to open the specific column.
19pub(crate) fn cf_options(ctx: &Context, opts: Options, desc: &Descriptor) -> Result<Options> {
20	let cache = get_cache(ctx, desc);
21	let config = &ctx.server.config;
22	descriptor_cf_options(opts, *desc, config, cache.as_ref())
23}
24
25fn descriptor_cf_options(
26	mut opts: Options,
27	mut desc: Descriptor,
28	config: &Config,
29	cache: Option<&Cache>,
30) -> Result<Options> {
31	set_compression(&mut desc, config);
32	set_table_options(&mut opts, &desc, cache)?;
33
34	opts.set_min_write_buffer_number(1);
35	opts.set_max_write_buffer_number(2);
36	opts.set_write_buffer_size(desc.write_size);
37
38	opts.set_target_file_size_base(desc.file_size);
39	opts.set_target_file_size_multiplier(desc.file_shape);
40
41	opts.set_level_zero_file_num_compaction_trigger(desc.level0_width);
42	opts.set_level_compaction_dynamic_level_bytes(false);
43	opts.set_ttl(desc.ttl);
44
45	opts.set_max_bytes_for_level_base(desc.level_size);
46	opts.set_max_bytes_for_level_multiplier(1.0);
47	opts.set_max_bytes_for_level_multiplier_additional(&desc.level_shape);
48
49	opts.set_disable_auto_compactions(desc.ignored);
50	opts.set_compaction_style(desc.compaction);
51	opts.set_compaction_pri(desc.compaction_pri);
52	opts.set_universal_compaction_options(&uc_options(&desc));
53	opts.set_fifo_compaction_options(&fifo_options(&desc));
54
55	let compression_shape: Vec<_> = desc
56		.compression_shape
57		.into_iter()
58		.map(|val| (val > 0).then_some(desc.compression))
59		.map(|val| val.unwrap_or(CompressionType::None))
60		.collect();
61
62	opts.set_compression_type(desc.compression);
63	opts.set_compression_per_level(compression_shape.as_slice());
64	opts.set_compression_options(-14, desc.compression_level, 0, 0); // -14 w_bits used by zlib.
65	if let Some(&bottommost_level) = desc.bottommost_level.as_ref() {
66		opts.set_bottommost_compression_type(desc.compression);
67		opts.set_bottommost_zstd_max_train_bytes(0, true);
68		opts.set_bottommost_compression_options(
69			-14, // -14 w_bits is only read by zlib.
70			bottommost_level,
71			0,
72			0,
73			true,
74		);
75	}
76
77	opts.set_options_from_string("{{arena_block_size=2097152;}}")
78		.map_err(map_err)?;
79
80	#[cfg(debug_assertions)]
81	opts.set_options_from_string(
82		"{{paranoid_checks=true;paranoid_file_checks=true;force_consistency_checks=true;\
83		 verify_sst_unique_id_in_manifest=true;}}",
84	)
85	.map_err(map_err)?;
86
87	Ok(opts)
88}
89
90fn set_table_options(opts: &mut Options, desc: &Descriptor, cache: Option<&Cache>) -> Result {
91	let mut table = table_options(desc, cache.is_some());
92
93	if let Some(cache) = cache {
94		table.set_block_cache(cache);
95	} else {
96		table.disable_cache();
97	}
98
99	let prepopulate = if desc.write_to_cache { "kFlushOnly" } else { "kDisable" };
100
101	let string = format!(
102		"{{block_based_table_factory={{num_file_reads_for_auto_readahead={0};\
103		 max_auto_readahead_size={1};initial_auto_readahead_size={2};\
104		 enable_index_compression={3};prepopulate_block_cache={4}}}}}",
105		desc.auto_readahead_thresh,
106		desc.auto_readahead_max,
107		desc.auto_readahead_init,
108		desc.compressed_index,
109		prepopulate,
110	);
111
112	opts.set_options_from_string(&string)
113		.map_err(map_err)?;
114
115	opts.set_block_based_table_factory(&table);
116
117	Ok(())
118}
119
120fn set_compression(desc: &mut Descriptor, config: &Config) {
121	desc.compression = match config.rocksdb_compression_algo.as_ref() {
122		| "snappy" => CompressionType::Snappy,
123		| "zlib" => CompressionType::Zlib,
124		| "bz2" => CompressionType::Bz2,
125		| "lz4" => CompressionType::Lz4,
126		| "lz4hc" => CompressionType::Lz4hc,
127		| "none" => CompressionType::None,
128		| _ => CompressionType::Zstd,
129	};
130
131	let can_override_level = config.rocksdb_compression_level == SENTINEL_COMPRESSION_LEVEL
132		&& desc.compression == CompressionType::Zstd;
133
134	if !can_override_level {
135		desc.compression_level = config.rocksdb_compression_level;
136	}
137
138	let can_override_bottom = config.rocksdb_bottommost_compression_level
139		== SENTINEL_COMPRESSION_LEVEL
140		&& desc.compression == CompressionType::Zstd;
141
142	if !can_override_bottom {
143		desc.bottommost_level = Some(config.rocksdb_bottommost_compression_level);
144	}
145
146	if !config.rocksdb_bottommost_compression {
147		desc.bottommost_level = None;
148	}
149}
150
151fn fifo_options(desc: &Descriptor) -> FifoCompactOptions {
152	let mut opts = FifoCompactOptions::default();
153	opts.set_max_table_files_size(desc.limit_size);
154
155	opts
156}
157
158fn uc_options(desc: &Descriptor) -> UniversalCompactOptions {
159	let mut opts = UniversalCompactOptions::default();
160	opts.set_stop_style(UniversalCompactionStopStyle::Total);
161	opts.set_min_merge_width(desc.merge_width.0);
162	opts.set_max_merge_width(desc.merge_width.1);
163	opts.set_max_size_amplification_percent(10000);
164	opts.set_compression_size_percent(-1);
165	opts.set_size_ratio(1);
166
167	opts
168}
169
170fn table_options(desc: &Descriptor, has_cache: bool) -> BlockBasedOptions {
171	let mut opts = BlockBasedOptions::default();
172
173	opts.set_block_size(desc.block_size);
174	opts.set_metadata_block_size(desc.index_size);
175
176	opts.set_cache_index_and_filter_blocks(has_cache);
177	opts.set_pin_top_level_index_and_filter(false);
178	opts.set_pin_l0_filter_and_index_blocks_in_cache(false);
179	opts.set_partition_pinning_tier(BlockBasedPinningTier::None);
180	opts.set_unpartitioned_pinning_tier(BlockBasedPinningTier::None);
181	opts.set_top_level_index_pinning_tier(BlockBasedPinningTier::None);
182
183	opts.set_partition_filters(true);
184	opts.set_use_delta_encoding(false);
185	opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
186
187	opts.set_data_block_index_type(match desc.block_index_hashing {
188		| None if desc.index_size > 512 => DataBlockIndexType::BinaryAndHash,
189		| Some(enable) if enable => DataBlockIndexType::BinaryAndHash,
190		| Some(_) | None => DataBlockIndexType::BinarySearch,
191	});
192
193	opts
194}
195
196fn get_cache(ctx: &Context, desc: &Descriptor) -> Option<Cache> {
197	if desc.dropped {
198		return None;
199	}
200
201	// Some cache capacities are overridden by server config in a strange but
202	// legacy-compat way
203	let config = &ctx.server.config;
204	let cap = match desc.name {
205		| "eventid_pduid" => Some(config.eventid_pdu_cache_capacity),
206		| "eventid_shorteventid" => Some(config.eventidshort_cache_capacity),
207		| "shorteventid_eventid" => Some(config.shorteventid_cache_capacity),
208		| "shortstatekey_statekey" => Some(config.shortstatekey_cache_capacity),
209		| "statekey_shortstatekey" => Some(config.statekeyshort_cache_capacity),
210		| "servernameevent_data" => Some(config.servernameevent_data_cache_capacity),
211		| "pduid_pdu" | "eventid_outlierpdu" => Some(config.pdu_cache_capacity),
212		| "shorteventid_authchain" | "authchainkey_authchain" =>
213			Some(config.auth_chain_cache_capacity),
214		| _ => None,
215	}
216	.map(TryInto::try_into)
217	.transpose()
218	.expect("u32 to usize");
219
220	let ent_size: usize = desc
221		.key_size_hint
222		.unwrap_or_default()
223		.expected_add(desc.val_size_hint.unwrap_or_default());
224
225	let size = match cap {
226		| Some(cap) => cache_size(config, cap, ent_size),
227		| _ => desc.cache_size,
228	};
229
230	let shard_bits: i32 = desc
231		.cache_shards
232		.ilog2()
233		.try_into()
234		.expect("u32 to i32 conversion");
235
236	debug_assert!(shard_bits <= 10, "cache shards probably too large");
237	let mut cache_opts = LruCacheOptions::default();
238	cache_opts.set_num_shard_bits(shard_bits);
239	cache_opts.set_capacity(size);
240
241	let mut caches = ctx.col_cache.lock().expect("locked");
242	register_pool(&mut caches, desc, || Cache::new_lru_cache_opts(&cache_opts))
243}
244
245/// Returns the cache for `desc`'s pool; `build_cache` is invoked only when a
246/// new pool must be created.
247pub(crate) fn register_pool(
248	caches: &mut ColCaches,
249	desc: &Descriptor,
250	build_cache: impl FnOnce() -> Cache,
251) -> Option<Cache> {
252	match desc.cache_disp {
253		| CacheDisp::Unique if desc.cache_size == 0 => None,
254		| CacheDisp::Unique => {
255			let cache = build_cache();
256			caches.insert(desc.name, ColCache {
257				cache: cache.clone(),
258				participants: vec![desc.name],
259			});
260
261			Some(cache)
262		},
263
264		| CacheDisp::SharedWith(other) => Some(match caches.get_mut(other) {
265			| Some(pool) => {
266				pool.participants.push(desc.name);
267				pool.cache.clone()
268			},
269			| None => {
270				let new = build_cache();
271				caches.insert(desc.name, ColCache {
272					cache: new.clone(),
273					participants: vec![desc.name],
274				});
275
276				new
277			},
278		}),
279
280		| CacheDisp::Shared => {
281			let pool = caches
282				.get_mut(SHARED_POOL)
283				.expect("shared cache must already exist");
284
285			pool.participants.push(desc.name);
286			Some(pool.cache.clone())
287		},
288	}
289}
290
291pub(crate) fn cache_size(config: &Config, base_size: u32, entity_size: usize) -> usize {
292	cache_size_f64(config, f64::from(base_size), entity_size)
293}
294
295#[expect(
296	clippy::as_conversions,
297	clippy::cast_sign_loss,
298	clippy::cast_possible_truncation
299)]
300pub(crate) fn cache_size_f64(config: &Config, base_size: f64, entity_size: usize) -> usize {
301	let ents = base_size * config.cache_capacity_modifier;
302
303	(ents as usize)
304		.checked_mul(entity_size)
305		.ok_or_else(|| err!(Config("cache_capacity_modifier", "Cache size is too large.")))
306		.expect("invalid cache size")
307}