tuwunel_service/fetcher/
transport.rs1use std::sync::Arc;
8
9use async_trait::async_trait;
10use bytes::Bytes;
11use ruma::{
12 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, ServerName, UInt,
13 api::federation::{
14 authorization::get_event_authorization::v1::Request as EventAuthRequest,
15 backfill::get_backfill::v1::Request as BackfillRequest,
16 event::{
17 get_event::v1::Request as EventRequest,
18 get_event_by_timestamp::v1::Request as TimestampRequest,
19 get_missing_events::v1::Request as MissingEventsRequest,
20 get_room_state_ids::v1::Request as StateIdsRequest,
21 },
22 },
23};
24use tuwunel_core::{Result, err, utils::BoolExt};
25
26use super::{Op, Opts};
27use crate::services::OnceServices;
28
29#[async_trait]
32pub(super) trait Transport: Send + Sync {
33 async fn fetch_raw(&self, op: Op, server: &ServerName, opts: &Opts) -> Result<Bytes>;
34}
35
36pub(super) struct FederationTransport {
37 pub(super) services: Arc<OnceServices>,
38}
39
40#[async_trait]
41impl Transport for FederationTransport {
42 #[tracing::instrument(
43 level = "debug",
44 skip(self, opts),
45 fields(
46 %server,
47 ),
48 )]
49 async fn fetch_raw(&self, op: Op, server: &ServerName, opts: &Opts) -> Result<Bytes> {
50 let federation = &self.services.federation;
51
52 match op {
53 | Op::Event | Op::AuthEvent => {
54 let event_id = require_event_id(opts)?;
55 let res = federation
56 .execute(server, EventRequest { event_id })
57 .await?;
58
59 Ok(Bytes::copy_from_slice(res.pdu.get().as_bytes()))
60 },
61 | Op::AuthChain => {
62 let event_id = require_event_id(opts)?;
63 let room_id = require_room_id(opts)?;
64 let res = federation
65 .execute(server, EventAuthRequest { room_id, event_id })
66 .await?;
67
68 to_bytes(&res.auth_chain)
69 },
70 | Op::Backfill => {
71 let event_id = require_event_id(opts)?;
72 let room_id = require_room_id(opts)?;
73 let res = federation
74 .execute(server, BackfillRequest {
75 room_id,
76 v: vec![event_id],
77 limit: batch_limit(opts),
78 })
79 .await?;
80
81 to_bytes(&res.pdus)
82 },
83 | Op::StateIds => {
84 let event_id = require_event_id(opts)?;
85 let room_id = require_room_id(opts)?;
86 let res = federation
87 .execute(server, StateIdsRequest { room_id, event_id })
88 .await?;
89
90 to_bytes(&serde_json::json!({
91 "auth_chain_ids": res.auth_chain_ids,
92 "pdu_ids": res.pdu_ids,
93 }))
94 },
95 | Op::MissingEvents => {
96 require_latest_events(opts)?;
97 let room_id = require_room_id(opts)?;
98 let req = MissingEventsRequest {
99 room_id,
100 earliest_events: opts.earliest_events.to_vec(),
101 latest_events: opts.latest_events.to_vec(),
102 limit: batch_limit(opts),
103 min_depth: UInt::default(),
104 };
105
106 let res = federation.execute(server, req).await?;
107
108 to_bytes(&res.events)
109 },
110 | Op::TimestampToEvent => {
111 let room_id = require_room_id(opts)?;
112 let ts = require_ts(opts)?;
113 let res = federation
114 .execute(
115 server,
116 TimestampRequest::new(room_id, ts, opts.dir.unwrap_or_default()),
117 )
118 .await?;
119
120 to_bytes(&serde_json::json!({
121 "event_id": res.event_id,
122 "origin_server_ts": res.origin_server_ts,
123 }))
124 },
125 }
126 }
127}
128
129fn require_event_id(opts: &Opts) -> Result<OwnedEventId> {
130 opts.event_id
131 .clone()
132 .ok_or_else(|| err!(Request(InvalidParam("event_id is required for op {:?}", opts.op))))
133}
134
135fn require_room_id(opts: &Opts) -> Result<OwnedRoomId> {
136 opts.room_id
137 .clone()
138 .ok_or_else(|| err!(Request(InvalidParam("room_id is required for op {:?}", opts.op))))
139}
140
141fn require_ts(opts: &Opts) -> Result<MilliSecondsSinceUnixEpoch> {
142 opts.ts
143 .ok_or_else(|| err!(Request(InvalidParam("ts is required for op {:?}", opts.op))))
144}
145
146fn require_latest_events(opts: &Opts) -> Result {
147 opts.latest_events
148 .is_empty()
149 .is_false()
150 .then_some(())
151 .ok_or_else(|| {
152 err!(Request(InvalidParam("latest_events is required for op {:?}", opts.op)))
153 })
154}
155
156fn batch_limit(opts: &Opts) -> UInt {
159 opts.backfill_limit.map_or_else(
160 || UInt::from(10_u8),
161 |n| UInt::new_saturating(u64::try_from(n.get()).unwrap_or(u64::MAX)),
162 )
163}
164
165fn to_bytes<T: serde::Serialize>(value: &T) -> Result<Bytes> {
166 serde_json::to_vec(value)
167 .map(Bytes::from)
168 .map_err(|e| err!(BadServerResponse("failed to re-encode federation response: {e}")))
169}