tuwunel_service/rooms/timeline/
pdus.rs1use std::borrow::Borrow;
2
3use futures::{
4 Stream, TryFutureExt, TryStreamExt,
5 future::Either::{Left, Right},
6};
7use ruma::{MilliSecondsSinceUnixEpoch, RoomId, UInt, UserId, api::Direction};
8use tuwunel_core::{
9 Result, at, err, implement,
10 matrix::pdu::{PduCount, PduEvent},
11 trace,
12 utils::{
13 result::LogErr,
14 stream::{TryIgnore, TryReadyExt, TryWidebandExt},
15 },
16 warn,
17};
18use tuwunel_database::{KeyVal, keyval::Val};
19
20use super::{PduId, RawPduId};
21
22pub type PdusIterItem = (PduCount, PduEvent);
23
24#[must_use]
27pub fn bias_count(count: [u8; 8]) -> u64 {
28 i64::from_be_bytes(count)
29 .wrapping_sub(i64::MIN)
30 .cast_unsigned()
31}
32
33#[implement(super::Service)]
34pub async fn delete_pdus(&self, room_id: &RoomId) -> Result {
35 let current = self
36 .count_to_id(room_id, PduCount::min(), Direction::Forward)
37 .await?;
38
39 let prefix = current.shortroomid();
40 self.db
41 .pduid_pdu
42 .raw_stream_from(¤t)
43 .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
44 .ready_try_for_each(move |(key, value)| {
45 let pdu = serde_json::from_slice::<PduEvent>(value)?;
46 let ts: u64 = pdu.origin_server_ts.into();
47 let event_id = &pdu.event_id;
48
49 self.db.pduid_pdu.remove(key);
50 self.db.eventid_pduid.remove(event_id);
51 self.db.eventid_outlierpdu.remove(event_id);
52 self.db.roomid_tscount_pducount.del((
53 room_id,
54 ts,
55 bias_count(RawPduId::from(key).count()),
56 ));
57
58 trace!(?event_id, ?room_id, ?ts, ?key, "Removed");
59
60 Ok(())
61 })
62 .await
63}
64
65#[implement(super::Service)]
66pub fn pdus_near_ts(
67 &self,
68 user_id: Option<&UserId>,
69 room_id: &RoomId,
70 ts: MilliSecondsSinceUnixEpoch,
71 dir: Direction,
72) -> impl Stream<Item = Result<PdusIterItem>> + Send {
73 self.pdu_ids_near_ts(room_id, ts, dir)
74 .map_ok(|(ts, pdu_id)| (ts, pdu_id.into()))
75 .wide_and_then(async |(_, pdu_id): (_, RawPduId)| {
76 self.get_pdu_from_id(&pdu_id)
77 .map_ok(|pdu| (pdu_id, pdu))
78 .await
79 })
80 .ready_and_then(move |item| Self::each_pdu(item, user_id))
81}
82
83#[implement(super::Service)]
84pub fn pdu_ids_near_ts(
85 &self,
86 room_id: &RoomId,
87 ts: MilliSecondsSinceUnixEpoch,
88 dir: Direction,
89) -> impl Stream<Item = Result<(MilliSecondsSinceUnixEpoch, PduId)>> + Send {
90 use Direction::{Backward, Forward};
91
92 type KeyVal<'a> = ((&'a RoomId, UInt, u64), i64);
93
94 let ts: u64 = ts.get().into();
95
96 self.services
97 .short
98 .get_shortroomid(room_id)
99 .map_err(|e| err!(Request(NotFound("Room not found: {e:?}"))))
100 .map_ok(move |shortroomid| {
101 match dir {
102 | Forward => Left(self.db.roomid_tscount_pducount.stream_from(&(
103 room_id,
104 ts,
105 u64::MIN,
106 ))),
107 | Backward => Right(self.db.roomid_tscount_pducount.rev_stream_from(&(
108 room_id,
109 ts,
110 u64::MAX,
111 ))),
112 }
113 .ready_try_take_while(
114 move |((room_id_, ..), _): &KeyVal<'_>| Ok(room_id == *room_id_),
115 )
116 .map_ok(move |((_, ts, _), count)| {
117 (MilliSecondsSinceUnixEpoch(ts), PduId { shortroomid, count: count.into() })
118 })
119 })
120 .try_flatten_stream()
121}
122
123#[implement(super::Service)]
126#[inline]
127pub fn all_pdus<'a>(
128 &'a self,
129 user_id: &'a UserId,
130 room_id: &'a RoomId,
131) -> impl Stream<Item = PdusIterItem> + Send + 'a {
132 self.pdus(Some(user_id), room_id, None)
133 .ignore_err()
134}
135
136#[implement(super::Service)]
139#[tracing::instrument(skip(self), level = "debug")]
140pub fn pdus<'a>(
141 &'a self,
142 user_id: Option<&'a UserId>,
143 room_id: &'a RoomId,
144 from: Option<PduCount>,
145) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
146 let from = from.unwrap_or_else(PduCount::min);
147 self.count_to_id(room_id, from, Direction::Forward)
148 .map_ok(move |current| {
149 let prefix = current.shortroomid();
150 self.db
151 .pduid_pdu
152 .raw_stream_from(¤t)
153 .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
154 .ready_and_then(move |item| Self::each_slice(item, user_id))
155 })
156 .try_flatten_stream()
157}
158
159#[implement(super::Service)]
162#[tracing::instrument(skip(self), level = "debug")]
163pub fn pdus_rev<'a>(
164 &'a self,
165 user_id: Option<&'a UserId>,
166 room_id: &'a RoomId,
167 until: Option<PduCount>,
168) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
169 let until = until.unwrap_or_else(PduCount::max);
170 self.count_to_id(room_id, until, Direction::Backward)
171 .map_ok(move |current| {
172 let prefix = current.shortroomid();
173 self.db
174 .pduid_pdu
175 .rev_raw_stream_from(¤t)
176 .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
177 .ready_and_then(move |item| Self::each_slice(item, user_id))
178 })
179 .try_flatten_stream()
180}
181
182#[implement(super::Service)]
183pub fn pdus_raw(&self) -> impl Stream<Item = Result<Val<'_>>> + Send {
184 self.db.pduid_pdu.raw_stream().map_ok(at!(1))
185}
186
187#[implement(super::Service)]
188pub fn outlier_pdus_raw(&self) -> impl Stream<Item = Result<Val<'_>>> + Send {
189 self.db
190 .eventid_outlierpdu
191 .raw_stream()
192 .map_ok(at!(1))
193}
194
195#[implement(super::Service)]
196fn each_slice((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
197 let pdu_id: RawPduId = pdu_id.into();
198 let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
199
200 Self::each_pdu((pdu_id, pdu), user_id)
201}
202
203#[implement(super::Service)]
204fn each_pdu(
205 (pdu_id, mut pdu): (RawPduId, PduEvent),
206 user_id: Option<&UserId>,
207) -> Result<PdusIterItem> {
208 if Some(pdu.sender.borrow()) != user_id {
209 pdu.remove_transaction_id().log_err().ok();
210 }
211
212 pdu.add_age().log_err().ok();
213
214 Ok((pdu_id.pdu_count(), pdu))
215}