tuwunel_core/utils/stream/
tools.rs1use std::{collections::HashMap, hash::Hash};
4
5use futures::{Future, Stream, StreamExt};
6
7use super::ReadyExt;
8use crate::expected;
9
10pub trait Tools<Item>
14where
15 Self: Stream<Item = Item> + Send + Sized,
16 <Self as Stream>::Item: Send,
17{
18 fn counts(self) -> impl Future<Output = HashMap<Item, usize>> + Send
19 where
20 <Self as Stream>::Item: Eq + Hash;
21
22 fn counts_by<K, F>(self, f: F) -> impl Future<Output = HashMap<K, usize>> + Send
23 where
24 F: Fn(Item) -> K + Send,
25 K: Eq + Hash + Send;
26
27 fn counts_by_with_cap<const CAP: usize, K, F>(
28 self,
29 f: F,
30 ) -> impl Future<Output = HashMap<K, usize>> + Send
31 where
32 F: Fn(Item) -> K + Send,
33 K: Eq + Hash + Send;
34
35 fn counts_with_cap<const CAP: usize>(
36 self,
37 ) -> impl Future<Output = HashMap<Item, usize>> + Send
38 where
39 <Self as Stream>::Item: Eq + Hash;
40
41 fn fold_default<T, F, Fut>(self, f: F) -> impl Future<Output = T> + Send
42 where
43 F: Fn(T, Item) -> Fut + Send,
44 Fut: Future<Output = T> + Send,
45 T: Default + Send;
46}
47
48impl<Item, S> Tools<Item> for S
49where
50 S: Stream<Item = Item> + Send + Sized,
51 <Self as Stream>::Item: Send,
52{
53 #[inline]
54 fn counts(self) -> impl Future<Output = HashMap<Item, usize>> + Send
55 where
56 <Self as Stream>::Item: Eq + Hash,
57 {
58 self.counts_with_cap::<0>()
59 }
60
61 #[inline]
62 fn counts_by<K, F>(self, f: F) -> impl Future<Output = HashMap<K, usize>> + Send
63 where
64 F: Fn(Item) -> K + Send,
65 K: Eq + Hash + Send,
66 {
67 self.counts_by_with_cap::<0, K, F>(f)
68 }
69
70 #[inline]
71 fn counts_by_with_cap<const CAP: usize, K, F>(
72 self,
73 f: F,
74 ) -> impl Future<Output = HashMap<K, usize>> + Send
75 where
76 F: Fn(Item) -> K + Send,
77 K: Eq + Hash + Send,
78 {
79 self.map(f).counts_with_cap::<CAP>()
80 }
81
82 #[inline]
83 fn counts_with_cap<const CAP: usize>(
84 self,
85 ) -> impl Future<Output = HashMap<Item, usize>> + Send
86 where
87 <Self as Stream>::Item: Eq + Hash,
88 {
89 self.ready_fold(HashMap::with_capacity(CAP), |mut counts, item| {
90 let entry = counts.entry(item).or_default();
91 let value = *entry;
92 *entry = expected!(value + 1);
93 counts
94 })
95 }
96
97 #[inline]
98 fn fold_default<T, F, Fut>(self, f: F) -> impl Future<Output = T> + Send
99 where
100 F: Fn(T, Item) -> Fut + Send,
101 Fut: Future<Output = T> + Send,
102 T: Default + Send,
103 {
104 self.fold(T::default(), f)
105 }
106}