Skip to main content

tuwunel_core/metrics/
mod.rs

1pub mod dump;
2
3use std::sync::{
4	Arc,
5	atomic::{AtomicU32, AtomicU64},
6};
7
8use tokio::runtime;
9#[cfg(tokio_unstable)]
10use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};
11use tokio_metrics::{TaskMetrics, TaskMonitor};
12
13pub struct Metrics {
14	_runtime: Option<runtime::Handle>,
15
16	runtime_metrics: Option<runtime::RuntimeMetrics>,
17
18	task_monitor: Option<TaskMonitor>,
19
20	task_intervals: std::sync::Mutex<Option<Box<dyn Iterator<Item = TaskMetrics> + Send>>>,
21
22	#[cfg(tokio_unstable)]
23	_runtime_monitor: Option<RuntimeMonitor>,
24
25	#[cfg(tokio_unstable)]
26	runtime_intervals: std::sync::Mutex<Option<RuntimeIntervals>>,
27
28	// TODO: move stats
29	pub requests_count: AtomicU64,
30	pub requests_handle_finished: AtomicU64,
31	pub requests_handle_active: AtomicU32,
32	pub requests_panic: AtomicU32,
33}
34
35impl Metrics {
36	#[must_use]
37	pub fn new(runtime: Option<&runtime::Handle>) -> Arc<Self> {
38		#[cfg(tokio_unstable)]
39		let runtime_monitor = runtime.map(RuntimeMonitor::new);
40
41		#[cfg(tokio_unstable)]
42		let runtime_intervals = runtime_monitor
43			.as_ref()
44			.map(RuntimeMonitor::intervals);
45
46		let task_monitor = cfg!(tokio_unstable).then(|| {
47			TaskMonitor::builder()
48				.with_slow_poll_threshold(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD)
49				.with_long_delay_threshold(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD)
50				.clone()
51				.build()
52		});
53
54		let task_intervals = task_monitor.as_ref().map(
55			|task_monitor| -> Box<dyn Iterator<Item = TaskMetrics> + Send> {
56				Box::new(task_monitor.intervals())
57			},
58		);
59
60		Arc::new(Self {
61			_runtime: runtime.cloned(),
62
63			runtime_metrics: runtime.map(runtime::Handle::metrics),
64
65			task_monitor,
66
67			task_intervals: task_intervals.into(),
68
69			#[cfg(tokio_unstable)]
70			_runtime_monitor: runtime_monitor,
71
72			#[cfg(tokio_unstable)]
73			runtime_intervals: std::sync::Mutex::new(runtime_intervals),
74
75			requests_count: AtomicU64::new(0),
76			requests_handle_finished: AtomicU64::new(0),
77			requests_handle_active: AtomicU32::new(0),
78			requests_panic: AtomicU32::new(0),
79		})
80	}
81
82	#[inline]
83	pub async fn instrument<F, Output>(&self, f: F) -> Output
84	where
85		F: Future<Output = Output>,
86	{
87		if let Some(monitor) = self.task_metrics() {
88			monitor.instrument(f).await
89		} else {
90			f.await
91		}
92	}
93
94	pub fn task_interval(&self) -> Option<TaskMetrics> {
95		self.task_intervals
96			.lock()
97			.expect("locked")
98			.as_mut()
99			.and_then(Iterator::next)
100	}
101
102	#[cfg(tokio_unstable)]
103	pub fn runtime_interval(&self) -> Option<tokio_metrics::RuntimeMetrics> {
104		self.runtime_intervals
105			.lock()
106			.expect("locked")
107			.as_mut()
108			.map(Iterator::next)
109			.expect("next interval")
110	}
111
112	#[inline]
113	pub fn num_workers(&self) -> usize {
114		self.runtime_metrics()
115			.map_or(0, runtime::RuntimeMetrics::num_workers)
116	}
117
118	#[inline]
119	pub fn task_metrics(&self) -> Option<&TaskMonitor> { self.task_monitor.as_ref() }
120
121	#[inline]
122	pub fn runtime_metrics(&self) -> Option<&runtime::RuntimeMetrics> {
123		self.runtime_metrics.as_ref()
124	}
125}