tuwunel_core/metrics/
mod.rs1use std::sync::{
2 Arc,
3 atomic::{AtomicU32, AtomicU64},
4};
5
6use tokio::runtime;
7#[cfg(tokio_unstable)]
8use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};
9use tokio_metrics::{TaskMetrics, TaskMonitor};
10
11pub struct Metrics {
12 _runtime: Option<runtime::Handle>,
13
14 runtime_metrics: Option<runtime::RuntimeMetrics>,
15
16 task_monitor: Option<TaskMonitor>,
17
18 task_intervals: std::sync::Mutex<Option<Box<dyn Iterator<Item = TaskMetrics> + Send>>>,
19
20 #[cfg(tokio_unstable)]
21 _runtime_monitor: Option<RuntimeMonitor>,
22
23 #[cfg(tokio_unstable)]
24 runtime_intervals: std::sync::Mutex<Option<RuntimeIntervals>>,
25
26 pub requests_count: AtomicU64,
28 pub requests_handle_finished: AtomicU64,
29 pub requests_handle_active: AtomicU32,
30 pub requests_panic: AtomicU32,
31}
32
33impl Metrics {
34 #[must_use]
35 pub fn new(runtime: Option<&runtime::Handle>) -> Arc<Self> {
36 #[cfg(tokio_unstable)]
37 let runtime_monitor = runtime.map(RuntimeMonitor::new);
38
39 #[cfg(tokio_unstable)]
40 let runtime_intervals = runtime_monitor
41 .as_ref()
42 .map(RuntimeMonitor::intervals);
43
44 let task_monitor = cfg!(tokio_unstable).then(|| {
45 TaskMonitor::builder()
46 .with_slow_poll_threshold(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD)
47 .with_long_delay_threshold(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD)
48 .clone()
49 .build()
50 });
51
52 let task_intervals = task_monitor.as_ref().map(
53 |task_monitor| -> Box<dyn Iterator<Item = TaskMetrics> + Send> {
54 Box::new(task_monitor.intervals())
55 },
56 );
57
58 Arc::new(Self {
59 _runtime: runtime.cloned(),
60
61 runtime_metrics: runtime.map(runtime::Handle::metrics),
62
63 task_monitor,
64
65 task_intervals: task_intervals.into(),
66
67 #[cfg(tokio_unstable)]
68 _runtime_monitor: runtime_monitor,
69
70 #[cfg(tokio_unstable)]
71 runtime_intervals: std::sync::Mutex::new(runtime_intervals),
72
73 requests_count: AtomicU64::new(0),
74 requests_handle_finished: AtomicU64::new(0),
75 requests_handle_active: AtomicU32::new(0),
76 requests_panic: AtomicU32::new(0),
77 })
78 }
79
80 #[inline]
81 pub async fn instrument<F, Output>(&self, f: F) -> Output
82 where
83 F: Future<Output = Output>,
84 {
85 if let Some(monitor) = self.task_metrics() {
86 monitor.instrument(f).await
87 } else {
88 f.await
89 }
90 }
91
92 pub fn task_interval(&self) -> Option<TaskMetrics> {
93 self.task_intervals
94 .lock()
95 .expect("locked")
96 .as_mut()
97 .and_then(Iterator::next)
98 }
99
100 #[cfg(tokio_unstable)]
101 pub fn runtime_interval(&self) -> Option<tokio_metrics::RuntimeMetrics> {
102 self.runtime_intervals
103 .lock()
104 .expect("locked")
105 .as_mut()
106 .map(Iterator::next)
107 .expect("next interval")
108 }
109
110 #[inline]
111 pub fn num_workers(&self) -> usize {
112 self.runtime_metrics()
113 .map_or(0, runtime::RuntimeMetrics::num_workers)
114 }
115
116 #[inline]
117 pub fn task_metrics(&self) -> Option<&TaskMonitor> { self.task_monitor.as_ref() }
118
119 #[inline]
120 pub fn runtime_metrics(&self) -> Option<&runtime::RuntimeMetrics> {
121 self.runtime_metrics.as_ref()
122 }
123}