1use std::{
2 sync::{Arc, Weak, atomic::Ordering},
3 time::Duration,
4};
5
6use futures::{FutureExt, future::join, pin_mut};
7use tuwunel_core::{
8 Error, Result, Server, debug, debug_error, debug_info, error, info, utils::BoolExt,
9};
10use tuwunel_service::Services;
11
12use crate::{handle::ServerHandle, serve};
13
14#[tracing::instrument(skip_all)]
16pub(crate) async fn run(services: Arc<Services>) -> Result {
17 let server = &services.server;
18 debug!("Start");
19
20 tuwunel_admin::init(&services.admin).await;
22
23 services.admin.startup_execute().await?;
25
26 let handle = ServerHandle::new();
28 let sigs = server
29 .runtime()
30 .spawn(signal(server.clone(), handle.clone()));
31 #[cfg(all(feature = "systemd", target_os = "linux"))]
32 let watchdog = server.runtime().spawn(start_systemd_watchdog());
33
34 let non_listener = services
35 .config
36 .listening
37 .is_false()
38 .then_async(|| server.until_shutdown().map(Ok));
39
40 let listener = services.config.listening.then_async(|| {
41 server
42 .runtime()
43 .spawn(serve::serve(services.clone(), handle))
44 .map(|res| res.map_err(Error::from).unwrap_or_else(Err))
45 });
46
47 debug!("Running");
49 pin_mut!(listener, non_listener);
50 let res = tokio::select! {
51 res = join(&mut listener, &mut non_listener) => {
52 res.0.unwrap_or(res.1.unwrap_or(Ok(())))
53 },
54 res = services.poll() => {
55 server.until_shutdown().await;
56 handle_services_finish(server, res, listener.await)
57 },
58 };
59
60 #[cfg(all(feature = "systemd", target_os = "linux"))]
62 {
63 watchdog.abort();
64 _ = watchdog.await;
65 };
66
67 sigs.abort();
68 _ = sigs.await;
69
70 tuwunel_admin::fini(&services.admin).await;
72
73 debug_info!("Finish");
74 res
75}
76
77#[tracing::instrument(skip_all)]
79pub(crate) async fn start(server: Arc<Server>) -> Result<Arc<Services>> {
80 debug!("Starting...");
81
82 let services = Services::build(server).await?.start().await?;
83
84 #[cfg(all(feature = "systemd", target_os = "linux"))]
85 sd_notify::notify(false, &[sd_notify::NotifyState::Ready])
86 .expect("failed to notify systemd of ready state");
87
88 debug!("Started");
89 Ok(services)
90}
91
92#[tracing::instrument(skip_all)]
94pub(crate) async fn stop(services: Arc<Services>) -> Result {
95 debug!("Shutting down...");
96
97 #[cfg(all(feature = "systemd", target_os = "linux"))]
98 sd_notify::notify(true, &[sd_notify::NotifyState::Stopping])
99 .expect("failed to notify systemd of stopping state");
100
101 services.stop().await;
104
105 debug!("Cleaning up...");
109 let db = Arc::downgrade(&services.db);
110 if let Err(services) = Arc::try_unwrap(services) {
111 debug_error!(
112 "{} dangling references to Services after shutdown",
113 Arc::strong_count(&services)
114 );
115 }
116
117 if Weak::strong_count(&db) > 0 {
118 debug_error!(
119 "{} dangling references to Database after shutdown",
120 Weak::strong_count(&db)
121 );
122 }
123
124 info!("Shutdown complete.");
125 Ok(())
126}
127
128#[tracing::instrument(skip_all)]
129async fn signal(server: Arc<Server>, handle: ServerHandle) {
130 server.until_shutdown().await;
131 handle_shutdown(&server, &handle);
132}
133
134fn handle_shutdown(server: &Arc<Server>, handle: &ServerHandle) {
135 let timeout = server.config.client_shutdown_timeout;
136 let timeout = Duration::from_secs(timeout);
137 debug!(
138 ?timeout,
139 handle_active = ?server.metrics.requests_handle_active.load(Ordering::Relaxed),
140 "Notifying for graceful shutdown"
141 );
142
143 handle.graceful_shutdown(Some(timeout));
144}
145
146fn handle_services_finish(
147 server: &Arc<Server>,
148 result: Result,
149 listener: Option<Result>,
150) -> Result {
151 debug!("Service manager finished: {result:?}");
152
153 if server.is_running()
154 && let Err(e) = server.shutdown()
155 {
156 error!("Failed to send shutdown signal: {e}");
157 }
158
159 if let Some(Err(e)) = listener {
160 error!("Client listener task finished with error: {e}");
161 }
162
163 result
164}
165
166#[cfg(all(feature = "systemd", target_os = "linux"))]
167async fn start_systemd_watchdog() {
168 use tokio::time::MissedTickBehavior;
169
170 let mut watchdog_usec = 0;
171 if !sd_notify::watchdog_enabled(false, &mut watchdog_usec) || watchdog_usec == 0 {
172 return;
173 }
174
175 let interval_usec = (watchdog_usec / 2).max(1);
176 let interval = Duration::from_micros(interval_usec);
177
178 let mut ticker = tokio::time::interval(interval);
179 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
180 loop {
181 ticker.tick().await;
182
183 if let Err(e) = sd_notify::notify(false, &[sd_notify::NotifyState::Watchdog]) {
184 error!("failed to notify systemd watchdog state: {e}");
185 }
186 }
187}