Skip to main content

tuwunel_router/
run.rs

1use std::{
2	sync::{Arc, Weak, atomic::Ordering},
3	time::Duration,
4};
5
6use futures::{FutureExt, future::join, pin_mut};
7use tuwunel_core::{
8	Error, Result, Server, debug, debug_error, debug_info, error, info, utils::BoolExt,
9};
10use tuwunel_service::Services;
11
12use crate::{handle::ServerHandle, serve};
13
14/// Main loop base
15#[tracing::instrument(skip_all)]
16pub(crate) async fn run(services: Arc<Services>) -> Result {
17	let server = &services.server;
18	debug!("Start");
19
20	// Install the admin command root here for now
21	tuwunel_admin::init(&services.admin);
22
23	// Execute configured startup commands.
24	services.admin.startup_execute().await?;
25
26	// Setup shutdown/signal handling
27	let handle = ServerHandle::new();
28	let sigs = server
29		.runtime()
30		.spawn(signal(server.clone(), handle.clone()));
31	#[cfg(all(feature = "systemd", target_os = "linux"))]
32	let watchdog = server.runtime().spawn(start_systemd_watchdog());
33
34	let non_listener = services
35		.config
36		.listening
37		.is_false()
38		.then_async(|| server.until_shutdown().map(Ok));
39
40	let listener = services.config.listening.then_async(|| {
41		server
42			.runtime()
43			.spawn(serve::serve(services.clone(), handle))
44			.map(|res| res.map_err(Error::from).unwrap_or_else(Err))
45	});
46
47	// Focal point
48	debug!("Running");
49	pin_mut!(listener, non_listener);
50	let res = tokio::select! {
51		res = join(&mut listener, &mut non_listener) => {
52			res.0.unwrap_or(res.1.unwrap_or(Ok(())))
53		},
54		res = services.poll() => {
55			server.until_shutdown().await;
56			handle_services_finish(server, res, listener.await)
57		},
58	};
59
60	// Join watchdog and the signal handler before we leave.
61	#[cfg(all(feature = "systemd", target_os = "linux"))]
62	{
63		watchdog.abort();
64		_ = watchdog.await;
65	};
66
67	sigs.abort();
68	_ = sigs.await;
69
70	// Remove the admin command root
71	tuwunel_admin::fini(&services.admin);
72
73	debug_info!("Finish");
74	res
75}
76
77/// Async initializations
78#[tracing::instrument(skip_all)]
79pub(crate) async fn start(server: Arc<Server>) -> Result<Arc<Services>> {
80	debug!("Starting...");
81
82	let services = Services::build(server).await?.start().await?;
83
84	#[cfg(all(feature = "systemd", target_os = "linux"))]
85	sd_notify::notify(&[sd_notify::NotifyState::Ready])
86		.expect("failed to notify systemd of ready state");
87
88	debug!("Started");
89	Ok(services)
90}
91
92/// Async destructions
93#[tracing::instrument(skip_all)]
94pub(crate) async fn stop(services: Arc<Services>) -> Result {
95	debug!("Shutting down...");
96
97	#[cfg(all(feature = "systemd", target_os = "linux"))]
98	// SAFETY: clears NOTIFY_SOCKET from the process environment. Safe because no
99	// other thread reads or writes that variable; this matches the previous
100	// `notify(unset_env=true, ...)` semantics from sd-notify 0.4.
101	unsafe { sd_notify::notify_and_unset_env(&[sd_notify::NotifyState::Stopping]) }
102		.expect("failed to notify systemd of stopping state");
103
104	// Wait for all completions before dropping or we'll lose them to the module
105	// unload and explode.
106	services.stop().await;
107
108	// Check that Services and Database will drop as expected, The complex of Arc's
109	// used for various components can easily lead to references being held
110	// somewhere improperly; this can hang shutdowns.
111	debug!("Cleaning up...");
112	let db = Arc::downgrade(&services.db);
113	if let Err(services) = Arc::try_unwrap(services) {
114		debug_error!(
115			"{} dangling references to Services after shutdown",
116			Arc::strong_count(&services)
117		);
118	}
119
120	if Weak::strong_count(&db) > 0 {
121		debug_error!(
122			"{} dangling references to Database after shutdown",
123			Weak::strong_count(&db)
124		);
125	}
126
127	info!("Shutdown complete.");
128	Ok(())
129}
130
131#[tracing::instrument(skip_all)]
132async fn signal(server: Arc<Server>, handle: ServerHandle) {
133	server.until_shutdown().await;
134	handle_shutdown(&server, &handle);
135}
136
137fn handle_shutdown(server: &Arc<Server>, handle: &ServerHandle) {
138	let timeout = server.config.client_shutdown_timeout;
139	let timeout = Duration::from_secs(timeout);
140	debug!(
141		?timeout,
142		handle_active = ?server.metrics.requests_handle_active.load(Ordering::Relaxed),
143		"Notifying for graceful shutdown"
144	);
145
146	handle.graceful_shutdown(Some(timeout));
147}
148
149fn handle_services_finish(
150	server: &Arc<Server>,
151	result: Result,
152	listener: Option<Result>,
153) -> Result {
154	debug!("Service manager finished: {result:?}");
155
156	if server.is_running()
157		&& let Err(e) = server.shutdown()
158	{
159		error!("Failed to send shutdown signal: {e}");
160	}
161
162	if let Some(Err(e)) = listener {
163		error!("Client listener task finished with error: {e}");
164	}
165
166	result
167}
168
169#[cfg(all(feature = "systemd", target_os = "linux"))]
170async fn start_systemd_watchdog() {
171	use tokio::time::MissedTickBehavior;
172
173	let Some(watchdog) = sd_notify::watchdog_enabled() else {
174		return;
175	};
176
177	let watchdog_usec = u64::try_from(watchdog.as_micros()).unwrap_or(u64::MAX);
178	let interval_usec = (watchdog_usec / 2).max(1);
179	let interval = Duration::from_micros(interval_usec);
180
181	let mut ticker = tokio::time::interval(interval);
182	ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
183	loop {
184		ticker.tick().await;
185
186		if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Watchdog]) {
187			error!("failed to notify systemd watchdog state: {e}");
188		}
189	}
190}