Skip to main content

tuwunel_service/pusher/
request.rs

1use std::{fmt::Debug, mem};
2
3use bytes::BytesMut;
4use ipaddress::IPAddress;
5use ruma::api::{
6	IncomingResponse, OutgoingRequest, auth_scheme::AuthScheme, path_builder::PathBuilder,
7};
8use tuwunel_core::{
9	Err, Result, debug_warn, err, implement, trace, utils::string_from_bytes, warn,
10};
11
12#[implement(super::Service)]
13#[tracing::instrument(level = "debug", skip_all)]
14pub(super) async fn send_request<T>(&self, dest: &str, request: T) -> Result<T::IncomingResponse>
15where
16	T: OutgoingRequest + Debug + Send,
17	for<'a> T::Authentication: AuthScheme<Input<'a> = ()>,
18	for<'a> T::PathBuilder: PathBuilder<Input<'a> = ()>,
19{
20	let dest = dest.replace(&self.services.config.notification_push_path, "");
21	trace!("Push gateway destination: {dest}");
22
23	let http_request = request
24		.try_into_http_request::<BytesMut>(&dest, (), ())
25		.map_err(|e| {
26			err!(BadServerResponse(warn!(
27				"Failed to find destination {dest} for push gateway: {e}"
28			)))
29		})?
30		.map(BytesMut::freeze);
31
32	let reqwest_request = reqwest::Request::try_from(http_request)?;
33	if let Some(url_host) = reqwest_request.url().host_str() {
34		trace!("Checking request URL for IP");
35		if let Ok(ip) = IPAddress::parse(url_host)
36			&& !self.services.client.valid_cidr_range(&ip)
37		{
38			return Err!(BadServerResponse("Not allowed to send requests to this IP"));
39		}
40	}
41
42	let response = self
43		.services
44		.client
45		.pusher
46		.execute(reqwest_request)
47		.await;
48
49	match response {
50		| Ok(mut response) => {
51			// reqwest::Response -> http::Response conversion
52
53			trace!("Checking response destination's IP");
54			if let Some(remote_addr) = response.remote_addr()
55				&& let Ok(ip) = IPAddress::parse(remote_addr.ip().to_string())
56				&& !self.services.client.valid_cidr_range(&ip)
57			{
58				return Err!(BadServerResponse("Not allowed to send requests to this IP"));
59			}
60
61			let status = response.status();
62			let mut http_response_builder = http::Response::builder()
63				.status(status)
64				.version(response.version());
65
66			mem::swap(
67				response.headers_mut(),
68				http_response_builder
69					.headers_mut()
70					.expect("http::response::Builder is usable"),
71			);
72
73			let body = response.bytes().await?; // TODO: handle timeout
74
75			if !status.is_success() {
76				debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body));
77				return Err!(BadServerResponse(warn!(
78					"Push gateway {dest} returned unsuccessful HTTP response: {status}"
79				)));
80			}
81
82			let response = T::IncomingResponse::try_from_http_response(
83				http_response_builder
84					.body(body)
85					.expect("reqwest body is valid http body"),
86			);
87
88			response.map_err(|e| {
89				err!(BadServerResponse(warn!(
90					"Push gateway {dest} returned invalid response: {e}"
91				)))
92			})
93		},
94		| Err(e) => {
95			warn!("Could not send request to pusher {dest}: {e}");
96			Err(e.into())
97		},
98	}
99}