Skip to main content

tuwunel_service/pusher/
request.rs

1use std::{fmt::Debug, mem};
2
3use bytes::BytesMut;
4use ipaddress::IPAddress;
5use ruma::api::{
6	IncomingResponse, OutgoingRequest, auth_scheme::AuthScheme, path_builder::PathBuilder,
7};
8use tuwunel_core::{
9	Err, Result, debug_warn, err, implement, trace, utils::string_from_bytes, warn,
10};
11
12use crate::client::read_response_capped;
13
14#[implement(super::Service)]
15#[tracing::instrument(level = "debug", skip_all)]
16pub(super) async fn send_request<T>(&self, dest: &str, request: T) -> Result<T::IncomingResponse>
17where
18	T: OutgoingRequest + Debug + Send,
19	for<'a> T::Authentication: AuthScheme<Input<'a> = ()>,
20	for<'a> T::PathBuilder: PathBuilder<Input<'a> = ()>,
21{
22	let dest = dest.replace(&self.services.config.notification_push_path, "");
23	trace!("Push gateway destination: {dest}");
24
25	let http_request = request
26		.try_into_http_request::<BytesMut>(&dest, (), ())
27		.map_err(|e| {
28			err!(BadServerResponse(warn!(
29				"Failed to find destination {dest} for push gateway: {e}"
30			)))
31		})?
32		.map(BytesMut::freeze);
33
34	let reqwest_request = reqwest::Request::try_from(http_request)?;
35	if let Some(url_host) = reqwest_request.url().host_str() {
36		trace!("Checking request URL for IP");
37		if let Ok(ip) = IPAddress::parse(url_host)
38			&& !self.services.client.valid_cidr_range(&ip)
39		{
40			return Err!(BadServerResponse("Not allowed to send requests to this IP"));
41		}
42	}
43
44	let response = self
45		.services
46		.client
47		.pusher
48		.execute(reqwest_request)
49		.await;
50
51	match response {
52		| Ok(mut response) => {
53			// reqwest::Response -> http::Response conversion
54
55			trace!("Checking response destination's IP");
56			if let Some(remote_addr) = response.remote_addr()
57				&& let Ok(ip) = IPAddress::parse(remote_addr.ip().to_string())
58				&& !self.services.client.valid_cidr_range(&ip)
59			{
60				return Err!(BadServerResponse("Not allowed to send requests to this IP"));
61			}
62
63			let status = response.status();
64			let mut http_response_builder = http::Response::builder()
65				.status(status)
66				.version(response.version());
67
68			mem::swap(
69				response.headers_mut(),
70				http_response_builder
71					.headers_mut()
72					.expect("http::response::Builder is usable"),
73			);
74
75			let limit = self.services.config.max_response_size;
76			let body = read_response_capped(response, limit).await?;
77
78			if !status.is_success() {
79				debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
80				return Err!(BadServerResponse(warn!(
81					"Push gateway {dest} returned unsuccessful HTTP response: {status}"
82				)));
83			}
84
85			let response = T::IncomingResponse::try_from_http_response(
86				http_response_builder
87					.body(body)
88					.expect("reqwest body is valid http body"),
89			);
90
91			response.map_err(|e| {
92				err!(BadServerResponse(warn!(
93					"Push gateway {dest} returned invalid response: {e}"
94				)))
95			})
96		},
97		| Err(e) => {
98			warn!("Could not send request to pusher {dest}: {e}");
99			Err(e.into())
100		},
101	}
102}