tuwunel_service/rooms/event_handler/
mod.rs1mod acl_check;
2mod backoff;
3mod fetch_auth;
4mod fetch_prev;
5mod fetch_state;
6mod handle_incoming_pdu;
7mod handle_outlier_pdu;
8mod handle_prev_pdu;
9mod outlier_state;
10mod parse_incoming_pdu;
11mod policy_server;
12mod resolve_state;
13mod state_at_incoming;
14mod upgrade_outlier_pdu;
15
16use std::{fmt::Write, num::NonZeroUsize, sync::Arc};
17
18use async_trait::async_trait;
19use ruma::{EventId, OwnedRoomId, RoomVersionId, events::AnyStrippedStateEvent, serde::Raw};
20use tuwunel_core::{Result, implement, matrix::PduEvent, utils::MutexMap};
21use tuwunel_database::Map;
22
23pub struct Service {
24 pub mutex_federation: RoomMutexMap,
25 services: Arc<crate::services::OnceServices>,
26 db: Data,
27}
28
29struct Data {
30 eventid_backoff: Arc<Map>,
31 eventid_policysigstate: Arc<Map>,
32 eventid_resolvedstate: Arc<Map>,
33}
34
35type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
36
37const EVENT_FETCH_ATTEMPT_LIMIT: NonZeroUsize = NonZeroUsize::new(3).unwrap();
39
40#[async_trait]
41impl crate::Service for Service {
42 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
43 Ok(Arc::new(Self {
44 mutex_federation: RoomMutexMap::new(),
45 services: args.services.clone(),
46 db: Data {
47 eventid_backoff: args.db["eventid_backoff"].clone(),
48 eventid_policysigstate: args.db["eventid_policysigstate"].clone(),
49 eventid_resolvedstate: args.db["eventid_resolvedstate"].clone(),
50 },
51 }))
52 }
53
54 async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
55 let mutex_federation = self.mutex_federation.len();
56 writeln!(out, "- federation_mutex: {mutex_federation}")?;
57
58 Ok(())
59 }
60
61 async fn clear_cache(&self) {
62 self.db.eventid_backoff.clear().await;
63 self.db.eventid_resolvedstate.clear().await;
64 }
65
66 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
67}
68
69#[implement(Service)]
70#[tracing::instrument(
71 name = "exists",
72 level = "trace",
73 ret(level = "trace"),
74 skip_all,
75 fields(%event_id)
76)]
77async fn event_exists(&self, event_id: &EventId) -> bool {
78 self.services.timeline.pdu_exists(event_id).await
79}
80
81#[implement(Service)]
82#[tracing::instrument(
83 name = "fetch",
84 level = "trace",
85 skip_all,
86 fields(%event_id)
87)]
88async fn event_fetch(&self, event_id: &EventId) -> Result<PduEvent> {
89 self.services.timeline.get_pdu(event_id).await
90}
91
92fn room_version_of(stripped: &[Raw<AnyStrippedStateEvent>]) -> Option<RoomVersionId> {
95 stripped
96 .iter()
97 .find_map(|event| match event.deserialize() {
98 | Ok(AnyStrippedStateEvent::RoomCreate(create)) => Some(create.content.room_version),
99 | _ => None,
100 })
101}