Skip to main content

tuwunel_service/client/
mod.rs

1use std::{
2	net::IpAddr,
3	ops::Deref,
4	sync::{Arc, LazyLock},
5	time::Duration,
6};
7
8use bytes::{Bytes, BytesMut};
9use ipaddress::{IPAddress, ipv4::from_u32 as ipv4_from_u32};
10use reqwest::{Certificate, Client, ClientBuilder, dns::Resolve, header::HeaderValue, redirect};
11use tuwunel_core::{Config, Err, Result, debug, either::Either, err, implement, trace};
12
13use crate::{Services, resolver::Validating, service};
14
15type DisableEncoding = fn(ClientBuilder) -> ClientBuilder;
16
17pub struct Clients {
18	pub default: Client,
19	pub url_preview: Client,
20	pub extern_media: Client,
21	pub well_known: Client,
22	pub federation: Client,
23	pub synapse: Client,
24	pub sender: Client,
25	pub appservice: Client,
26	pub pusher: Client,
27	pub oauth: Client,
28}
29
30pub struct Service {
31	pub clients: LazyLock<Clients, Box<dyn FnOnce() -> Clients + Send>>,
32
33	pub cidr_range_denylist: Arc<[IPAddress]>,
34}
35
36impl Deref for Service {
37	type Target = Clients;
38
39	fn deref(&self) -> &Self::Target { &self.clients }
40}
41
42impl crate::Service for Service {
43	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
44		let config = &args.server.config;
45
46		Ok(Arc::new(Self {
47			clients: LazyLock::new(Box::new({
48				let services = args.services.clone();
49
50				move || make_clients(&services).expect("failed to construct clients")
51			})),
52
53			cidr_range_denylist: config
54				.ip_range_denylist
55				.iter()
56				.map(IPAddress::parse)
57				.inspect(|cidr| trace!("Denied CIDR range: {cidr:?}"))
58				.collect::<Result<Vec<_>, String>>()
59				.map(Arc::from)
60				.map_err(|e| err!(Config("ip_range_denylist", e)))?,
61		}))
62	}
63
64	fn name(&self) -> &str { service::make_name(std::module_path!()) }
65}
66
67fn make_clients(services: &Services) -> Result<Clients> {
68	macro_rules! with {
69		($builder:ident => $make:expr) => {{
70			let $builder = base(&services.config, None)?;
71			$make.build()?
72		}};
73		($name:literal, $builder:ident => $make:expr) => {{
74			let $builder = base(&services.config, Some($name))?;
75			$make.build()?
76		}};
77	}
78
79	Ok(Clients {
80		default: with!(cb => cb.dns_resolver(Arc::clone(&services.resolver.resolver))),
81
82		url_preview: with!("preview", cb => {
83			let interface = &services
84				.config
85				.url_preview_bound_interface;
86
87			let bind_addr = interface.clone().and_then(Either::left);
88			let bind_iface = interface.clone().and_then(Either::right);
89
90			let resolver = Validating::new(
91				Arc::clone(&services.resolver.resolver),
92				Arc::clone(&services.client.cidr_range_denylist),
93			);
94
95			builder_interface(cb, bind_iface.as_deref())?
96				.local_address(bind_addr)
97				.dns_resolver(resolver)
98				.redirect(redirect::Policy::limited(3))
99		}),
100
101		extern_media: with!(cb => cb
102			.dns_resolver(Validating::new(
103				Arc::clone(&services.resolver.resolver),
104				Arc::clone(&services.client.cidr_range_denylist),
105			))
106			.redirect(redirect::Policy::limited(3))),
107
108		well_known: with!(cb => cb
109			.dns_resolver(Arc::clone(&services.resolver.resolver))
110			.connect_timeout(Duration::from_secs(
111				services.config.well_known_conn_timeout,
112			))
113			.read_timeout(Duration::from_secs(services.config.well_known_timeout))
114			.timeout(Duration::from_secs(services.config.well_known_timeout))
115			.pool_max_idle_per_host(0)
116			.redirect(redirect::Policy::limited(4))),
117
118		federation: with!(cb => cb
119			.dns_resolver(Arc::clone(&services.resolver.resolver.hooked))
120			.read_timeout(Duration::from_secs(services.config.federation_timeout))
121			.pool_max_idle_per_host(services.config.federation_idle_per_host.into())
122			.pool_idle_timeout(Duration::from_secs(
123				services.config.federation_idle_timeout,
124			))
125			.redirect(redirect::Policy::limited(3))),
126
127		synapse: with!(cb => cb
128			.dns_resolver(Arc::clone(&services.resolver.resolver.hooked))
129			.read_timeout(Duration::from_secs(305))
130			.pool_max_idle_per_host(0)
131			.redirect(redirect::Policy::limited(3))),
132
133		sender: with!(cb => cb
134			.dns_resolver(Arc::clone(&services.resolver.resolver.hooked))
135			.read_timeout(Duration::from_secs(services.config.sender_timeout))
136			.timeout(Duration::from_secs(services.config.sender_timeout))
137			.pool_max_idle_per_host(1)
138			.pool_idle_timeout(Duration::from_secs(
139				services.config.sender_idle_timeout,
140			))
141			.redirect(redirect::Policy::limited(2))),
142
143		appservice: with!(cb => cb
144			.dns_resolver(appservice_resolver(services))
145			.connect_timeout(Duration::from_secs(5))
146			.read_timeout(Duration::from_secs(services.config.appservice_timeout))
147			.timeout(Duration::from_secs(services.config.appservice_timeout))
148			.pool_max_idle_per_host(1)
149			.pool_idle_timeout(Duration::from_secs(
150				services.config.appservice_idle_timeout,
151			))
152			.redirect(redirect::Policy::limited(2))),
153
154		pusher: with!(cb => cb
155			.dns_resolver(Validating::new(
156				Arc::clone(&services.resolver.resolver),
157				Arc::clone(&services.client.cidr_range_denylist),
158			))
159			.pool_max_idle_per_host(1)
160			.pool_idle_timeout(Duration::from_secs(
161				services.config.pusher_idle_timeout,
162			))
163			.redirect(redirect::Policy::limited(2))),
164
165		oauth: with!(cb => cb
166			.dns_resolver(Arc::clone(&services.resolver.resolver))
167			.redirect(redirect::Policy::limited(0))
168			.pool_max_idle_per_host(1)),
169	})
170}
171
172fn base(config: &Config, name: Option<&str>) -> Result<ClientBuilder> {
173	let user_agent = tuwunel_core::version::user_agent();
174	let user_agent: HeaderValue = name
175		.map(|name| format!("{user_agent} {name}").try_into())
176		.unwrap_or_else(|| user_agent.try_into())?;
177
178	let builder = Client::builder()
179		.connect_timeout(Duration::from_secs(config.request_conn_timeout))
180		.read_timeout(Duration::from_secs(config.request_timeout))
181		.timeout(Duration::from_secs(config.request_total_timeout))
182		.pool_idle_timeout(Duration::from_secs(config.request_idle_timeout))
183		.pool_max_idle_per_host(config.request_idle_per_host.into())
184		.user_agent(user_agent)
185		.redirect(redirect::Policy::limited(6))
186		.danger_accept_invalid_certs(config.allow_invalid_tls_certificates)
187		.tls_certs_merge(
188			webpki_root_certs::TLS_SERVER_ROOT_CERTS
189				.iter()
190				.map(|der| {
191					Certificate::from_der(der).expect("certificate must be valid der encoding")
192				}),
193		)
194		.connection_verbose(cfg!(debug_assertions))
195		// Check if env var is set to avoid locking the keyfile mutex on every connection open
196		.tls_sslkeylogfile(std::env::var_os("SSLKEYLOGFILE").is_some());
197
198	let encodings: [(bool, DisableEncoding); 3] = [
199		(config.request_gzip, ClientBuilder::no_gzip),
200		(config.request_brotli, ClientBuilder::no_brotli),
201		(config.request_zstd, ClientBuilder::no_zstd),
202	];
203
204	let builder = encodings
205		.into_iter()
206		.filter(|(enabled, _)| !enabled)
207		.fold(builder, |builder, (_, disable)| disable(builder));
208
209	match config.proxy.to_proxy()? {
210		| Some(proxy) => Ok(builder.proxy(proxy)),
211		| _ => Ok(builder),
212	}
213}
214
215/// Buffer a remote response body, rejecting any response larger than `limit`
216/// bytes. reqwest enforces no response-size limit, so an unbounded `bytes()`
217/// lets a peer drive an allocator abort; refuse an oversized advertised length
218/// and hold the same bound while streaming for when that length is absent.
219pub async fn read_response_capped(
220	mut response: reqwest::Response,
221	limit: usize,
222) -> Result<Bytes> {
223	let mut body = match response.content_length() {
224		| Some(len) if len > limit.try_into().unwrap_or(u64::MAX) => {
225			debug!(%len, %limit, "rejecting response: advertised body exceeds limit");
226			return Err!(BadServerResponse(
227				"Response body length {len} exceeds the {limit} byte limit"
228			));
229		},
230		| Some(len) => BytesMut::with_capacity(usize::try_from(len).unwrap_or(limit)),
231		| None => BytesMut::new(),
232	};
233	while let Some(chunk) = response.chunk().await? {
234		if body.len().saturating_add(chunk.len()) > limit {
235			debug!(%limit, "rejecting response: streamed body exceeds limit");
236			return Err!(BadServerResponse("Response body exceeds the {limit} byte limit"));
237		}
238
239		body.extend_from_slice(&chunk);
240	}
241
242	Ok(body.freeze())
243}
244
245#[cfg(any(
246	target_os = "android",
247	target_os = "fuchsia",
248	target_os = "linux"
249))]
250fn builder_interface(builder: ClientBuilder, config: Option<&str>) -> Result<ClientBuilder> {
251	if let Some(iface) = config {
252		Ok(builder.interface(iface))
253	} else {
254		Ok(builder)
255	}
256}
257
258#[cfg(not(any(
259	target_os = "android",
260	target_os = "fuchsia",
261	target_os = "linux"
262)))]
263fn builder_interface(builder: ClientBuilder, config: Option<&str>) -> Result<ClientBuilder> {
264	use tuwunel_core::Err;
265
266	if let Some(iface) = config {
267		Err!("Binding to network-interface {iface:?} by name is not supported on this platform.")
268	} else {
269		Ok(builder)
270	}
271}
272
273fn appservice_resolver(services: &Services) -> Arc<dyn Resolve> {
274	if services.server.config.dns_passthru_appservices {
275		services.resolver.resolver.passthru.clone()
276	} else {
277		services.resolver.resolver.clone()
278	}
279}
280
281#[inline]
282#[must_use]
283#[implement(Service)]
284pub fn valid_cidr_range(&self, ip: &IPAddress) -> bool {
285	self.cidr_range_denylist
286		.iter()
287		.all(|cidr| !cidr.includes(ip))
288}
289
290#[inline]
291#[must_use]
292#[implement(Service)]
293pub fn valid_cidr_range_ip(&self, ip: IpAddr) -> bool {
294	let addr = ipaddress_from_std(ip);
295	self.cidr_range_denylist
296		.iter()
297		.all(|cidr| !cidr.includes(&addr))
298}
299
300#[must_use]
301pub(crate) fn ipaddress_from_std(ip: IpAddr) -> IPAddress {
302	match ip {
303		| IpAddr::V4(v4) =>
304			ipv4_from_u32(u32::from(v4), 32).expect("/32 is always a valid prefix"),
305		// ipv6::from_int would skip the regex parser but pulls in num-bigint.
306		| IpAddr::V6(v6) =>
307			IPAddress::parse(v6.to_string()).expect("Ipv6Addr Display output parses"),
308	}
309}