1use std::{fmt, sync::Arc};
2
3use futures::{StreamExt, TryStreamExt};
4use tokio::sync::Mutex;
5use tuwunel_core::{
6 Result, Server, debug, debug_info, implement, info, trace, utils::stream::IterStream,
7};
8use tuwunel_database::Database;
9
10pub(crate) use crate::OnceServices;
11use crate::{
12 account_data, admin, appservice, client, config, deactivate, emergency, federation, fetcher,
13 globals, key_backups,
14 manager::Manager,
15 media, membership, oauth, presence, profile, pusher, registration_tokens, resolver,
16 rooms::{self, retention},
17 sending, sendmail, server_keys,
18 service::{Args, Service},
19 storage, sync, threepid, transaction_ids, uiaa, users,
20};
21
22pub struct Services {
23 pub account_data: Arc<account_data::Service>,
24 pub admin: Arc<admin::Service>,
25 pub appservice: Arc<appservice::Service>,
26 pub config: Arc<config::Service>,
27 pub client: Arc<client::Service>,
28 pub emergency: Arc<emergency::Service>,
29 pub fetcher: Arc<fetcher::Service>,
30 pub globals: Arc<globals::Service>,
31 pub key_backups: Arc<key_backups::Service>,
32 pub media: Arc<media::Service>,
33 pub presence: Arc<presence::Service>,
34 pub pusher: Arc<pusher::Service>,
35 pub resolver: Arc<resolver::Service>,
36 pub alias: Arc<rooms::alias::Service>,
37 pub auth_chain: Arc<rooms::auth_chain::Service>,
38 pub delete: Arc<rooms::delete::Service>,
39 pub directory: Arc<rooms::directory::Service>,
40 pub event_handler: Arc<rooms::event_handler::Service>,
41 pub lazy_loading: Arc<rooms::lazy_loading::Service>,
42 pub metadata: Arc<rooms::metadata::Service>,
43 pub pdu_metadata: Arc<rooms::pdu_metadata::Service>,
44 pub read_receipt: Arc<rooms::read_receipt::Service>,
45 pub search: Arc<rooms::search::Service>,
46 pub short: Arc<rooms::short::Service>,
47 pub spaces: Arc<rooms::spaces::Service>,
48 pub state: Arc<rooms::state::Service>,
49 pub state_accessor: Arc<rooms::state_accessor::Service>,
50 pub state_cache: Arc<rooms::state_cache::Service>,
51 pub state_compressor: Arc<rooms::state_compressor::Service>,
52 pub storage: Arc<storage::Service>,
53 pub threads: Arc<rooms::threads::Service>,
54 pub timeline: Arc<rooms::timeline::Service>,
55 pub typing: Arc<rooms::typing::Service>,
56 pub federation: Arc<federation::Service>,
57 pub sending: Arc<sending::Service>,
58 pub server_keys: Arc<server_keys::Service>,
59 pub sync: Arc<sync::Service>,
60 pub transaction_ids: Arc<transaction_ids::Service>,
61 pub uiaa: Arc<uiaa::Service>,
62 pub users: Arc<users::Service>,
63 pub membership: Arc<membership::Service>,
64 pub deactivate: Arc<deactivate::Service>,
65 pub oauth: Arc<oauth::Service>,
66 pub retention: Arc<retention::Service>,
67 pub registration_tokens: Arc<registration_tokens::Service>,
68 pub sendmail: Arc<sendmail::Service>,
69 pub threepid: Arc<threepid::Service>,
70 pub profile: Arc<profile::Service>,
71
72 manager: Mutex<Option<Arc<Manager>>>,
73 pub server: Arc<Server>,
74 pub db: Arc<Database>,
75}
76
77#[implement(Services)]
78pub async fn build(server: Arc<Server>) -> Result<Arc<Self>> {
79 let db = Database::open(&server).await?;
80 let services = Arc::new(OnceServices::default());
81 let args = Args {
82 db: &db,
83 server: &server,
84 services: &services,
85 };
86
87 let res = Arc::new(Self {
88 account_data: account_data::Service::build(&args)?,
89 admin: admin::Service::build(&args)?,
90 appservice: appservice::Service::build(&args)?,
91 resolver: resolver::Service::build(&args)?,
92 client: client::Service::build(&args)?,
93 config: config::Service::build(&args)?,
94 emergency: emergency::Service::build(&args)?,
95 fetcher: fetcher::Service::build(&args)?,
96 globals: globals::Service::build(&args)?,
97 key_backups: key_backups::Service::build(&args)?,
98 media: media::Service::build(&args)?,
99 presence: presence::Service::build(&args)?,
100 pusher: pusher::Service::build(&args)?,
101 alias: rooms::alias::Service::build(&args)?,
102 auth_chain: rooms::auth_chain::Service::build(&args)?,
103 delete: rooms::delete::Service::build(&args)?,
104 directory: rooms::directory::Service::build(&args)?,
105 event_handler: rooms::event_handler::Service::build(&args)?,
106 lazy_loading: rooms::lazy_loading::Service::build(&args)?,
107 metadata: rooms::metadata::Service::build(&args)?,
108 pdu_metadata: rooms::pdu_metadata::Service::build(&args)?,
109 read_receipt: rooms::read_receipt::Service::build(&args)?,
110 search: rooms::search::Service::build(&args)?,
111 short: rooms::short::Service::build(&args)?,
112 spaces: rooms::spaces::Service::build(&args)?,
113 state: rooms::state::Service::build(&args)?,
114 state_accessor: rooms::state_accessor::Service::build(&args)?,
115 state_cache: rooms::state_cache::Service::build(&args)?,
116 state_compressor: rooms::state_compressor::Service::build(&args)?,
117 storage: storage::Service::build(&args)?,
118 threads: rooms::threads::Service::build(&args)?,
119 timeline: rooms::timeline::Service::build(&args)?,
120 typing: rooms::typing::Service::build(&args)?,
121 federation: federation::Service::build(&args)?,
122 sending: sending::Service::build(&args)?,
123 server_keys: server_keys::Service::build(&args)?,
124 sync: sync::Service::build(&args)?,
125 transaction_ids: transaction_ids::Service::build(&args)?,
126 uiaa: uiaa::Service::build(&args)?,
127 users: users::Service::build(&args)?,
128 membership: membership::Service::build(&args)?,
129 deactivate: deactivate::Service::build(&args)?,
130 oauth: oauth::Service::build(&args)?,
131 retention: retention::Service::build(&args)?,
132 registration_tokens: registration_tokens::Service::build(&args)?,
133 sendmail: sendmail::Service::build(&args)?,
134 threepid: threepid::Service::build(&args)?,
135 profile: profile::Service::build(&args)?,
136
137 manager: Mutex::new(None),
138 server,
139 db,
140 });
141
142 Ok(services.set(res))
143}
144
145#[implement(Services)]
146pub(crate) fn services(&self) -> impl Iterator<Item = Arc<dyn Service>> + Send {
147 macro_rules! cast {
148 ($s:expr) => {
149 <Arc<dyn Service> as Into<_>>::into($s.clone())
150 };
151 }
152
153 [
154 cast!(self.account_data),
155 cast!(self.admin),
156 cast!(self.appservice),
157 cast!(self.resolver),
158 cast!(self.client),
159 cast!(self.config),
160 cast!(self.emergency),
161 cast!(self.fetcher),
162 cast!(self.globals),
163 cast!(self.key_backups),
164 cast!(self.media),
165 cast!(self.presence),
166 cast!(self.pusher),
167 cast!(self.alias),
168 cast!(self.auth_chain),
169 cast!(self.delete),
170 cast!(self.directory),
171 cast!(self.event_handler),
172 cast!(self.lazy_loading),
173 cast!(self.metadata),
174 cast!(self.pdu_metadata),
175 cast!(self.read_receipt),
176 cast!(self.search),
177 cast!(self.short),
178 cast!(self.spaces),
179 cast!(self.state),
180 cast!(self.state_accessor),
181 cast!(self.state_cache),
182 cast!(self.state_compressor),
183 cast!(self.storage),
184 cast!(self.threads),
185 cast!(self.timeline),
186 cast!(self.typing),
187 cast!(self.federation),
188 cast!(self.sending),
189 cast!(self.server_keys),
190 cast!(self.sync),
191 cast!(self.transaction_ids),
192 cast!(self.uiaa),
193 cast!(self.users),
194 cast!(self.membership),
195 cast!(self.deactivate),
196 cast!(self.oauth),
197 cast!(self.retention),
198 cast!(self.registration_tokens),
199 cast!(self.profile),
200 ]
201 .into_iter()
202}
203
204impl fmt::Debug for Services {
205 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206 f.debug_struct("Services").finish()
207 }
208}
209
210#[implement(Services)]
211pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
212 debug_info!("Starting services...");
213
214 super::migrations::migrations(self).await?;
215
216 self.manager
217 .lock()
218 .await
219 .insert(Manager::new(self))
220 .clone()
221 .start()
222 .await?;
223
224 debug_info!("Services startup complete.");
225
226 Ok(Arc::clone(self))
227}
228
229#[implement(Services)]
230pub async fn stop(&self) {
231 info!("Shutting down services...");
232
233 self.interrupt().await;
234 if let Some(manager) = self.manager.lock().await.as_ref() {
235 manager.stop().await;
236 }
237
238 debug_info!("Services shutdown complete.");
239}
240
241#[implement(Services)]
242pub(crate) async fn interrupt(&self) {
243 debug!("Interrupting services...");
244 for service in self.services() {
245 trace!(
246 name = ?service.name(),
247 "Interrupting Service"
248 );
249
250 service.interrupt().await;
251 }
252}
253
254#[implement(Services)]
255pub async fn poll(&self) -> Result {
256 if let Some(manager) = self.manager.lock().await.as_ref() {
257 trace!("Polling service manager...");
258 return manager.poll().await;
259 }
260
261 Ok(())
262}
263
264#[implement(Services)]
265pub async fn clear_cache(&self) {
266 self.services()
267 .stream()
268 .for_each(async |service| {
269 service.clear_cache().await;
270 })
271 .await;
272}
273
274#[implement(Services)]
275pub async fn memory_usage(&self) -> Result<String> {
276 self.services()
277 .try_stream()
278 .try_fold(String::new(), async |mut out, service| {
279 service.memory_usage(&mut out).await?;
280 Ok(out)
281 })
282 .await
283}