Skip to main content

tuwunel_core/utils/stream/
try_parallel.rs

1//! Parallelism stream combinator extensions to futures::Stream
2
3use futures::{TryFutureExt, stream::TryStream};
4use tokio::{runtime, task::JoinError};
5
6use super::TryBroadbandExt;
7use crate::{Error, Result, utils::sys::available_parallelism};
8
9/// Parallelism extensions to augment futures::StreamExt. These combinators are
10/// for computation-oriented workloads, unlike -band combinators for I/O
11/// workloads; these default to the available compute parallelism for the
12/// system. Threads are currently drawn from the tokio-spawn pool. Results are
13/// unordered.
14pub trait TryParallelExt<T, E>
15where
16	Self: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
17	E: From<JoinError> + From<Error> + Send + 'static,
18	T: Send + 'static,
19{
20	fn paralleln_and_then<U, F, N, H>(
21		self,
22		h: H,
23		n: N,
24		f: F,
25	) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
26	where
27		N: Into<Option<usize>>,
28		H: Into<Option<runtime::Handle>>,
29		F: Fn(Self::Ok) -> Result<U, E> + Clone + Send + 'static,
30		U: Send + 'static;
31
32	fn parallel_and_then<U, F, H>(
33		self,
34		h: H,
35		f: F,
36	) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
37	where
38		H: Into<Option<runtime::Handle>>,
39		F: Fn(Self::Ok) -> Result<U, E> + Clone + Send + 'static,
40		U: Send + 'static,
41	{
42		self.paralleln_and_then(h, None, f)
43	}
44}
45
46impl<T, E, S> TryParallelExt<T, E> for S
47where
48	S: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
49	E: From<JoinError> + From<Error> + Send + 'static,
50	T: Send + 'static,
51{
52	fn paralleln_and_then<U, F, N, H>(
53		self,
54		h: H,
55		n: N,
56		f: F,
57	) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
58	where
59		N: Into<Option<usize>>,
60		H: Into<Option<runtime::Handle>>,
61		F: Fn(Self::Ok) -> Result<U, E> + Clone + Send + 'static,
62		U: Send + 'static,
63	{
64		let n = n.into().unwrap_or_else(available_parallelism);
65		let h = h.into().unwrap_or_else(runtime::Handle::current);
66		self.broadn_and_then(n, move |val| {
67			let (h, f) = (h.clone(), f.clone());
68			async move {
69				h.spawn_blocking(move || f(val))
70					.map_err(E::from)
71					.await?
72			}
73		})
74	}
75}