Skip to main content

tuwunel_service/rooms/event_handler/
outlier_state.rs

1use std::{collections::HashMap, sync::Arc};
2
3use futures::StreamExt;
4use ruma::{EventId, OwnedEventId, RoomId, events::StateEventType};
5use tuwunel_core::{debug_warn, implement};
6use tuwunel_database::Deserialized;
7
8use crate::rooms::{short::ShortStateHash, state_compressor::CompressedState};
9
10/// State resolved for an event not in our timeline, keyed by event id, held
11/// only to spare a repeat `/state_ids` fetch on the next walk. The value is a
12/// `shortstatehash` into the shared compressor tables, never the authoritative
13/// `shorteventid_shortstatehash`, so no authoritative state read observes it.
14#[implement(super::Service)]
15pub(super) async fn cached_resolved_state(
16	&self,
17	event_id: &EventId,
18) -> Option<HashMap<u64, OwnedEventId>> {
19	let shortstatehash: ShortStateHash = self
20		.db
21		.eventid_resolvedstate
22		.get(event_id)
23		.await
24		.deserialized()
25		.ok()?;
26
27	let state: HashMap<u64, OwnedEventId> = self
28		.services
29		.state_accessor
30		.state_full_ids(shortstatehash)
31		.collect()
32		.await;
33
34	// A room purge drops the events this map names; the create event goes only in
35	// a full purge, so reject the hit when it is gone and let the caller refetch.
36	let create_shortstatekey = self
37		.services
38		.short
39		.get_shortstatekey(&StateEventType::RoomCreate, "")
40		.await
41		.ok()?;
42
43	let create_event_id = state.get(&create_shortstatekey)?;
44	let create_present = self
45		.services
46		.timeline
47		.pdu_exists(create_event_id)
48		.await;
49
50	create_present.then_some(state)
51}
52
53/// Persist the state resolved for `event_id` over federation so a later walk of
54/// the same event resolves without another fetch. Best effort: a failed
55/// compressor write leaves the next walk to refetch.
56#[implement(super::Service)]
57pub(super) async fn cache_resolved_state(
58	&self,
59	room_id: &RoomId,
60	event_id: &EventId,
61	state: Arc<CompressedState>,
62) {
63	const BUFSIZE: usize = size_of::<ShortStateHash>();
64
65	let Ok(saved) = self
66		.services
67		.state_compressor
68		.save_state(room_id, state)
69		.await
70		.inspect_err(|e| debug_warn!(?event_id, "Failed to cache resolved state: {e}"))
71	else {
72		return;
73	};
74
75	self.db
76		.eventid_resolvedstate
77		.raw_aput::<BUFSIZE, _, _>(event_id, saved.shortstatehash);
78}