Skip to main content

tuwunel_core/alloc/
je.rs

1//! jemalloc allocator
2
3use std::{
4	alloc::Layout,
5	cell::OnceCell,
6	ffi::{CStr, c_char, c_void},
7	fmt::Debug,
8	panic::catch_unwind,
9	process::abort,
10	sync::{
11		Mutex,
12		atomic::{AtomicBool, AtomicU64, Ordering},
13	},
14};
15
16use jevmalloc as jemalloc;
17use jevmalloc::{ctl as mallctl, ffi};
18
19use crate::{
20	Result,
21	arrayvec::ArrayVec,
22	err, is_equal_to, is_nonzero,
23	utils::{BoolExt, math, math::Tried},
24};
25
26#[cfg(feature = "jemalloc_conf")]
27#[unsafe(no_mangle)]
28pub static malloc_conf: &[u8] = const_str::concat_bytes!(
29	"tcache:true",
30	",percpu_arena:percpu",
31	",metadata_thp:always",
32	",background_thread:true",
33	",max_background_threads:-1",
34	",lg_extent_max_active_fit:4",
35	",oversize_threshold:2097152",
36	",tcache_max:524288",
37	",dirty_decay_ms:16000",
38	",muzzy_decay_ms:144000",
39	//MALLOC_CONF_PROF,
40	0
41);
42
43#[cfg(all(
44	feature = "jemalloc_conf",
45	feature = "jemalloc_prof",
46	target_arch = "x86_64",
47))]
48const _MALLOC_CONF_PROF: &str = ",prof_active:false";
49#[cfg(all(
50	feature = "jemalloc_conf",
51	any(not(feature = "jemalloc_prof"), not(target_arch = "x86_64")),
52))]
53const _MALLOC_CONF_PROF: &str = "";
54
55type Name = ArrayVec<u8, NAME_MAX>;
56type Key = ArrayVec<usize, KEY_SEGS>;
57
58const NAME_MAX: usize = 128;
59const KEY_SEGS: usize = 8;
60
61#[global_allocator]
62static JEMALLOC: jemalloc::Jemalloc = jemalloc::Jemalloc;
63static CONTROL: Mutex<()> = Mutex::new(());
64
65static GLOBAL_ALLOCS: AtomicU64 = AtomicU64::new(0);
66static COUNT_GLOBAL_ALLOCS: AtomicBool = AtomicBool::new(false);
67static TRACE_GLOBAL_ALLOCS: AtomicBool = AtomicBool::new(false);
68
69#[crate::ctor]
70fn _static_initialization() {
71	// SAFETY: Mutable static globals in jemalloc crate; must be initialized
72	// properly and uniquely.
73	unsafe {
74		jemalloc::hook::ALLOC = Some(global_alloc_hook);
75		jemalloc::hook::ALLOC_ZEROED = Some(global_alloc_zeroed_hook);
76	};
77}
78
79#[must_use]
80#[cfg(disable)]
81//#[cfg(feature = "jemalloc_stats")]
82pub fn memory_usage() -> Option<String> {
83	use mallctl::stats;
84
85	let mibs = |input: Result<usize, mallctl::Error>| {
86		let input = input.unwrap_or_default();
87		let kibs = input / 1024;
88		let kibs = u32::try_from(kibs).unwrap_or_default();
89		let kibs = f64::from(kibs);
90		kibs / 1024.0
91	};
92
93	// Acquire the epoch; ensure latest stats are pulled in
94	acq_epoch().ok()?;
95
96	let allocated = mibs(stats::allocated::read());
97	let active = mibs(stats::active::read());
98	let mapped = mibs(stats::mapped::read());
99	let metadata = mibs(stats::metadata::read());
100	let resident = mibs(stats::resident::read());
101	let retained = mibs(stats::retained::read());
102	Some(format!(
103		"allocated: {allocated:.2} MiB\nactive: {active:.2} MiB\nmapped: {mapped:.2} \
104		 MiB\nmetadata: {metadata:.2} MiB\nresident: {resident:.2} MiB\nretained: {retained:.2} \
105		 MiB\n"
106	))
107}
108
109#[must_use]
110//#[cfg(not(feature = "jemalloc_stats"))]
111pub fn memory_usage() -> Option<String> { None }
112
113pub fn memory_stats(opts: &str) -> Option<String> {
114	const MAX_LENGTH: usize = 1_048_576;
115
116	let mut str = String::new();
117	let opaque = std::ptr::from_mut(&mut str).cast::<c_void>();
118	let opts_p: *const c_char = std::ffi::CString::new(opts)
119		.expect("cstring")
120		.into_raw()
121		.cast_const();
122
123	// Acquire the epoch; ensure latest stats are pulled in
124	acq_epoch().ok()?;
125
126	// SAFETY: calls malloc_stats_print() with our string instance which must remain
127	// in this frame. https://docs.rs/tikv-jemalloc-sys/latest/tikv_jemalloc_sys/fn.malloc_stats_print.html
128	unsafe { ffi::malloc_stats_print(Some(malloc_stats_cb), opaque, opts_p) };
129
130	str.truncate(MAX_LENGTH);
131
132	Some(str)
133}
134
135unsafe extern "C" fn malloc_stats_cb(opaque: *mut c_void, msg: *const c_char) {
136	catch_unwind(move || handle_malloc_stats(opaque, msg))
137		.map_err(|_| abort())
138		.ok();
139}
140
141fn handle_malloc_stats(opaque: *mut c_void, msg: *const c_char) {
142	// SAFETY: we have to trust the opaque points to our String
143	let res: &mut String = unsafe {
144		opaque
145			.cast::<String>()
146			.as_mut()
147			.expect("failed to cast void* to &mut String")
148	};
149
150	// SAFETY: we have to trust the string is null terminated.
151	let msg = unsafe { CStr::from_ptr(msg) };
152
153	let msg = String::from_utf8_lossy(msg.to_bytes());
154	res.push_str(msg.as_ref());
155}
156
157fn global_alloc_hook(layout: Layout) {
158	catch_unwind(move || handle_global_alloc(layout))
159		.map_err(|_| abort())
160		.ok();
161}
162
163fn global_alloc_zeroed_hook(layout: Layout) {
164	catch_unwind(move || handle_global_alloc(layout))
165		.map_err(|_| abort())
166		.ok();
167}
168
169fn handle_global_alloc(layout: Layout) {
170	use std::io::Write;
171
172	use libc::{STDOUT_FILENO, write};
173
174	let do_count = COUNT_GLOBAL_ALLOCS.load(Ordering::Relaxed);
175	let count = GLOBAL_ALLOCS.fetch_add(do_count.into(), Ordering::Relaxed);
176
177	if TRACE_GLOBAL_ALLOCS.load(Ordering::Relaxed) {
178		let mut buf = ArrayVec::<u8, 128>::new();
179		writeln!(&mut buf, "{count} align={} size={}", layout.align(), layout.size())
180			.expect("writeln! to buffer failed");
181
182		// SAFETY: Valid ptr and len from buf for writing to stdout.
183		unsafe { write(STDOUT_FILENO, buf.as_ptr().cast::<c_void>(), buf.len()) }
184			.ge(&0)
185			.into_result()
186			.expect("write(2) error");
187	}
188}
189
190#[inline]
191#[must_use]
192pub fn global_alloc_count() -> u64 { GLOBAL_ALLOCS.load(Ordering::Relaxed) }
193
194macro_rules! mallctl {
195	($name:expr_2021) => {{
196		thread_local! {
197			static KEY: OnceCell<Key> = OnceCell::default();
198		};
199
200		KEY.with(|once| {
201			once.get_or_init(move || key($name).expect("failed to translate name into mib key"))
202				.clone()
203		})
204	}};
205}
206
207pub mod this_thread {
208	use super::{Debug, Key, OnceCell, Result, is_nonzero, key, math};
209
210	thread_local! {
211		static ALLOCATED_BYTES: OnceCell<&'static u64> = const { OnceCell::new() };
212		static DEALLOCATED_BYTES: OnceCell<&'static u64> = const { OnceCell::new() };
213	}
214
215	pub fn trim() -> Result { decay().and_then(|()| purge()) }
216
217	pub fn purge() -> Result { notify(mallctl!("arena.0.purge")) }
218
219	pub fn decay() -> Result { notify(mallctl!("arena.0.decay")) }
220
221	pub fn idle() -> Result { super::notify(&mallctl!("thread.idle")) }
222
223	pub fn flush() -> Result { super::notify(&mallctl!("thread.tcache.flush")) }
224
225	pub fn set_muzzy_decay(decay_ms: isize) -> Result<isize> {
226		set(mallctl!("arena.0.muzzy_decay_ms"), decay_ms)
227	}
228
229	pub fn get_muzzy_decay() -> Result<isize> { get(mallctl!("arena.0.muzzy_decay_ms")) }
230
231	pub fn set_dirty_decay(decay_ms: isize) -> Result<isize> {
232		set(mallctl!("arena.0.dirty_decay_ms"), decay_ms)
233	}
234
235	pub fn get_dirty_decay() -> Result<isize> { get(mallctl!("arena.0.dirty_decay_ms")) }
236
237	pub fn cache_enable(enable: bool) -> Result<bool> {
238		super::set::<u8>(&mallctl!("thread.tcache.enabled"), enable.into()).map(is_nonzero!())
239	}
240
241	pub fn is_cache_enabled() -> Result<bool> {
242		super::get::<u8>(&mallctl!("thread.tcache.enabled")).map(is_nonzero!())
243	}
244
245	pub fn set_arena(id: usize) -> Result<usize> {
246		super::set::<u32>(&mallctl!("thread.arena"), id.try_into()?).and_then(math::try_into)
247	}
248
249	pub fn arena_id() -> Result<usize> {
250		super::get::<u32>(&mallctl!("thread.arena")).and_then(math::try_into)
251	}
252
253	pub fn prof_enable(enable: bool) -> Result<bool> {
254		super::set::<u8>(&mallctl!("thread.prof.active"), enable.into()).map(is_nonzero!())
255	}
256
257	pub fn is_prof_enabled() -> Result<bool> {
258		super::get::<u8>(&mallctl!("thread.prof.active")).map(is_nonzero!())
259	}
260
261	pub fn reset_peak() -> Result { super::notify(&mallctl!("thread.peak.reset")) }
262
263	pub fn peak() -> Result<u64> { super::get(&mallctl!("thread.peak.read")) }
264
265	#[inline]
266	#[must_use]
267	pub fn allocated() -> u64 {
268		*ALLOCATED_BYTES.with(|once| init_tls_cell(once, "thread.allocatedp"))
269	}
270
271	#[inline]
272	#[must_use]
273	pub fn deallocated() -> u64 {
274		*DEALLOCATED_BYTES.with(|once| init_tls_cell(once, "thread.deallocatedp"))
275	}
276
277	fn notify(key: Key) -> Result { super::notify_by_arena(Some(arena_id()?), key) }
278
279	fn set<T>(key: Key, val: T) -> Result<T>
280	where
281		T: Copy + Debug,
282	{
283		super::set_by_arena(Some(arena_id()?), key, val)
284	}
285
286	fn get<T>(key: Key) -> Result<T>
287	where
288		T: Copy + Debug,
289	{
290		super::get_by_arena(Some(arena_id()?), key)
291	}
292
293	fn init_tls_cell(cell: &OnceCell<&'static u64>, name: &str) -> &'static u64 {
294		cell.get_or_init(|| {
295			let ptr: *const u64 = super::get(&mallctl!(name)).expect("failed to obtain pointer");
296
297			// SAFETY: ptr points directly to the internal state of jemalloc for this thread
298			unsafe { ptr.as_ref() }.expect("pointer must not be null")
299		})
300	}
301}
302
303pub fn stats_reset() -> Result { notify(&mallctl!("stats.mutexes.reset")) }
304
305pub fn prof_reset() -> Result { notify(&mallctl!("prof.reset")) }
306
307pub fn prof_dump() -> Result { notify(&mallctl!("prof.dump")) }
308
309pub fn prof_gdump(enable: bool) -> Result<bool> {
310	set::<u8>(&mallctl!("prof.gdump"), enable.into()).map(is_nonzero!())
311}
312
313pub fn prof_enable(enable: bool) -> Result<bool> {
314	set::<u8>(&mallctl!("prof.active"), enable.into()).map(is_nonzero!())
315}
316
317pub fn is_prof_enabled() -> Result<bool> {
318	get::<u8>(&mallctl!("prof.active")).map(is_nonzero!())
319}
320
321pub fn prof_interval() -> Result<u64> {
322	get::<u64>(&mallctl!("prof.interval")).and_then(math::try_into)
323}
324
325pub fn trim<I: Into<Option<usize>> + Copy>(arena: I) -> Result {
326	decay(arena).and_then(|()| purge(arena))
327}
328
329pub fn purge<I: Into<Option<usize>>>(arena: I) -> Result {
330	notify_by_arena(arena.into(), mallctl!("arena.4096.purge"))
331}
332
333pub fn decay<I: Into<Option<usize>>>(arena: I) -> Result {
334	notify_by_arena(arena.into(), mallctl!("arena.4096.decay"))
335}
336
337pub fn set_muzzy_decay<I: Into<Option<usize>>>(arena: I, decay_ms: isize) -> Result<isize> {
338	match arena.into() {
339		| Some(arena) =>
340			set_by_arena(Some(arena), mallctl!("arena.4096.muzzy_decay_ms"), decay_ms),
341		| _ => set(&mallctl!("arenas.muzzy_decay_ms"), decay_ms),
342	}
343}
344
345pub fn set_dirty_decay<I: Into<Option<usize>>>(arena: I, decay_ms: isize) -> Result<isize> {
346	match arena.into() {
347		| Some(arena) =>
348			set_by_arena(Some(arena), mallctl!("arena.4096.dirty_decay_ms"), decay_ms),
349		| _ => set(&mallctl!("arenas.dirty_decay_ms"), decay_ms),
350	}
351}
352
353pub fn background_thread_enable(enable: bool) -> Result<bool> {
354	set::<u8>(&mallctl!("background_thread"), enable.into()).map(is_nonzero!())
355}
356
357#[inline]
358#[must_use]
359pub fn is_affine_arena() -> bool { is_percpu_arena() || is_phycpu_arena() }
360
361#[inline]
362#[must_use]
363pub fn is_percpu_arena() -> bool { percpu_arenas().is_ok_and(is_equal_to!("percpu")) }
364
365#[inline]
366#[must_use]
367pub fn is_phycpu_arena() -> bool { percpu_arenas().is_ok_and(is_equal_to!("phycpu")) }
368
369pub fn percpu_arenas() -> Result<&'static str> {
370	let ptr = get::<*const c_char>(&mallctl!("opt.percpu_arena"))?;
371	//SAFETY: ptr points to a null-terminated string returned for opt.percpu_arena.
372	let cstr = unsafe { CStr::from_ptr(ptr) };
373	cstr.to_str().map_err(Into::into)
374}
375
376pub fn arenas() -> Result<usize> {
377	get::<u32>(&mallctl!("arenas.narenas")).and_then(math::try_into)
378}
379
380pub fn inc_epoch() -> Result<u64> { xchg(&mallctl!("epoch"), 1_u64) }
381
382pub fn acq_epoch() -> Result<u64> { xchg(&mallctl!("epoch"), 0_u64) }
383
384fn notify_by_arena(id: Option<usize>, mut key: Key) -> Result {
385	key[1] = id.unwrap_or(4096);
386	notify(&key)
387}
388
389fn set_by_arena<T>(id: Option<usize>, mut key: Key, val: T) -> Result<T>
390where
391	T: Copy + Debug,
392{
393	key[1] = id.unwrap_or(4096);
394	set(&key, val)
395}
396
397fn get_by_arena<T>(id: Option<usize>, mut key: Key) -> Result<T>
398where
399	T: Copy + Debug,
400{
401	key[1] = id.unwrap_or(4096);
402	get(&key)
403}
404
405fn notify(key: &Key) -> Result { xchg(key, ()) }
406
407fn set<T>(key: &Key, val: T) -> Result<T>
408where
409	T: Copy + Debug,
410{
411	let _lock = CONTROL.lock()?;
412	let res = xchg(key, val)?;
413	inc_epoch()?;
414
415	Ok(res)
416}
417
418#[tracing::instrument(
419	name = "get",
420	level = "trace"
421	skip_all,
422	fields(?key)
423)]
424fn get<T>(key: &Key) -> Result<T>
425where
426	T: Copy + Debug,
427{
428	acq_epoch()?;
429
430	// SAFETY: T must be perfectly valid to receive value.
431	unsafe { mallctl::raw::read_mib(key.as_slice()) }.map_err(map_err)
432}
433
434#[tracing::instrument(
435	name = "xchg",
436	level = "trace"
437	skip_all,
438	fields(?key, ?val)
439)]
440fn xchg<T>(key: &Key, val: T) -> Result<T>
441where
442	T: Copy + Debug,
443{
444	// SAFETY: T must be the exact expected type.
445	unsafe { mallctl::raw::update_mib(key.as_slice(), val) }.map_err(map_err)
446}
447
448fn key(name: &str) -> Result<Key> {
449	// tikv asserts the output buffer length is tight to the number of required mibs
450	// so we slice that down here.
451	let segs = name
452		.chars()
453		.filter(is_equal_to!(&'.'))
454		.count()
455		.try_add(1)?;
456
457	let name = self::name(name)?;
458	let mut buf = [0_usize; KEY_SEGS];
459	mallctl::raw::name_to_mib(name.as_slice(), &mut buf[0..segs])
460		.map_err(map_err)
461		.map(move |()| buf.into_iter().take(segs).collect())
462}
463
464fn name(name: &str) -> Result<Name> {
465	let mut buf = Name::new();
466	buf.try_extend_from_slice(name.as_bytes())?;
467	buf.try_extend_from_slice(b"\0")?;
468
469	Ok(buf)
470}
471
472fn map_err(error: jemalloc::ctl::Error) -> crate::Error { err!("mallctl: {}", error.to_string()) }