Skip to main content

tuwunel_database/pool/
configure.rs

1use std::{path::PathBuf, sync::Arc};
2
3use tuwunel_core::{
4	Config, Server, at, debug,
5	debug::INFO_SPAN_LEVEL,
6	debug_info, debug_warn, expected, info, is_equal_to,
7	utils::{
8		BoolExt,
9		math::usize_from_f64,
10		result::LogDebugErr,
11		stream,
12		stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT},
13		sys::{
14			compute::{available_parallelism, cores_available, is_core_available},
15			max_threads,
16			storage::{self, MultiDevice},
17		},
18	},
19};
20
21use super::{QUEUE_LIMIT, WORKER_LIMIT};
22
23/// Determine storage hardware capabilities of the system for configuring the
24/// shape of the database frontend threadpool.
25///
26/// Returns a tuple of:
27/// - `topology` Vector mapping hardware cores to hardware queues. Systems with
28///   fewer queues than cores will see queue ID's repeated. Systems with the
29///   same or more queues as cores will usually see a 1:1 association of core
30///   ID's to queue ID's. Systems with sparse core assignments will see 0 for
31///   core ID positions not available to the process. Systems where detection
32///   failed will see a default of 1:1 core identity as a best-guess maintaining
33///   core locality.
34/// - `workers` Vector mapping hardware queues to the number of threads to spawn
35///   in service of that queue. Systems with fewer queues than cores will set an
36///   affinity mask for each thread to multiple cores based on the topology.
37///   Systems with equal or more hardware queues than cores will set a single
38///   affinity for each thread.
39/// - `queues` Vector of software mpmc queues to create and the size of each
40///   queue. Each indice is associated with a thread-pool of workers which it
41///   feeds requests from various tokio tasks. When this queue reaches capacity
42///   the tokio task must yield.
43#[tracing::instrument(
44	level = INFO_SPAN_LEVEL,
45	skip_all,
46	ret(level = "trace"),
47)]
48pub(super) fn configure(server: &Arc<Server>) -> (Vec<usize>, Vec<usize>, Vec<usize>) {
49	let config = &server.config;
50	let num_cores = available_parallelism();
51
52	let cores_max = cores_available()
53		.last()
54		.unwrap_or(0)
55		.saturating_add(1);
56
57	let path: PathBuf = config.database_path.clone();
58	let device_name = storage::name_from_path(&path)
59		.log_debug_err()
60		.ok();
61
62	let devices = storage::md_discover(&path);
63	let topology_detected = devices.md.is_empty().is_false();
64	debug!(?topology_detected, ?device_name, ?devices);
65
66	let default_worker_count = topology_detected
67		.is_false()
68		.then_some(config.db_pool_workers)
69		.map(|workers| workers.saturating_mul(num_cores));
70
71	let total_tags = sum_total_tags(&devices, default_worker_count);
72	let topology = compute_topology(&devices, topology_detected, cores_max);
73	let max_workers =
74		compute_max_workers(&devices, default_worker_count, config.db_pool_max_workers);
75
76	let chan_limit = expected!(max_workers / num_cores)
77		.saturating_sub(8)
78		.saturating_add(1)
79		.next_multiple_of(8);
80
81	let workers =
82		compute_workers(&devices, config, default_worker_count, topology.len(), chan_limit);
83
84	let queues: Vec<usize> = workers
85		.iter()
86		.map(|count| {
87			count
88				.saturating_mul(config.db_pool_queue_mult)
89				.min(QUEUE_LIMIT.1)
90		})
91		.collect();
92
93	let total_workers = workers.iter().sum::<usize>();
94	let total_capacity = queues.iter().sum::<usize>();
95	let num_queues = queues.iter().filter(|&&cap| cap > 0).count();
96
97	if config.stream_width_scale > 0.0 {
98		update_stream_width(server, num_queues, total_workers, total_capacity);
99	}
100
101	log_topology(
102		topology_detected,
103		device_name.as_deref(),
104		num_cores,
105		&topology,
106		&workers,
107		&queues,
108		num_queues,
109		total_workers,
110		total_tags,
111		total_capacity,
112	);
113
114	assert!(total_workers > 0, "some workers expected");
115	debug_assert!(
116		total_workers <= max_workers || !topology_detected,
117		"spawning too many workers"
118	);
119
120	assert!(!queues.is_empty(), "some queues expected");
121	assert!(!queues.iter().copied().all(is_equal_to!(0)), "positive queue capacity expected");
122
123	(topology, workers, queues)
124}
125
126/// Sum the total number of possible tags. Without hardware detection this
127/// reduces to the default worker count. The thread-worker model never
128/// approaches actual NVMe capacity, but the value still informs request
129/// capacity downstream.
130fn sum_total_tags(devices: &MultiDevice, default_worker_count: Option<usize>) -> usize {
131	devices
132		.md
133		.iter()
134		.flat_map(|md| md.mq.iter())
135		.filter(|mq| mq.cpu_list.iter().copied().any(is_core_available))
136		.filter_map(|mq| mq.nr_tags)
137		.chain(default_worker_count)
138		.fold(0_usize, usize::saturating_add)
139}
140
141/// Map cores to their associated hardware queue. Shared queues repeat across
142/// cores; sparse unavailable cores default to 0; undetected hardware falls back
143/// to the core identity as a best-guess maintaining core locality.
144fn compute_topology(
145	devices: &MultiDevice,
146	topology_detected: bool,
147	cores_max: usize,
148) -> Vec<usize> {
149	devices
150		.md
151		.iter()
152		.flat_map(|md| md.mq.iter())
153		.fold(vec![0; cores_max], |mut topology, mq| {
154			mq.cpu_list
155				.iter()
156				.filter(|&&id| id < cores_max)
157				.filter(|&&id| is_core_available(id))
158				.for_each(|&id| {
159					topology[id] = mq.id;
160				});
161
162			topology
163		})
164		.into_iter()
165		.enumerate()
166		.map(|(core_id, queue_id)| {
167			topology_detected
168				.then_some(queue_id)
169				.unwrap_or(core_id)
170		})
171		.collect()
172}
173
174/// Determine an ideal max worker count based on true capacity. The true value
175/// is rarely attainable in a thread-worker model so the result is clamped by
176/// both the rlimit-derived budget and the static `WORKER_LIMIT` range.
177fn compute_max_workers(
178	devices: &MultiDevice,
179	default_worker_count: Option<usize>,
180	max_workers_cfg: usize,
181) -> usize {
182	let max_threads = max_threads()
183		.map(at!(0))
184		.unwrap_or(usize::MAX)
185		.saturating_div(3);
186
187	devices
188		.md
189		.iter()
190		.flat_map(|md| md.mq.iter())
191		.filter_map(|mq| mq.nr_tags)
192		.chain(default_worker_count)
193		.fold(0_usize, usize::saturating_add)
194		.min(max_workers_cfg)
195		.clamp(WORKER_LIMIT.0, max_threads)
196		.clamp(WORKER_LIMIT.0, WORKER_LIMIT.1)
197}
198
199/// Determine the worker groupings. Each indice represents a hardware queue and
200/// contains the number of workers which will service it. The vector is
201/// truncated to the number of cores on systems with multiple hardware queues
202/// per core, and the per-pool count is capped well below NVMe capacity.
203fn compute_workers(
204	devices: &MultiDevice,
205	config: &Config,
206	default_worker_count: Option<usize>,
207	topology_len: usize,
208	chan_limit: usize,
209) -> Vec<usize> {
210	let default_workers = default_worker_count
211		.into_iter()
212		.cycle()
213		.enumerate()
214		.map(move |(core_id, count)| {
215			is_core_available(core_id)
216				.then_some(count)
217				.unwrap_or(0)
218				.min(chan_limit)
219		});
220
221	devices
222		.md
223		.iter()
224		.inspect(|md| debug!(?md))
225		.flat_map(|md| md.mq.iter())
226		.map(|mq| {
227			let shares = mq
228				.cpu_list
229				.iter()
230				.filter(|&&id| is_core_available(id))
231				.count();
232
233			let conf_limit = config
234				.db_pool_workers_limit
235				.saturating_mul(shares);
236
237			let hard_limit = devices
238				.md
239				.iter()
240				.filter(|_| shares > 0)
241				.fold(0_usize, |acc, mq| {
242					mq.nr_requests
243						.map(|nr| nr.min(conf_limit))
244						.or(Some(conf_limit))
245						.map(|nr| acc.saturating_add(nr))
246						.unwrap_or(acc)
247				});
248
249			let tags = mq
250				.nr_tags
251				.unwrap_or(WORKER_LIMIT.0)
252				.min(hard_limit)
253				.min(chan_limit);
254
255			debug!(?mq, ?shares, ?tags, ?conf_limit, ?hard_limit, ?chan_limit);
256
257			tags
258		})
259		.chain(default_workers)
260		.take(topology_len)
261		.collect()
262}
263
264#[expect(clippy::too_many_arguments)]
265fn log_topology(
266	topology_detected: bool,
267	device_name: Option<&str>,
268	num_cores: usize,
269	topology: &[usize],
270	workers: &[usize],
271	queues: &[usize],
272	num_queues: usize,
273	total_workers: usize,
274	total_tags: usize,
275	total_capacity: usize,
276) {
277	if topology_detected {
278		debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology",);
279		info!(
280			device_name = ?device_name.unwrap_or("None"),
281			?num_queues,
282			?total_workers,
283			?total_tags,
284			?total_capacity,
285			stream_width = ?stream::automatic_width(),
286			amplification = ?stream::automatic_amplification(),
287			"Frontend topology",
288		);
289	} else {
290		debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology (defaults)");
291		debug_warn!(
292			device_name = ?device_name.unwrap_or("None"),
293			?total_workers,
294			?total_capacity,
295			stream_width = ?stream::automatic_width(),
296			amplification = ?stream::automatic_amplification(),
297			"Storage hardware not detected for database directory; assuming defaults.",
298		);
299	}
300}
301
302#[expect(clippy::as_conversions, clippy::cast_precision_loss)]
303fn update_stream_width(
304	server: &Arc<Server>,
305	num_queues: usize,
306	total_workers: usize,
307	_total_capacity: usize,
308) {
309	assert!(num_queues > 0, "Expected at least one queue.");
310	assert!(total_workers > 0, "Expected some workers.");
311
312	let config = &server.config;
313	let scale: f64 = config.stream_width_scale.min(100.0).into();
314	let max_width = expected!(total_workers / num_queues);
315
316	let old_width = stream::automatic_width();
317	let old_scale_width = expected!(old_width * num_queues);
318
319	let new_scale = total_workers as f64 / old_scale_width as f64;
320	let new_scale = new_scale.clamp(1.0, 4.0);
321	let new_scale_width = new_scale * old_width as f64;
322	let new_scale_width = usize_from_f64(new_scale_width)
323		.expect("failed to convert f64 to usize")
324		.next_multiple_of(8);
325
326	let req_width = usize_from_f64(scale * new_scale_width as f64)
327		.expect("failed to convert f64 to usize")
328		.next_multiple_of(4)
329		.min(max_width)
330		.clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1);
331
332	let req_amp = new_scale * config.stream_amplification as f64;
333	let req_amp = usize_from_f64(req_amp * scale)
334		.expect("failed to convert f64 to usize")
335		.next_multiple_of(64)
336		.clamp(AMPLIFICATION_LIMIT.0, AMPLIFICATION_LIMIT.1);
337
338	let (old_width, new_width) = stream::set_width(req_width);
339	let (old_amp, new_amp) = stream::set_amplification(req_amp);
340	debug!(
341		config_scale = ?config.stream_width_scale,
342		?old_width,
343		?new_scale,
344		?new_width,
345		?old_amp,
346		?new_amp,
347		"Updated global stream width"
348	);
349}