tuwunel_service/rooms/retention/
mod.rs1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use futures::{Stream, TryStreamExt};
5use ruma::{CanonicalJsonObject, EventId};
6use tuwunel_core::{
7 Result, debug_info, expected, implement,
8 matrix::pdu::PduEvent,
9 utils::{TryReadyExt, time::now},
10};
11use tuwunel_database::{Deserialized, Json, Map};
12
13use crate::rooms::timeline::RoomMutexGuard;
14
15pub struct Service {
16 services: Arc<crate::services::OnceServices>,
17 eventid_originalpdu: Arc<Map>,
18 timeredacted_eventid: Arc<Map>,
19}
20
21#[async_trait]
22impl crate::Service for Service {
23 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
24 Ok(Arc::new(Self {
25 services: args.services.clone(),
26 eventid_originalpdu: args.db["eventid_originalpdu"].clone(),
27 timeredacted_eventid: args.db["timeredacted_eventid"].clone(),
28 }))
29 }
30
31 async fn worker(self: Arc<Self>) -> Result {
32 loop {
33 let retention_seconds = self.services.config.redaction_retention_seconds;
34
35 if retention_seconds != 0 {
36 debug_info!("Cleaning up retained events");
37
38 let now = now().as_secs();
39 let count = self
40 .timeredacted_eventid
41 .keys::<(u64, &EventId)>()
42 .ready_try_take_while(|(time_redacted, _)| {
43 let time_redacted = *time_redacted;
44 Ok(expected!(time_redacted + retention_seconds) < now)
45 })
46 .ready_try_fold_default(|count: usize, (time_redacted, event_id)| {
47 self.eventid_originalpdu.remove(event_id);
48 self.timeredacted_eventid
49 .del((time_redacted, event_id));
50 Ok(count.saturating_add(1))
51 })
52 .await?;
53
54 debug_info!(?count, "Finished cleaning up retained events");
55 }
56
57 tokio::select! {
58 () = tokio::time::sleep(Duration::from_hours(1)) => {},
59 () = self.services.server.until_shutdown() => return Ok(())
60 };
61 }
62 }
63
64 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
65}
66
67#[implement(Service)]
68pub async fn get_original_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
69 self.eventid_originalpdu
70 .get(event_id)
71 .await?
72 .deserialized()
73}
74
75#[implement(Service)]
76pub async fn get_original_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
77 self.eventid_originalpdu
78 .get(event_id)
79 .await?
80 .deserialized()
81}
82
83#[implement(Service)]
84pub async fn save_original_pdu(
85 &self,
86 event_id: &EventId,
87 pdu: &CanonicalJsonObject,
88 _state_lock: &RoomMutexGuard,
89) {
90 if !self.services.config.save_unredacted_events {
91 return;
92 }
93
94 if self
95 .eventid_originalpdu
96 .exists(event_id)
97 .await
98 .is_ok()
99 {
100 return;
101 }
102
103 let now = now().as_secs();
104
105 self.eventid_originalpdu
106 .raw_put(event_id, Json(pdu));
107
108 self.timeredacted_eventid
109 .put_raw((now, event_id), []);
110}
111
112#[implement(Service)]
113pub fn retained_pdus_raw(&self) -> impl Stream<Item = Result<&[u8]>> + Send {
114 self.eventid_originalpdu
115 .raw_stream()
116 .map_ok(|x| x.1)
117}