Skip to main content

tuwunel_service/rooms/timeline/
backfill.rs

1use std::{collections::HashSet, iter::once};
2
3use futures::{
4	FutureExt, StreamExt, TryFutureExt,
5	future::{join, try_join, try_join4},
6};
7use rand::seq::SliceRandom;
8use ruma::{
9	CanonicalJsonObject, EventId, RoomId, ServerName, api::federation, events::TimelineEventType,
10	uint,
11};
12use serde_json::value::RawValue as RawJsonValue;
13use tuwunel_core::{
14	Result, at, debug, debug_info, debug_warn, implement, is_false,
15	matrix::{
16		event::Event,
17		pdu::{PduCount, PduId, RawPduId},
18	},
19	utils::{
20		BoolExt, IterStream, ReadyExt,
21		future::{BoolExt as FutureBoolExt, TryExtExt},
22	},
23	validated, warn,
24};
25use tuwunel_database::Json;
26
27use super::ExtractBody;
28
29#[implement(super::Service)]
30#[tracing::instrument(name = "backfill", level = "debug", skip(self))]
31pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result {
32	let (first_pdu_count, first_pdu) = self
33		.first_item_in_room(room_id)
34		.await
35		.expect("Room is not empty");
36
37	// No backfill required, there are still events between them
38	if first_pdu_count < from {
39		return Ok(());
40	}
41
42	// No backfill required, reached the end.
43	if *first_pdu.event_type() == TimelineEventType::RoomCreate {
44		return Ok(());
45	}
46
47	let empty_room = self
48		.services
49		.state_cache
50		.room_joined_count(room_id)
51		.map_ok_or(true, |count| count <= 1);
52
53	let not_world_readable = self
54		.services
55		.state_accessor
56		.is_world_readable(room_id)
57		.map(is_false!());
58
59	// Room is empty (1 user or none), there is no one that can backfill
60	if empty_room.and(not_world_readable).await {
61		return Ok(());
62	}
63
64	let canonical_alias = self
65		.services
66		.state_accessor
67		.get_canonical_alias(room_id);
68
69	let power_levels = self
70		.services
71		.state_accessor
72		.get_power_levels(room_id);
73
74	let (canonical_alias, power_levels) = join(canonical_alias, power_levels).await;
75
76	let power_servers = power_levels
77		.iter()
78		.flat_map(|power| {
79			power
80				.rules
81				.privileged_creators
82				.iter()
83				.flat_map(|creators| creators.iter())
84		})
85		.chain(power_levels.iter().flat_map(|power| {
86			power
87				.users
88				.iter()
89				.filter_map(|(user_id, level)| level.gt(&power.users_default).then_some(user_id))
90		}))
91		.filter_map(|user_id| {
92			self.services
93				.globals
94				.user_is_local(user_id)
95				.is_false()
96				.then_some(user_id.server_name())
97		})
98		.collect::<HashSet<_>>();
99
100	let power_servers = {
101		let mut vec: Vec<_> = power_servers
102			.into_iter()
103			.map(ToOwned::to_owned)
104			.collect();
105
106		vec.shuffle(&mut rand::rng());
107		vec.into_iter().stream()
108	};
109
110	let canonical_room_alias_server = once(canonical_alias)
111		.filter_map(Result::ok)
112		.map(|alias| alias.server_name().to_owned())
113		.stream();
114
115	let trusted_servers = self
116		.services
117		.server
118		.config
119		.trusted_servers
120		.iter()
121		.map(ToOwned::to_owned)
122		.stream();
123
124	let mut servers = power_servers
125		.chain(canonical_room_alias_server)
126		.chain(trusted_servers)
127		.ready_filter(|server_name| !self.services.globals.server_is_ours(server_name))
128		.filter_map(async |server_name| {
129			self.services
130				.state_cache
131				.server_in_room(&server_name, room_id)
132				.await
133				.then_some(server_name)
134		})
135		.boxed();
136
137	while let Some(ref backfill_server) = servers.next().await {
138		let request = federation::backfill::get_backfill::v1::Request {
139			room_id: room_id.to_owned(),
140			v: vec![first_pdu.event_id().to_owned()],
141			limit: uint!(100),
142		};
143
144		debug_info!("Asking {backfill_server} for backfill");
145		if let Ok(response) = self
146			.services
147			.federation
148			.execute(backfill_server, request)
149			.inspect_err(|e| {
150				warn!("{backfill_server} failed backfilling for room {room_id}: {e}");
151			})
152			.await
153		{
154			return response
155				.pdus
156				.into_iter()
157				.stream()
158				.for_each(async |pdu| {
159					if let Err(e) = self
160						.backfill_pdu(room_id, backfill_server, pdu)
161						.await
162					{
163						debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}");
164					}
165				})
166				.map(Ok)
167				.await;
168		}
169	}
170
171	warn!("No servers could backfill, but backfill was needed in room {room_id}");
172
173	Ok(())
174}
175
176#[implement(super::Service)]
177#[tracing::instrument(skip(self, pdu), level = "debug")]
178pub async fn backfill_pdu(
179	&self,
180	room_id: &RoomId,
181	origin: &ServerName,
182	pdu: Box<RawJsonValue>,
183) -> Result {
184	let parsed = self
185		.services
186		.event_handler
187		.parse_incoming_pdu(&pdu);
188
189	// Lock so we cannot backfill the same pdu twice at the same time
190	let mutex_lock = self
191		.services
192		.event_handler
193		.mutex_federation
194		.lock(room_id)
195		.map(Ok);
196
197	let ((_, event_id, value), mutex_lock) = try_join(parsed, mutex_lock).await?;
198
199	let existed = self
200		.services
201		.event_handler
202		.handle_incoming_pdu(origin, room_id, &event_id, value, false)
203		.boxed()
204		.await?
205		.map(at!(1))
206		.is_some_and(is_false!());
207
208	// Bail if the PDU already exists; a duplicate insertion is not good.
209	if existed {
210		return Ok(());
211	}
212
213	let pdu = self.get_pdu(&event_id);
214
215	let value = self.get_pdu_json(&event_id);
216
217	let shortroomid = self.services.short.get_shortroomid(room_id);
218
219	let insert_lock = self.mutex_insert.lock(room_id).map(Ok);
220
221	let (pdu, value, shortroomid, insert_lock) =
222		try_join4(pdu, value, shortroomid, insert_lock).await?;
223
224	// A pdu_id is not returned from handle_incoming_pdu() when accepting a new
225	// event on this codepath. The pdu_id is instead created here in ℤ−
226	let count = self.services.globals.next_count();
227	let count: i64 = (*count).try_into()?;
228	let pdu_id: RawPduId = PduId {
229		shortroomid,
230		count: PduCount::Backfilled(validated!(0 - count)),
231	}
232	.into();
233
234	// Insert pdu
235	self.prepend_backfill_pdu(
236		&pdu_id,
237		room_id,
238		&event_id,
239		u64::from(pdu.origin_server_ts),
240		&value,
241	);
242	drop(insert_lock);
243
244	if pdu.kind == TimelineEventType::RoomMessage {
245		let content: ExtractBody = pdu.get_content()?;
246		if let Some(body) = content.body {
247			self.services
248				.search
249				.index_pdu(shortroomid, &pdu_id, &body);
250		}
251	}
252	drop(mutex_lock);
253
254	debug!("Prepended backfill pdu");
255	Ok(())
256}
257
258#[implement(super::Service)]
259fn prepend_backfill_pdu(
260	&self,
261	pdu_id: &RawPduId,
262	room_id: &RoomId,
263	event_id: &EventId,
264	origin_server_ts: u64,
265	json: &CanonicalJsonObject,
266) {
267	self.db.pduid_pdu.raw_put(pdu_id, Json(json));
268
269	self.db.eventid_pduid.insert(event_id, pdu_id);
270
271	self.db.eventid_outlierpdu.remove(event_id);
272
273	self.db
274		.roomid_ts_pducount
275		.put_raw((room_id, origin_server_ts), pdu_id.count());
276}