Skip to main content

tuwunel_router/
run.rs

1use std::{
2	sync::{Arc, Weak, atomic::Ordering},
3	time::Duration,
4};
5
6use futures::{FutureExt, future::join, pin_mut};
7use tuwunel_core::{
8	Error, Result, Server, debug, debug_error, debug_info, error, info, utils::BoolExt,
9};
10use tuwunel_service::Services;
11
12use crate::{handle::ServerHandle, serve};
13
14/// Main loop base
15#[tracing::instrument(skip_all)]
16pub(crate) async fn run(services: Arc<Services>) -> Result {
17	let server = &services.server;
18	debug!("Start");
19
20	// Install the admin room callback here for now
21	tuwunel_admin::init(&services.admin).await;
22
23	// Execute configured startup commands.
24	services.admin.startup_execute().await?;
25
26	// Setup shutdown/signal handling
27	let handle = ServerHandle::new();
28	let sigs = server
29		.runtime()
30		.spawn(signal(server.clone(), handle.clone()));
31	#[cfg(all(feature = "systemd", target_os = "linux"))]
32	let watchdog = server.runtime().spawn(start_systemd_watchdog());
33
34	let non_listener = services
35		.config
36		.listening
37		.is_false()
38		.then_async(|| server.until_shutdown().map(Ok));
39
40	let listener = services.config.listening.then_async(|| {
41		server
42			.runtime()
43			.spawn(serve::serve(services.clone(), handle))
44			.map(|res| res.map_err(Error::from).unwrap_or_else(Err))
45	});
46
47	// Focal point
48	debug!("Running");
49	pin_mut!(listener, non_listener);
50	let res = tokio::select! {
51		res = join(&mut listener, &mut non_listener) => {
52			res.0.unwrap_or(res.1.unwrap_or(Ok(())))
53		},
54		res = services.poll() => {
55			server.until_shutdown().await;
56			handle_services_finish(server, res, listener.await)
57		},
58	};
59
60	// Join watchdog and the signal handler before we leave.
61	#[cfg(all(feature = "systemd", target_os = "linux"))]
62	{
63		watchdog.abort();
64		_ = watchdog.await;
65	};
66
67	sigs.abort();
68	_ = sigs.await;
69
70	// Remove the admin room callback
71	tuwunel_admin::fini(&services.admin).await;
72
73	debug_info!("Finish");
74	res
75}
76
77/// Async initializations
78#[tracing::instrument(skip_all)]
79pub(crate) async fn start(server: Arc<Server>) -> Result<Arc<Services>> {
80	debug!("Starting...");
81
82	let services = Services::build(server).await?.start().await?;
83
84	#[cfg(all(feature = "systemd", target_os = "linux"))]
85	sd_notify::notify(false, &[sd_notify::NotifyState::Ready])
86		.expect("failed to notify systemd of ready state");
87
88	debug!("Started");
89	Ok(services)
90}
91
92/// Async destructions
93#[tracing::instrument(skip_all)]
94pub(crate) async fn stop(services: Arc<Services>) -> Result {
95	debug!("Shutting down...");
96
97	#[cfg(all(feature = "systemd", target_os = "linux"))]
98	sd_notify::notify(true, &[sd_notify::NotifyState::Stopping])
99		.expect("failed to notify systemd of stopping state");
100
101	// Wait for all completions before dropping or we'll lose them to the module
102	// unload and explode.
103	services.stop().await;
104
105	// Check that Services and Database will drop as expected, The complex of Arc's
106	// used for various components can easily lead to references being held
107	// somewhere improperly; this can hang shutdowns.
108	debug!("Cleaning up...");
109	let db = Arc::downgrade(&services.db);
110	if let Err(services) = Arc::try_unwrap(services) {
111		debug_error!(
112			"{} dangling references to Services after shutdown",
113			Arc::strong_count(&services)
114		);
115	}
116
117	if Weak::strong_count(&db) > 0 {
118		debug_error!(
119			"{} dangling references to Database after shutdown",
120			Weak::strong_count(&db)
121		);
122	}
123
124	info!("Shutdown complete.");
125	Ok(())
126}
127
128#[tracing::instrument(skip_all)]
129async fn signal(server: Arc<Server>, handle: ServerHandle) {
130	server.until_shutdown().await;
131	handle_shutdown(&server, &handle);
132}
133
134fn handle_shutdown(server: &Arc<Server>, handle: &ServerHandle) {
135	let timeout = server.config.client_shutdown_timeout;
136	let timeout = Duration::from_secs(timeout);
137	debug!(
138		?timeout,
139		handle_active = ?server.metrics.requests_handle_active.load(Ordering::Relaxed),
140		"Notifying for graceful shutdown"
141	);
142
143	handle.graceful_shutdown(Some(timeout));
144}
145
146fn handle_services_finish(
147	server: &Arc<Server>,
148	result: Result,
149	listener: Option<Result>,
150) -> Result {
151	debug!("Service manager finished: {result:?}");
152
153	if server.is_running()
154		&& let Err(e) = server.shutdown()
155	{
156		error!("Failed to send shutdown signal: {e}");
157	}
158
159	if let Some(Err(e)) = listener {
160		error!("Client listener task finished with error: {e}");
161	}
162
163	result
164}
165
166#[cfg(all(feature = "systemd", target_os = "linux"))]
167async fn start_systemd_watchdog() {
168	use tokio::time::MissedTickBehavior;
169
170	let mut watchdog_usec = 0;
171	if !sd_notify::watchdog_enabled(false, &mut watchdog_usec) || watchdog_usec == 0 {
172		return;
173	}
174
175	let interval_usec = (watchdog_usec / 2).max(1);
176	let interval = Duration::from_micros(interval_usec);
177
178	let mut ticker = tokio::time::interval(interval);
179	ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
180	loop {
181		ticker.tick().await;
182
183		if let Err(e) = sd_notify::notify(false, &[sd_notify::NotifyState::Watchdog]) {
184			error!("failed to notify systemd watchdog state: {e}");
185		}
186	}
187}