Skip to main content

tuwunel_admin/query/
sending.rs

1use clap::Subcommand;
2use futures::StreamExt;
3use ruma::{OwnedServerName, OwnedUserId};
4use tuwunel_core::{Err, Result};
5use tuwunel_service::sending::Destination;
6
7use crate::Context;
8
9#[derive(Debug, Subcommand)]
10/// All the getters and iterators from src/service/sending/
11pub(crate) enum SendingCommand {
12	/// - Queries database for all `servercurrentevent_data`
13	ActiveRequests,
14
15	/// - Queries database for `servercurrentevent_data` but for a specific
16	///   destination
17	///
18	/// This command takes only *one* format of these arguments:
19	///
20	/// appservice_id
21	/// server_name
22	/// user_id AND push_key
23	///
24	/// See src/service/sending/dest.rs for the definition of the `Destination`
25	/// enum
26	ActiveRequestsFor {
27		#[arg(short, long)]
28		appservice_id: Option<String>,
29		#[arg(short, long)]
30		server_name: Option<OwnedServerName>,
31		#[arg(short, long)]
32		user_id: Option<OwnedUserId>,
33		#[arg(short, long)]
34		push_key: Option<String>,
35	},
36
37	/// - Queries database for `servernameevent_data` which are the queued up
38	///   requests that will eventually be sent
39	///
40	/// This command takes only *one* format of these arguments:
41	///
42	/// appservice_id
43	/// server_name
44	/// user_id AND push_key
45	///
46	/// See src/service/sending/dest.rs for the definition of the `Destination`
47	/// enum
48	QueuedRequests {
49		#[arg(short, long)]
50		appservice_id: Option<String>,
51		#[arg(short, long)]
52		server_name: Option<OwnedServerName>,
53		#[arg(short, long)]
54		user_id: Option<OwnedUserId>,
55		#[arg(short, long)]
56		push_key: Option<String>,
57	},
58
59	GetLatestEduCount {
60		server_name: OwnedServerName,
61	},
62}
63
64/// All the getters and iterators from src/service/sending/
65pub(super) async fn process(subcommand: SendingCommand, context: &Context<'_>) -> Result {
66	let services = context.services;
67
68	match subcommand {
69		| SendingCommand::ActiveRequests => {
70			let timer = tokio::time::Instant::now();
71			let results = services.sending.db.active_requests();
72			let active_requests = results.collect::<Vec<_>>().await;
73			let query_time = timer.elapsed();
74
75			context
76				.write_str(&format!(
77					"Query completed in {query_time:?}:\n\n```rs\n{active_requests:#?}\n```"
78				))
79				.await
80		},
81		| SendingCommand::QueuedRequests {
82			appservice_id,
83			server_name,
84			user_id,
85			push_key,
86		} => {
87			if appservice_id.is_none()
88				&& server_name.is_none()
89				&& user_id.is_none()
90				&& push_key.is_none()
91			{
92				return Err!(
93					"An appservice ID, server name, or a user ID with push key must be \
94					 specified via arguments. See --help for more details.",
95				);
96			}
97			let timer = tokio::time::Instant::now();
98			let results = match (appservice_id, server_name, user_id, push_key) {
99				| (Some(appservice_id), None, None, None) => {
100					if appservice_id.is_empty() {
101						return Err!(
102							"An appservice ID, server name, or a user ID with push key must be \
103							 specified via arguments. See --help for more details.",
104						);
105					}
106
107					services
108						.sending
109						.db
110						.queued_requests(&Destination::Appservice(appservice_id))
111				},
112				| (None, Some(server_name), None, None) => services
113					.sending
114					.db
115					.queued_requests(&Destination::Federation(server_name)),
116				| (None, None, Some(user_id), Some(push_key)) => {
117					if push_key.is_empty() {
118						return Err!(
119							"An appservice ID, server name, or a user ID with push key must be \
120							 specified via arguments. See --help for more details.",
121						);
122					}
123
124					services
125						.sending
126						.db
127						.queued_requests(&Destination::Push(user_id, push_key))
128				},
129				| (Some(_), Some(_), Some(_), Some(_)) => {
130					return Err!(
131						"An appservice ID, server name, or a user ID with push key must be \
132						 specified via arguments. Not all of them See --help for more details.",
133					);
134				},
135				| _ => {
136					return Err!(
137						"An appservice ID, server name, or a user ID with push key must be \
138						 specified via arguments. See --help for more details.",
139					);
140				},
141			};
142
143			let queued_requests = results.collect::<Vec<_>>().await;
144			let query_time = timer.elapsed();
145
146			context
147				.write_str(&format!(
148					"Query completed in {query_time:?}:\n\n```rs\n{queued_requests:#?}\n```"
149				))
150				.await
151		},
152		| SendingCommand::ActiveRequestsFor {
153			appservice_id,
154			server_name,
155			user_id,
156			push_key,
157		} => {
158			if appservice_id.is_none()
159				&& server_name.is_none()
160				&& user_id.is_none()
161				&& push_key.is_none()
162			{
163				return Err!(
164					"An appservice ID, server name, or a user ID with push key must be \
165					 specified via arguments. See --help for more details.",
166				);
167			}
168
169			let timer = tokio::time::Instant::now();
170			let results = match (appservice_id, server_name, user_id, push_key) {
171				| (Some(appservice_id), None, None, None) => {
172					if appservice_id.is_empty() {
173						return Err!(
174							"An appservice ID, server name, or a user ID with push key must be \
175							 specified via arguments. See --help for more details.",
176						);
177					}
178
179					services
180						.sending
181						.db
182						.active_requests_for(&Destination::Appservice(appservice_id))
183				},
184				| (None, Some(server_name), None, None) => services
185					.sending
186					.db
187					.active_requests_for(&Destination::Federation(server_name)),
188				| (None, None, Some(user_id), Some(push_key)) => {
189					if push_key.is_empty() {
190						return Err!(
191							"An appservice ID, server name, or a user ID with push key must be \
192							 specified via arguments. See --help for more details.",
193						);
194					}
195
196					services
197						.sending
198						.db
199						.active_requests_for(&Destination::Push(user_id, push_key))
200				},
201				| (Some(_), Some(_), Some(_), Some(_)) => {
202					return Err!(
203						"An appservice ID, server name, or a user ID with push key must be \
204						 specified via arguments. Not all of them See --help for more details.",
205					);
206				},
207				| _ => {
208					return Err!(
209						"An appservice ID, server name, or a user ID with push key must be \
210						 specified via arguments. See --help for more details.",
211					);
212				},
213			};
214
215			let active_requests = results.collect::<Vec<_>>().await;
216			let query_time = timer.elapsed();
217
218			context
219				.write_str(&format!(
220					"Query completed in {query_time:?}:\n\n```rs\n{active_requests:#?}\n```"
221				))
222				.await
223		},
224		| SendingCommand::GetLatestEduCount { server_name } => {
225			let timer = tokio::time::Instant::now();
226			let results = services
227				.sending
228				.db
229				.get_latest_educount(&server_name)
230				.await;
231			let query_time = timer.elapsed();
232
233			context
234				.write_str(&format!(
235					"Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```"
236				))
237				.await
238		},
239	}
240}