Skip to main content

tuwunel_service/fetcher/
transport.rs

1//! Federation transport: an [`Op`] and target server in, raw response bytes
2//! out.
3//!
4//! The [`Transport`] seam isolates the network behind a trait the tests mock;
5//! [`FederationTransport`] is the production impl.
6
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use bytes::Bytes;
11use ruma::{
12	MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, ServerName, UInt,
13	api::federation::{
14		authorization::get_event_authorization::v1::Request as EventAuthRequest,
15		backfill::get_backfill::v1::Request as BackfillRequest,
16		event::{
17			get_event::v1::Request as EventRequest,
18			get_event_by_timestamp::v1::Request as TimestampRequest,
19			get_missing_events::v1::Request as MissingEventsRequest,
20			get_room_state_ids::v1::Request as StateIdsRequest,
21		},
22	},
23};
24use tuwunel_core::{Result, err, utils::BoolExt};
25
26use super::{Op, Opts};
27use crate::services::OnceServices;
28
29/// Injection seam between the fetcher and the network. The production impl
30/// routes through `federation::execute`; tests substitute a scripted mock.
31#[async_trait]
32pub(super) trait Transport: Send + Sync {
33	async fn fetch_raw(&self, op: Op, server: &ServerName, opts: &Opts) -> Result<Bytes>;
34}
35
36pub(super) struct FederationTransport {
37	pub(super) services: Arc<OnceServices>,
38}
39
40#[async_trait]
41impl Transport for FederationTransport {
42	#[tracing::instrument(
43		level = "debug",
44		skip(self, opts),
45		fields(
46			%server,
47		),
48	)]
49	async fn fetch_raw(&self, op: Op, server: &ServerName, opts: &Opts) -> Result<Bytes> {
50		let federation = &self.services.federation;
51
52		match op {
53			| Op::Event | Op::AuthEvent => {
54				let event_id = require_event_id(opts)?;
55				let res = federation
56					.execute(server, EventRequest { event_id })
57					.await?;
58
59				Ok(Bytes::copy_from_slice(res.pdu.get().as_bytes()))
60			},
61			| Op::AuthChain => {
62				let event_id = require_event_id(opts)?;
63				let room_id = require_room_id(opts)?;
64				let res = federation
65					.execute(server, EventAuthRequest { room_id, event_id })
66					.await?;
67
68				to_bytes(&res.auth_chain)
69			},
70			| Op::Backfill => {
71				let event_id = require_event_id(opts)?;
72				let room_id = require_room_id(opts)?;
73				let res = federation
74					.execute(server, BackfillRequest {
75						room_id,
76						v: vec![event_id],
77						limit: batch_limit(opts),
78					})
79					.await?;
80
81				to_bytes(&res.pdus)
82			},
83			| Op::StateIds => {
84				let event_id = require_event_id(opts)?;
85				let room_id = require_room_id(opts)?;
86				let res = federation
87					.execute(server, StateIdsRequest { room_id, event_id })
88					.await?;
89
90				to_bytes(&serde_json::json!({
91					"auth_chain_ids": res.auth_chain_ids,
92					"pdu_ids": res.pdu_ids,
93				}))
94			},
95			| Op::MissingEvents => {
96				require_latest_events(opts)?;
97				let room_id = require_room_id(opts)?;
98				let req = MissingEventsRequest {
99					room_id,
100					earliest_events: opts.earliest_events.to_vec(),
101					latest_events: opts.latest_events.to_vec(),
102					limit: batch_limit(opts),
103					min_depth: UInt::default(),
104				};
105
106				let res = federation.execute(server, req).await?;
107
108				to_bytes(&res.events)
109			},
110			| Op::TimestampToEvent => {
111				let room_id = require_room_id(opts)?;
112				let ts = require_ts(opts)?;
113				let res = federation
114					.execute(
115						server,
116						TimestampRequest::new(room_id, ts, opts.dir.unwrap_or_default()),
117					)
118					.await?;
119
120				to_bytes(&serde_json::json!({
121					"event_id": res.event_id,
122					"origin_server_ts": res.origin_server_ts,
123				}))
124			},
125		}
126	}
127}
128
129fn require_event_id(opts: &Opts) -> Result<OwnedEventId> {
130	opts.event_id
131		.clone()
132		.ok_or_else(|| err!(Request(InvalidParam("event_id is required for op {:?}", opts.op))))
133}
134
135fn require_room_id(opts: &Opts) -> Result<OwnedRoomId> {
136	opts.room_id
137		.clone()
138		.ok_or_else(|| err!(Request(InvalidParam("room_id is required for op {:?}", opts.op))))
139}
140
141fn require_ts(opts: &Opts) -> Result<MilliSecondsSinceUnixEpoch> {
142	opts.ts
143		.ok_or_else(|| err!(Request(InvalidParam("ts is required for op {:?}", opts.op))))
144}
145
146fn require_latest_events(opts: &Opts) -> Result {
147	opts.latest_events
148		.is_empty()
149		.is_false()
150		.then_some(())
151		.ok_or_else(|| {
152			err!(Request(InvalidParam("latest_events is required for op {:?}", opts.op)))
153		})
154}
155
156/// Event count requested per batch op, defaulting to the federation default of
157/// 10 and saturating an oversized cap to the wire `UInt`.
158fn batch_limit(opts: &Opts) -> UInt {
159	opts.backfill_limit.map_or_else(
160		|| UInt::from(10_u8),
161		|n| UInt::new_saturating(u64::try_from(n.get()).unwrap_or(u64::MAX)),
162	)
163}
164
165fn to_bytes<T: serde::Serialize>(value: &T) -> Result<Bytes> {
166	serde_json::to_vec(value)
167		.map(Bytes::from)
168		.map_err(|e| err!(BadServerResponse("failed to re-encode federation response: {e}")))
169}