tuwunel_service/pusher/
request.rs1use std::{fmt::Debug, mem};
2
3use bytes::BytesMut;
4use ipaddress::IPAddress;
5use ruma::api::{
6 IncomingResponse, OutgoingRequest, auth_scheme::AuthScheme, path_builder::PathBuilder,
7};
8use tuwunel_core::{
9 Err, Result, debug_warn, err, implement, trace, utils::string_from_bytes, warn,
10};
11
12use crate::client::read_response_capped;
13
14#[implement(super::Service)]
15#[tracing::instrument(level = "debug", skip_all)]
16pub(super) async fn send_request<T>(&self, dest: &str, request: T) -> Result<T::IncomingResponse>
17where
18 T: OutgoingRequest + Debug + Send,
19 for<'a> T::Authentication: AuthScheme<Input<'a> = ()>,
20 for<'a> T::PathBuilder: PathBuilder<Input<'a> = ()>,
21{
22 let dest = dest.replace(&self.services.config.notification_push_path, "");
23 trace!("Push gateway destination: {dest}");
24
25 let http_request = request
26 .try_into_http_request::<BytesMut>(&dest, (), ())
27 .map_err(|e| {
28 err!(BadServerResponse(warn!(
29 "Failed to find destination {dest} for push gateway: {e}"
30 )))
31 })?
32 .map(BytesMut::freeze);
33
34 let reqwest_request = reqwest::Request::try_from(http_request)?;
35 if let Some(url_host) = reqwest_request.url().host_str() {
36 trace!("Checking request URL for IP");
37 if let Ok(ip) = IPAddress::parse(url_host)
38 && !self.services.client.valid_cidr_range(&ip)
39 {
40 return Err!(BadServerResponse("Not allowed to send requests to this IP"));
41 }
42 }
43
44 let response = self
45 .services
46 .client
47 .pusher
48 .execute(reqwest_request)
49 .await;
50
51 match response {
52 | Ok(mut response) => {
53 trace!("Checking response destination's IP");
56 if let Some(remote_addr) = response.remote_addr()
57 && let Ok(ip) = IPAddress::parse(remote_addr.ip().to_string())
58 && !self.services.client.valid_cidr_range(&ip)
59 {
60 return Err!(BadServerResponse("Not allowed to send requests to this IP"));
61 }
62
63 let status = response.status();
64 let mut http_response_builder = http::Response::builder()
65 .status(status)
66 .version(response.version());
67
68 mem::swap(
69 response.headers_mut(),
70 http_response_builder
71 .headers_mut()
72 .expect("http::response::Builder is usable"),
73 );
74
75 let limit = self.services.config.max_response_size;
76 let body = read_response_capped(response, limit).await?;
77
78 if !status.is_success() {
79 debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
80 return Err!(BadServerResponse(warn!(
81 "Push gateway {dest} returned unsuccessful HTTP response: {status}"
82 )));
83 }
84
85 let response = T::IncomingResponse::try_from_http_response(
86 http_response_builder
87 .body(body)
88 .expect("reqwest body is valid http body"),
89 );
90
91 response.map_err(|e| {
92 err!(BadServerResponse(warn!(
93 "Push gateway {dest} returned invalid response: {e}"
94 )))
95 })
96 },
97 | Err(e) => {
98 warn!("Could not send request to pusher {dest}: {e}");
99 Err(e.into())
100 },
101 }
102}