1use std::{
2 sync::{Arc, Weak, atomic::Ordering},
3 time::Duration,
4};
5
6use futures::{FutureExt, future::join, pin_mut};
7use tuwunel_core::{
8 Error, Result, Server, debug, debug_error, debug_info, error, info, utils::BoolExt,
9};
10use tuwunel_service::Services;
11
12use crate::{handle::ServerHandle, serve};
13
14#[tracing::instrument(skip_all)]
16pub(crate) async fn run(services: Arc<Services>) -> Result {
17 let server = &services.server;
18 debug!("Start");
19
20 tuwunel_admin::init(&services.admin);
22
23 services.admin.startup_execute().await?;
25
26 let handle = ServerHandle::new();
28 let sigs = server
29 .runtime()
30 .spawn(signal(server.clone(), handle.clone()));
31 #[cfg(all(feature = "systemd", target_os = "linux"))]
32 let watchdog = server.runtime().spawn(start_systemd_watchdog());
33
34 let non_listener = services
35 .config
36 .listening
37 .is_false()
38 .then_async(|| server.until_shutdown().map(Ok));
39
40 let listener = services.config.listening.then_async(|| {
41 server
42 .runtime()
43 .spawn(serve::serve(services.clone(), handle))
44 .map(|res| res.map_err(Error::from).unwrap_or_else(Err))
45 });
46
47 debug!("Running");
49 pin_mut!(listener, non_listener);
50 let res = tokio::select! {
51 res = join(&mut listener, &mut non_listener) => {
52 res.0.unwrap_or(res.1.unwrap_or(Ok(())))
53 },
54 res = services.poll() => {
55 server.until_shutdown().await;
56 handle_services_finish(server, res, listener.await)
57 },
58 };
59
60 #[cfg(all(feature = "systemd", target_os = "linux"))]
62 {
63 watchdog.abort();
64 _ = watchdog.await;
65 };
66
67 sigs.abort();
68 _ = sigs.await;
69
70 tuwunel_admin::fini(&services.admin);
72
73 debug_info!("Finish");
74 res
75}
76
77#[tracing::instrument(skip_all)]
79pub(crate) async fn start(server: Arc<Server>) -> Result<Arc<Services>> {
80 debug!("Starting...");
81
82 let services = Services::build(server).await?.start().await?;
83
84 #[cfg(all(feature = "systemd", target_os = "linux"))]
85 sd_notify::notify(&[sd_notify::NotifyState::Ready])
86 .expect("failed to notify systemd of ready state");
87
88 debug!("Started");
89 Ok(services)
90}
91
92#[tracing::instrument(skip_all)]
94pub(crate) async fn stop(services: Arc<Services>) -> Result {
95 debug!("Shutting down...");
96
97 #[cfg(all(feature = "systemd", target_os = "linux"))]
98 unsafe { sd_notify::notify_and_unset_env(&[sd_notify::NotifyState::Stopping]) }
102 .expect("failed to notify systemd of stopping state");
103
104 services.stop().await;
107
108 debug!("Cleaning up...");
112 let db = Arc::downgrade(&services.db);
113 if let Err(services) = Arc::try_unwrap(services) {
114 debug_error!(
115 "{} dangling references to Services after shutdown",
116 Arc::strong_count(&services)
117 );
118 }
119
120 if Weak::strong_count(&db) > 0 {
121 debug_error!(
122 "{} dangling references to Database after shutdown",
123 Weak::strong_count(&db)
124 );
125 }
126
127 info!("Shutdown complete.");
128 Ok(())
129}
130
131#[tracing::instrument(skip_all)]
132async fn signal(server: Arc<Server>, handle: ServerHandle) {
133 server.until_shutdown().await;
134 handle_shutdown(&server, &handle);
135}
136
137fn handle_shutdown(server: &Arc<Server>, handle: &ServerHandle) {
138 let timeout = server.config.client_shutdown_timeout;
139 let timeout = Duration::from_secs(timeout);
140 debug!(
141 ?timeout,
142 handle_active = ?server.metrics.requests_handle_active.load(Ordering::Relaxed),
143 "Notifying for graceful shutdown"
144 );
145
146 handle.graceful_shutdown(Some(timeout));
147}
148
149fn handle_services_finish(
150 server: &Arc<Server>,
151 result: Result,
152 listener: Option<Result>,
153) -> Result {
154 debug!("Service manager finished: {result:?}");
155
156 if server.is_running()
157 && let Err(e) = server.shutdown()
158 {
159 error!("Failed to send shutdown signal: {e}");
160 }
161
162 if let Some(Err(e)) = listener {
163 error!("Client listener task finished with error: {e}");
164 }
165
166 result
167}
168
169#[cfg(all(feature = "systemd", target_os = "linux"))]
170async fn start_systemd_watchdog() {
171 use tokio::time::MissedTickBehavior;
172
173 let Some(watchdog) = sd_notify::watchdog_enabled() else {
174 return;
175 };
176
177 let watchdog_usec = u64::try_from(watchdog.as_micros()).unwrap_or(u64::MAX);
178 let interval_usec = (watchdog_usec / 2).max(1);
179 let interval = Duration::from_micros(interval_usec);
180
181 let mut ticker = tokio::time::interval(interval);
182 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
183 loop {
184 ticker.tick().await;
185
186 if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Watchdog]) {
187 error!("failed to notify systemd watchdog state: {e}");
188 }
189 }
190}