tuwunel_service/storage/
mod.rs1pub mod provider;
2
3use std::{collections::BTreeMap, path::PathBuf, sync::Arc};
4
5use async_trait::async_trait;
6use derive_more::Debug;
7use futures::TryStreamExt;
8pub use object_store::{CopyMode, GetResult, GetResultPayload, PutPayload, PutResult};
9use tuwunel_core::{
10 Result, at,
11 config::{StorageProvider, StorageProviderLocal},
12 err, implement,
13 utils::{BoolExt, stream::IterStream},
14};
15
16pub use self::provider::Provider;
17
18#[derive(Debug)]
19pub struct Service {
20 providers: Providers,
21
22 #[debug(skip)]
23 services: Arc<crate::services::OnceServices>,
24}
25
26type Providers = BTreeMap<String, Arc<Provider>>;
27
28#[async_trait]
29impl crate::Service for Service {
30 fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
31 Ok(Arc::new(Self {
32 services: args.services.clone(),
33 providers: Self::build_providers(args)?,
34 }))
35 }
36
37 async fn worker(self: Arc<Self>) -> Result {
38 self.start_providers().await?;
39
40 Ok(())
41 }
42
43 fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
44}
45
46#[implement(Service)]
47#[tracing::instrument(
48 level = "info",
49 err(level = "error")
50 skip_all,
51)]
52fn build_providers(args: &crate::Args<'_>) -> Result<Providers> {
53 let default_media_provider = args
54 .server
55 .config
56 .storage_provider
57 .contains_key("media")
58 .is_false()
59 .then(|| {
60 let db_path = args.server.config.database_path.clone();
61 let provider = StorageProviderLocal {
62 create_if_missing: true,
63 base_path: [db_path, "media".into()]
64 .into_iter()
65 .collect::<PathBuf>()
66 .to_string_lossy()
67 .into(),
68
69 ..Default::default()
70 };
71
72 ("media".into(), StorageProvider::local(provider))
73 });
74
75 args.server
76 .config
77 .storage_provider
78 .iter()
79 .chain(
80 default_media_provider
81 .iter()
82 .map(|(name, conf)| (name, conf)),
83 )
84 .filter_map(|(name, conf)| match conf {
85 | StorageProvider::local(conf) => provider::local::new(args, name, conf).transpose(),
86 | StorageProvider::s3(conf) => provider::s3::new(args, name, conf).transpose(),
87 | _ => None,
88 })
89 .collect::<Result<_>>()
90}
91
92#[implement(Service)]
93async fn start_providers(&self) -> Result {
94 self.providers
95 .iter()
96 .map(at!(1))
97 .try_stream()
98 .and_then(Provider::start)
99 .try_collect()
100 .await
101}
102
103#[implement(Service)]
105pub fn provider<'a>(&'a self, id: &'a str) -> Result<&'a Arc<Provider>> {
106 self.providers
107 .get(id)
108 .ok_or_else(|| err!(Request(NotFound(error!("No instance of provider")))))
109}
110
111#[implement(Service)]
113pub fn config<'a>(&'a self, id: &'a str) -> Result<&'a StorageProvider> {
114 self.configs(Some(id))
115 .next()
116 .map(at!(1))
117 .ok_or_else(|| err!(Request(NotFound("No configuration for provider"))))
118}
119
120#[implement(Service)]
122pub fn providers(&self) -> impl Iterator<Item = &Arc<Provider>> + Send + '_ {
123 self.providers.values()
124}
125
126#[implement(Service)]
128pub fn configs<'a, Id>(
129 &'a self,
130 id: Id,
131) -> impl Iterator<Item = (&'a String, &'a StorageProvider)> + Send + 'a
132where
133 Id: Into<Option<&'a str>>,
134{
135 let id = id.into();
136
137 self.services
138 .config
139 .storage_provider
140 .iter()
141 .filter(move |(id_, _)| id.is_none_or(|id| id_.starts_with(id)))
142}