1use std::{path::PathBuf, sync::Arc};
2
3use tuwunel_core::{
4 Config, Server, at, debug,
5 debug::INFO_SPAN_LEVEL,
6 debug_info, debug_warn, expected, info, is_equal_to,
7 utils::{
8 BoolExt,
9 math::usize_from_f64,
10 result::LogDebugErr,
11 stream,
12 stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT},
13 sys::{
14 compute::{available_parallelism, cores_available, is_core_available},
15 max_threads,
16 storage::{self, MultiDevice},
17 },
18 },
19};
20
21use super::{QUEUE_LIMIT, WORKER_LIMIT};
22
23#[tracing::instrument(
44 level = INFO_SPAN_LEVEL,
45 skip_all,
46 ret(level = "trace"),
47)]
48pub(super) fn configure(server: &Arc<Server>) -> (Vec<usize>, Vec<usize>, Vec<usize>) {
49 let config = &server.config;
50 let num_cores = available_parallelism();
51
52 let cores_max = cores_available()
53 .last()
54 .unwrap_or(0)
55 .saturating_add(1);
56
57 let path: PathBuf = config.database_path.clone();
58 let device_name = storage::name_from_path(&path)
59 .log_debug_err()
60 .ok();
61
62 let devices = storage::md_discover(&path);
63 let topology_detected = devices.md.is_empty().is_false();
64 debug!(?topology_detected, ?device_name, ?devices);
65
66 let default_worker_count = topology_detected
67 .is_false()
68 .then_some(config.db_pool_workers)
69 .map(|workers| workers.saturating_mul(num_cores));
70
71 let total_tags = sum_total_tags(&devices, default_worker_count);
72 let topology = compute_topology(&devices, topology_detected, cores_max);
73 let max_workers =
74 compute_max_workers(&devices, default_worker_count, config.db_pool_max_workers);
75
76 let chan_limit = expected!(max_workers / num_cores)
77 .saturating_sub(8)
78 .saturating_add(1)
79 .next_multiple_of(8);
80
81 let workers =
82 compute_workers(&devices, config, default_worker_count, topology.len(), chan_limit);
83
84 let queues: Vec<usize> = workers
85 .iter()
86 .map(|count| {
87 count
88 .saturating_mul(config.db_pool_queue_mult)
89 .min(QUEUE_LIMIT.1)
90 })
91 .collect();
92
93 let total_workers = workers.iter().sum::<usize>();
94 let total_capacity = queues.iter().sum::<usize>();
95 let num_queues = queues.iter().filter(|&&cap| cap > 0).count();
96
97 if config.stream_width_scale > 0.0 {
98 update_stream_width(server, num_queues, total_workers, total_capacity);
99 }
100
101 log_topology(
102 topology_detected,
103 device_name.as_deref(),
104 num_cores,
105 &topology,
106 &workers,
107 &queues,
108 num_queues,
109 total_workers,
110 total_tags,
111 total_capacity,
112 );
113
114 assert!(total_workers > 0, "some workers expected");
115 debug_assert!(
116 total_workers <= max_workers || !topology_detected,
117 "spawning too many workers"
118 );
119
120 assert!(!queues.is_empty(), "some queues expected");
121 assert!(!queues.iter().copied().all(is_equal_to!(0)), "positive queue capacity expected");
122
123 (topology, workers, queues)
124}
125
126fn sum_total_tags(devices: &MultiDevice, default_worker_count: Option<usize>) -> usize {
131 devices
132 .md
133 .iter()
134 .flat_map(|md| md.mq.iter())
135 .filter(|mq| mq.cpu_list.iter().copied().any(is_core_available))
136 .filter_map(|mq| mq.nr_tags)
137 .chain(default_worker_count)
138 .fold(0_usize, usize::saturating_add)
139}
140
141fn compute_topology(
145 devices: &MultiDevice,
146 topology_detected: bool,
147 cores_max: usize,
148) -> Vec<usize> {
149 devices
150 .md
151 .iter()
152 .flat_map(|md| md.mq.iter())
153 .fold(vec![0; cores_max], |mut topology, mq| {
154 mq.cpu_list
155 .iter()
156 .filter(|&&id| id < cores_max)
157 .filter(|&&id| is_core_available(id))
158 .for_each(|&id| {
159 topology[id] = mq.id;
160 });
161
162 topology
163 })
164 .into_iter()
165 .enumerate()
166 .map(|(core_id, queue_id)| {
167 topology_detected
168 .then_some(queue_id)
169 .unwrap_or(core_id)
170 })
171 .collect()
172}
173
174fn compute_max_workers(
178 devices: &MultiDevice,
179 default_worker_count: Option<usize>,
180 max_workers_cfg: usize,
181) -> usize {
182 let max_threads = max_threads()
183 .map(at!(0))
184 .unwrap_or(usize::MAX)
185 .saturating_div(3);
186
187 devices
188 .md
189 .iter()
190 .flat_map(|md| md.mq.iter())
191 .filter_map(|mq| mq.nr_tags)
192 .chain(default_worker_count)
193 .fold(0_usize, usize::saturating_add)
194 .min(max_workers_cfg)
195 .clamp(WORKER_LIMIT.0, max_threads)
196 .clamp(WORKER_LIMIT.0, WORKER_LIMIT.1)
197}
198
199fn compute_workers(
204 devices: &MultiDevice,
205 config: &Config,
206 default_worker_count: Option<usize>,
207 topology_len: usize,
208 chan_limit: usize,
209) -> Vec<usize> {
210 let default_workers = default_worker_count
211 .into_iter()
212 .cycle()
213 .enumerate()
214 .map(move |(core_id, count)| {
215 is_core_available(core_id)
216 .then_some(count)
217 .unwrap_or(0)
218 .min(chan_limit)
219 });
220
221 devices
222 .md
223 .iter()
224 .inspect(|md| debug!(?md))
225 .flat_map(|md| md.mq.iter())
226 .map(|mq| {
227 let shares = mq
228 .cpu_list
229 .iter()
230 .filter(|&&id| is_core_available(id))
231 .count();
232
233 let conf_limit = config
234 .db_pool_workers_limit
235 .saturating_mul(shares);
236
237 let hard_limit = devices
238 .md
239 .iter()
240 .filter(|_| shares > 0)
241 .fold(0_usize, |acc, mq| {
242 mq.nr_requests
243 .map(|nr| nr.min(conf_limit))
244 .or(Some(conf_limit))
245 .map(|nr| acc.saturating_add(nr))
246 .unwrap_or(acc)
247 });
248
249 let tags = mq
250 .nr_tags
251 .unwrap_or(WORKER_LIMIT.0)
252 .min(hard_limit)
253 .min(chan_limit);
254
255 debug!(?mq, ?shares, ?tags, ?conf_limit, ?hard_limit, ?chan_limit);
256
257 tags
258 })
259 .chain(default_workers)
260 .take(topology_len)
261 .collect()
262}
263
264#[expect(clippy::too_many_arguments)]
265fn log_topology(
266 topology_detected: bool,
267 device_name: Option<&str>,
268 num_cores: usize,
269 topology: &[usize],
270 workers: &[usize],
271 queues: &[usize],
272 num_queues: usize,
273 total_workers: usize,
274 total_tags: usize,
275 total_capacity: usize,
276) {
277 if topology_detected {
278 debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology",);
279 info!(
280 device_name = ?device_name.unwrap_or("None"),
281 ?num_queues,
282 ?total_workers,
283 ?total_tags,
284 ?total_capacity,
285 stream_width = ?stream::automatic_width(),
286 amplification = ?stream::automatic_amplification(),
287 "Frontend topology",
288 );
289 } else {
290 debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology (defaults)");
291 debug_warn!(
292 device_name = ?device_name.unwrap_or("None"),
293 ?total_workers,
294 ?total_capacity,
295 stream_width = ?stream::automatic_width(),
296 amplification = ?stream::automatic_amplification(),
297 "Storage hardware not detected for database directory; assuming defaults.",
298 );
299 }
300}
301
302#[expect(clippy::as_conversions, clippy::cast_precision_loss)]
303fn update_stream_width(
304 server: &Arc<Server>,
305 num_queues: usize,
306 total_workers: usize,
307 _total_capacity: usize,
308) {
309 assert!(num_queues > 0, "Expected at least one queue.");
310 assert!(total_workers > 0, "Expected some workers.");
311
312 let config = &server.config;
313 let scale: f64 = config.stream_width_scale.min(100.0).into();
314 let max_width = expected!(total_workers / num_queues);
315
316 let old_width = stream::automatic_width();
317 let old_scale_width = expected!(old_width * num_queues);
318
319 let new_scale = total_workers as f64 / old_scale_width as f64;
320 let new_scale = new_scale.clamp(1.0, 4.0);
321 let new_scale_width = new_scale * old_width as f64;
322 let new_scale_width = usize_from_f64(new_scale_width)
323 .expect("failed to convert f64 to usize")
324 .next_multiple_of(8);
325
326 let req_width = usize_from_f64(scale * new_scale_width as f64)
327 .expect("failed to convert f64 to usize")
328 .next_multiple_of(4)
329 .min(max_width)
330 .clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1);
331
332 let req_amp = new_scale * config.stream_amplification as f64;
333 let req_amp = usize_from_f64(req_amp * scale)
334 .expect("failed to convert f64 to usize")
335 .next_multiple_of(64)
336 .clamp(AMPLIFICATION_LIMIT.0, AMPLIFICATION_LIMIT.1);
337
338 let (old_width, new_width) = stream::set_width(req_width);
339 let (old_amp, new_amp) = stream::set_amplification(req_amp);
340 debug!(
341 config_scale = ?config.stream_width_scale,
342 ?old_width,
343 ?new_scale,
344 ?new_width,
345 ?old_amp,
346 ?new_amp,
347 "Updated global stream width"
348 );
349}