tuwunel_service/rooms/timeline/
backfill.rs1use std::{collections::HashSet, iter::once};
2
3use futures::{
4 FutureExt, StreamExt, TryFutureExt,
5 future::{join, try_join, try_join4},
6};
7use rand::seq::SliceRandom;
8use ruma::{
9 CanonicalJsonObject, EventId, RoomId, ServerName, api::federation, events::TimelineEventType,
10 uint,
11};
12use serde_json::value::RawValue as RawJsonValue;
13use tuwunel_core::{
14 Result, at, debug, debug_info, debug_warn, implement, is_false,
15 matrix::{
16 event::Event,
17 pdu::{PduCount, PduId, RawPduId},
18 },
19 utils::{
20 BoolExt, IterStream, ReadyExt,
21 future::{BoolExt as FutureBoolExt, TryExtExt},
22 },
23 validated, warn,
24};
25use tuwunel_database::Json;
26
27use super::ExtractBody;
28
29#[implement(super::Service)]
30#[tracing::instrument(name = "backfill", level = "debug", skip(self))]
31pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result {
32 let (first_pdu_count, first_pdu) = self
33 .first_item_in_room(room_id)
34 .await
35 .expect("Room is not empty");
36
37 if first_pdu_count < from {
39 return Ok(());
40 }
41
42 if *first_pdu.event_type() == TimelineEventType::RoomCreate {
44 return Ok(());
45 }
46
47 let empty_room = self
48 .services
49 .state_cache
50 .room_joined_count(room_id)
51 .map_ok_or(true, |count| count <= 1);
52
53 let not_world_readable = self
54 .services
55 .state_accessor
56 .is_world_readable(room_id)
57 .map(is_false!());
58
59 if empty_room.and(not_world_readable).await {
61 return Ok(());
62 }
63
64 let canonical_alias = self
65 .services
66 .state_accessor
67 .get_canonical_alias(room_id);
68
69 let power_levels = self
70 .services
71 .state_accessor
72 .get_power_levels(room_id);
73
74 let (canonical_alias, power_levels) = join(canonical_alias, power_levels).await;
75
76 let power_servers = power_levels
77 .iter()
78 .flat_map(|power| {
79 power
80 .rules
81 .privileged_creators
82 .iter()
83 .flat_map(|creators| creators.iter())
84 })
85 .chain(power_levels.iter().flat_map(|power| {
86 power
87 .users
88 .iter()
89 .filter_map(|(user_id, level)| level.gt(&power.users_default).then_some(user_id))
90 }))
91 .filter_map(|user_id| {
92 self.services
93 .globals
94 .user_is_local(user_id)
95 .is_false()
96 .then_some(user_id.server_name())
97 })
98 .collect::<HashSet<_>>();
99
100 let power_servers = {
101 let mut vec: Vec<_> = power_servers
102 .into_iter()
103 .map(ToOwned::to_owned)
104 .collect();
105
106 vec.shuffle(&mut rand::rng());
107 vec.into_iter().stream()
108 };
109
110 let canonical_room_alias_server = once(canonical_alias)
111 .filter_map(Result::ok)
112 .map(|alias| alias.server_name().to_owned())
113 .stream();
114
115 let trusted_servers = self
116 .services
117 .server
118 .config
119 .trusted_servers
120 .iter()
121 .map(ToOwned::to_owned)
122 .stream();
123
124 let mut servers = power_servers
125 .chain(canonical_room_alias_server)
126 .chain(trusted_servers)
127 .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name))
128 .filter_map(async |server_name| {
129 self.services
130 .state_cache
131 .server_in_room(&server_name, room_id)
132 .await
133 .then_some(server_name)
134 })
135 .boxed();
136
137 while let Some(ref backfill_server) = servers.next().await {
138 let request = federation::backfill::get_backfill::v1::Request {
139 room_id: room_id.to_owned(),
140 v: vec![first_pdu.event_id().to_owned()],
141 limit: uint!(100),
142 };
143
144 debug_info!("Asking {backfill_server} for backfill");
145 if let Ok(response) = self
146 .services
147 .federation
148 .execute(backfill_server, request)
149 .inspect_err(|e| {
150 warn!("{backfill_server} failed backfilling for room {room_id}: {e}");
151 })
152 .await
153 {
154 return response
155 .pdus
156 .into_iter()
157 .stream()
158 .for_each(async |pdu| {
159 if let Err(e) = self
160 .backfill_pdu(room_id, backfill_server, pdu)
161 .await
162 {
163 debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}");
164 }
165 })
166 .map(Ok)
167 .await;
168 }
169 }
170
171 warn!("No servers could backfill, but backfill was needed in room {room_id}");
172
173 Ok(())
174}
175
176#[implement(super::Service)]
177#[tracing::instrument(skip(self, pdu), level = "debug")]
178pub async fn backfill_pdu(
179 &self,
180 room_id: &RoomId,
181 origin: &ServerName,
182 pdu: Box<RawJsonValue>,
183) -> Result {
184 let parsed = self
185 .services
186 .event_handler
187 .parse_incoming_pdu(&pdu);
188
189 let mutex_lock = self
191 .services
192 .event_handler
193 .mutex_federation
194 .lock(room_id)
195 .map(Ok);
196
197 let ((_, event_id, value), mutex_lock) = try_join(parsed, mutex_lock).await?;
198
199 let existed = self
200 .services
201 .event_handler
202 .handle_incoming_pdu(origin, room_id, &event_id, value, false)
203 .boxed()
204 .await?
205 .map(at!(1))
206 .is_some_and(is_false!());
207
208 if existed {
210 return Ok(());
211 }
212
213 let pdu = self.get_pdu(&event_id);
214
215 let value = self.get_pdu_json(&event_id);
216
217 let shortroomid = self.services.short.get_shortroomid(room_id);
218
219 let insert_lock = self.mutex_insert.lock(room_id).map(Ok);
220
221 let (pdu, value, shortroomid, insert_lock) =
222 try_join4(pdu, value, shortroomid, insert_lock).await?;
223
224 let count = self.services.globals.next_count();
227 let count: i64 = (*count).try_into()?;
228 let pdu_id: RawPduId = PduId {
229 shortroomid,
230 count: PduCount::Backfilled(validated!(0 - count)),
231 }
232 .into();
233
234 self.prepend_backfill_pdu(
236 &pdu_id,
237 room_id,
238 &event_id,
239 u64::from(pdu.origin_server_ts),
240 &value,
241 );
242 drop(insert_lock);
243
244 if pdu.kind == TimelineEventType::RoomMessage {
245 let content: ExtractBody = pdu.get_content()?;
246 if let Some(body) = content.body {
247 self.services
248 .search
249 .index_pdu(shortroomid, &pdu_id, &body);
250 }
251 }
252 drop(mutex_lock);
253
254 debug!("Prepended backfill pdu");
255 Ok(())
256}
257
258#[implement(super::Service)]
259fn prepend_backfill_pdu(
260 &self,
261 pdu_id: &RawPduId,
262 room_id: &RoomId,
263 event_id: &EventId,
264 origin_server_ts: u64,
265 json: &CanonicalJsonObject,
266) {
267 self.db.pduid_pdu.raw_put(pdu_id, Json(json));
268
269 self.db.eventid_pduid.insert(event_id, pdu_id);
270
271 self.db.eventid_outlierpdu.remove(event_id);
272
273 self.db
274 .roomid_ts_pducount
275 .put_raw((room_id, origin_server_ts), pdu_id.count());
276}