tuwunel_core/metrics/
mod.rs1pub mod dump;
2
3use std::sync::{
4 Arc,
5 atomic::{AtomicU32, AtomicU64},
6};
7
8use tokio::runtime;
9#[cfg(tokio_unstable)]
10use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};
11use tokio_metrics::{TaskMetrics, TaskMonitor};
12
13pub struct Metrics {
14 _runtime: Option<runtime::Handle>,
15
16 runtime_metrics: Option<runtime::RuntimeMetrics>,
17
18 task_monitor: Option<TaskMonitor>,
19
20 task_intervals: std::sync::Mutex<Option<Box<dyn Iterator<Item = TaskMetrics> + Send>>>,
21
22 #[cfg(tokio_unstable)]
23 _runtime_monitor: Option<RuntimeMonitor>,
24
25 #[cfg(tokio_unstable)]
26 runtime_intervals: std::sync::Mutex<Option<RuntimeIntervals>>,
27
28 pub requests_count: AtomicU64,
30 pub requests_handle_finished: AtomicU64,
31 pub requests_handle_active: AtomicU32,
32 pub requests_panic: AtomicU32,
33}
34
35impl Metrics {
36 #[must_use]
37 pub fn new(runtime: Option<&runtime::Handle>) -> Arc<Self> {
38 #[cfg(tokio_unstable)]
39 let runtime_monitor = runtime.map(RuntimeMonitor::new);
40
41 #[cfg(tokio_unstable)]
42 let runtime_intervals = runtime_monitor
43 .as_ref()
44 .map(RuntimeMonitor::intervals);
45
46 let task_monitor = cfg!(tokio_unstable).then(|| {
47 TaskMonitor::builder()
48 .with_slow_poll_threshold(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD)
49 .with_long_delay_threshold(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD)
50 .clone()
51 .build()
52 });
53
54 let task_intervals = task_monitor.as_ref().map(
55 |task_monitor| -> Box<dyn Iterator<Item = TaskMetrics> + Send> {
56 Box::new(task_monitor.intervals())
57 },
58 );
59
60 Arc::new(Self {
61 _runtime: runtime.cloned(),
62
63 runtime_metrics: runtime.map(runtime::Handle::metrics),
64
65 task_monitor,
66
67 task_intervals: task_intervals.into(),
68
69 #[cfg(tokio_unstable)]
70 _runtime_monitor: runtime_monitor,
71
72 #[cfg(tokio_unstable)]
73 runtime_intervals: std::sync::Mutex::new(runtime_intervals),
74
75 requests_count: AtomicU64::new(0),
76 requests_handle_finished: AtomicU64::new(0),
77 requests_handle_active: AtomicU32::new(0),
78 requests_panic: AtomicU32::new(0),
79 })
80 }
81
82 #[inline]
83 pub async fn instrument<F, Output>(&self, f: F) -> Output
84 where
85 F: Future<Output = Output>,
86 {
87 if let Some(monitor) = self.task_metrics() {
88 monitor.instrument(f).await
89 } else {
90 f.await
91 }
92 }
93
94 pub fn task_interval(&self) -> Option<TaskMetrics> {
95 self.task_intervals
96 .lock()
97 .expect("locked")
98 .as_mut()
99 .and_then(Iterator::next)
100 }
101
102 #[cfg(tokio_unstable)]
103 pub fn runtime_interval(&self) -> Option<tokio_metrics::RuntimeMetrics> {
104 self.runtime_intervals
105 .lock()
106 .expect("locked")
107 .as_mut()
108 .map(Iterator::next)
109 .expect("next interval")
110 }
111
112 #[inline]
113 pub fn num_workers(&self) -> usize {
114 self.runtime_metrics()
115 .map_or(0, runtime::RuntimeMetrics::num_workers)
116 }
117
118 #[inline]
119 pub fn task_metrics(&self) -> Option<&TaskMonitor> { self.task_monitor.as_ref() }
120
121 #[inline]
122 pub fn runtime_metrics(&self) -> Option<&runtime::RuntimeMetrics> {
123 self.runtime_metrics.as_ref()
124 }
125}