Skip to main content

tuwunel_service/
services.rs

1use std::{fmt, sync::Arc};
2
3use futures::{StreamExt, TryStreamExt};
4use tokio::sync::Mutex;
5use tuwunel_core::{
6	Result, Server, debug, debug_info, implement, info, trace, utils::stream::IterStream,
7};
8use tuwunel_database::Database;
9
10pub(crate) use crate::OnceServices;
11use crate::{
12	account_data, admin, appservice, client, config, deactivate, emergency, federation, fetcher,
13	globals, key_backups,
14	manager::Manager,
15	media, membership, oauth, presence, profile, pusher, registration_tokens, resolver,
16	rooms::{self, retention},
17	sending, sendmail, server_keys,
18	service::{Args, Service},
19	storage, sync, threepid, transaction_ids, uiaa, users,
20};
21
22pub struct Services {
23	pub account_data: Arc<account_data::Service>,
24	pub admin: Arc<admin::Service>,
25	pub appservice: Arc<appservice::Service>,
26	pub config: Arc<config::Service>,
27	pub client: Arc<client::Service>,
28	pub emergency: Arc<emergency::Service>,
29	pub fetcher: Arc<fetcher::Service>,
30	pub globals: Arc<globals::Service>,
31	pub key_backups: Arc<key_backups::Service>,
32	pub media: Arc<media::Service>,
33	pub presence: Arc<presence::Service>,
34	pub pusher: Arc<pusher::Service>,
35	pub resolver: Arc<resolver::Service>,
36	pub alias: Arc<rooms::alias::Service>,
37	pub auth_chain: Arc<rooms::auth_chain::Service>,
38	pub delete: Arc<rooms::delete::Service>,
39	pub directory: Arc<rooms::directory::Service>,
40	pub event_handler: Arc<rooms::event_handler::Service>,
41	pub lazy_loading: Arc<rooms::lazy_loading::Service>,
42	pub metadata: Arc<rooms::metadata::Service>,
43	pub pdu_metadata: Arc<rooms::pdu_metadata::Service>,
44	pub read_receipt: Arc<rooms::read_receipt::Service>,
45	pub search: Arc<rooms::search::Service>,
46	pub short: Arc<rooms::short::Service>,
47	pub spaces: Arc<rooms::spaces::Service>,
48	pub state: Arc<rooms::state::Service>,
49	pub state_accessor: Arc<rooms::state_accessor::Service>,
50	pub state_cache: Arc<rooms::state_cache::Service>,
51	pub state_compressor: Arc<rooms::state_compressor::Service>,
52	pub storage: Arc<storage::Service>,
53	pub threads: Arc<rooms::threads::Service>,
54	pub timeline: Arc<rooms::timeline::Service>,
55	pub typing: Arc<rooms::typing::Service>,
56	pub federation: Arc<federation::Service>,
57	pub sending: Arc<sending::Service>,
58	pub server_keys: Arc<server_keys::Service>,
59	pub sync: Arc<sync::Service>,
60	pub transaction_ids: Arc<transaction_ids::Service>,
61	pub uiaa: Arc<uiaa::Service>,
62	pub users: Arc<users::Service>,
63	pub membership: Arc<membership::Service>,
64	pub deactivate: Arc<deactivate::Service>,
65	pub oauth: Arc<oauth::Service>,
66	pub retention: Arc<retention::Service>,
67	pub registration_tokens: Arc<registration_tokens::Service>,
68	pub sendmail: Arc<sendmail::Service>,
69	pub threepid: Arc<threepid::Service>,
70	pub profile: Arc<profile::Service>,
71
72	manager: Mutex<Option<Arc<Manager>>>,
73	pub server: Arc<Server>,
74	pub db: Arc<Database>,
75}
76
77#[implement(Services)]
78pub async fn build(server: Arc<Server>) -> Result<Arc<Self>> {
79	let db = Database::open(&server).await?;
80	let services = Arc::new(OnceServices::default());
81	let args = Args {
82		db: &db,
83		server: &server,
84		services: &services,
85	};
86
87	let res = Arc::new(Self {
88		account_data: account_data::Service::build(&args)?,
89		admin: admin::Service::build(&args)?,
90		appservice: appservice::Service::build(&args)?,
91		resolver: resolver::Service::build(&args)?,
92		client: client::Service::build(&args)?,
93		config: config::Service::build(&args)?,
94		emergency: emergency::Service::build(&args)?,
95		fetcher: fetcher::Service::build(&args)?,
96		globals: globals::Service::build(&args)?,
97		key_backups: key_backups::Service::build(&args)?,
98		media: media::Service::build(&args)?,
99		presence: presence::Service::build(&args)?,
100		pusher: pusher::Service::build(&args)?,
101		alias: rooms::alias::Service::build(&args)?,
102		auth_chain: rooms::auth_chain::Service::build(&args)?,
103		delete: rooms::delete::Service::build(&args)?,
104		directory: rooms::directory::Service::build(&args)?,
105		event_handler: rooms::event_handler::Service::build(&args)?,
106		lazy_loading: rooms::lazy_loading::Service::build(&args)?,
107		metadata: rooms::metadata::Service::build(&args)?,
108		pdu_metadata: rooms::pdu_metadata::Service::build(&args)?,
109		read_receipt: rooms::read_receipt::Service::build(&args)?,
110		search: rooms::search::Service::build(&args)?,
111		short: rooms::short::Service::build(&args)?,
112		spaces: rooms::spaces::Service::build(&args)?,
113		state: rooms::state::Service::build(&args)?,
114		state_accessor: rooms::state_accessor::Service::build(&args)?,
115		state_cache: rooms::state_cache::Service::build(&args)?,
116		state_compressor: rooms::state_compressor::Service::build(&args)?,
117		storage: storage::Service::build(&args)?,
118		threads: rooms::threads::Service::build(&args)?,
119		timeline: rooms::timeline::Service::build(&args)?,
120		typing: rooms::typing::Service::build(&args)?,
121		federation: federation::Service::build(&args)?,
122		sending: sending::Service::build(&args)?,
123		server_keys: server_keys::Service::build(&args)?,
124		sync: sync::Service::build(&args)?,
125		transaction_ids: transaction_ids::Service::build(&args)?,
126		uiaa: uiaa::Service::build(&args)?,
127		users: users::Service::build(&args)?,
128		membership: membership::Service::build(&args)?,
129		deactivate: deactivate::Service::build(&args)?,
130		oauth: oauth::Service::build(&args)?,
131		retention: retention::Service::build(&args)?,
132		registration_tokens: registration_tokens::Service::build(&args)?,
133		sendmail: sendmail::Service::build(&args)?,
134		threepid: threepid::Service::build(&args)?,
135		profile: profile::Service::build(&args)?,
136
137		manager: Mutex::new(None),
138		server,
139		db,
140	});
141
142	Ok(services.set(res))
143}
144
145#[implement(Services)]
146pub(crate) fn services(&self) -> impl Iterator<Item = Arc<dyn Service>> + Send {
147	macro_rules! cast {
148		($s:expr) => {
149			<Arc<dyn Service> as Into<_>>::into($s.clone())
150		};
151	}
152
153	[
154		cast!(self.account_data),
155		cast!(self.admin),
156		cast!(self.appservice),
157		cast!(self.resolver),
158		cast!(self.client),
159		cast!(self.config),
160		cast!(self.emergency),
161		cast!(self.fetcher),
162		cast!(self.globals),
163		cast!(self.key_backups),
164		cast!(self.media),
165		cast!(self.presence),
166		cast!(self.pusher),
167		cast!(self.alias),
168		cast!(self.auth_chain),
169		cast!(self.delete),
170		cast!(self.directory),
171		cast!(self.event_handler),
172		cast!(self.lazy_loading),
173		cast!(self.metadata),
174		cast!(self.pdu_metadata),
175		cast!(self.read_receipt),
176		cast!(self.search),
177		cast!(self.short),
178		cast!(self.spaces),
179		cast!(self.state),
180		cast!(self.state_accessor),
181		cast!(self.state_cache),
182		cast!(self.state_compressor),
183		cast!(self.storage),
184		cast!(self.threads),
185		cast!(self.timeline),
186		cast!(self.typing),
187		cast!(self.federation),
188		cast!(self.sending),
189		cast!(self.server_keys),
190		cast!(self.sync),
191		cast!(self.transaction_ids),
192		cast!(self.uiaa),
193		cast!(self.users),
194		cast!(self.membership),
195		cast!(self.deactivate),
196		cast!(self.oauth),
197		cast!(self.retention),
198		cast!(self.registration_tokens),
199		cast!(self.profile),
200	]
201	.into_iter()
202}
203
204impl fmt::Debug for Services {
205	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206		f.debug_struct("Services").finish()
207	}
208}
209
210#[implement(Services)]
211pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
212	debug_info!("Starting services...");
213
214	super::migrations::migrations(self).await?;
215
216	self.manager
217		.lock()
218		.await
219		.insert(Manager::new(self))
220		.clone()
221		.start()
222		.await?;
223
224	debug_info!("Services startup complete.");
225
226	Ok(Arc::clone(self))
227}
228
229#[implement(Services)]
230pub async fn stop(&self) {
231	info!("Shutting down services...");
232
233	self.interrupt().await;
234	if let Some(manager) = self.manager.lock().await.as_ref() {
235		manager.stop().await;
236	}
237
238	debug_info!("Services shutdown complete.");
239}
240
241#[implement(Services)]
242pub(crate) async fn interrupt(&self) {
243	debug!("Interrupting services...");
244	for service in self.services() {
245		trace!(
246			name = ?service.name(),
247			"Interrupting Service"
248		);
249
250		service.interrupt().await;
251	}
252}
253
254#[implement(Services)]
255pub async fn poll(&self) -> Result {
256	if let Some(manager) = self.manager.lock().await.as_ref() {
257		trace!("Polling service manager...");
258		return manager.poll().await;
259	}
260
261	Ok(())
262}
263
264#[implement(Services)]
265pub async fn clear_cache(&self) {
266	self.services()
267		.stream()
268		.for_each(async |service| {
269			service.clear_cache().await;
270		})
271		.await;
272}
273
274#[implement(Services)]
275pub async fn memory_usage(&self) -> Result<String> {
276	self.services()
277		.try_stream()
278		.try_fold(String::new(), async |mut out, service| {
279			service.memory_usage(&mut out).await?;
280			Ok(out)
281		})
282		.await
283}