Skip to main content

tuwunel_service/rooms/timeline/
pdus.rs

1use std::borrow::Borrow;
2
3use futures::{
4	Stream, TryFutureExt, TryStreamExt,
5	future::Either::{Left, Right},
6};
7use ruma::{MilliSecondsSinceUnixEpoch, RoomId, UInt, UserId, api::Direction};
8use tuwunel_core::{
9	Result, at, err, implement,
10	matrix::pdu::{PduCount, PduEvent},
11	trace,
12	utils::{
13		result::LogErr,
14		stream::{TryIgnore, TryReadyExt, TryWidebandExt},
15	},
16	warn,
17};
18use tuwunel_database::{KeyVal, keyval::Val};
19
20use super::{PduId, RawPduId};
21
22pub type PdusIterItem = (PduCount, PduEvent);
23
24#[implement(super::Service)]
25pub async fn delete_pdus(&self, room_id: &RoomId) -> Result {
26	let current = self
27		.count_to_id(room_id, PduCount::min(), Direction::Forward)
28		.await?;
29
30	let prefix = current.shortroomid();
31	self.db
32		.pduid_pdu
33		.raw_stream_from(&current)
34		.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
35		.ready_try_for_each(move |(key, value)| {
36			let pdu = serde_json::from_slice::<PduEvent>(value)?;
37			let ts: u64 = pdu.origin_server_ts.into();
38			let event_id = &pdu.event_id;
39
40			self.db.pduid_pdu.remove(key);
41			self.db.eventid_pduid.remove(event_id);
42			self.db.eventid_outlierpdu.remove(event_id);
43			self.db.roomid_ts_pducount.del((room_id, ts));
44
45			trace!(?event_id, ?room_id, ?ts, ?key, "Removed");
46
47			Ok(())
48		})
49		.await
50}
51
52#[implement(super::Service)]
53pub fn pdus_near_ts(
54	&self,
55	user_id: Option<&UserId>,
56	room_id: &RoomId,
57	ts: MilliSecondsSinceUnixEpoch,
58	dir: Direction,
59) -> impl Stream<Item = Result<PdusIterItem>> + Send {
60	self.pdu_ids_near_ts(room_id, ts, dir)
61		.map_ok(|(ts, pdu_id)| (ts, pdu_id.into()))
62		.wide_and_then(async |(_, pdu_id): (_, RawPduId)| {
63			self.get_pdu_from_id(&pdu_id)
64				.map_ok(|pdu| (pdu_id, pdu))
65				.await
66		})
67		.ready_and_then(move |item| Self::each_pdu(item, user_id))
68}
69
70#[implement(super::Service)]
71pub fn pdu_ids_near_ts(
72	&self,
73	room_id: &RoomId,
74	ts: MilliSecondsSinceUnixEpoch,
75	dir: Direction,
76) -> impl Stream<Item = Result<(MilliSecondsSinceUnixEpoch, PduId)>> + Send {
77	use Direction::{Backward, Forward};
78
79	type KeyVal<'a> = ((&'a RoomId, UInt), i64);
80
81	let ts: u64 = ts.get().into();
82	let key = (room_id, ts);
83
84	self.services
85		.short
86		.get_shortroomid(room_id)
87		.map_err(|e| err!(Request(NotFound("Room not found: {e:?}"))))
88		.map_ok(move |shortroomid| {
89			match dir {
90				| Forward => Left(self.db.roomid_ts_pducount.stream_from(&key)),
91				| Backward => Right(self.db.roomid_ts_pducount.rev_stream_from(&key)),
92			}
93			.ready_try_take_while(move |((room_id_, _), _): &KeyVal<'_>| Ok(room_id == *room_id_))
94			.map_ok(move |((_, ts), count)| {
95				(MilliSecondsSinceUnixEpoch(ts), PduId { shortroomid, count: count.into() })
96			})
97		})
98		.try_flatten_stream()
99}
100
101/// Returns an iterator over all PDUs in a room. Unknown rooms produce no
102/// items.
103#[implement(super::Service)]
104#[inline]
105pub fn all_pdus<'a>(
106	&'a self,
107	user_id: &'a UserId,
108	room_id: &'a RoomId,
109) -> impl Stream<Item = PdusIterItem> + Send + 'a {
110	self.pdus(Some(user_id), room_id, None)
111		.ignore_err()
112}
113
114/// Returns an iterator over all events and their tokens in a room that
115/// happened after the event with id `from` in order.
116#[implement(super::Service)]
117#[tracing::instrument(skip(self), level = "debug")]
118pub fn pdus<'a>(
119	&'a self,
120	user_id: Option<&'a UserId>,
121	room_id: &'a RoomId,
122	from: Option<PduCount>,
123) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
124	let from = from.unwrap_or_else(PduCount::min);
125	self.count_to_id(room_id, from, Direction::Forward)
126		.map_ok(move |current| {
127			let prefix = current.shortroomid();
128			self.db
129				.pduid_pdu
130				.raw_stream_from(&current)
131				.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
132				.ready_and_then(move |item| Self::each_slice(item, user_id))
133		})
134		.try_flatten_stream()
135}
136
137/// Returns an iterator over all events and their tokens in a room that
138/// happened before the event with id `until` in reverse-order.
139#[implement(super::Service)]
140#[tracing::instrument(skip(self), level = "debug")]
141pub fn pdus_rev<'a>(
142	&'a self,
143	user_id: Option<&'a UserId>,
144	room_id: &'a RoomId,
145	until: Option<PduCount>,
146) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
147	let until = until.unwrap_or_else(PduCount::max);
148	self.count_to_id(room_id, until, Direction::Backward)
149		.map_ok(move |current| {
150			let prefix = current.shortroomid();
151			self.db
152				.pduid_pdu
153				.rev_raw_stream_from(&current)
154				.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
155				.ready_and_then(move |item| Self::each_slice(item, user_id))
156		})
157		.try_flatten_stream()
158}
159
160#[implement(super::Service)]
161pub fn pdus_raw(&self) -> impl Stream<Item = Result<Val<'_>>> + Send {
162	self.db.pduid_pdu.raw_stream().map_ok(at!(1))
163}
164
165#[implement(super::Service)]
166pub fn outlier_pdus_raw(&self) -> impl Stream<Item = Result<Val<'_>>> + Send {
167	self.db
168		.eventid_outlierpdu
169		.raw_stream()
170		.map_ok(at!(1))
171}
172
173#[implement(super::Service)]
174fn each_slice((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
175	let pdu_id: RawPduId = pdu_id.into();
176	let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
177
178	Self::each_pdu((pdu_id, pdu), user_id)
179}
180
181#[implement(super::Service)]
182fn each_pdu(
183	(pdu_id, mut pdu): (RawPduId, PduEvent),
184	user_id: Option<&UserId>,
185) -> Result<PdusIterItem> {
186	if Some(pdu.sender.borrow()) != user_id {
187		pdu.remove_transaction_id().log_err().ok();
188	}
189
190	pdu.add_age().log_err().ok();
191
192	Ok((pdu_id.pdu_count(), pdu))
193}