tuwunel_service/pusher/
request.rs1use std::{fmt::Debug, mem};
2
3use bytes::BytesMut;
4use ipaddress::IPAddress;
5use ruma::api::{
6 IncomingResponse, OutgoingRequest, auth_scheme::AuthScheme, path_builder::PathBuilder,
7};
8use tuwunel_core::{
9 Err, Result, debug_warn, err, implement, trace, utils::string_from_bytes, warn,
10};
11
12#[implement(super::Service)]
13#[tracing::instrument(level = "debug", skip_all)]
14pub(super) async fn send_request<T>(&self, dest: &str, request: T) -> Result<T::IncomingResponse>
15where
16 T: OutgoingRequest + Debug + Send,
17 for<'a> T::Authentication: AuthScheme<Input<'a> = ()>,
18 for<'a> T::PathBuilder: PathBuilder<Input<'a> = ()>,
19{
20 let dest = dest.replace(&self.services.config.notification_push_path, "");
21 trace!("Push gateway destination: {dest}");
22
23 let http_request = request
24 .try_into_http_request::<BytesMut>(&dest, (), ())
25 .map_err(|e| {
26 err!(BadServerResponse(warn!(
27 "Failed to find destination {dest} for push gateway: {e}"
28 )))
29 })?
30 .map(BytesMut::freeze);
31
32 let reqwest_request = reqwest::Request::try_from(http_request)?;
33 if let Some(url_host) = reqwest_request.url().host_str() {
34 trace!("Checking request URL for IP");
35 if let Ok(ip) = IPAddress::parse(url_host)
36 && !self.services.client.valid_cidr_range(&ip)
37 {
38 return Err!(BadServerResponse("Not allowed to send requests to this IP"));
39 }
40 }
41
42 let response = self
43 .services
44 .client
45 .pusher
46 .execute(reqwest_request)
47 .await;
48
49 match response {
50 | Ok(mut response) => {
51 trace!("Checking response destination's IP");
54 if let Some(remote_addr) = response.remote_addr()
55 && let Ok(ip) = IPAddress::parse(remote_addr.ip().to_string())
56 && !self.services.client.valid_cidr_range(&ip)
57 {
58 return Err!(BadServerResponse("Not allowed to send requests to this IP"));
59 }
60
61 let status = response.status();
62 let mut http_response_builder = http::Response::builder()
63 .status(status)
64 .version(response.version());
65
66 mem::swap(
67 response.headers_mut(),
68 http_response_builder
69 .headers_mut()
70 .expect("http::response::Builder is usable"),
71 );
72
73 let body = response.bytes().await?; if !status.is_success() {
76 debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
77 return Err!(BadServerResponse(warn!(
78 "Push gateway {dest} returned unsuccessful HTTP response: {status}"
79 )));
80 }
81
82 let response = T::IncomingResponse::try_from_http_response(
83 http_response_builder
84 .body(body)
85 .expect("reqwest body is valid http body"),
86 );
87
88 response.map_err(|e| {
89 err!(BadServerResponse(warn!(
90 "Push gateway {dest} returned invalid response: {e}"
91 )))
92 })
93 },
94 | Err(e) => {
95 warn!("Could not send request to pusher {dest}: {e}");
96 Err(e.into())
97 },
98 }
99}