tuwunel_service/fetcher/
mod.rs1mod error;
9mod inflight;
10mod opts;
11mod select;
12mod transport;
13mod validate;
14mod worker;
15
16#[cfg(test)]
17mod tests;
18
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use loole::{Receiver, Sender, unbounded};
23use tokio::sync::{
24 oneshot::{self, channel},
25 watch,
26};
27use tuwunel_core::{Result, implement};
28
29pub use self::opts::{EventWindow, FanoutGrowth, Op, Opts, Outcome};
30use self::{
31 error::Failure,
32 inflight::{Key, SharedResult, Subscription},
33 select::{RoomCandidates, Select},
34 transport::{FederationTransport, Transport},
35};
36use crate::services::OnceServices;
37
38const REQUESTS_MAX: usize = 256;
40
41pub struct Service {
42 services: Arc<OnceServices>,
43 channel: (Sender<Msg>, Receiver<Msg>),
44 transport: Arc<dyn Transport>,
45 select: Arc<dyn Select>,
46 capacity: usize,
47}
48
49struct Msg {
52 key: Key,
53 opts: Opts,
54 reply: oneshot::Sender<Subscription>,
55}
56
57#[async_trait]
58impl crate::Service for Service {
59 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
60 let services = args.services.clone();
61 let transport: Arc<dyn Transport> =
62 Arc::new(FederationTransport { services: services.clone() });
63
64 let select: Arc<dyn Select> = Arc::new(RoomCandidates { services: services.clone() });
65
66 Ok(Arc::new(Self {
67 services,
68 channel: unbounded(),
69 transport,
70 select,
71 capacity: REQUESTS_MAX,
72 }))
73 }
74
75 async fn worker(self: Arc<Self>) -> Result {
76 self.run_worker().await;
77 Ok(())
78 }
79
80 async fn interrupt(&self) {
81 let (sender, _) = &self.channel;
82 if !sender.is_closed() {
83 sender.close();
84 }
85 }
86
87 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
88}
89
90#[implement(Service)]
95#[tracing::instrument(
96 level = "debug",
97 skip_all,
98 fields(
99 op = ?opts.op,
100 room_id = ?opts.room_id,
101 event_id = ?opts.event_id,
102 ),
103)]
104pub async fn fetch(&self, opts: Opts) -> Result<Arc<Outcome>> {
105 let key = Key::new(&opts);
106 let (reply, reply_rx) = channel();
107
108 self.channel
109 .0
110 .send(Msg { key, opts, reply })
111 .map_err(|_| Failure::Cancelled)?;
112
113 let (rx, _interest) = reply_rx.await.map_err(|_| Failure::Cancelled)?;
115
116 await_result(rx).await.map_err(Into::into)
117}
118
119async fn await_result(mut rx: watch::Receiver<Option<SharedResult>>) -> SharedResult {
120 rx.wait_for(Option::is_some)
121 .await
122 .map_or(Err(Failure::Cancelled), |value| value.clone().expect("present by predicate"))
123}