Skip to main content

tuwunel_service/
services.rs

1use std::{fmt, sync::Arc};
2
3use futures::{StreamExt, TryStreamExt};
4use tokio::sync::Mutex;
5use tuwunel_core::{
6	Result, Server, debug, debug_info, implement, info, trace, utils::stream::IterStream,
7};
8use tuwunel_database::Database;
9
10pub(crate) use crate::OnceServices;
11use crate::{
12	account_data, admin, appservice, client, config, deactivate, emergency, federation, globals,
13	key_backups,
14	manager::Manager,
15	media, membership, oauth, presence, pusher, registration_tokens, resolver,
16	rooms::{self, retention},
17	sending, server_keys,
18	service::{Args, Service},
19	storage, sync, transaction_ids, uiaa, users,
20};
21
22pub struct Services {
23	pub account_data: Arc<account_data::Service>,
24	pub admin: Arc<admin::Service>,
25	pub appservice: Arc<appservice::Service>,
26	pub config: Arc<config::Service>,
27	pub client: Arc<client::Service>,
28	pub emergency: Arc<emergency::Service>,
29	pub globals: Arc<globals::Service>,
30	pub key_backups: Arc<key_backups::Service>,
31	pub media: Arc<media::Service>,
32	pub presence: Arc<presence::Service>,
33	pub pusher: Arc<pusher::Service>,
34	pub resolver: Arc<resolver::Service>,
35	pub alias: Arc<rooms::alias::Service>,
36	pub auth_chain: Arc<rooms::auth_chain::Service>,
37	pub delete: Arc<rooms::delete::Service>,
38	pub directory: Arc<rooms::directory::Service>,
39	pub event_handler: Arc<rooms::event_handler::Service>,
40	pub lazy_loading: Arc<rooms::lazy_loading::Service>,
41	pub metadata: Arc<rooms::metadata::Service>,
42	pub pdu_metadata: Arc<rooms::pdu_metadata::Service>,
43	pub read_receipt: Arc<rooms::read_receipt::Service>,
44	pub search: Arc<rooms::search::Service>,
45	pub short: Arc<rooms::short::Service>,
46	pub spaces: Arc<rooms::spaces::Service>,
47	pub state: Arc<rooms::state::Service>,
48	pub state_accessor: Arc<rooms::state_accessor::Service>,
49	pub state_cache: Arc<rooms::state_cache::Service>,
50	pub state_compressor: Arc<rooms::state_compressor::Service>,
51	pub storage: Arc<storage::Service>,
52	pub threads: Arc<rooms::threads::Service>,
53	pub timeline: Arc<rooms::timeline::Service>,
54	pub typing: Arc<rooms::typing::Service>,
55	pub federation: Arc<federation::Service>,
56	pub sending: Arc<sending::Service>,
57	pub server_keys: Arc<server_keys::Service>,
58	pub sync: Arc<sync::Service>,
59	pub transaction_ids: Arc<transaction_ids::Service>,
60	pub uiaa: Arc<uiaa::Service>,
61	pub users: Arc<users::Service>,
62	pub membership: Arc<membership::Service>,
63	pub deactivate: Arc<deactivate::Service>,
64	pub oauth: Arc<oauth::Service>,
65	pub retention: Arc<retention::Service>,
66	pub registration_tokens: Arc<registration_tokens::Service>,
67
68	manager: Mutex<Option<Arc<Manager>>>,
69	pub server: Arc<Server>,
70	pub db: Arc<Database>,
71}
72
73#[implement(Services)]
74pub async fn build(server: Arc<Server>) -> Result<Arc<Self>> {
75	let db = Database::open(&server).await?;
76	let services = Arc::new(OnceServices::default());
77	let args = Args {
78		db: &db,
79		server: &server,
80		services: &services,
81	};
82
83	let res = Arc::new(Self {
84		account_data: account_data::Service::build(&args)?,
85		admin: admin::Service::build(&args)?,
86		appservice: appservice::Service::build(&args)?,
87		resolver: resolver::Service::build(&args)?,
88		client: client::Service::build(&args)?,
89		config: config::Service::build(&args)?,
90		emergency: emergency::Service::build(&args)?,
91		globals: globals::Service::build(&args)?,
92		key_backups: key_backups::Service::build(&args)?,
93		media: media::Service::build(&args)?,
94		presence: presence::Service::build(&args)?,
95		pusher: pusher::Service::build(&args)?,
96		alias: rooms::alias::Service::build(&args)?,
97		auth_chain: rooms::auth_chain::Service::build(&args)?,
98		delete: rooms::delete::Service::build(&args)?,
99		directory: rooms::directory::Service::build(&args)?,
100		event_handler: rooms::event_handler::Service::build(&args)?,
101		lazy_loading: rooms::lazy_loading::Service::build(&args)?,
102		metadata: rooms::metadata::Service::build(&args)?,
103		pdu_metadata: rooms::pdu_metadata::Service::build(&args)?,
104		read_receipt: rooms::read_receipt::Service::build(&args)?,
105		search: rooms::search::Service::build(&args)?,
106		short: rooms::short::Service::build(&args)?,
107		spaces: rooms::spaces::Service::build(&args)?,
108		state: rooms::state::Service::build(&args)?,
109		state_accessor: rooms::state_accessor::Service::build(&args)?,
110		state_cache: rooms::state_cache::Service::build(&args)?,
111		state_compressor: rooms::state_compressor::Service::build(&args)?,
112		storage: storage::Service::build(&args)?,
113		threads: rooms::threads::Service::build(&args)?,
114		timeline: rooms::timeline::Service::build(&args)?,
115		typing: rooms::typing::Service::build(&args)?,
116		federation: federation::Service::build(&args)?,
117		sending: sending::Service::build(&args)?,
118		server_keys: server_keys::Service::build(&args)?,
119		sync: sync::Service::build(&args)?,
120		transaction_ids: transaction_ids::Service::build(&args)?,
121		uiaa: uiaa::Service::build(&args)?,
122		users: users::Service::build(&args)?,
123		membership: membership::Service::build(&args)?,
124		deactivate: deactivate::Service::build(&args)?,
125		oauth: oauth::Service::build(&args)?,
126		retention: retention::Service::build(&args)?,
127		registration_tokens: registration_tokens::Service::build(&args)?,
128
129		manager: Mutex::new(None),
130		server,
131		db,
132	});
133
134	Ok(services.set(res))
135}
136
137#[implement(Services)]
138pub(crate) fn services(&self) -> impl Iterator<Item = Arc<dyn Service>> + Send {
139	macro_rules! cast {
140		($s:expr) => {
141			<Arc<dyn Service> as Into<_>>::into($s.clone())
142		};
143	}
144
145	[
146		cast!(self.account_data),
147		cast!(self.admin),
148		cast!(self.appservice),
149		cast!(self.resolver),
150		cast!(self.client),
151		cast!(self.config),
152		cast!(self.emergency),
153		cast!(self.globals),
154		cast!(self.key_backups),
155		cast!(self.media),
156		cast!(self.presence),
157		cast!(self.pusher),
158		cast!(self.alias),
159		cast!(self.auth_chain),
160		cast!(self.delete),
161		cast!(self.directory),
162		cast!(self.event_handler),
163		cast!(self.lazy_loading),
164		cast!(self.metadata),
165		cast!(self.pdu_metadata),
166		cast!(self.read_receipt),
167		cast!(self.search),
168		cast!(self.short),
169		cast!(self.spaces),
170		cast!(self.state),
171		cast!(self.state_accessor),
172		cast!(self.state_cache),
173		cast!(self.state_compressor),
174		cast!(self.storage),
175		cast!(self.threads),
176		cast!(self.timeline),
177		cast!(self.typing),
178		cast!(self.federation),
179		cast!(self.sending),
180		cast!(self.server_keys),
181		cast!(self.sync),
182		cast!(self.transaction_ids),
183		cast!(self.uiaa),
184		cast!(self.users),
185		cast!(self.membership),
186		cast!(self.deactivate),
187		cast!(self.oauth),
188		cast!(self.retention),
189		cast!(self.registration_tokens),
190	]
191	.into_iter()
192}
193
194impl fmt::Debug for Services {
195	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196		f.debug_struct("Services").finish()
197	}
198}
199
200#[implement(Services)]
201pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
202	debug_info!("Starting services...");
203
204	super::migrations::migrations(self).await?;
205
206	self.manager
207		.lock()
208		.await
209		.insert(Manager::new(self))
210		.clone()
211		.start()
212		.await?;
213
214	debug_info!("Services startup complete.");
215
216	Ok(Arc::clone(self))
217}
218
219#[implement(Services)]
220pub async fn stop(&self) {
221	info!("Shutting down services...");
222
223	self.interrupt().await;
224	if let Some(manager) = self.manager.lock().await.as_ref() {
225		manager.stop().await;
226	}
227
228	debug_info!("Services shutdown complete.");
229}
230
231#[implement(Services)]
232pub(crate) async fn interrupt(&self) {
233	debug!("Interrupting services...");
234	for service in self.services() {
235		trace!(
236			name = ?service.name(),
237			"Interrupting Service"
238		);
239
240		service.interrupt().await;
241	}
242}
243
244#[implement(Services)]
245pub async fn poll(&self) -> Result {
246	if let Some(manager) = self.manager.lock().await.as_ref() {
247		trace!("Polling service manager...");
248		return manager.poll().await;
249	}
250
251	Ok(())
252}
253
254#[implement(Services)]
255pub async fn clear_cache(&self) {
256	self.services()
257		.stream()
258		.for_each(async |service| {
259			service.clear_cache().await;
260		})
261		.await;
262}
263
264#[implement(Services)]
265pub async fn memory_usage(&self) -> Result<String> {
266	self.services()
267		.try_stream()
268		.try_fold(String::new(), async |mut out, service| {
269			service.memory_usage(&mut out).await?;
270			Ok(out)
271		})
272		.await
273}