1use clap::Subcommand;
2use futures::StreamExt;
3use ruma::{OwnedServerName, OwnedUserId};
4use tuwunel_core::{Err, Result};
5use tuwunel_service::sending::Destination;
6
7use crate::Context;
8
9#[derive(Debug, Subcommand)]
10pub(crate) enum SendingCommand {
12 ActiveRequests,
14
15 ActiveRequestsFor {
27 #[arg(short, long)]
28 appservice_id: Option<String>,
29 #[arg(short, long)]
30 server_name: Option<OwnedServerName>,
31 #[arg(short, long)]
32 user_id: Option<OwnedUserId>,
33 #[arg(short, long)]
34 push_key: Option<String>,
35 },
36
37 QueuedRequests {
49 #[arg(short, long)]
50 appservice_id: Option<String>,
51 #[arg(short, long)]
52 server_name: Option<OwnedServerName>,
53 #[arg(short, long)]
54 user_id: Option<OwnedUserId>,
55 #[arg(short, long)]
56 push_key: Option<String>,
57 },
58
59 GetLatestEduCount {
60 server_name: OwnedServerName,
61 },
62}
63
64pub(super) async fn process(subcommand: SendingCommand, context: &Context<'_>) -> Result {
66 let services = context.services;
67
68 match subcommand {
69 | SendingCommand::ActiveRequests => {
70 let timer = tokio::time::Instant::now();
71 let results = services.sending.db.active_requests();
72 let active_requests = results.collect::<Vec<_>>().await;
73 let query_time = timer.elapsed();
74
75 context
76 .write_str(&format!(
77 "Query completed in {query_time:?}:\n\n```rs\n{active_requests:#?}\n```"
78 ))
79 .await
80 },
81 | SendingCommand::QueuedRequests {
82 appservice_id,
83 server_name,
84 user_id,
85 push_key,
86 } => {
87 if appservice_id.is_none()
88 && server_name.is_none()
89 && user_id.is_none()
90 && push_key.is_none()
91 {
92 return Err!(
93 "An appservice ID, server name, or a user ID with push key must be \
94 specified via arguments. See --help for more details.",
95 );
96 }
97 let timer = tokio::time::Instant::now();
98 let results = match (appservice_id, server_name, user_id, push_key) {
99 | (Some(appservice_id), None, None, None) => {
100 if appservice_id.is_empty() {
101 return Err!(
102 "An appservice ID, server name, or a user ID with push key must be \
103 specified via arguments. See --help for more details.",
104 );
105 }
106
107 services
108 .sending
109 .db
110 .queued_requests(&Destination::Appservice(appservice_id))
111 },
112 | (None, Some(server_name), None, None) => services
113 .sending
114 .db
115 .queued_requests(&Destination::Federation(server_name)),
116 | (None, None, Some(user_id), Some(push_key)) => {
117 if push_key.is_empty() {
118 return Err!(
119 "An appservice ID, server name, or a user ID with push key must be \
120 specified via arguments. See --help for more details.",
121 );
122 }
123
124 services
125 .sending
126 .db
127 .queued_requests(&Destination::Push(user_id, push_key))
128 },
129 | (Some(_), Some(_), Some(_), Some(_)) => {
130 return Err!(
131 "An appservice ID, server name, or a user ID with push key must be \
132 specified via arguments. Not all of them See --help for more details.",
133 );
134 },
135 | _ => {
136 return Err!(
137 "An appservice ID, server name, or a user ID with push key must be \
138 specified via arguments. See --help for more details.",
139 );
140 },
141 };
142
143 let queued_requests = results.collect::<Vec<_>>().await;
144 let query_time = timer.elapsed();
145
146 context
147 .write_str(&format!(
148 "Query completed in {query_time:?}:\n\n```rs\n{queued_requests:#?}\n```"
149 ))
150 .await
151 },
152 | SendingCommand::ActiveRequestsFor {
153 appservice_id,
154 server_name,
155 user_id,
156 push_key,
157 } => {
158 if appservice_id.is_none()
159 && server_name.is_none()
160 && user_id.is_none()
161 && push_key.is_none()
162 {
163 return Err!(
164 "An appservice ID, server name, or a user ID with push key must be \
165 specified via arguments. See --help for more details.",
166 );
167 }
168
169 let timer = tokio::time::Instant::now();
170 let results = match (appservice_id, server_name, user_id, push_key) {
171 | (Some(appservice_id), None, None, None) => {
172 if appservice_id.is_empty() {
173 return Err!(
174 "An appservice ID, server name, or a user ID with push key must be \
175 specified via arguments. See --help for more details.",
176 );
177 }
178
179 services
180 .sending
181 .db
182 .active_requests_for(&Destination::Appservice(appservice_id))
183 },
184 | (None, Some(server_name), None, None) => services
185 .sending
186 .db
187 .active_requests_for(&Destination::Federation(server_name)),
188 | (None, None, Some(user_id), Some(push_key)) => {
189 if push_key.is_empty() {
190 return Err!(
191 "An appservice ID, server name, or a user ID with push key must be \
192 specified via arguments. See --help for more details.",
193 );
194 }
195
196 services
197 .sending
198 .db
199 .active_requests_for(&Destination::Push(user_id, push_key))
200 },
201 | (Some(_), Some(_), Some(_), Some(_)) => {
202 return Err!(
203 "An appservice ID, server name, or a user ID with push key must be \
204 specified via arguments. Not all of them See --help for more details.",
205 );
206 },
207 | _ => {
208 return Err!(
209 "An appservice ID, server name, or a user ID with push key must be \
210 specified via arguments. See --help for more details.",
211 );
212 },
213 };
214
215 let active_requests = results.collect::<Vec<_>>().await;
216 let query_time = timer.elapsed();
217
218 context
219 .write_str(&format!(
220 "Query completed in {query_time:?}:\n\n```rs\n{active_requests:#?}\n```"
221 ))
222 .await
223 },
224 | SendingCommand::GetLatestEduCount { server_name } => {
225 let timer = tokio::time::Instant::now();
226 let results = services
227 .sending
228 .db
229 .get_latest_educount(&server_name)
230 .await;
231 let query_time = timer.elapsed();
232
233 context
234 .write_str(&format!(
235 "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```"
236 ))
237 .await
238 },
239 }
240}