Skip to main content

tuwunel_database/pool/
configure.rs

1use std::{path::PathBuf, sync::Arc};
2
3use tuwunel_core::{
4	Server, at, debug,
5	debug::INFO_SPAN_LEVEL,
6	debug_info, debug_warn, expected, info, is_equal_to,
7	utils::{
8		BoolExt,
9		math::usize_from_f64,
10		result::LogDebugErr,
11		stream,
12		stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT},
13		sys::{
14			compute::{available_parallelism, cores_available, is_core_available},
15			max_threads, storage,
16		},
17	},
18};
19
20use super::{QUEUE_LIMIT, WORKER_LIMIT};
21
22/// Determine storage hardware capabilities of the system for configuring the
23/// shape of the database frontend threadpool.
24///
25/// Returns a tuple of:
26/// - `topology` Vector mapping hardware cores to hardware queues. Systems with
27///   fewer queues than cores will see queue ID's repeated. Systems with the
28///   same or more queues as cores will usually see a 1:1 association of core
29///   ID's to queue ID's. Systems with sparse core assignments will see 0 for
30///   core ID positions not available to the process. Systems where detection
31///   failed will see a default of 1:1 core identity as a best-guess maintaining
32///   core locality.
33/// - `workers` Vector mapping hardware queues to the number of threads to spawn
34///   in service of that queue. Systems with fewer queues than cores will set an
35///   affinity mask for each thread to multiple cores based on the topology.
36///   Systems with equal or more hardware queues than cores will set a single
37///   affinity for each thread.
38/// - `queues` Vector of software mpmc queues to create and the size of each
39///   queue. Each indice is associated with a thread-pool of workers which it
40///   feeds requests from various tokio tasks. When this queue reaches capacity
41///   the tokio task must yield.
42#[tracing::instrument(
43	level = INFO_SPAN_LEVEL,
44	skip_all,
45	ret(level = "trace"),
46)]
47pub(super) fn configure(server: &Arc<Server>) -> (Vec<usize>, Vec<usize>, Vec<usize>) {
48	let config = &server.config;
49	let num_cores = available_parallelism();
50
51	// Determine the maximum number of cores. The total number of cores available to
52	// the process may be less on systems with sparse core assignments, but this
53	// still serves as an upper-bound.
54	let cores_max = cores_available()
55		.last()
56		.unwrap_or(0)
57		.saturating_add(1);
58
59	// This finds the block device and gathers all the properties we need.
60	let path: PathBuf = config.database_path.clone();
61	let device_name = storage::name_from_path(&path)
62		.log_debug_err()
63		.ok();
64
65	let devices = storage::md_discover(&path);
66	let topology_detected = devices.md.is_empty().is_false();
67	debug!(?topology_detected, ?device_name, ?devices);
68
69	// The default worker count is masked-on if we didn't find better information.
70	let default_worker_count = topology_detected
71		.is_false()
72		.then_some(config.db_pool_workers)
73		.map(|workers| workers.saturating_mul(num_cores));
74
75	// Sum the total number of possible tags. When no hardware detected this will
76	// default to the default_worker_count. Note well that the thread-worker model
77	// we use will never approach actual NVMe capacity as with io_uring or even
78	// close to userspace drivers. We still take some cues from this value which
79	// does give us actual request capacity.
80	let total_tags = devices
81		.md
82		.iter()
83		.flat_map(|md| md.mq.iter())
84		.filter(|mq| mq.cpu_list.iter().copied().any(is_core_available))
85		.filter_map(|mq| mq.nr_tags)
86		.chain(default_worker_count)
87		.fold(0_usize, usize::saturating_add);
88
89	// Determine the CPU affinities of each hardware queue. Each indice is a core
90	// and each value is the associated hardware queue. On systems which share
91	// queues between cores some values will be repeated; on systems with multiple
92	// queues per core the affinities are assumed to match and we don't require a
93	// vector of vectors. Sparse unavailable cores default to 0. Undetected hardware
94	// defaults to the core identity as a best-guess.
95	let topology: Vec<usize> = devices
96		.md
97		.iter()
98		.flat_map(|md| md.mq.iter())
99		.fold(vec![0; cores_max], |mut topology, mq| {
100			mq.cpu_list
101				.iter()
102				.filter(|&&id| id < cores_max)
103				.filter(|&&id| is_core_available(id))
104				.for_each(|&id| {
105					topology[id] = mq.id;
106				});
107
108			topology
109		})
110		.into_iter()
111		.enumerate()
112		.map(|(core_id, queue_id)| {
113			topology_detected
114				.then_some(queue_id)
115				.unwrap_or(core_id)
116		})
117		.collect();
118
119	// Query getrlimit(2) to impose any additional restriction, divide to leave room
120	// for other threads in the process.
121	let max_threads = max_threads()
122		.map(at!(0))
123		.unwrap_or(usize::MAX)
124		.saturating_div(3);
125
126	// Determine an ideal max worker count based on true capacity. As stated prior
127	// the true value is rarely attainable in any thread-worker model, and clamped.
128	let max_workers = devices
129		.md
130		.iter()
131		.flat_map(|md| md.mq.iter())
132		.filter_map(|mq| mq.nr_tags)
133		.chain(default_worker_count.into_iter())
134		.fold(0_usize, usize::saturating_add)
135		.min(config.db_pool_max_workers)
136		.clamp(WORKER_LIMIT.0, max_threads)
137		.clamp(WORKER_LIMIT.0, WORKER_LIMIT.1);
138
139	// Tamper for the total number of workers by reducing the count for each group.
140	let chan_limit = expected!(max_workers / num_cores)
141		.saturating_sub(8)
142		.saturating_add(1)
143		.next_multiple_of(8);
144
145	// Default workers vector without detection.
146	let default_workers = default_worker_count
147		.into_iter()
148		.cycle()
149		.enumerate()
150		.map(|(core_id, count)| {
151			is_core_available(core_id)
152				.then_some(count)
153				.unwrap_or(0)
154				.min(chan_limit)
155		});
156
157	// Determine the worker groupings. Each indice represents a hardware queue and
158	// contains the number of workers which will service it. This vector is
159	// truncated to the number of cores on systems which have multiple hardware
160	// queues per core. The number of workers is then truncated to a maximum for
161	// each pool; as stated prior, this will usually be less than NVMe capacity.
162	let workers: Vec<usize> = devices
163		.md
164		.iter()
165		.inspect(|md| debug!(?md))
166		.flat_map(|md| md.mq.iter())
167		.map(|mq| {
168			let shares = mq
169				.cpu_list
170				.iter()
171				.filter(|&&id| is_core_available(id))
172				.count();
173
174			let conf_limit = config
175				.db_pool_workers_limit
176				.saturating_mul(shares);
177
178			let hard_limit = devices
179				.md
180				.iter()
181				.filter(|_| shares > 0)
182				.fold(0_usize, |acc, mq| {
183					mq.nr_requests
184						.map(|nr| nr.min(conf_limit))
185						.or(Some(conf_limit))
186						.map(|nr| acc.saturating_add(nr))
187						.unwrap_or(acc)
188				});
189
190			let tags = mq
191				.nr_tags
192				.unwrap_or(WORKER_LIMIT.0)
193				.min(hard_limit)
194				.min(chan_limit);
195
196			debug!(?mq, ?shares, ?tags, ?conf_limit, ?hard_limit, ?chan_limit);
197
198			tags
199		})
200		.chain(default_workers)
201		.take(topology.len())
202		.collect();
203
204	// Determine our software queue size for each hardware queue. This is the mpmc
205	// between the tokio worker and the pool worker.
206	let queues: Vec<usize> = workers
207		.iter()
208		.map(|count| {
209			count
210				.saturating_mul(config.db_pool_queue_mult)
211				.min(QUEUE_LIMIT.1)
212		})
213		.collect();
214
215	// Total number of workers to spawn.
216	let total_workers = workers.iter().sum::<usize>();
217
218	// Total capacity of all software queues.
219	let total_capacity = queues.iter().sum::<usize>();
220
221	// Discount queues with zero capacity for a proper denominator.
222	let num_queues = queues.iter().filter(|&&cap| cap > 0).count();
223
224	// After computing all of the above we can update the global automatic stream
225	// width, hopefully with a better value tailored to this system.
226	if config.stream_width_scale > 0.0 {
227		update_stream_width(server, num_queues, total_workers, total_capacity);
228	}
229
230	if topology_detected {
231		debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology",);
232		info!(
233			device_name = ?device_name.as_deref().unwrap_or("None"),
234			?num_queues,
235			?total_workers,
236			?total_tags,
237			?total_capacity,
238			stream_width = ?stream::automatic_width(),
239			amplification = ?stream::automatic_amplification(),
240			"Frontend topology",
241		);
242	} else {
243		debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology (defaults)");
244		debug_warn!(
245			device_name = ?device_name.as_deref().unwrap_or("None"),
246			?total_workers,
247			?total_capacity,
248			stream_width = ?stream::automatic_width(),
249			amplification = ?stream::automatic_amplification(),
250			"Storage hardware not detected for database directory; assuming defaults.",
251		);
252	}
253
254	assert!(total_workers > 0, "some workers expected");
255	debug_assert!(
256		total_workers <= max_workers || !topology_detected,
257		"spawning too many workers"
258	);
259
260	assert!(!queues.is_empty(), "some queues expected");
261	assert!(!queues.iter().copied().all(is_equal_to!(0)), "positive queue capacity expected");
262
263	(topology, workers, queues)
264}
265
266#[expect(clippy::as_conversions, clippy::cast_precision_loss)]
267fn update_stream_width(
268	server: &Arc<Server>,
269	num_queues: usize,
270	total_workers: usize,
271	_total_capacity: usize,
272) {
273	assert!(num_queues > 0, "Expected at least one queue.");
274	assert!(total_workers > 0, "Expected some workers.");
275
276	let config = &server.config;
277	let scale: f64 = config.stream_width_scale.min(100.0).into();
278	let max_width = expected!(total_workers / num_queues);
279
280	let old_width = stream::automatic_width();
281	let old_scale_width = expected!(old_width * num_queues);
282
283	let new_scale = total_workers as f64 / old_scale_width as f64;
284	let new_scale = new_scale.clamp(1.0, 4.0);
285	let new_scale_width = new_scale * old_width as f64;
286	let new_scale_width = usize_from_f64(new_scale_width)
287		.expect("failed to convert f64 to usize")
288		.next_multiple_of(8);
289
290	let req_width = usize_from_f64(scale * new_scale_width as f64)
291		.expect("failed to convert f64 to usize")
292		.next_multiple_of(4)
293		.min(max_width)
294		.clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1);
295
296	let req_amp = new_scale * config.stream_amplification as f64;
297	let req_amp = usize_from_f64(req_amp * scale)
298		.expect("failed to convert f64 to usize")
299		.next_multiple_of(64)
300		.clamp(AMPLIFICATION_LIMIT.0, AMPLIFICATION_LIMIT.1);
301
302	let (old_width, new_width) = stream::set_width(req_width);
303	let (old_amp, new_amp) = stream::set_amplification(req_amp);
304	debug!(
305		config_scale = ?config.stream_width_scale,
306		?old_width,
307		?new_scale,
308		?new_width,
309		?old_amp,
310		?new_amp,
311		"Updated global stream width"
312	);
313}