tuwunel_service/rooms/state_res/resolve/
iterative_auth_check.rs1use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
2use ruma::{
3 EventId, OwnedEventId,
4 events::{StateEventType, TimelineEventType},
5 room_version_rules::RoomVersionRules,
6};
7use tuwunel_core::{
8 Error, Result, debug_warn, err, error,
9 matrix::{Event, EventTypeExt, StateKey},
10 smallvec::SmallVec,
11 trace,
12 utils::stream::{IterStream, ReadyExt, TryReadyExt, TryWidebandExt},
13};
14
15use super::{
16 super::{auth_types_for_event, check_state_dependent_auth_rules},
17 StateMap,
18};
19
20#[tracing::instrument(
45 name = "iterative_auth",
46 level = "debug",
47 skip_all,
48 fields(
49 states = ?state.len(),
50 )
51)]
52pub(super) async fn iterative_auth_check<'b, SortedPowerEvents, Fetch, Fut, Pdu>(
53 rules: &RoomVersionRules,
54 events: SortedPowerEvents,
55 state: StateMap<OwnedEventId>,
56 fetch: &Fetch,
57) -> Result<StateMap<OwnedEventId>>
58where
59 SortedPowerEvents: Stream<Item = &'b EventId> + Send,
60 Fetch: Fn(OwnedEventId) -> Fut + Sync,
61 Fut: Future<Output = Result<Pdu>> + Send,
62 Pdu: Event,
63{
64 events
65 .map(Ok)
66 .wide_and_then(async |event_id| {
67 let event = fetch(event_id.to_owned()).await?;
68 let state_key: StateKey = event
69 .state_key()
70 .ok_or_else(|| err!(Request(InvalidParam("Missing state_key"))))?
71 .into();
72
73 Ok((event_id, state_key, event))
74 })
75 .try_fold(state, |state, (event_id, state_key, event)| {
76 auth_check(rules, state, event_id, state_key, event, fetch)
77 })
78 .await
79}
80
81#[tracing::instrument(
82 name = "check",
83 level = "trace",
84 skip_all,
85 fields(
86 %event_id,
87 ?state_key,
88 )
89)]
90async fn auth_check<Fetch, Fut, Pdu>(
91 rules: &RoomVersionRules,
92 mut state: StateMap<OwnedEventId>,
93 event_id: &EventId,
94 state_key: StateKey,
95 event: Pdu,
96 fetch: &Fetch,
97) -> Result<StateMap<OwnedEventId>>
98where
99 Fetch: Fn(OwnedEventId) -> Fut + Sync,
100 Fut: Future<Output = Result<Pdu>> + Send,
101 Pdu: Event,
102{
103 let Ok(auth_types) = auth_types_for_event(
104 event.event_type(),
105 event.sender(),
106 Some(&state_key),
107 event.content(),
108 &rules.authorization,
109 true,
110 )
111 .inspect_err(|e| error!("failed to get auth types for event: {e}")) else {
112 return Ok(state);
113 };
114
115 let auth_types_events = auth_types
116 .stream()
117 .ready_filter_map(|key| {
118 state
119 .get(&key)
120 .map(move |auth_event_id| (auth_event_id, key))
121 })
122 .filter_map(async |(id, key)| {
123 fetch(id.clone())
124 .inspect_err(|e| debug_warn!(%id, "missing auth event: {e}"))
125 .inspect_err(|e| debug_assert!(!cfg!(test), "missing auth {id:?}: {e:?}"))
126 .map_ok(move |auth_event| (key, auth_event))
127 .await
128 .ok()
129 })
130 .ready_filter_map(|(key, auth_event)| {
131 auth_event
132 .rejected()
133 .eq(&false)
134 .then_some((key, auth_event))
135 })
136 .map(Ok);
137
138 let also_need_create_event = *event.event_type() != TimelineEventType::RoomCreate
141 && rules
142 .authorization
143 .room_create_event_id_as_room_id;
144
145 let also_create_id: Option<OwnedEventId> = also_need_create_event
146 .then(|| event.room_id().as_event_id().ok())
147 .flatten();
148
149 let auth_events = event
150 .auth_events()
151 .chain(also_create_id.as_deref().into_iter())
152 .stream()
153 .filter_map(async |id| {
154 fetch(id.to_owned())
155 .inspect_err(|e| debug_warn!(%id, "missing auth event: {e}"))
156 .inspect_err(|e| debug_assert!(!cfg!(test), "missing auth {id:?}: {e:?}"))
157 .await
158 .ok()
159 })
160 .map(Result::<Pdu, Error>::Ok)
161 .ready_try_filter_map(|auth_event| {
162 let state_key = auth_event
163 .state_key()
164 .ok_or_else(|| err!(Request(InvalidParam("Missing state_key"))))?;
165
166 let key_val = auth_event
167 .rejected()
168 .eq(&false)
169 .then_some((auth_event.event_type().with_state_key(state_key), auth_event));
170
171 Ok(key_val)
172 });
173
174 let auth_events = auth_events
175 .chain(auth_types_events)
176 .try_collect()
177 .map_ok(|mut vec: SmallVec<[_; 4]>| {
178 vec.sort_by(|a, b| a.0.cmp(&b.0));
179 vec.reverse();
180 vec.dedup_by(|a, b| a.0.eq(&b.0));
181 vec
182 })
183 .await?;
184
185 let fetch_state = async |ty: StateEventType, key: StateKey| -> Result<Pdu> {
186 trace!(?ty, ?key, auth_events = auth_events.len(), "fetch state");
187 auth_events
188 .binary_search_by(|a| ty.cmp(&a.0.0).then(key.cmp(&a.0.1)))
189 .map(|i| auth_events[i].1.clone())
190 .map_err(|_| err!(Request(NotFound("Missing auth_event {ty:?},{key:?}"))))
191 };
192
193 if check_state_dependent_auth_rules(rules, &event, &fetch_state)
195 .await
196 .inspect_err(|e| {
197 debug_warn!(
198 %event_id,
199 sender = %event.sender(),
200 event_type = ?event.event_type(),
201 ?state_key,
202 "event failed auth check: {e}"
203 );
204 })
205 .is_ok()
206 {
207 let key = event.event_type().with_state_key(state_key);
208 state.insert(key, event_id.to_owned());
209 }
210
211 Ok(state)
212}