tuwunel_service/rooms/state_res/resolve/
auth_difference.rs1use std::{borrow::Borrow, collections::BTreeMap};
2
3use futures::{FutureExt, Stream};
4use ruma::EventId;
5use tuwunel_core::utils::stream::{IterStream, ReadyExt};
6
7use super::AuthSet;
8
9struct Counts<Id: Ord> {
10 by_id: BTreeMap<Id, usize>,
11 total: usize,
12}
13
14impl<Id: Ord> Default for Counts<Id> {
15 fn default() -> Self { Self { by_id: BTreeMap::new(), total: 0 } }
16}
17
18impl<Id: Ord> Counts<Id> {
19 fn merge(mut self, set: AuthSet<Id>) -> Self {
20 self.total = self.total.saturating_add(1);
21 for id in set {
22 let count = self.by_id.entry(id).or_default();
23
24 *count = count.saturating_add(1);
25 }
26
27 self
28 }
29}
30
31#[tracing::instrument(level = "debug", skip_all)]
50pub(super) fn auth_difference<'a, AuthSets, Id>(auth_sets: AuthSets) -> impl Stream<Item = Id>
51where
52 AuthSets: Stream<Item = AuthSet<Id>>,
53 Id: Borrow<EventId> + Clone + Eq + Ord + Send + 'a,
54{
55 auth_sets
56 .ready_fold_default(Counts::<Id>::merge)
57 .map(|Counts { by_id, total }: Counts<Id>| {
58 by_id
59 .into_iter()
60 .filter_map(move |(id, count)| (count < total).then_some(id))
61 .stream()
62 })
63 .flatten_stream()
64}