Skip to main content

tuwunel_core/utils/stream/
try_tools.rs

1//! TryStreamTools for futures::TryStream
2#![expect(clippy::type_complexity)]
3
4use futures::{TryStream, TryStreamExt, future, future::Ready, stream::TryTakeWhile};
5
6use crate::Result;
7
8/// TryStreamTools
9pub trait TryTools<T, E, S>
10where
11	S: TryStream<Ok = T, Error = E, Item = Result<T, E>> + ?Sized,
12	Self: TryStream + Sized,
13{
14	fn try_take(
15		self,
16		n: usize,
17	) -> TryTakeWhile<
18		Self,
19		Ready<Result<bool, S::Error>>,
20		impl FnMut(&S::Ok) -> Ready<Result<bool, S::Error>>,
21	>;
22}
23
24impl<T, E, S> TryTools<T, E, S> for S
25where
26	S: TryStream<Ok = T, Error = E, Item = Result<T, E>> + ?Sized,
27	Self: TryStream + Sized,
28{
29	#[inline]
30	fn try_take(
31		self,
32		mut n: usize,
33	) -> TryTakeWhile<
34		Self,
35		Ready<Result<bool, S::Error>>,
36		impl FnMut(&S::Ok) -> Ready<Result<bool, S::Error>>,
37	> {
38		self.try_take_while(move |_| {
39			let res = future::ok(n > 0);
40			n = n.saturating_sub(1);
41			res
42		})
43	}
44}