tuwunel_core/utils/stream/
wideband.rs1use std::convert::identity;
4
5use futures::{
6 Future,
7 stream::{Stream, StreamExt},
8};
9
10use super::{ReadyExt, automatic_width};
11
12pub trait WidebandExt<Item>
15where
16 Self: Stream<Item = Item> + Send + Sized,
17{
18 fn widen_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
20 where
21 N: Into<Option<usize>>,
22 F: Fn(Item) -> Fut + Send,
23 Fut: Future<Output = Option<U>> + Send,
24 U: Send;
25
26 fn widen_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
27 where
28 N: Into<Option<usize>>,
29 F: Fn(Item) -> Fut + Send,
30 Fut: Future<Output = U> + Send,
31 U: Send;
32
33 #[inline]
34 fn wide_filter_map<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
35 where
36 F: Fn(Item) -> Fut + Send,
37 Fut: Future<Output = Option<U>> + Send,
38 U: Send,
39 {
40 self.widen_filter_map(None, f)
41 }
42
43 #[inline]
44 fn wide_then<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
45 where
46 F: Fn(Item) -> Fut + Send,
47 Fut: Future<Output = U> + Send,
48 U: Send,
49 {
50 self.widen_then(None, f)
51 }
52}
53
54impl<Item, S> WidebandExt<Item> for S
55where
56 S: Stream<Item = Item> + Send + Sized,
57{
58 #[inline]
59 fn widen_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
60 where
61 N: Into<Option<usize>>,
62 F: Fn(Item) -> Fut + Send,
63 Fut: Future<Output = Option<U>> + Send,
64 U: Send,
65 {
66 self.map(f)
67 .buffered(n.into().unwrap_or_else(automatic_width))
68 .ready_filter_map(identity)
69 }
70
71 #[inline]
72 fn widen_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
73 where
74 N: Into<Option<usize>>,
75 F: Fn(Item) -> Fut + Send,
76 Fut: Future<Output = U> + Send,
77 U: Send,
78 {
79 self.map(f)
80 .buffered(n.into().unwrap_or_else(automatic_width))
81 }
82}