Skip to main content

tuwunel_service/rooms/event_handler/
mod.rs

1mod acl_check;
2mod backoff;
3mod fetch_auth;
4mod fetch_prev;
5mod fetch_state;
6mod handle_incoming_pdu;
7mod handle_outlier_pdu;
8mod handle_prev_pdu;
9mod outlier_state;
10mod parse_incoming_pdu;
11mod policy_server;
12mod resolve_state;
13mod state_at_incoming;
14mod upgrade_outlier_pdu;
15
16use std::{fmt::Write, num::NonZeroUsize, sync::Arc};
17
18use async_trait::async_trait;
19use ruma::{EventId, OwnedRoomId, RoomVersionId, events::AnyStrippedStateEvent, serde::Raw};
20use tuwunel_core::{Result, implement, matrix::PduEvent, utils::MutexMap};
21use tuwunel_database::Map;
22
23pub struct Service {
24	pub mutex_federation: RoomMutexMap,
25	services: Arc<crate::services::OnceServices>,
26	db: Data,
27}
28
29struct Data {
30	eventid_backoff: Arc<Map>,
31	eventid_policysigstate: Arc<Map>,
32	eventid_resolvedstate: Arc<Map>,
33}
34
35type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
36
37// Distinct candidate servers tried per fetch, not retries per server.
38const EVENT_FETCH_ATTEMPT_LIMIT: NonZeroUsize = NonZeroUsize::new(3).unwrap();
39
40#[async_trait]
41impl crate::Service for Service {
42	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
43		Ok(Arc::new(Self {
44			mutex_federation: RoomMutexMap::new(),
45			services: args.services.clone(),
46			db: Data {
47				eventid_backoff: args.db["eventid_backoff"].clone(),
48				eventid_policysigstate: args.db["eventid_policysigstate"].clone(),
49				eventid_resolvedstate: args.db["eventid_resolvedstate"].clone(),
50			},
51		}))
52	}
53
54	async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
55		let mutex_federation = self.mutex_federation.len();
56		writeln!(out, "- federation_mutex: {mutex_federation}")?;
57
58		Ok(())
59	}
60
61	async fn clear_cache(&self) {
62		self.db.eventid_backoff.clear().await;
63		self.db.eventid_resolvedstate.clear().await;
64	}
65
66	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
67}
68
69#[implement(Service)]
70#[tracing::instrument(
71	name = "exists",
72	level = "trace",
73	ret(level = "trace"),
74	skip_all,
75	fields(%event_id)
76)]
77async fn event_exists(&self, event_id: &EventId) -> bool {
78	self.services.timeline.pdu_exists(event_id).await
79}
80
81#[implement(Service)]
82#[tracing::instrument(
83	name = "fetch",
84	level = "trace",
85	skip_all,
86	fields(%event_id)
87)]
88async fn event_fetch(&self, event_id: &EventId) -> Result<PduEvent> {
89	self.services.timeline.get_pdu(event_id).await
90}
91
92/// Extract a room's version from the create event in a stripped-state list (as
93/// stored for an out-of-band invite or knock).
94fn room_version_of(stripped: &[Raw<AnyStrippedStateEvent>]) -> Option<RoomVersionId> {
95	stripped
96		.iter()
97		.find_map(|event| match event.deserialize() {
98			| Ok(AnyStrippedStateEvent::RoomCreate(create)) => Some(create.content.room_version),
99			| _ => None,
100		})
101}