Skip to main content

tuwunel_core/utils/stream/
try_wideband.rs

1//! Synchronous combinator extensions to futures::TryStream
2
3use futures::{TryFuture, TryStream, TryStreamExt};
4
5use super::automatic_width;
6use crate::Result;
7
8/// Concurrency extensions to augment futures::TryStreamExt. wide_ combinators
9/// produce in-order results
10pub trait TryWidebandExt<T, E>
11where
12	Self: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
13{
14	fn widen_and_then<U, F, Fut, N>(
15		self,
16		n: N,
17		f: F,
18	) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
19	where
20		N: Into<Option<usize>>,
21		F: Fn(Self::Ok) -> Fut + Send,
22		Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send,
23		U: Send;
24
25	fn wide_and_then<U, F, Fut>(
26		self,
27		f: F,
28	) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
29	where
30		F: Fn(Self::Ok) -> Fut + Send,
31		Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send,
32		U: Send,
33	{
34		self.widen_and_then(None, f)
35	}
36}
37
38impl<T, E, S> TryWidebandExt<T, E> for S
39where
40	S: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + Sized,
41	E: Send,
42{
43	fn widen_and_then<U, F, Fut, N>(
44		self,
45		n: N,
46		f: F,
47	) -> impl TryStream<Ok = U, Error = E, Item = Result<U, E>> + Send
48	where
49		N: Into<Option<usize>>,
50		F: Fn(Self::Ok) -> Fut + Send,
51		Fut: TryFuture<Ok = U, Error = E, Output = Result<U, E>> + Send,
52		U: Send,
53	{
54		self.map_ok(f)
55			.try_buffered(n.into().unwrap_or_else(automatic_width))
56	}
57}