Skip to main content

tuwunel_core/utils/stream/
broadband.rs

1//! Broadband stream combinator extensions to futures::Stream
2
3use std::convert::identity;
4
5use futures::{
6	Future,
7	stream::{Stream, StreamExt},
8};
9
10use super::{ReadyExt, automatic_width};
11
12/// Concurrency extensions to augment futures::StreamExt. broad_ combinators
13/// produce out-of-order
14pub trait BroadbandExt<Item>
15where
16	Self: Stream<Item = Item> + Send + Sized,
17{
18	fn broadn_all<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
19	where
20		N: Into<Option<usize>>,
21		F: Fn(Item) -> Fut + Send,
22		Fut: Future<Output = bool> + Send;
23
24	fn broadn_any<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
25	where
26		N: Into<Option<usize>>,
27		F: Fn(Item) -> Fut + Send,
28		Fut: Future<Output = bool> + Send;
29
30	/// Concurrent filter_map(); unordered results
31	fn broadn_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
32	where
33		N: Into<Option<usize>>,
34		F: Fn(Item) -> Fut + Send,
35		Fut: Future<Output = Option<U>> + Send,
36		U: Send;
37
38	/// Concurrent find_map(); unordered result
39	fn broadn_find_map<'a, F, Fut, U, N>(
40		self,
41		n: N,
42		f: F,
43	) -> impl Future<Output = Option<U>> + Send
44	where
45		N: Into<Option<usize>>,
46		F: Fn(Item) -> Fut + Send + 'a,
47		Fut: Future<Output = Option<U>> + Send,
48		U: Send + 'a,
49		Self: Unpin + 'a;
50
51	fn broadn_flat_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
52	where
53		N: Into<Option<usize>>,
54		F: Fn(Item) -> Fut + Send,
55		Fut: Stream<Item = U> + Send + Unpin,
56		U: Send;
57
58	fn broadn_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
59	where
60		N: Into<Option<usize>>,
61		F: Fn(Item) -> Fut + Send,
62		Fut: Future<Output = U> + Send,
63		U: Send;
64
65	#[inline]
66	fn broad_all<F, Fut>(self, f: F) -> impl Future<Output = bool> + Send
67	where
68		F: Fn(Item) -> Fut + Send,
69		Fut: Future<Output = bool> + Send,
70	{
71		self.broadn_all(None, f)
72	}
73
74	#[inline]
75	fn broad_any<F, Fut>(self, f: F) -> impl Future<Output = bool> + Send
76	where
77		F: Fn(Item) -> Fut + Send,
78		Fut: Future<Output = bool> + Send,
79	{
80		self.broadn_any(None, f)
81	}
82
83	#[inline]
84	fn broad_filter_map<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
85	where
86		F: Fn(Item) -> Fut + Send,
87		Fut: Future<Output = Option<U>> + Send,
88		U: Send,
89	{
90		self.broadn_filter_map(None, f)
91	}
92
93	#[inline]
94	fn broad_find_map<'a, F, Fut, U>(self, f: F) -> impl Future<Output = Option<U>> + Send
95	where
96		F: Fn(Item) -> Fut + Send + 'a,
97		Fut: Future<Output = Option<U>> + Send,
98		U: Send + 'a,
99		Self: Unpin + 'a,
100	{
101		self.broadn_find_map(None, f)
102	}
103
104	#[inline]
105	fn broad_flat_map<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
106	where
107		F: Fn(Item) -> Fut + Send,
108		Fut: Stream<Item = U> + Send + Unpin,
109		U: Send,
110	{
111		self.broadn_flat_map(None, f)
112	}
113
114	#[inline]
115	fn broad_then<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
116	where
117		F: Fn(Item) -> Fut + Send,
118		Fut: Future<Output = U> + Send,
119		U: Send,
120	{
121		self.broadn_then(None, f)
122	}
123}
124
125impl<Item, S> BroadbandExt<Item> for S
126where
127	S: Stream<Item = Item> + Send + Sized,
128{
129	#[inline]
130	fn broadn_all<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
131	where
132		N: Into<Option<usize>>,
133		F: Fn(Item) -> Fut + Send,
134		Fut: Future<Output = bool> + Send,
135	{
136		self.map(f)
137			.buffer_unordered(n.into().unwrap_or_else(automatic_width))
138			.ready_all(identity)
139	}
140
141	#[inline]
142	fn broadn_any<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
143	where
144		N: Into<Option<usize>>,
145		F: Fn(Item) -> Fut + Send,
146		Fut: Future<Output = bool> + Send,
147	{
148		self.map(f)
149			.buffer_unordered(n.into().unwrap_or_else(automatic_width))
150			.ready_any(identity)
151	}
152
153	#[inline]
154	fn broadn_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
155	where
156		N: Into<Option<usize>>,
157		F: Fn(Item) -> Fut + Send,
158		Fut: Future<Output = Option<U>> + Send,
159		U: Send,
160	{
161		self.map(f)
162			.buffer_unordered(n.into().unwrap_or_else(automatic_width))
163			.ready_filter_map(identity)
164	}
165
166	#[inline]
167	fn broadn_find_map<'a, F, Fut, U, N>(
168		self,
169		n: N,
170		f: F,
171	) -> impl Future<Output = Option<U>> + Send
172	where
173		N: Into<Option<usize>>,
174		F: Fn(Item) -> Fut + Send + 'a,
175		Fut: Future<Output = Option<U>> + Send,
176		U: Send + 'a,
177		Self: Unpin + 'a,
178	{
179		self.map(f)
180			.buffer_unordered(n.into().unwrap_or_else(automatic_width))
181			.ready_find_map(identity)
182	}
183
184	#[inline]
185	fn broadn_flat_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
186	where
187		N: Into<Option<usize>>,
188		F: Fn(Item) -> Fut + Send,
189		Fut: Stream<Item = U> + Send + Unpin,
190		U: Send,
191	{
192		self.flat_map_unordered(n.into().unwrap_or_else(automatic_width), f)
193	}
194
195	#[inline]
196	fn broadn_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
197	where
198		N: Into<Option<usize>>,
199		F: Fn(Item) -> Fut + Send,
200		Fut: Future<Output = U> + Send,
201		U: Send,
202	{
203		self.map(f)
204			.buffer_unordered(n.into().unwrap_or_else(automatic_width))
205	}
206}