Skip to main content

tuwunel_core/alloc/
je.rs

1//! jemalloc allocator
2
3use std::{
4	alloc::Layout,
5	cell::OnceCell,
6	ffi::{CStr, c_char, c_void},
7	fmt::Debug,
8	panic::catch_unwind,
9	process::abort,
10	sync::{
11		Mutex,
12		atomic::{AtomicBool, AtomicU64, Ordering},
13	},
14};
15
16use jevmalloc as jemalloc;
17use jevmalloc::{ctl as mallctl, ffi};
18
19use crate::{
20	Result,
21	arrayvec::ArrayVec,
22	err, is_equal_to, is_nonzero,
23	utils::{BoolExt, math, math::Tried},
24};
25
26#[cfg(feature = "jemalloc_conf")]
27#[used]
28#[unsafe(no_mangle)]
29pub static malloc_conf: &[u8] = const_str::concat_bytes!(
30	"tcache:true",
31	",percpu_arena:percpu",
32	",metadata_thp:always",
33	",background_thread:true",
34	",max_background_threads:-1",
35	",lg_extent_max_active_fit:4",
36	",oversize_threshold:2097152",
37	",tcache_max:524288",
38	",dirty_decay_ms:16000",
39	",muzzy_decay_ms:144000",
40	//MALLOC_CONF_PROF,
41	0
42);
43
44#[cfg(all(
45	feature = "jemalloc_conf",
46	feature = "jemalloc_prof",
47	target_arch = "x86_64",
48))]
49const _MALLOC_CONF_PROF: &str = ",prof_active:false";
50#[cfg(all(
51	feature = "jemalloc_conf",
52	any(not(feature = "jemalloc_prof"), not(target_arch = "x86_64")),
53))]
54const _MALLOC_CONF_PROF: &str = "";
55
56type Name = ArrayVec<u8, NAME_MAX>;
57type Key = ArrayVec<usize, KEY_SEGS>;
58
59const NAME_MAX: usize = 128;
60const KEY_SEGS: usize = 8;
61
62#[global_allocator]
63static JEMALLOC: jemalloc::Jemalloc = jemalloc::Jemalloc;
64static CONTROL: Mutex<()> = Mutex::new(());
65
66static GLOBAL_ALLOCS: AtomicU64 = AtomicU64::new(0);
67static COUNT_GLOBAL_ALLOCS: AtomicBool = AtomicBool::new(false);
68static TRACE_GLOBAL_ALLOCS: AtomicBool = AtomicBool::new(false);
69
70#[crate::ctor(unsafe)]
71fn _static_initialization() {
72	// SAFETY: Mutable static globals in jemalloc crate; must be initialized
73	// properly and uniquely.
74	unsafe {
75		jemalloc::hook::ALLOC = Some(global_alloc_hook);
76		jemalloc::hook::ALLOC_ZEROED = Some(global_alloc_zeroed_hook);
77	};
78}
79
80#[must_use]
81#[cfg(disable)]
82//#[cfg(feature = "jemalloc_stats")]
83pub fn memory_usage() -> Option<String> {
84	use mallctl::stats;
85
86	let mibs = |input: Result<usize, mallctl::Error>| {
87		let input = input.unwrap_or_default();
88		let kibs = input / 1024;
89		let kibs = u32::try_from(kibs).unwrap_or_default();
90		let kibs = f64::from(kibs);
91		kibs / 1024.0
92	};
93
94	// Acquire the epoch; ensure latest stats are pulled in
95	acq_epoch().ok()?;
96
97	let allocated = mibs(stats::allocated::read());
98	let active = mibs(stats::active::read());
99	let mapped = mibs(stats::mapped::read());
100	let metadata = mibs(stats::metadata::read());
101	let resident = mibs(stats::resident::read());
102	let retained = mibs(stats::retained::read());
103	Some(format!(
104		"allocated: {allocated:.2} MiB\nactive: {active:.2} MiB\nmapped: {mapped:.2} \
105		 MiB\nmetadata: {metadata:.2} MiB\nresident: {resident:.2} MiB\nretained: {retained:.2} \
106		 MiB\n"
107	))
108}
109
110#[must_use]
111//#[cfg(not(feature = "jemalloc_stats"))]
112pub fn memory_usage() -> Option<String> { None }
113
114pub fn memory_stats(opts: &str) -> Option<String> {
115	const MAX_LENGTH: usize = 1_048_576;
116
117	let mut str = String::new();
118	let opaque = std::ptr::from_mut(&mut str).cast::<c_void>();
119	let opts_p: *const c_char = std::ffi::CString::new(opts)
120		.expect("cstring")
121		.into_raw()
122		.cast_const();
123
124	// Acquire the epoch; ensure latest stats are pulled in
125	acq_epoch().ok()?;
126
127	// SAFETY: calls malloc_stats_print() with our string instance which must remain
128	// in this frame. https://docs.rs/tikv-jemalloc-sys/latest/tikv_jemalloc_sys/fn.malloc_stats_print.html
129	unsafe { ffi::malloc_stats_print(Some(malloc_stats_cb), opaque, opts_p) };
130
131	str.truncate(MAX_LENGTH);
132
133	Some(str)
134}
135
136unsafe extern "C" fn malloc_stats_cb(opaque: *mut c_void, msg: *const c_char) {
137	catch_unwind(move || handle_malloc_stats(opaque, msg))
138		.map_err(|_| abort())
139		.ok();
140}
141
142fn handle_malloc_stats(opaque: *mut c_void, msg: *const c_char) {
143	// SAFETY: we have to trust the opaque points to our String
144	let res: &mut String = unsafe {
145		opaque
146			.cast::<String>()
147			.as_mut()
148			.expect("failed to cast void* to &mut String")
149	};
150
151	// SAFETY: we have to trust the string is null terminated.
152	let msg = unsafe { CStr::from_ptr(msg) };
153
154	let msg = String::from_utf8_lossy(msg.to_bytes());
155	res.push_str(msg.as_ref());
156}
157
158fn global_alloc_hook(layout: Layout) {
159	catch_unwind(move || handle_global_alloc(layout))
160		.map_err(|_| abort())
161		.ok();
162}
163
164fn global_alloc_zeroed_hook(layout: Layout) {
165	catch_unwind(move || handle_global_alloc(layout))
166		.map_err(|_| abort())
167		.ok();
168}
169
170fn handle_global_alloc(layout: Layout) {
171	use std::io::Write;
172
173	use libc::{STDOUT_FILENO, write};
174
175	let do_count = COUNT_GLOBAL_ALLOCS.load(Ordering::Relaxed);
176	let count = GLOBAL_ALLOCS.fetch_add(do_count.into(), Ordering::Relaxed);
177
178	if TRACE_GLOBAL_ALLOCS.load(Ordering::Relaxed) {
179		let mut buf = ArrayVec::<u8, 128>::new();
180		writeln!(&mut buf, "{count} align={} size={}", layout.align(), layout.size())
181			.expect("writeln! to buffer failed");
182
183		// SAFETY: Valid ptr and len from buf for writing to stdout.
184		unsafe { write(STDOUT_FILENO, buf.as_ptr().cast::<c_void>(), buf.len()) }
185			.ge(&0)
186			.into_result()
187			.expect("write(2) error");
188	}
189}
190
191#[inline]
192#[must_use]
193pub fn global_alloc_count() -> u64 { GLOBAL_ALLOCS.load(Ordering::Relaxed) }
194
195macro_rules! mallctl {
196	($name:expr_2021) => {{
197		thread_local! {
198			static KEY: OnceCell<Key> = OnceCell::default();
199		};
200
201		KEY.with(|once| {
202			once.get_or_init(move || key($name).expect("failed to translate name into mib key"))
203				.clone()
204		})
205	}};
206}
207
208pub mod this_thread {
209	use super::{Debug, Key, OnceCell, Result, is_nonzero, key, math};
210
211	thread_local! {
212		static ALLOCATED_BYTES: OnceCell<&'static u64> = const { OnceCell::new() };
213		static DEALLOCATED_BYTES: OnceCell<&'static u64> = const { OnceCell::new() };
214	}
215
216	pub fn trim() -> Result { decay().and_then(|()| purge()) }
217
218	pub fn purge() -> Result { notify(mallctl!("arena.0.purge")) }
219
220	pub fn decay() -> Result { notify(mallctl!("arena.0.decay")) }
221
222	pub fn idle() -> Result { super::notify(&mallctl!("thread.idle")) }
223
224	pub fn flush() -> Result { super::notify(&mallctl!("thread.tcache.flush")) }
225
226	pub fn set_muzzy_decay(decay_ms: isize) -> Result<isize> {
227		set(mallctl!("arena.0.muzzy_decay_ms"), decay_ms)
228	}
229
230	pub fn get_muzzy_decay() -> Result<isize> { get(mallctl!("arena.0.muzzy_decay_ms")) }
231
232	pub fn set_dirty_decay(decay_ms: isize) -> Result<isize> {
233		set(mallctl!("arena.0.dirty_decay_ms"), decay_ms)
234	}
235
236	pub fn get_dirty_decay() -> Result<isize> { get(mallctl!("arena.0.dirty_decay_ms")) }
237
238	pub fn cache_enable(enable: bool) -> Result<bool> {
239		super::set::<u8>(&mallctl!("thread.tcache.enabled"), enable.into()).map(is_nonzero!())
240	}
241
242	pub fn is_cache_enabled() -> Result<bool> {
243		super::get::<u8>(&mallctl!("thread.tcache.enabled")).map(is_nonzero!())
244	}
245
246	pub fn set_arena(id: usize) -> Result<usize> {
247		super::set::<u32>(&mallctl!("thread.arena"), id.try_into()?).and_then(math::try_into)
248	}
249
250	pub fn arena_id() -> Result<usize> {
251		super::get::<u32>(&mallctl!("thread.arena")).and_then(math::try_into)
252	}
253
254	pub fn prof_enable(enable: bool) -> Result<bool> {
255		super::set::<u8>(&mallctl!("thread.prof.active"), enable.into()).map(is_nonzero!())
256	}
257
258	pub fn is_prof_enabled() -> Result<bool> {
259		super::get::<u8>(&mallctl!("thread.prof.active")).map(is_nonzero!())
260	}
261
262	pub fn reset_peak() -> Result { super::notify(&mallctl!("thread.peak.reset")) }
263
264	pub fn peak() -> Result<u64> { super::get(&mallctl!("thread.peak.read")) }
265
266	#[inline]
267	#[must_use]
268	pub fn allocated() -> u64 {
269		*ALLOCATED_BYTES.with(|once| init_tls_cell(once, "thread.allocatedp"))
270	}
271
272	#[inline]
273	#[must_use]
274	pub fn deallocated() -> u64 {
275		*DEALLOCATED_BYTES.with(|once| init_tls_cell(once, "thread.deallocatedp"))
276	}
277
278	fn notify(key: Key) -> Result { super::notify_by_arena(Some(arena_id()?), key) }
279
280	fn set<T>(key: Key, val: T) -> Result<T>
281	where
282		T: Copy + Debug,
283	{
284		super::set_by_arena(Some(arena_id()?), key, val)
285	}
286
287	fn get<T>(key: Key) -> Result<T>
288	where
289		T: Copy + Debug,
290	{
291		super::get_by_arena(Some(arena_id()?), key)
292	}
293
294	fn init_tls_cell(cell: &OnceCell<&'static u64>, name: &str) -> &'static u64 {
295		cell.get_or_init(|| {
296			let ptr: *const u64 = super::get(&mallctl!(name)).expect("failed to obtain pointer");
297
298			// SAFETY: ptr points directly to the internal state of jemalloc for this thread
299			unsafe { ptr.as_ref() }.expect("pointer must not be null")
300		})
301	}
302}
303
304pub fn stats_reset() -> Result { notify(&mallctl!("stats.mutexes.reset")) }
305
306pub fn prof_reset() -> Result { notify(&mallctl!("prof.reset")) }
307
308pub fn prof_dump() -> Result { notify(&mallctl!("prof.dump")) }
309
310pub fn prof_gdump(enable: bool) -> Result<bool> {
311	set::<u8>(&mallctl!("prof.gdump"), enable.into()).map(is_nonzero!())
312}
313
314pub fn prof_enable(enable: bool) -> Result<bool> {
315	set::<u8>(&mallctl!("prof.active"), enable.into()).map(is_nonzero!())
316}
317
318pub fn is_prof_enabled() -> Result<bool> {
319	get::<u8>(&mallctl!("prof.active")).map(is_nonzero!())
320}
321
322pub fn prof_interval() -> Result<u64> {
323	get::<u64>(&mallctl!("prof.interval")).and_then(math::try_into)
324}
325
326pub fn trim<I: Into<Option<usize>> + Copy>(arena: I) -> Result {
327	decay(arena).and_then(|()| purge(arena))
328}
329
330pub fn purge<I: Into<Option<usize>>>(arena: I) -> Result {
331	notify_by_arena(arena.into(), mallctl!("arena.4096.purge"))
332}
333
334pub fn decay<I: Into<Option<usize>>>(arena: I) -> Result {
335	notify_by_arena(arena.into(), mallctl!("arena.4096.decay"))
336}
337
338pub fn set_muzzy_decay<I: Into<Option<usize>>>(arena: I, decay_ms: isize) -> Result<isize> {
339	match arena.into() {
340		| Some(arena) =>
341			set_by_arena(Some(arena), mallctl!("arena.4096.muzzy_decay_ms"), decay_ms),
342		| _ => set(&mallctl!("arenas.muzzy_decay_ms"), decay_ms),
343	}
344}
345
346pub fn set_dirty_decay<I: Into<Option<usize>>>(arena: I, decay_ms: isize) -> Result<isize> {
347	match arena.into() {
348		| Some(arena) =>
349			set_by_arena(Some(arena), mallctl!("arena.4096.dirty_decay_ms"), decay_ms),
350		| _ => set(&mallctl!("arenas.dirty_decay_ms"), decay_ms),
351	}
352}
353
354pub fn background_thread_enable(enable: bool) -> Result<bool> {
355	set::<u8>(&mallctl!("background_thread"), enable.into()).map(is_nonzero!())
356}
357
358#[inline]
359#[must_use]
360pub fn is_affine_arena() -> bool { is_percpu_arena() || is_phycpu_arena() }
361
362#[inline]
363#[must_use]
364pub fn is_percpu_arena() -> bool { percpu_arenas().is_ok_and(is_equal_to!("percpu")) }
365
366#[inline]
367#[must_use]
368pub fn is_phycpu_arena() -> bool { percpu_arenas().is_ok_and(is_equal_to!("phycpu")) }
369
370pub fn percpu_arenas() -> Result<&'static str> {
371	let ptr = get::<*const c_char>(&mallctl!("opt.percpu_arena"))?;
372	//SAFETY: ptr points to a null-terminated string returned for opt.percpu_arena.
373	let cstr = unsafe { CStr::from_ptr(ptr) };
374	cstr.to_str().map_err(Into::into)
375}
376
377pub fn arenas() -> Result<usize> {
378	get::<u32>(&mallctl!("arenas.narenas")).and_then(math::try_into)
379}
380
381pub fn inc_epoch() -> Result<u64> { xchg(&mallctl!("epoch"), 1_u64) }
382
383pub fn acq_epoch() -> Result<u64> { xchg(&mallctl!("epoch"), 0_u64) }
384
385fn notify_by_arena(id: Option<usize>, mut key: Key) -> Result {
386	key[1] = id.unwrap_or(4096);
387	notify(&key)
388}
389
390fn set_by_arena<T>(id: Option<usize>, mut key: Key, val: T) -> Result<T>
391where
392	T: Copy + Debug,
393{
394	key[1] = id.unwrap_or(4096);
395	set(&key, val)
396}
397
398fn get_by_arena<T>(id: Option<usize>, mut key: Key) -> Result<T>
399where
400	T: Copy + Debug,
401{
402	key[1] = id.unwrap_or(4096);
403	get(&key)
404}
405
406fn notify(key: &Key) -> Result { xchg(key, ()) }
407
408fn set<T>(key: &Key, val: T) -> Result<T>
409where
410	T: Copy + Debug,
411{
412	let _lock = CONTROL.lock()?;
413	let res = xchg(key, val)?;
414	inc_epoch()?;
415
416	Ok(res)
417}
418
419#[tracing::instrument(
420	name = "get",
421	level = "trace"
422	skip_all,
423	fields(?key)
424)]
425fn get<T>(key: &Key) -> Result<T>
426where
427	T: Copy + Debug,
428{
429	acq_epoch()?;
430
431	// SAFETY: T must be perfectly valid to receive value.
432	unsafe { mallctl::raw::read_mib(key.as_slice()) }.map_err(map_err)
433}
434
435#[tracing::instrument(
436	name = "xchg",
437	level = "trace"
438	skip_all,
439	fields(?key, ?val)
440)]
441fn xchg<T>(key: &Key, val: T) -> Result<T>
442where
443	T: Copy + Debug,
444{
445	// SAFETY: T must be the exact expected type.
446	unsafe { mallctl::raw::update_mib(key.as_slice(), val) }.map_err(map_err)
447}
448
449fn key(name: &str) -> Result<Key> {
450	// tikv asserts the output buffer length is tight to the number of required mibs
451	// so we slice that down here.
452	let segs = name
453		.chars()
454		.filter(is_equal_to!(&'.'))
455		.count()
456		.try_add(1)?;
457
458	let name = self::name(name)?;
459	let mut buf = [0_usize; KEY_SEGS];
460	mallctl::raw::name_to_mib(name.as_slice(), &mut buf[0..segs])
461		.map_err(map_err)
462		.map(move |()| buf.into_iter().take(segs).collect())
463}
464
465fn name(name: &str) -> Result<Name> {
466	let mut buf = Name::new();
467	buf.try_extend_from_slice(name.as_bytes())?;
468	buf.try_extend_from_slice(b"\0")?;
469
470	Ok(buf)
471}
472
473fn map_err(error: jemalloc::ctl::Error) -> crate::Error { err!("mallctl: {}", error.to_string()) }