1use std::{path::PathBuf, sync::Arc};
2
3use tuwunel_core::{
4 Server, at, debug,
5 debug::INFO_SPAN_LEVEL,
6 debug_info, debug_warn, expected, info, is_equal_to,
7 utils::{
8 BoolExt,
9 math::usize_from_f64,
10 result::LogDebugErr,
11 stream,
12 stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT},
13 sys::{
14 compute::{available_parallelism, cores_available, is_core_available},
15 max_threads, storage,
16 },
17 },
18};
19
20use super::{QUEUE_LIMIT, WORKER_LIMIT};
21
22#[tracing::instrument(
43 level = INFO_SPAN_LEVEL,
44 skip_all,
45 ret(level = "trace"),
46)]
47pub(super) fn configure(server: &Arc<Server>) -> (Vec<usize>, Vec<usize>, Vec<usize>) {
48 let config = &server.config;
49 let num_cores = available_parallelism();
50
51 let cores_max = cores_available()
55 .last()
56 .unwrap_or(0)
57 .saturating_add(1);
58
59 let path: PathBuf = config.database_path.clone();
61 let device_name = storage::name_from_path(&path)
62 .log_debug_err()
63 .ok();
64
65 let devices = storage::md_discover(&path);
66 let topology_detected = devices.md.is_empty().is_false();
67 debug!(?topology_detected, ?device_name, ?devices);
68
69 let default_worker_count = topology_detected
71 .is_false()
72 .then_some(config.db_pool_workers)
73 .map(|workers| workers.saturating_mul(num_cores));
74
75 let total_tags = devices
81 .md
82 .iter()
83 .flat_map(|md| md.mq.iter())
84 .filter(|mq| mq.cpu_list.iter().copied().any(is_core_available))
85 .filter_map(|mq| mq.nr_tags)
86 .chain(default_worker_count)
87 .fold(0_usize, usize::saturating_add);
88
89 let topology: Vec<usize> = devices
96 .md
97 .iter()
98 .flat_map(|md| md.mq.iter())
99 .fold(vec![0; cores_max], |mut topology, mq| {
100 mq.cpu_list
101 .iter()
102 .filter(|&&id| id < cores_max)
103 .filter(|&&id| is_core_available(id))
104 .for_each(|&id| {
105 topology[id] = mq.id;
106 });
107
108 topology
109 })
110 .into_iter()
111 .enumerate()
112 .map(|(core_id, queue_id)| {
113 topology_detected
114 .then_some(queue_id)
115 .unwrap_or(core_id)
116 })
117 .collect();
118
119 let max_threads = max_threads()
122 .map(at!(0))
123 .unwrap_or(usize::MAX)
124 .saturating_div(3);
125
126 let max_workers = devices
129 .md
130 .iter()
131 .flat_map(|md| md.mq.iter())
132 .filter_map(|mq| mq.nr_tags)
133 .chain(default_worker_count.into_iter())
134 .fold(0_usize, usize::saturating_add)
135 .min(config.db_pool_max_workers)
136 .clamp(WORKER_LIMIT.0, max_threads)
137 .clamp(WORKER_LIMIT.0, WORKER_LIMIT.1);
138
139 let chan_limit = expected!(max_workers / num_cores)
141 .saturating_sub(8)
142 .saturating_add(1)
143 .next_multiple_of(8);
144
145 let default_workers = default_worker_count
147 .into_iter()
148 .cycle()
149 .enumerate()
150 .map(|(core_id, count)| {
151 is_core_available(core_id)
152 .then_some(count)
153 .unwrap_or(0)
154 .min(chan_limit)
155 });
156
157 let workers: Vec<usize> = devices
163 .md
164 .iter()
165 .inspect(|md| debug!(?md))
166 .flat_map(|md| md.mq.iter())
167 .map(|mq| {
168 let shares = mq
169 .cpu_list
170 .iter()
171 .filter(|&&id| is_core_available(id))
172 .count();
173
174 let conf_limit = config
175 .db_pool_workers_limit
176 .saturating_mul(shares);
177
178 let hard_limit = devices
179 .md
180 .iter()
181 .filter(|_| shares > 0)
182 .fold(0_usize, |acc, mq| {
183 mq.nr_requests
184 .map(|nr| nr.min(conf_limit))
185 .or(Some(conf_limit))
186 .map(|nr| acc.saturating_add(nr))
187 .unwrap_or(acc)
188 });
189
190 let tags = mq
191 .nr_tags
192 .unwrap_or(WORKER_LIMIT.0)
193 .min(hard_limit)
194 .min(chan_limit);
195
196 debug!(?mq, ?shares, ?tags, ?conf_limit, ?hard_limit, ?chan_limit);
197
198 tags
199 })
200 .chain(default_workers)
201 .take(topology.len())
202 .collect();
203
204 let queues: Vec<usize> = workers
207 .iter()
208 .map(|count| {
209 count
210 .saturating_mul(config.db_pool_queue_mult)
211 .min(QUEUE_LIMIT.1)
212 })
213 .collect();
214
215 let total_workers = workers.iter().sum::<usize>();
217
218 let total_capacity = queues.iter().sum::<usize>();
220
221 let num_queues = queues.iter().filter(|&&cap| cap > 0).count();
223
224 if config.stream_width_scale > 0.0 {
227 update_stream_width(server, num_queues, total_workers, total_capacity);
228 }
229
230 if topology_detected {
231 debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology",);
232 info!(
233 device_name = ?device_name.as_deref().unwrap_or("None"),
234 ?num_queues,
235 ?total_workers,
236 ?total_tags,
237 ?total_capacity,
238 stream_width = ?stream::automatic_width(),
239 amplification = ?stream::automatic_amplification(),
240 "Frontend topology",
241 );
242 } else {
243 debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology (defaults)");
244 debug_warn!(
245 device_name = ?device_name.as_deref().unwrap_or("None"),
246 ?total_workers,
247 ?total_capacity,
248 stream_width = ?stream::automatic_width(),
249 amplification = ?stream::automatic_amplification(),
250 "Storage hardware not detected for database directory; assuming defaults.",
251 );
252 }
253
254 assert!(total_workers > 0, "some workers expected");
255 debug_assert!(
256 total_workers <= max_workers || !topology_detected,
257 "spawning too many workers"
258 );
259
260 assert!(!queues.is_empty(), "some queues expected");
261 assert!(!queues.iter().copied().all(is_equal_to!(0)), "positive queue capacity expected");
262
263 (topology, workers, queues)
264}
265
266#[expect(clippy::as_conversions, clippy::cast_precision_loss)]
267fn update_stream_width(
268 server: &Arc<Server>,
269 num_queues: usize,
270 total_workers: usize,
271 _total_capacity: usize,
272) {
273 assert!(num_queues > 0, "Expected at least one queue.");
274 assert!(total_workers > 0, "Expected some workers.");
275
276 let config = &server.config;
277 let scale: f64 = config.stream_width_scale.min(100.0).into();
278 let max_width = expected!(total_workers / num_queues);
279
280 let old_width = stream::automatic_width();
281 let old_scale_width = expected!(old_width * num_queues);
282
283 let new_scale = total_workers as f64 / old_scale_width as f64;
284 let new_scale = new_scale.clamp(1.0, 4.0);
285 let new_scale_width = new_scale * old_width as f64;
286 let new_scale_width = usize_from_f64(new_scale_width)
287 .expect("failed to convert f64 to usize")
288 .next_multiple_of(8);
289
290 let req_width = usize_from_f64(scale * new_scale_width as f64)
291 .expect("failed to convert f64 to usize")
292 .next_multiple_of(4)
293 .min(max_width)
294 .clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1);
295
296 let req_amp = new_scale * config.stream_amplification as f64;
297 let req_amp = usize_from_f64(req_amp * scale)
298 .expect("failed to convert f64 to usize")
299 .next_multiple_of(64)
300 .clamp(AMPLIFICATION_LIMIT.0, AMPLIFICATION_LIMIT.1);
301
302 let (old_width, new_width) = stream::set_width(req_width);
303 let (old_amp, new_amp) = stream::set_amplification(req_amp);
304 debug!(
305 config_scale = ?config.stream_width_scale,
306 ?old_width,
307 ?new_scale,
308 ?new_width,
309 ?old_amp,
310 ?new_amp,
311 "Updated global stream width"
312 );
313}