tuwunel_core/utils/stream/
try_parallel.rs1use futures::{TryFutureExt, stream::TryStream};
4use tokio::{runtime, task::JoinError};
5
6use super::TryBroadbandExt;
7use crate::{Error, Result, utils::sys::available_parallelism};
8
9pub trait TryParallelExt<T, E>
15where
16 Self: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
17 E: From<JoinError> + From<Error> + Send + 'static,
18 T: Send + 'static,
19{
20 fn paralleln_and_then<U, F, N, H>(
21 self,
22 h: H,
23 n: N,
24 f: F,
25 ) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
26 where
27 N: Into<Option<usize>>,
28 H: Into<Option<runtime::Handle>>,
29 F: Fn(Self::Ok) -> Result<U, E> + Clone + Send + 'static,
30 U: Send + 'static;
31
32 fn parallel_and_then<U, F, H>(
33 self,
34 h: H,
35 f: F,
36 ) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
37 where
38 H: Into<Option<runtime::Handle>>,
39 F: Fn(Self::Ok) -> Result<U, E> + Clone + Send + 'static,
40 U: Send + 'static,
41 {
42 self.paralleln_and_then(h, None, f)
43 }
44}
45
46impl<T, E, S> TryParallelExt<T, E> for S
47where
48 S: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
49 E: From<JoinError> + From<Error> + Send + 'static,
50 T: Send + 'static,
51{
52 fn paralleln_and_then<U, F, N, H>(
53 self,
54 h: H,
55 n: N,
56 f: F,
57 ) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
58 where
59 N: Into<Option<usize>>,
60 H: Into<Option<runtime::Handle>>,
61 F: Fn(Self::Ok) -> Result<U, E> + Clone + Send + 'static,
62 U: Send + 'static,
63 {
64 let n = n.into().unwrap_or_else(available_parallelism);
65 let h = h.into().unwrap_or_else(runtime::Handle::current);
66 self.broadn_and_then(n, move |val| {
67 let (h, f) = (h.clone(), f.clone());
68 async move {
69 h.spawn_blocking(move || f(val))
70 .map_err(E::from)
71 .await?
72 }
73 })
74 }
75}