Skip to main content

tuwunel_service/rooms/retention/
mod.rs

1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use futures::{Stream, TryStreamExt};
5use ruma::{CanonicalJsonObject, EventId};
6use tuwunel_core::{
7	Result, debug_info, expected, implement,
8	matrix::pdu::PduEvent,
9	utils::{TryReadyExt, time::now},
10};
11use tuwunel_database::{Deserialized, Json, Map};
12
13use crate::rooms::timeline::RoomMutexGuard;
14
15pub struct Service {
16	services: Arc<crate::services::OnceServices>,
17	eventid_originalpdu: Arc<Map>,
18	timeredacted_eventid: Arc<Map>,
19}
20
21#[async_trait]
22impl crate::Service for Service {
23	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
24		Ok(Arc::new(Self {
25			services: args.services.clone(),
26			eventid_originalpdu: args.db["eventid_originalpdu"].clone(),
27			timeredacted_eventid: args.db["timeredacted_eventid"].clone(),
28		}))
29	}
30
31	async fn worker(self: Arc<Self>) -> Result {
32		loop {
33			let retention_seconds = self.services.config.redaction_retention_seconds;
34
35			if retention_seconds != 0 {
36				debug_info!("Cleaning up retained events");
37
38				let now = now().as_secs();
39				let count = self
40					.timeredacted_eventid
41					.keys::<(u64, &EventId)>()
42					.ready_try_take_while(|(time_redacted, _)| {
43						let time_redacted = *time_redacted;
44						Ok(expected!(time_redacted + retention_seconds) < now)
45					})
46					.ready_try_fold_default(|count: usize, (time_redacted, event_id)| {
47						self.eventid_originalpdu.remove(event_id);
48						self.timeredacted_eventid
49							.del((time_redacted, event_id));
50						Ok(count.saturating_add(1))
51					})
52					.await?;
53
54				debug_info!(?count, "Finished cleaning up retained events");
55			}
56
57			tokio::select! {
58				() = tokio::time::sleep(Duration::from_hours(1)) => {},
59				() = self.services.server.until_shutdown() => return Ok(())
60			};
61		}
62	}
63
64	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
65}
66
67#[implement(Service)]
68pub async fn get_original_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
69	self.eventid_originalpdu
70		.get(event_id)
71		.await?
72		.deserialized()
73}
74
75#[implement(Service)]
76pub async fn get_original_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
77	self.eventid_originalpdu
78		.get(event_id)
79		.await?
80		.deserialized()
81}
82
83#[implement(Service)]
84pub async fn save_original_pdu(
85	&self,
86	event_id: &EventId,
87	pdu: &CanonicalJsonObject,
88	_state_lock: &RoomMutexGuard,
89) {
90	if !self.services.config.save_unredacted_events {
91		return;
92	}
93
94	if self
95		.eventid_originalpdu
96		.exists(event_id)
97		.await
98		.is_ok()
99	{
100		return;
101	}
102
103	let now = now().as_secs();
104
105	self.eventid_originalpdu
106		.raw_put(event_id, Json(pdu));
107
108	self.timeredacted_eventid
109		.put_raw((now, event_id), []);
110}
111
112#[implement(Service)]
113pub fn retained_pdus_raw(&self) -> impl Stream<Item = Result<&[u8]>> + Send {
114	self.eventid_originalpdu
115		.raw_stream()
116		.map_ok(|x| x.1)
117}