1use std::{fmt, sync::Arc};
2
3use futures::{StreamExt, TryStreamExt};
4use tokio::sync::Mutex;
5use tuwunel_core::{
6 Result, Server, debug, debug_info, implement, info, trace, utils::stream::IterStream,
7};
8use tuwunel_database::Database;
9
10pub(crate) use crate::OnceServices;
11use crate::{
12 account_data, admin, appservice, client, config, deactivate, emergency, federation, globals,
13 key_backups,
14 manager::Manager,
15 media, membership, oauth, presence, pusher, registration_tokens, resolver,
16 rooms::{self, retention},
17 sending, server_keys,
18 service::{Args, Service},
19 storage, sync, transaction_ids, uiaa, users,
20};
21
22pub struct Services {
23 pub account_data: Arc<account_data::Service>,
24 pub admin: Arc<admin::Service>,
25 pub appservice: Arc<appservice::Service>,
26 pub config: Arc<config::Service>,
27 pub client: Arc<client::Service>,
28 pub emergency: Arc<emergency::Service>,
29 pub globals: Arc<globals::Service>,
30 pub key_backups: Arc<key_backups::Service>,
31 pub media: Arc<media::Service>,
32 pub presence: Arc<presence::Service>,
33 pub pusher: Arc<pusher::Service>,
34 pub resolver: Arc<resolver::Service>,
35 pub alias: Arc<rooms::alias::Service>,
36 pub auth_chain: Arc<rooms::auth_chain::Service>,
37 pub delete: Arc<rooms::delete::Service>,
38 pub directory: Arc<rooms::directory::Service>,
39 pub event_handler: Arc<rooms::event_handler::Service>,
40 pub lazy_loading: Arc<rooms::lazy_loading::Service>,
41 pub metadata: Arc<rooms::metadata::Service>,
42 pub pdu_metadata: Arc<rooms::pdu_metadata::Service>,
43 pub read_receipt: Arc<rooms::read_receipt::Service>,
44 pub search: Arc<rooms::search::Service>,
45 pub short: Arc<rooms::short::Service>,
46 pub spaces: Arc<rooms::spaces::Service>,
47 pub state: Arc<rooms::state::Service>,
48 pub state_accessor: Arc<rooms::state_accessor::Service>,
49 pub state_cache: Arc<rooms::state_cache::Service>,
50 pub state_compressor: Arc<rooms::state_compressor::Service>,
51 pub storage: Arc<storage::Service>,
52 pub threads: Arc<rooms::threads::Service>,
53 pub timeline: Arc<rooms::timeline::Service>,
54 pub typing: Arc<rooms::typing::Service>,
55 pub federation: Arc<federation::Service>,
56 pub sending: Arc<sending::Service>,
57 pub server_keys: Arc<server_keys::Service>,
58 pub sync: Arc<sync::Service>,
59 pub transaction_ids: Arc<transaction_ids::Service>,
60 pub uiaa: Arc<uiaa::Service>,
61 pub users: Arc<users::Service>,
62 pub membership: Arc<membership::Service>,
63 pub deactivate: Arc<deactivate::Service>,
64 pub oauth: Arc<oauth::Service>,
65 pub retention: Arc<retention::Service>,
66 pub registration_tokens: Arc<registration_tokens::Service>,
67
68 manager: Mutex<Option<Arc<Manager>>>,
69 pub server: Arc<Server>,
70 pub db: Arc<Database>,
71}
72
73#[implement(Services)]
74pub async fn build(server: Arc<Server>) -> Result<Arc<Self>> {
75 let db = Database::open(&server).await?;
76 let services = Arc::new(OnceServices::default());
77 let args = Args {
78 db: &db,
79 server: &server,
80 services: &services,
81 };
82
83 let res = Arc::new(Self {
84 account_data: account_data::Service::build(&args)?,
85 admin: admin::Service::build(&args)?,
86 appservice: appservice::Service::build(&args)?,
87 resolver: resolver::Service::build(&args)?,
88 client: client::Service::build(&args)?,
89 config: config::Service::build(&args)?,
90 emergency: emergency::Service::build(&args)?,
91 globals: globals::Service::build(&args)?,
92 key_backups: key_backups::Service::build(&args)?,
93 media: media::Service::build(&args)?,
94 presence: presence::Service::build(&args)?,
95 pusher: pusher::Service::build(&args)?,
96 alias: rooms::alias::Service::build(&args)?,
97 auth_chain: rooms::auth_chain::Service::build(&args)?,
98 delete: rooms::delete::Service::build(&args)?,
99 directory: rooms::directory::Service::build(&args)?,
100 event_handler: rooms::event_handler::Service::build(&args)?,
101 lazy_loading: rooms::lazy_loading::Service::build(&args)?,
102 metadata: rooms::metadata::Service::build(&args)?,
103 pdu_metadata: rooms::pdu_metadata::Service::build(&args)?,
104 read_receipt: rooms::read_receipt::Service::build(&args)?,
105 search: rooms::search::Service::build(&args)?,
106 short: rooms::short::Service::build(&args)?,
107 spaces: rooms::spaces::Service::build(&args)?,
108 state: rooms::state::Service::build(&args)?,
109 state_accessor: rooms::state_accessor::Service::build(&args)?,
110 state_cache: rooms::state_cache::Service::build(&args)?,
111 state_compressor: rooms::state_compressor::Service::build(&args)?,
112 storage: storage::Service::build(&args)?,
113 threads: rooms::threads::Service::build(&args)?,
114 timeline: rooms::timeline::Service::build(&args)?,
115 typing: rooms::typing::Service::build(&args)?,
116 federation: federation::Service::build(&args)?,
117 sending: sending::Service::build(&args)?,
118 server_keys: server_keys::Service::build(&args)?,
119 sync: sync::Service::build(&args)?,
120 transaction_ids: transaction_ids::Service::build(&args)?,
121 uiaa: uiaa::Service::build(&args)?,
122 users: users::Service::build(&args)?,
123 membership: membership::Service::build(&args)?,
124 deactivate: deactivate::Service::build(&args)?,
125 oauth: oauth::Service::build(&args)?,
126 retention: retention::Service::build(&args)?,
127 registration_tokens: registration_tokens::Service::build(&args)?,
128
129 manager: Mutex::new(None),
130 server,
131 db,
132 });
133
134 Ok(services.set(res))
135}
136
137#[implement(Services)]
138pub(crate) fn services(&self) -> impl Iterator<Item = Arc<dyn Service>> + Send {
139 macro_rules! cast {
140 ($s:expr) => {
141 <Arc<dyn Service> as Into<_>>::into($s.clone())
142 };
143 }
144
145 [
146 cast!(self.account_data),
147 cast!(self.admin),
148 cast!(self.appservice),
149 cast!(self.resolver),
150 cast!(self.client),
151 cast!(self.config),
152 cast!(self.emergency),
153 cast!(self.globals),
154 cast!(self.key_backups),
155 cast!(self.media),
156 cast!(self.presence),
157 cast!(self.pusher),
158 cast!(self.alias),
159 cast!(self.auth_chain),
160 cast!(self.delete),
161 cast!(self.directory),
162 cast!(self.event_handler),
163 cast!(self.lazy_loading),
164 cast!(self.metadata),
165 cast!(self.pdu_metadata),
166 cast!(self.read_receipt),
167 cast!(self.search),
168 cast!(self.short),
169 cast!(self.spaces),
170 cast!(self.state),
171 cast!(self.state_accessor),
172 cast!(self.state_cache),
173 cast!(self.state_compressor),
174 cast!(self.storage),
175 cast!(self.threads),
176 cast!(self.timeline),
177 cast!(self.typing),
178 cast!(self.federation),
179 cast!(self.sending),
180 cast!(self.server_keys),
181 cast!(self.sync),
182 cast!(self.transaction_ids),
183 cast!(self.uiaa),
184 cast!(self.users),
185 cast!(self.membership),
186 cast!(self.deactivate),
187 cast!(self.oauth),
188 cast!(self.retention),
189 cast!(self.registration_tokens),
190 ]
191 .into_iter()
192}
193
194impl fmt::Debug for Services {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 f.debug_struct("Services").finish()
197 }
198}
199
200#[implement(Services)]
201pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
202 debug_info!("Starting services...");
203
204 super::migrations::migrations(self).await?;
205
206 self.manager
207 .lock()
208 .await
209 .insert(Manager::new(self))
210 .clone()
211 .start()
212 .await?;
213
214 debug_info!("Services startup complete.");
215
216 Ok(Arc::clone(self))
217}
218
219#[implement(Services)]
220pub async fn stop(&self) {
221 info!("Shutting down services...");
222
223 self.interrupt().await;
224 if let Some(manager) = self.manager.lock().await.as_ref() {
225 manager.stop().await;
226 }
227
228 debug_info!("Services shutdown complete.");
229}
230
231#[implement(Services)]
232pub(crate) async fn interrupt(&self) {
233 debug!("Interrupting services...");
234 for service in self.services() {
235 trace!(
236 name = ?service.name(),
237 "Interrupting Service"
238 );
239
240 service.interrupt().await;
241 }
242}
243
244#[implement(Services)]
245pub async fn poll(&self) -> Result {
246 if let Some(manager) = self.manager.lock().await.as_ref() {
247 trace!("Polling service manager...");
248 return manager.poll().await;
249 }
250
251 Ok(())
252}
253
254#[implement(Services)]
255pub async fn clear_cache(&self) {
256 self.services()
257 .stream()
258 .for_each(async |service| {
259 service.clear_cache().await;
260 })
261 .await;
262}
263
264#[implement(Services)]
265pub async fn memory_usage(&self) -> Result<String> {
266 self.services()
267 .try_stream()
268 .try_fold(String::new(), async |mut out, service| {
269 service.memory_usage(&mut out).await?;
270 Ok(out)
271 })
272 .await
273}