Skip to main content

tuwunel_core/utils/stream/
tools.rs

1//! StreamTools for futures::Stream
2
3use std::{collections::HashMap, hash::Hash};
4
5use futures::{Future, Stream, StreamExt};
6
7use super::ReadyExt;
8use crate::expected;
9
10/// StreamTools
11///
12/// This interface is not necessarily complete; feel free to add as-needed.
13pub trait Tools<Item>
14where
15	Self: Stream<Item = Item> + Send + Sized,
16	<Self as Stream>::Item: Send,
17{
18	fn counts(self) -> impl Future<Output = HashMap<Item, usize>> + Send
19	where
20		<Self as Stream>::Item: Eq + Hash;
21
22	fn counts_by<K, F>(self, f: F) -> impl Future<Output = HashMap<K, usize>> + Send
23	where
24		F: Fn(Item) -> K + Send,
25		K: Eq + Hash + Send;
26
27	fn counts_by_with_cap<const CAP: usize, K, F>(
28		self,
29		f: F,
30	) -> impl Future<Output = HashMap<K, usize>> + Send
31	where
32		F: Fn(Item) -> K + Send,
33		K: Eq + Hash + Send;
34
35	fn counts_with_cap<const CAP: usize>(
36		self,
37	) -> impl Future<Output = HashMap<Item, usize>> + Send
38	where
39		<Self as Stream>::Item: Eq + Hash;
40
41	fn fold_default<T, F, Fut>(self, f: F) -> impl Future<Output = T> + Send
42	where
43		F: Fn(T, Item) -> Fut + Send,
44		Fut: Future<Output = T> + Send,
45		T: Default + Send;
46}
47
48impl<Item, S> Tools<Item> for S
49where
50	S: Stream<Item = Item> + Send + Sized,
51	<Self as Stream>::Item: Send,
52{
53	#[inline]
54	fn counts(self) -> impl Future<Output = HashMap<Item, usize>> + Send
55	where
56		<Self as Stream>::Item: Eq + Hash,
57	{
58		self.counts_with_cap::<0>()
59	}
60
61	#[inline]
62	fn counts_by<K, F>(self, f: F) -> impl Future<Output = HashMap<K, usize>> + Send
63	where
64		F: Fn(Item) -> K + Send,
65		K: Eq + Hash + Send,
66	{
67		self.counts_by_with_cap::<0, K, F>(f)
68	}
69
70	#[inline]
71	fn counts_by_with_cap<const CAP: usize, K, F>(
72		self,
73		f: F,
74	) -> impl Future<Output = HashMap<K, usize>> + Send
75	where
76		F: Fn(Item) -> K + Send,
77		K: Eq + Hash + Send,
78	{
79		self.map(f).counts_with_cap::<CAP>()
80	}
81
82	#[inline]
83	fn counts_with_cap<const CAP: usize>(
84		self,
85	) -> impl Future<Output = HashMap<Item, usize>> + Send
86	where
87		<Self as Stream>::Item: Eq + Hash,
88	{
89		self.ready_fold(HashMap::with_capacity(CAP), |mut counts, item| {
90			let entry = counts.entry(item).or_default();
91			let value = *entry;
92			*entry = expected!(value + 1);
93			counts
94		})
95	}
96
97	#[inline]
98	fn fold_default<T, F, Fut>(self, f: F) -> impl Future<Output = T> + Send
99	where
100		F: Fn(T, Item) -> Fut + Send,
101		Fut: Future<Output = T> + Send,
102		T: Default + Send,
103	{
104		self.fold(T::default(), f)
105	}
106}