1use std::{
2 sync::{
3 Arc,
4 atomic::{AtomicBool, Ordering},
5 },
6 time::SystemTime,
7};
8
9use ruma::OwnedServerName;
10use tokio::{runtime, sync::broadcast};
11
12use crate::{Err, Result, config, config::Config, log::Logging, metrics::Metrics};
13
14pub struct Server {
16 pub name: OwnedServerName,
19
20 pub config: config::Manager,
22
23 pub started: SystemTime,
25
26 pub stopping: AtomicBool,
29
30 pub reloading: AtomicBool,
33
34 pub restarting: AtomicBool,
36
37 pub runtime: Option<runtime::Handle>,
39
40 pub signal: broadcast::Sender<&'static str>,
42
43 pub log: Logging,
45
46 pub metrics: Arc<Metrics>,
48}
49
50impl Server {
51 #[must_use]
52 pub fn new(
53 config: Config,
54 runtime: Option<&runtime::Handle>,
55 log: Logging,
56 metrics: Arc<Metrics>,
57 ) -> Self {
58 Self {
59 name: config.server_name.clone(),
60 config: config::Manager::new(config),
61 started: SystemTime::now(),
62 stopping: AtomicBool::new(false),
63 reloading: AtomicBool::new(false),
64 restarting: AtomicBool::new(false),
65 runtime: runtime.cloned(),
66 signal: broadcast::channel::<&'static str>(1).0,
67 log,
68 metrics,
69 }
70 }
71
72 pub fn reload(&self) -> Result {
73 if cfg!(any(not(tuwunel_mods), not(feature = "tuwunel_mods"))) {
74 return Err!("Reloading not enabled");
75 }
76
77 if self.reloading.swap(true, Ordering::AcqRel) {
78 return Err!("Reloading already in progress");
79 }
80
81 if self.stopping.swap(true, Ordering::AcqRel) {
82 return Err!("Shutdown already in progress");
83 }
84
85 self.signal("SIGINT").inspect_err(|_| {
86 self.stopping.store(false, Ordering::Release);
87 self.reloading.store(false, Ordering::Release);
88 })
89 }
90
91 pub fn restart(&self) -> Result {
92 if self.restarting.swap(true, Ordering::AcqRel) {
93 return Err!("Restart already in progress");
94 }
95
96 self.shutdown().inspect_err(|_| {
97 self.restarting.store(false, Ordering::Release);
98 })
99 }
100
101 pub fn shutdown(&self) -> Result {
102 if self.stopping.swap(true, Ordering::AcqRel) {
103 return Err!("Shutdown already in progress");
104 }
105
106 self.signal("SIGTERM").inspect_err(|_| {
107 self.stopping.store(false, Ordering::Release);
108 })
109 }
110
111 pub fn signal(&self, sig: &'static str) -> Result {
112 self.signal.send(sig).ok();
113 Ok(())
114 }
115
116 #[inline]
117 pub async fn until_shutdown(self: &Arc<Self>) {
118 while self.is_running() {
119 self.signal.subscribe().recv().await.ok();
120 }
121 }
122
123 #[inline]
124 pub fn runtime(&self) -> &runtime::Handle {
125 self.runtime
126 .as_ref()
127 .expect("runtime handle available in Server")
128 }
129
130 #[inline]
131 pub fn check_running(&self) -> Result {
132 use std::{io, io::ErrorKind::Interrupted};
133
134 self.is_running()
135 .then_some(())
136 .ok_or_else(|| io::Error::new(Interrupted, "Server shutting down"))
137 .map_err(Into::into)
138 }
139
140 #[inline]
141 pub fn is_running(&self) -> bool { !self.is_stopping() }
142
143 #[inline]
144 pub fn is_stopping(&self) -> bool { self.stopping.load(Ordering::Relaxed) }
145
146 #[inline]
147 pub fn is_reloading(&self) -> bool { self.reloading.load(Ordering::Relaxed) }
148
149 #[inline]
150 pub fn is_restarting(&self) -> bool { self.restarting.load(Ordering::Relaxed) }
151
152 #[inline]
153 pub fn is_ours(&self, name: &str) -> bool { name == self.config.server_name }
154}