1use rocksdb::{
2 BlockBasedIndexType, BlockBasedOptions, BlockBasedPinningTier, Cache,
3 DBCompressionType as CompressionType, DataBlockIndexType, FifoCompactOptions,
4 LruCacheOptions, Options, UniversalCompactOptions, UniversalCompactionStopStyle,
5};
6use tuwunel_core::{Config, Result, err, utils::math::Expected};
7
8use super::{
9 context::{ColCache, ColCaches, SHARED_POOL},
10 descriptor::{CacheDisp, Descriptor},
11};
12use crate::{Context, util::map_err};
13
14pub(super) const SENTINEL_COMPRESSION_LEVEL: i32 = 32767;
15
16pub(crate) fn cf_options(ctx: &Context, opts: Options, desc: &Descriptor) -> Result<Options> {
20 let cache = get_cache(ctx, desc);
21 let config = &ctx.server.config;
22 descriptor_cf_options(opts, *desc, config, cache.as_ref())
23}
24
25fn descriptor_cf_options(
26 mut opts: Options,
27 mut desc: Descriptor,
28 config: &Config,
29 cache: Option<&Cache>,
30) -> Result<Options> {
31 set_compression(&mut desc, config);
32 set_table_options(&mut opts, &desc, cache)?;
33
34 opts.set_min_write_buffer_number(1);
35 opts.set_max_write_buffer_number(2);
36 opts.set_write_buffer_size(desc.write_size);
37
38 opts.set_target_file_size_base(desc.file_size);
39 opts.set_target_file_size_multiplier(desc.file_shape);
40
41 opts.set_level_zero_file_num_compaction_trigger(desc.level0_width);
42 opts.set_level_compaction_dynamic_level_bytes(false);
43 opts.set_ttl(desc.ttl);
44
45 opts.set_max_bytes_for_level_base(desc.level_size);
46 opts.set_max_bytes_for_level_multiplier(1.0);
47 opts.set_max_bytes_for_level_multiplier_additional(&desc.level_shape);
48
49 opts.set_disable_auto_compactions(desc.ignored);
50 opts.set_compaction_style(desc.compaction);
51 opts.set_compaction_pri(desc.compaction_pri);
52 opts.set_universal_compaction_options(&uc_options(&desc));
53 opts.set_fifo_compaction_options(&fifo_options(&desc));
54
55 let compression_shape: Vec<_> = desc
56 .compression_shape
57 .into_iter()
58 .map(|val| (val > 0).then_some(desc.compression))
59 .map(|val| val.unwrap_or(CompressionType::None))
60 .collect();
61
62 opts.set_compression_type(desc.compression);
63 opts.set_compression_per_level(compression_shape.as_slice());
64 opts.set_compression_options(-14, desc.compression_level, 0, 0); if let Some(&bottommost_level) = desc.bottommost_level.as_ref() {
66 opts.set_bottommost_compression_type(desc.compression);
67 opts.set_bottommost_zstd_max_train_bytes(0, true);
68 opts.set_bottommost_compression_options(
69 -14, bottommost_level,
71 0,
72 0,
73 true,
74 );
75 }
76
77 opts.set_options_from_string("{{arena_block_size=2097152;}}")
78 .map_err(map_err)?;
79
80 #[cfg(debug_assertions)]
81 opts.set_options_from_string(
82 "{{paranoid_checks=true;paranoid_file_checks=true;force_consistency_checks=true;\
83 verify_sst_unique_id_in_manifest=true;}}",
84 )
85 .map_err(map_err)?;
86
87 Ok(opts)
88}
89
90fn set_table_options(opts: &mut Options, desc: &Descriptor, cache: Option<&Cache>) -> Result {
91 let mut table = table_options(desc, cache.is_some());
92
93 if let Some(cache) = cache {
94 table.set_block_cache(cache);
95 } else {
96 table.disable_cache();
97 }
98
99 let prepopulate = if desc.write_to_cache { "kFlushOnly" } else { "kDisable" };
100
101 let string = format!(
102 "{{block_based_table_factory={{num_file_reads_for_auto_readahead={0};\
103 max_auto_readahead_size={1};initial_auto_readahead_size={2};\
104 enable_index_compression={3};prepopulate_block_cache={4}}}}}",
105 desc.auto_readahead_thresh,
106 desc.auto_readahead_max,
107 desc.auto_readahead_init,
108 desc.compressed_index,
109 prepopulate,
110 );
111
112 opts.set_options_from_string(&string)
113 .map_err(map_err)?;
114
115 opts.set_block_based_table_factory(&table);
116
117 Ok(())
118}
119
120fn set_compression(desc: &mut Descriptor, config: &Config) {
121 desc.compression = match config.rocksdb_compression_algo.as_ref() {
122 | "snappy" => CompressionType::Snappy,
123 | "zlib" => CompressionType::Zlib,
124 | "bz2" => CompressionType::Bz2,
125 | "lz4" => CompressionType::Lz4,
126 | "lz4hc" => CompressionType::Lz4hc,
127 | "none" => CompressionType::None,
128 | _ => CompressionType::Zstd,
129 };
130
131 let can_override_level = config.rocksdb_compression_level == SENTINEL_COMPRESSION_LEVEL
132 && desc.compression == CompressionType::Zstd;
133
134 if !can_override_level {
135 desc.compression_level = config.rocksdb_compression_level;
136 }
137
138 let can_override_bottom = config.rocksdb_bottommost_compression_level
139 == SENTINEL_COMPRESSION_LEVEL
140 && desc.compression == CompressionType::Zstd;
141
142 if !can_override_bottom {
143 desc.bottommost_level = Some(config.rocksdb_bottommost_compression_level);
144 }
145
146 if !config.rocksdb_bottommost_compression {
147 desc.bottommost_level = None;
148 }
149}
150
151fn fifo_options(desc: &Descriptor) -> FifoCompactOptions {
152 let mut opts = FifoCompactOptions::default();
153 opts.set_max_table_files_size(desc.limit_size);
154
155 opts
156}
157
158fn uc_options(desc: &Descriptor) -> UniversalCompactOptions {
159 let mut opts = UniversalCompactOptions::default();
160 opts.set_stop_style(UniversalCompactionStopStyle::Total);
161 opts.set_min_merge_width(desc.merge_width.0);
162 opts.set_max_merge_width(desc.merge_width.1);
163 opts.set_max_size_amplification_percent(10000);
164 opts.set_compression_size_percent(-1);
165 opts.set_size_ratio(1);
166
167 opts
168}
169
170fn table_options(desc: &Descriptor, has_cache: bool) -> BlockBasedOptions {
171 let mut opts = BlockBasedOptions::default();
172
173 opts.set_block_size(desc.block_size);
174 opts.set_metadata_block_size(desc.index_size);
175
176 opts.set_cache_index_and_filter_blocks(has_cache);
177 opts.set_pin_top_level_index_and_filter(false);
178 opts.set_pin_l0_filter_and_index_blocks_in_cache(false);
179 opts.set_partition_pinning_tier(BlockBasedPinningTier::None);
180 opts.set_unpartitioned_pinning_tier(BlockBasedPinningTier::None);
181 opts.set_top_level_index_pinning_tier(BlockBasedPinningTier::None);
182
183 opts.set_partition_filters(true);
184 opts.set_use_delta_encoding(false);
185 opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
186
187 opts.set_data_block_index_type(match desc.block_index_hashing {
188 | None if desc.index_size > 512 => DataBlockIndexType::BinaryAndHash,
189 | Some(enable) if enable => DataBlockIndexType::BinaryAndHash,
190 | Some(_) | None => DataBlockIndexType::BinarySearch,
191 });
192
193 opts
194}
195
196fn get_cache(ctx: &Context, desc: &Descriptor) -> Option<Cache> {
197 if desc.dropped {
198 return None;
199 }
200
201 let config = &ctx.server.config;
204 let cap = match desc.name {
205 | "eventid_pduid" => Some(config.eventid_pdu_cache_capacity),
206 | "eventid_shorteventid" => Some(config.eventidshort_cache_capacity),
207 | "shorteventid_eventid" => Some(config.shorteventid_cache_capacity),
208 | "shortstatekey_statekey" => Some(config.shortstatekey_cache_capacity),
209 | "statekey_shortstatekey" => Some(config.statekeyshort_cache_capacity),
210 | "servernameevent_data" => Some(config.servernameevent_data_cache_capacity),
211 | "pduid_pdu" | "eventid_outlierpdu" => Some(config.pdu_cache_capacity),
212 | "shorteventid_authchain" | "authchainkey_authchain" =>
213 Some(config.auth_chain_cache_capacity),
214 | _ => None,
215 }
216 .map(TryInto::try_into)
217 .transpose()
218 .expect("u32 to usize");
219
220 let ent_size: usize = desc
221 .key_size_hint
222 .unwrap_or_default()
223 .expected_add(desc.val_size_hint.unwrap_or_default());
224
225 let size = match cap {
226 | Some(cap) => cache_size(config, cap, ent_size),
227 | _ => desc.cache_size,
228 };
229
230 let shard_bits: i32 = desc
231 .cache_shards
232 .ilog2()
233 .try_into()
234 .expect("u32 to i32 conversion");
235
236 debug_assert!(shard_bits <= 10, "cache shards probably too large");
237 let mut cache_opts = LruCacheOptions::default();
238 cache_opts.set_num_shard_bits(shard_bits);
239 cache_opts.set_capacity(size);
240
241 let mut caches = ctx.col_cache.lock().expect("locked");
242 register_pool(&mut caches, desc, || Cache::new_lru_cache_opts(&cache_opts))
243}
244
245pub(crate) fn register_pool(
248 caches: &mut ColCaches,
249 desc: &Descriptor,
250 build_cache: impl FnOnce() -> Cache,
251) -> Option<Cache> {
252 match desc.cache_disp {
253 | CacheDisp::Unique if desc.cache_size == 0 => None,
254 | CacheDisp::Unique => {
255 let cache = build_cache();
256 caches.insert(desc.name, ColCache {
257 cache: cache.clone(),
258 participants: vec![desc.name],
259 });
260
261 Some(cache)
262 },
263
264 | CacheDisp::SharedWith(other) => Some(match caches.get_mut(other) {
265 | Some(pool) => {
266 pool.participants.push(desc.name);
267 pool.cache.clone()
268 },
269 | None => {
270 let new = build_cache();
271 caches.insert(desc.name, ColCache {
272 cache: new.clone(),
273 participants: vec![desc.name],
274 });
275
276 new
277 },
278 }),
279
280 | CacheDisp::Shared => {
281 let pool = caches
282 .get_mut(SHARED_POOL)
283 .expect("shared cache must already exist");
284
285 pool.participants.push(desc.name);
286 Some(pool.cache.clone())
287 },
288 }
289}
290
291pub(crate) fn cache_size(config: &Config, base_size: u32, entity_size: usize) -> usize {
292 cache_size_f64(config, f64::from(base_size), entity_size)
293}
294
295#[expect(
296 clippy::as_conversions,
297 clippy::cast_sign_loss,
298 clippy::cast_possible_truncation
299)]
300pub(crate) fn cache_size_f64(config: &Config, base_size: f64, entity_size: usize) -> usize {
301 let ents = base_size * config.cache_capacity_modifier;
302
303 (ents as usize)
304 .checked_mul(entity_size)
305 .ok_or_else(|| err!(Config("cache_capacity_modifier", "Cache size is too large.")))
306 .expect("invalid cache size")
307}