tuwunel_core/utils/stream/
broadband.rs1use std::convert::identity;
4
5use futures::{
6 Future,
7 stream::{Stream, StreamExt},
8};
9
10use super::{ReadyExt, automatic_width};
11
12pub trait BroadbandExt<Item>
15where
16 Self: Stream<Item = Item> + Send + Sized,
17{
18 fn broadn_all<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
19 where
20 N: Into<Option<usize>>,
21 F: Fn(Item) -> Fut + Send,
22 Fut: Future<Output = bool> + Send;
23
24 fn broadn_any<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
25 where
26 N: Into<Option<usize>>,
27 F: Fn(Item) -> Fut + Send,
28 Fut: Future<Output = bool> + Send;
29
30 fn broadn_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
32 where
33 N: Into<Option<usize>>,
34 F: Fn(Item) -> Fut + Send,
35 Fut: Future<Output = Option<U>> + Send,
36 U: Send;
37
38 fn broadn_find_map<'a, F, Fut, U, N>(
40 self,
41 n: N,
42 f: F,
43 ) -> impl Future<Output = Option<U>> + Send
44 where
45 N: Into<Option<usize>>,
46 F: Fn(Item) -> Fut + Send + 'a,
47 Fut: Future<Output = Option<U>> + Send,
48 U: Send + 'a,
49 Self: Unpin + 'a;
50
51 fn broadn_flat_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
52 where
53 N: Into<Option<usize>>,
54 F: Fn(Item) -> Fut + Send,
55 Fut: Stream<Item = U> + Send + Unpin,
56 U: Send;
57
58 fn broadn_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
59 where
60 N: Into<Option<usize>>,
61 F: Fn(Item) -> Fut + Send,
62 Fut: Future<Output = U> + Send,
63 U: Send;
64
65 #[inline]
66 fn broad_all<F, Fut>(self, f: F) -> impl Future<Output = bool> + Send
67 where
68 F: Fn(Item) -> Fut + Send,
69 Fut: Future<Output = bool> + Send,
70 {
71 self.broadn_all(None, f)
72 }
73
74 #[inline]
75 fn broad_any<F, Fut>(self, f: F) -> impl Future<Output = bool> + Send
76 where
77 F: Fn(Item) -> Fut + Send,
78 Fut: Future<Output = bool> + Send,
79 {
80 self.broadn_any(None, f)
81 }
82
83 #[inline]
84 fn broad_filter_map<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
85 where
86 F: Fn(Item) -> Fut + Send,
87 Fut: Future<Output = Option<U>> + Send,
88 U: Send,
89 {
90 self.broadn_filter_map(None, f)
91 }
92
93 #[inline]
94 fn broad_find_map<'a, F, Fut, U>(self, f: F) -> impl Future<Output = Option<U>> + Send
95 where
96 F: Fn(Item) -> Fut + Send + 'a,
97 Fut: Future<Output = Option<U>> + Send,
98 U: Send + 'a,
99 Self: Unpin + 'a,
100 {
101 self.broadn_find_map(None, f)
102 }
103
104 #[inline]
105 fn broad_flat_map<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
106 where
107 F: Fn(Item) -> Fut + Send,
108 Fut: Stream<Item = U> + Send + Unpin,
109 U: Send,
110 {
111 self.broadn_flat_map(None, f)
112 }
113
114 #[inline]
115 fn broad_then<F, Fut, U>(self, f: F) -> impl Stream<Item = U> + Send
116 where
117 F: Fn(Item) -> Fut + Send,
118 Fut: Future<Output = U> + Send,
119 U: Send,
120 {
121 self.broadn_then(None, f)
122 }
123}
124
125impl<Item, S> BroadbandExt<Item> for S
126where
127 S: Stream<Item = Item> + Send + Sized,
128{
129 #[inline]
130 fn broadn_all<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
131 where
132 N: Into<Option<usize>>,
133 F: Fn(Item) -> Fut + Send,
134 Fut: Future<Output = bool> + Send,
135 {
136 self.map(f)
137 .buffer_unordered(n.into().unwrap_or_else(automatic_width))
138 .ready_all(identity)
139 }
140
141 #[inline]
142 fn broadn_any<F, Fut, N>(self, n: N, f: F) -> impl Future<Output = bool> + Send
143 where
144 N: Into<Option<usize>>,
145 F: Fn(Item) -> Fut + Send,
146 Fut: Future<Output = bool> + Send,
147 {
148 self.map(f)
149 .buffer_unordered(n.into().unwrap_or_else(automatic_width))
150 .ready_any(identity)
151 }
152
153 #[inline]
154 fn broadn_filter_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
155 where
156 N: Into<Option<usize>>,
157 F: Fn(Item) -> Fut + Send,
158 Fut: Future<Output = Option<U>> + Send,
159 U: Send,
160 {
161 self.map(f)
162 .buffer_unordered(n.into().unwrap_or_else(automatic_width))
163 .ready_filter_map(identity)
164 }
165
166 #[inline]
167 fn broadn_find_map<'a, F, Fut, U, N>(
168 self,
169 n: N,
170 f: F,
171 ) -> impl Future<Output = Option<U>> + Send
172 where
173 N: Into<Option<usize>>,
174 F: Fn(Item) -> Fut + Send + 'a,
175 Fut: Future<Output = Option<U>> + Send,
176 U: Send + 'a,
177 Self: Unpin + 'a,
178 {
179 self.map(f)
180 .buffer_unordered(n.into().unwrap_or_else(automatic_width))
181 .ready_find_map(identity)
182 }
183
184 #[inline]
185 fn broadn_flat_map<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
186 where
187 N: Into<Option<usize>>,
188 F: Fn(Item) -> Fut + Send,
189 Fut: Stream<Item = U> + Send + Unpin,
190 U: Send,
191 {
192 self.flat_map_unordered(n.into().unwrap_or_else(automatic_width), f)
193 }
194
195 #[inline]
196 fn broadn_then<F, Fut, U, N>(self, n: N, f: F) -> impl Stream<Item = U> + Send
197 where
198 N: Into<Option<usize>>,
199 F: Fn(Item) -> Fut + Send,
200 Fut: Future<Output = U> + Send,
201 U: Send,
202 {
203 self.map(f)
204 .buffer_unordered(n.into().unwrap_or_else(automatic_width))
205 }
206}