Skip to main content

tuwunel_core/utils/stream/
ready.rs

1//! Synchronous combinator extensions to futures::Stream
2#![expect(clippy::type_complexity)]
3
4use futures::{
5	future::{FutureExt, Ready, ready},
6	stream::{
7		All, Any, Filter, FilterMap, Fold, ForEach, Scan, SkipWhile, Stream, StreamExt, TakeWhile,
8	},
9};
10
11/// Synchronous combinators to augment futures::StreamExt. Most Stream
12/// combinators take asynchronous arguments, but often only simple predicates
13/// are required to steer a Stream like an Iterator. This suite provides a
14/// convenience to reduce boilerplate by de-cluttering non-async predicates.
15///
16/// This interface is not necessarily complete; feel free to add as-needed.
17pub trait ReadyExt<Item>
18where
19	Self: Stream<Item = Item> + Sized,
20{
21	fn ready_all<F>(self, f: F) -> All<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
22	where
23		F: Fn(Item) -> bool;
24
25	fn ready_any<F>(self, f: F) -> Any<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
26	where
27		F: Fn(Item) -> bool;
28
29	fn ready_find<'a, F>(self, f: F) -> impl Future<Output = Option<Item>> + Send
30	where
31		Self: Send + Unpin + 'a,
32		F: Fn(&Item) -> bool + Send + 'a,
33		Item: Send;
34
35	fn ready_find_map<'a, F, U>(self, f: F) -> impl Future<Output = Option<U>> + Send
36	where
37		Self: Send + Unpin + 'a,
38		F: Fn(Item) -> Option<U> + Send + 'a,
39		Item: Send,
40		U: Send;
41
42	fn ready_filter<'a, F>(
43		self,
44		f: F,
45	) -> Filter<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
46	where
47		F: Fn(&Item) -> bool + 'a;
48
49	fn ready_filter_map<F, U>(
50		self,
51		f: F,
52	) -> FilterMap<Self, Ready<Option<U>>, impl FnMut(Item) -> Ready<Option<U>>>
53	where
54		F: Fn(Item) -> Option<U>;
55
56	fn ready_fold<T, F>(
57		self,
58		init: T,
59		f: F,
60	) -> Fold<Self, Ready<T>, T, impl FnMut(T, Item) -> Ready<T>>
61	where
62		F: Fn(T, Item) -> T;
63
64	fn ready_fold_default<T, F>(
65		self,
66		f: F,
67	) -> Fold<Self, Ready<T>, T, impl FnMut(T, Item) -> Ready<T>>
68	where
69		F: Fn(T, Item) -> T,
70		T: Default;
71
72	fn ready_for_each<F>(self, f: F) -> ForEach<Self, Ready<()>, impl FnMut(Item) -> Ready<()>>
73	where
74		F: FnMut(Item);
75
76	fn ready_take_while<'a, F>(
77		self,
78		f: F,
79	) -> TakeWhile<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
80	where
81		F: Fn(&Item) -> bool + 'a;
82
83	fn ready_scan<B, T, F>(
84		self,
85		init: T,
86		f: F,
87	) -> Scan<Self, T, Ready<Option<B>>, impl FnMut(&mut T, Item) -> Ready<Option<B>>>
88	where
89		F: Fn(&mut T, Item) -> Option<B>;
90
91	fn ready_scan_each<T, F>(
92		self,
93		init: T,
94		f: F,
95	) -> Scan<Self, T, Ready<Option<Item>>, impl FnMut(&mut T, Item) -> Ready<Option<Item>>>
96	where
97		F: Fn(&mut T, &Item);
98
99	fn ready_skip_while<'a, F>(
100		self,
101		f: F,
102	) -> SkipWhile<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
103	where
104		F: Fn(&Item) -> bool + 'a;
105}
106
107impl<Item, S> ReadyExt<Item> for S
108where
109	S: Stream<Item = Item> + Sized,
110{
111	#[inline]
112	fn ready_all<F>(self, f: F) -> All<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
113	where
114		F: Fn(Item) -> bool,
115	{
116		self.all(move |t| ready(f(t)))
117	}
118
119	#[inline]
120	fn ready_any<F>(self, f: F) -> Any<Self, Ready<bool>, impl FnMut(Item) -> Ready<bool>>
121	where
122		F: Fn(Item) -> bool,
123	{
124		self.any(move |t| ready(f(t)))
125	}
126
127	#[inline]
128	fn ready_find<'a, F>(self, f: F) -> impl Future<Output = Option<Item>> + Send
129	where
130		Self: Send + Unpin + 'a,
131		F: Fn(&Item) -> bool + Send + 'a,
132		Item: Send,
133	{
134		self.ready_filter(f)
135			.take(1)
136			.into_future()
137			.map(|(curr, _next)| curr)
138	}
139
140	#[inline]
141	fn ready_find_map<'a, F, U>(self, f: F) -> impl Future<Output = Option<U>> + Send
142	where
143		Self: Send + Unpin + 'a,
144		F: Fn(Item) -> Option<U> + Send + 'a,
145		Item: Send,
146		U: Send,
147	{
148		self.ready_filter_map(f)
149			.take(1)
150			.into_future()
151			.map(|(curr, _next)| curr)
152	}
153
154	#[inline]
155	fn ready_filter<'a, F>(
156		self,
157		f: F,
158	) -> Filter<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
159	where
160		F: Fn(&Item) -> bool + 'a,
161	{
162		self.filter(move |t| ready(f(t)))
163	}
164
165	#[inline]
166	fn ready_filter_map<F, U>(
167		self,
168		f: F,
169	) -> FilterMap<Self, Ready<Option<U>>, impl FnMut(Item) -> Ready<Option<U>>>
170	where
171		F: Fn(Item) -> Option<U>,
172	{
173		self.filter_map(move |t| ready(f(t)))
174	}
175
176	#[inline]
177	fn ready_fold<T, F>(
178		self,
179		init: T,
180		f: F,
181	) -> Fold<Self, Ready<T>, T, impl FnMut(T, Item) -> Ready<T>>
182	where
183		F: Fn(T, Item) -> T,
184	{
185		self.fold(init, move |a, t| ready(f(a, t)))
186	}
187
188	#[inline]
189	fn ready_fold_default<T, F>(
190		self,
191		f: F,
192	) -> Fold<Self, Ready<T>, T, impl FnMut(T, Item) -> Ready<T>>
193	where
194		F: Fn(T, Item) -> T,
195		T: Default,
196	{
197		self.ready_fold(T::default(), f)
198	}
199
200	#[inline]
201	#[expect(clippy::unit_arg)]
202	fn ready_for_each<F>(
203		self,
204		mut f: F,
205	) -> ForEach<Self, Ready<()>, impl FnMut(Item) -> Ready<()>>
206	where
207		F: FnMut(Item),
208	{
209		self.for_each(move |t| ready(f(t)))
210	}
211
212	#[inline]
213	fn ready_take_while<'a, F>(
214		self,
215		f: F,
216	) -> TakeWhile<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
217	where
218		F: Fn(&Item) -> bool + 'a,
219	{
220		self.take_while(move |t| ready(f(t)))
221	}
222
223	#[inline]
224	fn ready_scan<B, T, F>(
225		self,
226		init: T,
227		f: F,
228	) -> Scan<Self, T, Ready<Option<B>>, impl FnMut(&mut T, Item) -> Ready<Option<B>>>
229	where
230		F: Fn(&mut T, Item) -> Option<B>,
231	{
232		self.scan(init, move |s, t| ready(f(s, t)))
233	}
234
235	#[inline]
236	fn ready_scan_each<T, F>(
237		self,
238		init: T,
239		f: F,
240	) -> Scan<Self, T, Ready<Option<Item>>, impl FnMut(&mut T, Item) -> Ready<Option<Item>>>
241	where
242		F: Fn(&mut T, &Item),
243	{
244		self.ready_scan(init, move |s, t| {
245			f(s, &t);
246			Some(t)
247		})
248	}
249
250	#[inline]
251	fn ready_skip_while<'a, F>(
252		self,
253		f: F,
254	) -> SkipWhile<Self, Ready<bool>, impl FnMut(&Item) -> Ready<bool> + 'a>
255	where
256		F: Fn(&Item) -> bool + 'a,
257	{
258		self.skip_while(move |t| ready(f(t)))
259	}
260}