Skip to main content

tuwunel_service/rooms/timeline/
backfill.rs

1use std::{collections::HashSet, iter::once, num::NonZeroUsize};
2
3use futures::{
4	FutureExt, StreamExt, TryFutureExt,
5	future::{join, try_join, try_join4},
6};
7use rand::seq::SliceRandom;
8use ruma::{
9	CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
10	api::Direction, events::TimelineEventType,
11};
12use serde::Deserialize;
13use serde_json::value::RawValue as RawJsonValue;
14use tuwunel_core::{
15	Result, at, debug, debug_warn, implement, is_false,
16	matrix::{
17		event::Event,
18		pdu::{PduCount, PduId, RawPduId},
19	},
20	utils::{
21		BoolExt, IterStream, ReadyExt,
22		future::{BoolExt as FutureBoolExt, TryExtExt},
23	},
24	validated, warn,
25};
26use tuwunel_database::Json;
27
28use super::{ExtractBody, bias_count};
29use crate::{
30	federation::Candidates,
31	fetcher::{Op, Opts},
32	rooms::state_accessor::plain_text_topic,
33};
34
35/// Events requested per backfill batch.
36const BACKFILL_LIMIT: NonZeroUsize = NonZeroUsize::new(100).unwrap();
37
38/// The `event_id` and timestamp parsed back out of an [`Op::TimestampToEvent`]
39/// fetch outcome.
40#[derive(Deserialize)]
41struct TimestampHit {
42	event_id: OwnedEventId,
43	origin_server_ts: MilliSecondsSinceUnixEpoch,
44}
45
46#[implement(super::Service)]
47#[tracing::instrument(name = "backfill", level = "debug", skip(self))]
48pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result {
49	let (first_pdu_count, first_pdu) = self
50		.first_item_in_room(room_id)
51		.await
52		.expect("Room is not empty");
53
54	// No backfill required, there are still events between them
55	if first_pdu_count < from {
56		return Ok(());
57	}
58
59	// No backfill required, reached the end.
60	if *first_pdu.event_type() == TimelineEventType::RoomCreate {
61		return Ok(());
62	}
63
64	let empty_room = self
65		.services
66		.state_cache
67		.room_joined_count(room_id)
68		.map_ok_or(true, |count| count <= 1);
69
70	let not_world_readable = self
71		.services
72		.state_accessor
73		.is_world_readable(room_id)
74		.map(is_false!());
75
76	// Room is empty (1 user or none), there is no one that can backfill
77	if empty_room.and(not_world_readable).await {
78		return Ok(());
79	}
80
81	let eligible = self.backfill_candidates(room_id).await;
82
83	let no_backfill = || {
84		warn!(%room_id, "No servers could backfill, but backfill was needed");
85		Ok(())
86	};
87
88	// Empty here, rather than deferring to the fetcher, keeps backfill scoped to
89	// the authoritative servers; the fetcher would otherwise fall back to the
90	// room's whole population.
91	if eligible.is_empty() {
92		return no_backfill();
93	}
94
95	let opts = Opts::new(Op::Backfill, room_id.to_owned())
96		.event_id(first_pdu.event_id().to_owned())
97		.candidates(eligible)
98		.backfill_limit(BACKFILL_LIMIT);
99
100	let Ok(outcome) = self
101		.services
102		.fetcher
103		.fetch(opts)
104		.inspect_err(|e| warn!(%room_id, "Backfilling failed: {e}"))
105		.await
106	else {
107		return no_backfill();
108	};
109
110	let pdus: Vec<Box<RawJsonValue>> = serde_json::from_slice(&outcome.bytes)?;
111
112	pdus.into_iter()
113		.stream()
114		.for_each(async |pdu| {
115			self.backfill_pdu(room_id, &outcome.origin, pdu)
116				.await
117				.inspect_err(|e| debug_warn!(%room_id, "Failed to add backfilled pdu: {e}"))
118				.ok();
119		})
120		.await;
121
122	Ok(())
123}
124
125#[implement(super::Service)]
126async fn backfill_candidates(&self, room_id: &RoomId) -> Candidates {
127	let canonical_alias = self
128		.services
129		.state_accessor
130		.get_canonical_alias(room_id);
131
132	let power_levels = self
133		.services
134		.state_accessor
135		.get_power_levels(room_id);
136
137	let (canonical_alias, power_levels) = join(canonical_alias, power_levels).await;
138
139	let power_servers = power_levels
140		.iter()
141		.flat_map(|power| {
142			power
143				.rules
144				.privileged_creators
145				.iter()
146				.flat_map(|creators| creators.iter())
147		})
148		.chain(power_levels.iter().flat_map(|power| {
149			power
150				.users
151				.iter()
152				.filter_map(|(user_id, level)| level.gt(&power.users_default).then_some(user_id))
153		}))
154		.filter_map(|user_id| {
155			self.services
156				.globals
157				.user_is_local(user_id)
158				.is_false()
159				.then_some(user_id.server_name())
160		})
161		.collect::<HashSet<_>>();
162
163	let power_servers = {
164		let mut vec: Vec<_> = power_servers
165			.into_iter()
166			.map(ToOwned::to_owned)
167			.collect();
168
169		vec.shuffle(&mut rand::rng());
170		vec.into_iter().stream()
171	};
172
173	let canonical_room_alias_server = once(canonical_alias)
174		.filter_map(Result::ok)
175		.map(|alias| alias.server_name().to_owned())
176		.stream();
177
178	let trusted_servers = self
179		.services
180		.server
181		.config
182		.trusted_servers
183		.iter()
184		.map(ToOwned::to_owned)
185		.stream();
186
187	power_servers
188		.chain(canonical_room_alias_server)
189		.chain(trusted_servers)
190		.ready_filter(|server_name| !self.services.globals.server_is_ours(server_name))
191		.filter_map(async |server_name| {
192			self.services
193				.state_cache
194				.server_in_room(&server_name, room_id)
195				.await
196				.then_some(server_name)
197		})
198		.collect()
199		.await
200}
201
202#[implement(super::Service)]
203pub async fn get_event_id_near_ts_with_fallback(
204	&self,
205	room_id: &RoomId,
206	ts: MilliSecondsSinceUnixEpoch,
207	dir: Direction,
208) -> Result<(MilliSecondsSinceUnixEpoch, OwnedEventId)> {
209	let local = self.get_event_id_near_ts(room_id, ts, dir).await;
210
211	// Federate on a local miss, or a forward hit at the start edge of our history.
212	let federate = match &local {
213		| Err(_) => true,
214		| Ok((_, event_id)) =>
215			dir == Direction::Forward && self.is_start_edge_hit(room_id, event_id).await,
216	};
217
218	if !federate {
219		return local;
220	}
221
222	let candidates = self.backfill_candidates(room_id).await;
223	if candidates.is_empty() {
224		return local;
225	}
226
227	let opts = Opts::new(Op::TimestampToEvent, room_id.to_owned())
228		.ts(ts)
229		.dir(dir)
230		.candidates(candidates)
231		.checks(false);
232
233	let Ok(outcome) = self.services.fetcher.fetch(opts).await else {
234		return local;
235	};
236
237	let Ok(TimestampHit { event_id, origin_server_ts }) = serde_json::from_slice(&outcome.bytes)
238	else {
239		return local;
240	};
241
242	// Keep the local hit when it is no farther from the timestamp than the remote.
243	if let Ok((local_ts, local_id)) = &local
244		&& !nearer(dir, origin_server_ts, *local_ts)
245	{
246		return Ok((*local_ts, local_id.clone()));
247	}
248
249	// Fail closed: an un-ingested event can't be visibility-checked, so keep local.
250	let Ok(()) = self
251		.backfill_event(room_id, &event_id, &outcome.origin)
252		.await
253		.inspect_err(|e| debug_warn!(%room_id, "timestamp fallback backfill failed: {e}"))
254	else {
255		return local;
256	};
257
258	Ok((origin_server_ts, event_id))
259}
260
261#[implement(super::Service)]
262async fn is_start_edge_hit(&self, room_id: &RoomId, event_id: &EventId) -> bool {
263	self.first_item_in_room(room_id)
264		.await
265		.is_ok_and(|(_, first)| {
266			*first.event_type() != TimelineEventType::RoomCreate && first.event_id() == event_id
267		})
268}
269
270/// Whether `a` is nearer the queried timestamp than `b` for a search in `dir`.
271fn nearer(dir: Direction, a: MilliSecondsSinceUnixEpoch, b: MilliSecondsSinceUnixEpoch) -> bool {
272	match dir {
273		| Direction::Forward => a < b,
274		| Direction::Backward => a > b,
275	}
276}
277
278#[implement(super::Service)]
279async fn backfill_event(
280	&self,
281	room_id: &RoomId,
282	event_id: &EventId,
283	origin: &ServerName,
284) -> Result {
285	let opts = Opts::new(Op::Backfill, room_id.to_owned())
286		.event_id(event_id.to_owned())
287		.candidates([origin.to_owned()])
288		.backfill_limit(BACKFILL_LIMIT);
289
290	let outcome = self.services.fetcher.fetch(opts).await?;
291
292	let pdus: Vec<Box<RawJsonValue>> = serde_json::from_slice(&outcome.bytes)?;
293
294	pdus.into_iter()
295		.stream()
296		.for_each(async |pdu| {
297			self.backfill_pdu(room_id, &outcome.origin, pdu)
298				.await
299				.inspect_err(|e| debug_warn!(%room_id, "Failed to add backfilled pdu: {e}"))
300				.ok();
301		})
302		.await;
303
304	Ok(())
305}
306
307/// Fetch a single event we have not received over federation and persist it via
308/// the backfill path, so a subsequent local lookup resolves it. Checks are off:
309/// `backfill_pdu` performs full signature, hash, and auth validation itself.
310#[implement(super::Service)]
311#[tracing::instrument(skip(self), level = "debug")]
312pub async fn fetch_remote_event(&self, room_id: &RoomId, event_id: &EventId) -> Result {
313	let opts = Opts::new(Op::Event, room_id.to_owned())
314		.event_id(event_id.to_owned())
315		.checks(false);
316
317	let outcome = self.services.fetcher.fetch(opts).await?;
318
319	let pdu: Box<RawJsonValue> = serde_json::from_slice(&outcome.bytes)?;
320
321	self.backfill_pdu(room_id, &outcome.origin, pdu)
322		.await
323}
324
325#[implement(super::Service)]
326#[tracing::instrument(skip(self, pdu), level = "debug")]
327pub async fn backfill_pdu(
328	&self,
329	room_id: &RoomId,
330	origin: &ServerName,
331	pdu: Box<RawJsonValue>,
332) -> Result {
333	let parsed = self
334		.services
335		.event_handler
336		.parse_incoming_pdu(&pdu);
337
338	// Lock so we cannot backfill the same pdu twice at the same time
339	let mutex_lock = self
340		.services
341		.event_handler
342		.mutex_federation
343		.lock(room_id)
344		.map(Ok);
345
346	let ((_, event_id, value), mutex_lock) = try_join(parsed, mutex_lock).await?;
347
348	let existed = self
349		.services
350		.event_handler
351		.handle_incoming_pdu(origin, room_id, &event_id, value, false)
352		.boxed()
353		.await?
354		.map(at!(1))
355		.is_some_and(is_false!());
356
357	// Bail if the PDU already exists; a duplicate insertion is not good.
358	if existed {
359		return Ok(());
360	}
361
362	let pdu = self.get_pdu(&event_id);
363
364	let value = self.get_pdu_json(&event_id);
365
366	let shortroomid = self.services.short.get_shortroomid(room_id);
367
368	let insert_lock = self.mutex_insert.lock(room_id).map(Ok);
369
370	let (pdu, value, shortroomid, insert_lock) =
371		try_join4(pdu, value, shortroomid, insert_lock).await?;
372
373	// A pdu_id is not returned from handle_incoming_pdu() when accepting a new
374	// event on this codepath. The pdu_id is instead created here in ℤ−
375	let count = self.services.globals.next_count();
376	let count: i64 = (*count).try_into()?;
377	let pdu_id: RawPduId = PduId {
378		shortroomid,
379		count: PduCount::Backfilled(validated!(0 - count)),
380	}
381	.into();
382
383	// Insert pdu
384	self.prepend_backfill_pdu(
385		&pdu_id,
386		room_id,
387		&event_id,
388		u64::from(pdu.origin_server_ts),
389		&value,
390	);
391	drop(insert_lock);
392
393	match pdu.kind {
394		| TimelineEventType::RoomMessage => {
395			let content: ExtractBody = pdu.get_content()?;
396			if let Some(body) = content.body {
397				self.services
398					.search
399					.index_pdu(shortroomid, &pdu_id, &body);
400			}
401		},
402		| TimelineEventType::RoomTopic =>
403			if let Some(topic) = pdu.get_content().ok().and_then(plain_text_topic) {
404				self.services
405					.search
406					.index_pdu(shortroomid, &pdu_id, &topic);
407			},
408		| _ => {},
409	}
410
411	drop(mutex_lock);
412
413	debug!("Prepended backfill pdu");
414	Ok(())
415}
416
417#[implement(super::Service)]
418fn prepend_backfill_pdu(
419	&self,
420	pdu_id: &RawPduId,
421	room_id: &RoomId,
422	event_id: &EventId,
423	origin_server_ts: u64,
424	json: &CanonicalJsonObject,
425) {
426	self.db.pduid_pdu.raw_put(pdu_id, Json(json));
427
428	self.db.eventid_pduid.insert(event_id, pdu_id);
429
430	self.db.eventid_outlierpdu.remove(event_id);
431
432	let count_key = bias_count(pdu_id.count());
433
434	self.db
435		.roomid_tscount_pducount
436		.put_raw((room_id, origin_server_ts, count_key), pdu_id.count());
437}