Skip to main content

tuwunel_core/metrics/
mod.rs

1use std::sync::{
2	Arc,
3	atomic::{AtomicU32, AtomicU64},
4};
5
6use tokio::runtime;
7#[cfg(tokio_unstable)]
8use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};
9use tokio_metrics::{TaskMetrics, TaskMonitor};
10
11pub struct Metrics {
12	_runtime: Option<runtime::Handle>,
13
14	runtime_metrics: Option<runtime::RuntimeMetrics>,
15
16	task_monitor: Option<TaskMonitor>,
17
18	task_intervals: std::sync::Mutex<Option<Box<dyn Iterator<Item = TaskMetrics> + Send>>>,
19
20	#[cfg(tokio_unstable)]
21	_runtime_monitor: Option<RuntimeMonitor>,
22
23	#[cfg(tokio_unstable)]
24	runtime_intervals: std::sync::Mutex<Option<RuntimeIntervals>>,
25
26	// TODO: move stats
27	pub requests_count: AtomicU64,
28	pub requests_handle_finished: AtomicU64,
29	pub requests_handle_active: AtomicU32,
30	pub requests_panic: AtomicU32,
31}
32
33impl Metrics {
34	#[must_use]
35	pub fn new(runtime: Option<&runtime::Handle>) -> Arc<Self> {
36		#[cfg(tokio_unstable)]
37		let runtime_monitor = runtime.map(RuntimeMonitor::new);
38
39		#[cfg(tokio_unstable)]
40		let runtime_intervals = runtime_monitor
41			.as_ref()
42			.map(RuntimeMonitor::intervals);
43
44		let task_monitor = cfg!(tokio_unstable).then(|| {
45			TaskMonitor::builder()
46				.with_slow_poll_threshold(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD)
47				.with_long_delay_threshold(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD)
48				.clone()
49				.build()
50		});
51
52		let task_intervals = task_monitor.as_ref().map(
53			|task_monitor| -> Box<dyn Iterator<Item = TaskMetrics> + Send> {
54				Box::new(task_monitor.intervals())
55			},
56		);
57
58		Arc::new(Self {
59			_runtime: runtime.cloned(),
60
61			runtime_metrics: runtime.map(runtime::Handle::metrics),
62
63			task_monitor,
64
65			task_intervals: task_intervals.into(),
66
67			#[cfg(tokio_unstable)]
68			_runtime_monitor: runtime_monitor,
69
70			#[cfg(tokio_unstable)]
71			runtime_intervals: std::sync::Mutex::new(runtime_intervals),
72
73			requests_count: AtomicU64::new(0),
74			requests_handle_finished: AtomicU64::new(0),
75			requests_handle_active: AtomicU32::new(0),
76			requests_panic: AtomicU32::new(0),
77		})
78	}
79
80	#[inline]
81	pub async fn instrument<F, Output>(&self, f: F) -> Output
82	where
83		F: Future<Output = Output>,
84	{
85		if let Some(monitor) = self.task_metrics() {
86			monitor.instrument(f).await
87		} else {
88			f.await
89		}
90	}
91
92	pub fn task_interval(&self) -> Option<TaskMetrics> {
93		self.task_intervals
94			.lock()
95			.expect("locked")
96			.as_mut()
97			.and_then(Iterator::next)
98	}
99
100	#[cfg(tokio_unstable)]
101	pub fn runtime_interval(&self) -> Option<tokio_metrics::RuntimeMetrics> {
102		self.runtime_intervals
103			.lock()
104			.expect("locked")
105			.as_mut()
106			.map(Iterator::next)
107			.expect("next interval")
108	}
109
110	#[inline]
111	pub fn num_workers(&self) -> usize {
112		self.runtime_metrics()
113			.map_or(0, runtime::RuntimeMetrics::num_workers)
114	}
115
116	#[inline]
117	pub fn task_metrics(&self) -> Option<&TaskMonitor> { self.task_monitor.as_ref() }
118
119	#[inline]
120	pub fn runtime_metrics(&self) -> Option<&runtime::RuntimeMetrics> {
121		self.runtime_metrics.as_ref()
122	}
123}