tuwunel_service/rooms/timeline/
pdus.rs1use std::borrow::Borrow;
2
3use futures::{
4 Stream, TryFutureExt, TryStreamExt,
5 future::Either::{Left, Right},
6};
7use ruma::{MilliSecondsSinceUnixEpoch, RoomId, UInt, UserId, api::Direction};
8use tuwunel_core::{
9 Result, at, err, implement,
10 matrix::pdu::{PduCount, PduEvent},
11 trace,
12 utils::{
13 result::LogErr,
14 stream::{TryIgnore, TryReadyExt, TryWidebandExt},
15 },
16 warn,
17};
18use tuwunel_database::{KeyVal, keyval::Val};
19
20use super::{PduId, RawPduId};
21
22pub type PdusIterItem = (PduCount, PduEvent);
23
24#[implement(super::Service)]
25pub async fn delete_pdus(&self, room_id: &RoomId) -> Result {
26 let current = self
27 .count_to_id(room_id, PduCount::min(), Direction::Forward)
28 .await?;
29
30 let prefix = current.shortroomid();
31 self.db
32 .pduid_pdu
33 .raw_stream_from(¤t)
34 .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
35 .ready_try_for_each(move |(key, value)| {
36 let pdu = serde_json::from_slice::<PduEvent>(value)?;
37 let ts: u64 = pdu.origin_server_ts.into();
38 let event_id = &pdu.event_id;
39
40 self.db.pduid_pdu.remove(key);
41 self.db.eventid_pduid.remove(event_id);
42 self.db.eventid_outlierpdu.remove(event_id);
43 self.db.roomid_ts_pducount.del((room_id, ts));
44
45 trace!(?event_id, ?room_id, ?ts, ?key, "Removed");
46
47 Ok(())
48 })
49 .await
50}
51
52#[implement(super::Service)]
53pub fn pdus_near_ts(
54 &self,
55 user_id: Option<&UserId>,
56 room_id: &RoomId,
57 ts: MilliSecondsSinceUnixEpoch,
58 dir: Direction,
59) -> impl Stream<Item = Result<PdusIterItem>> + Send {
60 self.pdu_ids_near_ts(room_id, ts, dir)
61 .map_ok(|(ts, pdu_id)| (ts, pdu_id.into()))
62 .wide_and_then(async |(_, pdu_id): (_, RawPduId)| {
63 self.get_pdu_from_id(&pdu_id)
64 .map_ok(|pdu| (pdu_id, pdu))
65 .await
66 })
67 .ready_and_then(move |item| Self::each_pdu(item, user_id))
68}
69
70#[implement(super::Service)]
71pub fn pdu_ids_near_ts(
72 &self,
73 room_id: &RoomId,
74 ts: MilliSecondsSinceUnixEpoch,
75 dir: Direction,
76) -> impl Stream<Item = Result<(MilliSecondsSinceUnixEpoch, PduId)>> + Send {
77 use Direction::{Backward, Forward};
78
79 type KeyVal<'a> = ((&'a RoomId, UInt), i64);
80
81 let ts: u64 = ts.get().into();
82 let key = (room_id, ts);
83
84 self.services
85 .short
86 .get_shortroomid(room_id)
87 .map_err(|e| err!(Request(NotFound("Room not found: {e:?}"))))
88 .map_ok(move |shortroomid| {
89 match dir {
90 | Forward => Left(self.db.roomid_ts_pducount.stream_from(&key)),
91 | Backward => Right(self.db.roomid_ts_pducount.rev_stream_from(&key)),
92 }
93 .ready_try_take_while(move |((room_id_, _), _): &KeyVal<'_>| Ok(room_id == *room_id_))
94 .map_ok(move |((_, ts), count)| {
95 (MilliSecondsSinceUnixEpoch(ts), PduId { shortroomid, count: count.into() })
96 })
97 })
98 .try_flatten_stream()
99}
100
101#[implement(super::Service)]
104#[inline]
105pub fn all_pdus<'a>(
106 &'a self,
107 user_id: &'a UserId,
108 room_id: &'a RoomId,
109) -> impl Stream<Item = PdusIterItem> + Send + 'a {
110 self.pdus(Some(user_id), room_id, None)
111 .ignore_err()
112}
113
114#[implement(super::Service)]
117#[tracing::instrument(skip(self), level = "debug")]
118pub fn pdus<'a>(
119 &'a self,
120 user_id: Option<&'a UserId>,
121 room_id: &'a RoomId,
122 from: Option<PduCount>,
123) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
124 let from = from.unwrap_or_else(PduCount::min);
125 self.count_to_id(room_id, from, Direction::Forward)
126 .map_ok(move |current| {
127 let prefix = current.shortroomid();
128 self.db
129 .pduid_pdu
130 .raw_stream_from(¤t)
131 .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
132 .ready_and_then(move |item| Self::each_slice(item, user_id))
133 })
134 .try_flatten_stream()
135}
136
137#[implement(super::Service)]
140#[tracing::instrument(skip(self), level = "debug")]
141pub fn pdus_rev<'a>(
142 &'a self,
143 user_id: Option<&'a UserId>,
144 room_id: &'a RoomId,
145 until: Option<PduCount>,
146) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
147 let until = until.unwrap_or_else(PduCount::max);
148 self.count_to_id(room_id, until, Direction::Backward)
149 .map_ok(move |current| {
150 let prefix = current.shortroomid();
151 self.db
152 .pduid_pdu
153 .rev_raw_stream_from(¤t)
154 .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
155 .ready_and_then(move |item| Self::each_slice(item, user_id))
156 })
157 .try_flatten_stream()
158}
159
160#[implement(super::Service)]
161pub fn pdus_raw(&self) -> impl Stream<Item = Result<Val<'_>>> + Send {
162 self.db.pduid_pdu.raw_stream().map_ok(at!(1))
163}
164
165#[implement(super::Service)]
166pub fn outlier_pdus_raw(&self) -> impl Stream<Item = Result<Val<'_>>> + Send {
167 self.db
168 .eventid_outlierpdu
169 .raw_stream()
170 .map_ok(at!(1))
171}
172
173#[implement(super::Service)]
174fn each_slice((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
175 let pdu_id: RawPduId = pdu_id.into();
176 let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
177
178 Self::each_pdu((pdu_id, pdu), user_id)
179}
180
181#[implement(super::Service)]
182fn each_pdu(
183 (pdu_id, mut pdu): (RawPduId, PduEvent),
184 user_id: Option<&UserId>,
185) -> Result<PdusIterItem> {
186 if Some(pdu.sender.borrow()) != user_id {
187 pdu.remove_transaction_id().log_err().ok();
188 }
189
190 pdu.add_age().log_err().ok();
191
192 Ok((pdu_id.pdu_count(), pdu))
193}