Skip to main content

tuwunel_core/utils/stream/
try_broadband.rs

1//! Synchronous combinator extensions to futures::TryStream
2
3use futures::{TryFuture, TryStream, TryStreamExt};
4
5use super::automatic_width;
6use crate::Result;
7
8/// Concurrency extensions to augment futures::TryStreamExt. broad_ combinators
9/// produce out-of-order
10pub trait TryBroadbandExt<T, E>
11where
12	Self: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
13{
14	fn broadn_and_then<U, F, Fut, N>(
15		self,
16		n: N,
17		f: F,
18	) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
19	where
20		N: Into<Option<usize>>,
21		F: Fn(Self::Ok) -> Fut + Send,
22		Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send;
23
24	fn broad_and_then<U, F, Fut>(
25		self,
26		f: F,
27	) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
28	where
29		F: Fn(Self::Ok) -> Fut + Send,
30		Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send,
31	{
32		self.broadn_and_then(None, f)
33	}
34}
35
36impl<T, E, S> TryBroadbandExt<T, E> for S
37where
38	S: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
39{
40	fn broadn_and_then<U, F, Fut, N>(
41		self,
42		n: N,
43		f: F,
44	) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
45	where
46		N: Into<Option<usize>>,
47		F: Fn(Self::Ok) -> Fut + Send,
48		Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send,
49	{
50		self.map_ok(f)
51			.try_buffer_unordered(n.into().unwrap_or_else(automatic_width))
52	}
53}