tuwunel_api/server/
backfill.rs1use std::cmp;
2
3use axum::extract::State;
4use futures::{FutureExt, StreamExt, TryStreamExt};
5use ruma::{MilliSecondsSinceUnixEpoch, api::federation::backfill::get_backfill};
6use tuwunel_core::{
7 PduCount, Result,
8 utils::{IterStream, ReadyExt},
9};
10
11use super::AccessCheck;
12use crate::Ruma;
13
14const LIMIT_MAX: usize = 150;
17const LIMIT_DEFAULT: usize = 50;
19
20pub(crate) async fn get_backfill_route(
25 State(services): State<crate::State>,
26 ref body: Ruma<get_backfill::v1::Request>,
27) -> Result<get_backfill::v1::Response> {
28 AccessCheck {
29 services: &services,
30 origin: body.origin(),
31 room_id: &body.room_id,
32 event_id: None,
33 }
34 .check()
35 .await?;
36
37 let limit = body
38 .limit
39 .try_into()
40 .unwrap_or(LIMIT_DEFAULT)
41 .min(LIMIT_MAX);
42
43 let from = body
44 .v
45 .iter()
46 .stream()
47 .filter_map(|event_id| {
48 services
49 .timeline
50 .get_pdu_count(event_id)
51 .map(Result::ok)
52 })
53 .ready_fold(PduCount::min(), cmp::max)
54 .await;
55
56 let room_version = services
57 .state
58 .get_room_version(&body.room_id)
59 .await
60 .ok();
61
62 Ok(get_backfill::v1::Response {
63 origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
64
65 origin: services.globals.server_name().to_owned(),
66
67 pdus: services
68 .timeline
69 .pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
70 .try_filter_map(async |(_, pdu)| {
71 Ok(services
72 .state_accessor
73 .server_can_see_event(body.origin(), &pdu.room_id, &pdu.event_id)
74 .await
75 .then_some(pdu))
76 })
77 .try_filter_map(async |pdu| {
78 Ok(services
79 .timeline
80 .get_pdu_json(&pdu.event_id)
81 .await
82 .ok())
83 })
84 .take(limit)
85 .and_then(|pdu| {
86 services
87 .federation
88 .format_pdu_into(pdu, room_version.as_ref())
89 .map(Ok)
90 })
91 .try_collect()
92 .await?,
93 })
94}