tuwunel_service/rooms/timeline/
backfill.rs1use std::{collections::HashSet, iter::once, num::NonZeroUsize};
2
3use futures::{
4 FutureExt, StreamExt, TryFutureExt,
5 future::{join, try_join, try_join4},
6};
7use rand::seq::SliceRandom;
8use ruma::{
9 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
10 api::Direction, events::TimelineEventType,
11};
12use serde::Deserialize;
13use serde_json::value::RawValue as RawJsonValue;
14use tuwunel_core::{
15 Result, at, debug, debug_warn, implement, is_false,
16 matrix::{
17 event::Event,
18 pdu::{PduCount, PduId, RawPduId},
19 },
20 utils::{
21 BoolExt, IterStream, ReadyExt,
22 future::{BoolExt as FutureBoolExt, TryExtExt},
23 },
24 validated, warn,
25};
26use tuwunel_database::Json;
27
28use super::{ExtractBody, bias_count};
29use crate::{
30 federation::Candidates,
31 fetcher::{Op, Opts},
32 rooms::state_accessor::plain_text_topic,
33};
34
35const BACKFILL_LIMIT: NonZeroUsize = NonZeroUsize::new(100).unwrap();
37
38#[derive(Deserialize)]
41struct TimestampHit {
42 event_id: OwnedEventId,
43 origin_server_ts: MilliSecondsSinceUnixEpoch,
44}
45
46#[implement(super::Service)]
47#[tracing::instrument(name = "backfill", level = "debug", skip(self))]
48pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result {
49 let (first_pdu_count, first_pdu) = self
50 .first_item_in_room(room_id)
51 .await
52 .expect("Room is not empty");
53
54 if first_pdu_count < from {
56 return Ok(());
57 }
58
59 if *first_pdu.event_type() == TimelineEventType::RoomCreate {
61 return Ok(());
62 }
63
64 let empty_room = self
65 .services
66 .state_cache
67 .room_joined_count(room_id)
68 .map_ok_or(true, |count| count <= 1);
69
70 let not_world_readable = self
71 .services
72 .state_accessor
73 .is_world_readable(room_id)
74 .map(is_false!());
75
76 if empty_room.and(not_world_readable).await {
78 return Ok(());
79 }
80
81 let eligible = self.backfill_candidates(room_id).await;
82
83 let no_backfill = || {
84 warn!(%room_id, "No servers could backfill, but backfill was needed");
85 Ok(())
86 };
87
88 if eligible.is_empty() {
92 return no_backfill();
93 }
94
95 let opts = Opts::new(Op::Backfill, room_id.to_owned())
96 .event_id(first_pdu.event_id().to_owned())
97 .candidates(eligible)
98 .backfill_limit(BACKFILL_LIMIT);
99
100 let Ok(outcome) = self
101 .services
102 .fetcher
103 .fetch(opts)
104 .inspect_err(|e| warn!(%room_id, "Backfilling failed: {e}"))
105 .await
106 else {
107 return no_backfill();
108 };
109
110 let pdus: Vec<Box<RawJsonValue>> = serde_json::from_slice(&outcome.bytes)?;
111
112 pdus.into_iter()
113 .stream()
114 .for_each(async |pdu| {
115 self.backfill_pdu(room_id, &outcome.origin, pdu)
116 .await
117 .inspect_err(|e| debug_warn!(%room_id, "Failed to add backfilled pdu: {e}"))
118 .ok();
119 })
120 .await;
121
122 Ok(())
123}
124
125#[implement(super::Service)]
126async fn backfill_candidates(&self, room_id: &RoomId) -> Candidates {
127 let canonical_alias = self
128 .services
129 .state_accessor
130 .get_canonical_alias(room_id);
131
132 let power_levels = self
133 .services
134 .state_accessor
135 .get_power_levels(room_id);
136
137 let (canonical_alias, power_levels) = join(canonical_alias, power_levels).await;
138
139 let power_servers = power_levels
140 .iter()
141 .flat_map(|power| {
142 power
143 .rules
144 .privileged_creators
145 .iter()
146 .flat_map(|creators| creators.iter())
147 })
148 .chain(power_levels.iter().flat_map(|power| {
149 power
150 .users
151 .iter()
152 .filter_map(|(user_id, level)| level.gt(&power.users_default).then_some(user_id))
153 }))
154 .filter_map(|user_id| {
155 self.services
156 .globals
157 .user_is_local(user_id)
158 .is_false()
159 .then_some(user_id.server_name())
160 })
161 .collect::<HashSet<_>>();
162
163 let power_servers = {
164 let mut vec: Vec<_> = power_servers
165 .into_iter()
166 .map(ToOwned::to_owned)
167 .collect();
168
169 vec.shuffle(&mut rand::rng());
170 vec.into_iter().stream()
171 };
172
173 let canonical_room_alias_server = once(canonical_alias)
174 .filter_map(Result::ok)
175 .map(|alias| alias.server_name().to_owned())
176 .stream();
177
178 let trusted_servers = self
179 .services
180 .server
181 .config
182 .trusted_servers
183 .iter()
184 .map(ToOwned::to_owned)
185 .stream();
186
187 power_servers
188 .chain(canonical_room_alias_server)
189 .chain(trusted_servers)
190 .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name))
191 .filter_map(async |server_name| {
192 self.services
193 .state_cache
194 .server_in_room(&server_name, room_id)
195 .await
196 .then_some(server_name)
197 })
198 .collect()
199 .await
200}
201
202#[implement(super::Service)]
203pub async fn get_event_id_near_ts_with_fallback(
204 &self,
205 room_id: &RoomId,
206 ts: MilliSecondsSinceUnixEpoch,
207 dir: Direction,
208) -> Result<(MilliSecondsSinceUnixEpoch, OwnedEventId)> {
209 let local = self.get_event_id_near_ts(room_id, ts, dir).await;
210
211 let federate = match &local {
213 | Err(_) => true,
214 | Ok((_, event_id)) =>
215 dir == Direction::Forward && self.is_start_edge_hit(room_id, event_id).await,
216 };
217
218 if !federate {
219 return local;
220 }
221
222 let candidates = self.backfill_candidates(room_id).await;
223 if candidates.is_empty() {
224 return local;
225 }
226
227 let opts = Opts::new(Op::TimestampToEvent, room_id.to_owned())
228 .ts(ts)
229 .dir(dir)
230 .candidates(candidates)
231 .checks(false);
232
233 let Ok(outcome) = self.services.fetcher.fetch(opts).await else {
234 return local;
235 };
236
237 let Ok(TimestampHit { event_id, origin_server_ts }) = serde_json::from_slice(&outcome.bytes)
238 else {
239 return local;
240 };
241
242 if let Ok((local_ts, local_id)) = &local
244 && !nearer(dir, origin_server_ts, *local_ts)
245 {
246 return Ok((*local_ts, local_id.clone()));
247 }
248
249 let Ok(()) = self
251 .backfill_event(room_id, &event_id, &outcome.origin)
252 .await
253 .inspect_err(|e| debug_warn!(%room_id, "timestamp fallback backfill failed: {e}"))
254 else {
255 return local;
256 };
257
258 Ok((origin_server_ts, event_id))
259}
260
261#[implement(super::Service)]
262async fn is_start_edge_hit(&self, room_id: &RoomId, event_id: &EventId) -> bool {
263 self.first_item_in_room(room_id)
264 .await
265 .is_ok_and(|(_, first)| {
266 *first.event_type() != TimelineEventType::RoomCreate && first.event_id() == event_id
267 })
268}
269
270fn nearer(dir: Direction, a: MilliSecondsSinceUnixEpoch, b: MilliSecondsSinceUnixEpoch) -> bool {
272 match dir {
273 | Direction::Forward => a < b,
274 | Direction::Backward => a > b,
275 }
276}
277
278#[implement(super::Service)]
279async fn backfill_event(
280 &self,
281 room_id: &RoomId,
282 event_id: &EventId,
283 origin: &ServerName,
284) -> Result {
285 let opts = Opts::new(Op::Backfill, room_id.to_owned())
286 .event_id(event_id.to_owned())
287 .candidates([origin.to_owned()])
288 .backfill_limit(BACKFILL_LIMIT);
289
290 let outcome = self.services.fetcher.fetch(opts).await?;
291
292 let pdus: Vec<Box<RawJsonValue>> = serde_json::from_slice(&outcome.bytes)?;
293
294 pdus.into_iter()
295 .stream()
296 .for_each(async |pdu| {
297 self.backfill_pdu(room_id, &outcome.origin, pdu)
298 .await
299 .inspect_err(|e| debug_warn!(%room_id, "Failed to add backfilled pdu: {e}"))
300 .ok();
301 })
302 .await;
303
304 Ok(())
305}
306
307#[implement(super::Service)]
311#[tracing::instrument(skip(self), level = "debug")]
312pub async fn fetch_remote_event(&self, room_id: &RoomId, event_id: &EventId) -> Result {
313 let opts = Opts::new(Op::Event, room_id.to_owned())
314 .event_id(event_id.to_owned())
315 .checks(false);
316
317 let outcome = self.services.fetcher.fetch(opts).await?;
318
319 let pdu: Box<RawJsonValue> = serde_json::from_slice(&outcome.bytes)?;
320
321 self.backfill_pdu(room_id, &outcome.origin, pdu)
322 .await
323}
324
325#[implement(super::Service)]
326#[tracing::instrument(skip(self, pdu), level = "debug")]
327pub async fn backfill_pdu(
328 &self,
329 room_id: &RoomId,
330 origin: &ServerName,
331 pdu: Box<RawJsonValue>,
332) -> Result {
333 let parsed = self
334 .services
335 .event_handler
336 .parse_incoming_pdu(&pdu);
337
338 let mutex_lock = self
340 .services
341 .event_handler
342 .mutex_federation
343 .lock(room_id)
344 .map(Ok);
345
346 let ((_, event_id, value), mutex_lock) = try_join(parsed, mutex_lock).await?;
347
348 let existed = self
349 .services
350 .event_handler
351 .handle_incoming_pdu(origin, room_id, &event_id, value, false)
352 .boxed()
353 .await?
354 .map(at!(1))
355 .is_some_and(is_false!());
356
357 if existed {
359 return Ok(());
360 }
361
362 let pdu = self.get_pdu(&event_id);
363
364 let value = self.get_pdu_json(&event_id);
365
366 let shortroomid = self.services.short.get_shortroomid(room_id);
367
368 let insert_lock = self.mutex_insert.lock(room_id).map(Ok);
369
370 let (pdu, value, shortroomid, insert_lock) =
371 try_join4(pdu, value, shortroomid, insert_lock).await?;
372
373 let count = self.services.globals.next_count();
376 let count: i64 = (*count).try_into()?;
377 let pdu_id: RawPduId = PduId {
378 shortroomid,
379 count: PduCount::Backfilled(validated!(0 - count)),
380 }
381 .into();
382
383 self.prepend_backfill_pdu(
385 &pdu_id,
386 room_id,
387 &event_id,
388 u64::from(pdu.origin_server_ts),
389 &value,
390 );
391 drop(insert_lock);
392
393 match pdu.kind {
394 | TimelineEventType::RoomMessage => {
395 let content: ExtractBody = pdu.get_content()?;
396 if let Some(body) = content.body {
397 self.services
398 .search
399 .index_pdu(shortroomid, &pdu_id, &body);
400 }
401 },
402 | TimelineEventType::RoomTopic =>
403 if let Some(topic) = pdu.get_content().ok().and_then(plain_text_topic) {
404 self.services
405 .search
406 .index_pdu(shortroomid, &pdu_id, &topic);
407 },
408 | _ => {},
409 }
410
411 drop(mutex_lock);
412
413 debug!("Prepended backfill pdu");
414 Ok(())
415}
416
417#[implement(super::Service)]
418fn prepend_backfill_pdu(
419 &self,
420 pdu_id: &RawPduId,
421 room_id: &RoomId,
422 event_id: &EventId,
423 origin_server_ts: u64,
424 json: &CanonicalJsonObject,
425) {
426 self.db.pduid_pdu.raw_put(pdu_id, Json(json));
427
428 self.db.eventid_pduid.insert(event_id, pdu_id);
429
430 self.db.eventid_outlierpdu.remove(event_id);
431
432 let count_key = bias_count(pdu_id.count());
433
434 self.db
435 .roomid_tscount_pducount
436 .put_raw((room_id, origin_server_ts, count_key), pdu_id.count());
437}