Skip to main content

tuwunel_core/utils/stream/
wideband.rs

1//! Wideband stream combinator extensions to futures::Stream
2
3use std::convert::identity;
4
5use futures::{
6	Future,
7	stream::{Stream, StreamExt},
8};
9
10use super::{ReadyExt, automatic_width};
11
12/// Concurrency extensions to augment futures::StreamExt. wideband_ combinators
13/// produce in-order.
14pub trait WidebandExt<Item>
15where
16	Self: Stream<Item = Item> + Send + Sized,
17{
18	/// Concurrent filter_map(); ordered results
19	fn widen_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
20	where
21		N: Into<Option<usize>>,
22		F: Fn(Item) -> Fut + Send,
23		Fut: Future<Output = Option<U>> + Send,
24		U: Send;
25
26	fn widen_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
27	where
28		N: Into<Option<usize>>,
29		F: Fn(Item) -> Fut + Send,
30		Fut: Future<Output = U> + Send,
31		U: Send;
32
33	#[inline]
34	fn wide_filter_map<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
35	where
36		F: Fn(Item) -> Fut + Send,
37		Fut: Future<Output = Option<U>> + Send,
38		U: Send,
39	{
40		self.widen_filter_map(None, f)
41	}
42
43	#[inline]
44	fn wide_then<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
45	where
46		F: Fn(Item) -> Fut + Send,
47		Fut: Future<Output = U> + Send,
48		U: Send,
49	{
50		self.widen_then(None, f)
51	}
52}
53
54impl<Item, S> WidebandExt<Item> for S
55where
56	S: Stream<Item = Item> + Send + Sized,
57{
58	#[inline]
59	fn widen_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
60	where
61		N: Into<Option<usize>>,
62		F: Fn(Item) -> Fut + Send,
63		Fut: Future<Output = Option<U>> + Send,
64		U: Send,
65	{
66		self.map(f)
67			.buffered(n.into().unwrap_or_else(automatic_width))
68			.ready_filter_map(identity)
69	}
70
71	#[inline]
72	fn widen_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
73	where
74		N: Into<Option<usize>>,
75		F: Fn(Item) -> Fut + Send,
76		Fut: Future<Output = U> + Send,
77		U: Send,
78	{
79		self.map(f)
80			.buffered(n.into().unwrap_or_else(automatic_width))
81	}
82}