Skip to main content

tuwunel_service/rooms/timeline/
pdus.rs

1use std::borrow::Borrow;
2
3use futures::{
4	Stream, TryFutureExt, TryStreamExt,
5	future::Either::{Left, Right},
6};
7use ruma::{MilliSecondsSinceUnixEpoch, RoomId, UInt, UserId, api::Direction};
8use tuwunel_core::{
9	Result, at, err, implement,
10	matrix::pdu::{PduCount, PduEvent},
11	trace,
12	utils::{
13		result::LogErr,
14		stream::{TryIgnore, TryReadyExt, TryWidebandExt},
15	},
16	warn,
17};
18use tuwunel_database::{KeyVal, keyval::Val};
19
20use super::{PduId, RawPduId};
21
22pub type PdusIterItem = (PduCount, PduEvent);
23
24/// Offset-binary `u64` of a PDU count, so key order matches signed value order
25/// (backfilled negatives sort below normal positives).
26#[must_use]
27pub fn bias_count(count: [u8; 8]) -> u64 {
28	i64::from_be_bytes(count)
29		.wrapping_sub(i64::MIN)
30		.cast_unsigned()
31}
32
33#[implement(super::Service)]
34pub async fn delete_pdus(&self, room_id: &RoomId) -> Result {
35	let current = self
36		.count_to_id(room_id, PduCount::min(), Direction::Forward)
37		.await?;
38
39	let prefix = current.shortroomid();
40	self.db
41		.pduid_pdu
42		.raw_stream_from(&current)
43		.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
44		.ready_try_for_each(move |(key, value)| {
45			let pdu = serde_json::from_slice::<PduEvent>(value)?;
46			let ts: u64 = pdu.origin_server_ts.into();
47			let event_id = &pdu.event_id;
48
49			self.db.pduid_pdu.remove(key);
50			self.db.eventid_pduid.remove(event_id);
51			self.db.eventid_outlierpdu.remove(event_id);
52			self.db.roomid_tscount_pducount.del((
53				room_id,
54				ts,
55				bias_count(RawPduId::from(key).count()),
56			));
57
58			trace!(?event_id, ?room_id, ?ts, ?key, "Removed");
59
60			Ok(())
61		})
62		.await
63}
64
65#[implement(super::Service)]
66pub fn pdus_near_ts(
67	&self,
68	user_id: Option<&UserId>,
69	room_id: &RoomId,
70	ts: MilliSecondsSinceUnixEpoch,
71	dir: Direction,
72) -> impl Stream<Item = Result<PdusIterItem>> + Send {
73	self.pdu_ids_near_ts(room_id, ts, dir)
74		.map_ok(|(ts, pdu_id)| (ts, pdu_id.into()))
75		.wide_and_then(async |(_, pdu_id): (_, RawPduId)| {
76			self.get_pdu_from_id(&pdu_id)
77				.map_ok(|pdu| (pdu_id, pdu))
78				.await
79		})
80		.ready_and_then(move |item| Self::each_pdu(item, user_id))
81}
82
83#[implement(super::Service)]
84pub fn pdu_ids_near_ts(
85	&self,
86	room_id: &RoomId,
87	ts: MilliSecondsSinceUnixEpoch,
88	dir: Direction,
89) -> impl Stream<Item = Result<(MilliSecondsSinceUnixEpoch, PduId)>> + Send {
90	use Direction::{Backward, Forward};
91
92	type KeyVal<'a> = ((&'a RoomId, UInt, u64), i64);
93
94	let ts: u64 = ts.get().into();
95
96	self.services
97		.short
98		.get_shortroomid(room_id)
99		.map_err(|e| err!(Request(NotFound("Room not found: {e:?}"))))
100		.map_ok(move |shortroomid| {
101			match dir {
102				| Forward => Left(self.db.roomid_tscount_pducount.stream_from(&(
103					room_id,
104					ts,
105					u64::MIN,
106				))),
107				| Backward => Right(self.db.roomid_tscount_pducount.rev_stream_from(&(
108					room_id,
109					ts,
110					u64::MAX,
111				))),
112			}
113			.ready_try_take_while(
114				move |((room_id_, ..), _): &KeyVal<'_>| Ok(room_id == *room_id_),
115			)
116			.map_ok(move |((_, ts, _), count)| {
117				(MilliSecondsSinceUnixEpoch(ts), PduId { shortroomid, count: count.into() })
118			})
119		})
120		.try_flatten_stream()
121}
122
123/// Returns an iterator over all PDUs in a room. Unknown rooms produce no
124/// items.
125#[implement(super::Service)]
126#[inline]
127pub fn all_pdus<'a>(
128	&'a self,
129	user_id: &'a UserId,
130	room_id: &'a RoomId,
131) -> impl Stream<Item = PdusIterItem> + Send + 'a {
132	self.pdus(Some(user_id), room_id, None)
133		.ignore_err()
134}
135
136/// Returns an iterator over all events and their tokens in a room that
137/// happened after the event with id `from` in order.
138#[implement(super::Service)]
139#[tracing::instrument(skip(self), level = "debug")]
140pub fn pdus<'a>(
141	&'a self,
142	user_id: Option<&'a UserId>,
143	room_id: &'a RoomId,
144	from: Option<PduCount>,
145) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
146	let from = from.unwrap_or_else(PduCount::min);
147	self.count_to_id(room_id, from, Direction::Forward)
148		.map_ok(move |current| {
149			let prefix = current.shortroomid();
150			self.db
151				.pduid_pdu
152				.raw_stream_from(&current)
153				.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
154				.ready_and_then(move |item| Self::each_slice(item, user_id))
155		})
156		.try_flatten_stream()
157}
158
159/// Returns an iterator over all events and their tokens in a room that
160/// happened before the event with id `until` in reverse-order.
161#[implement(super::Service)]
162#[tracing::instrument(skip(self), level = "debug")]
163pub fn pdus_rev<'a>(
164	&'a self,
165	user_id: Option<&'a UserId>,
166	room_id: &'a RoomId,
167	until: Option<PduCount>,
168) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
169	let until = until.unwrap_or_else(PduCount::max);
170	self.count_to_id(room_id, until, Direction::Backward)
171		.map_ok(move |current| {
172			let prefix = current.shortroomid();
173			self.db
174				.pduid_pdu
175				.rev_raw_stream_from(&current)
176				.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
177				.ready_and_then(move |item| Self::each_slice(item, user_id))
178		})
179		.try_flatten_stream()
180}
181
182#[implement(super::Service)]
183pub fn pdus_raw(&self) -> impl Stream<Item = Result<Val<'_>>> + Send {
184	self.db.pduid_pdu.raw_stream().map_ok(at!(1))
185}
186
187#[implement(super::Service)]
188pub fn outlier_pdus_raw(&self) -> impl Stream<Item = Result<Val<'_>>> + Send {
189	self.db
190		.eventid_outlierpdu
191		.raw_stream()
192		.map_ok(at!(1))
193}
194
195#[implement(super::Service)]
196fn each_slice((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
197	let pdu_id: RawPduId = pdu_id.into();
198	let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
199
200	Self::each_pdu((pdu_id, pdu), user_id)
201}
202
203#[implement(super::Service)]
204fn each_pdu(
205	(pdu_id, mut pdu): (RawPduId, PduEvent),
206	user_id: Option<&UserId>,
207) -> Result<PdusIterItem> {
208	if Some(pdu.sender.borrow()) != user_id {
209		pdu.remove_transaction_id().log_err().ok();
210	}
211
212	pdu.add_age().log_err().ok();
213
214	Ok((pdu_id.pdu_count(), pdu))
215}