Skip to main content

tuwunel_service/storage/
mod.rs

1pub mod provider;
2
3use std::{collections::BTreeMap, path::PathBuf, sync::Arc};
4
5use async_trait::async_trait;
6use derive_more::Debug;
7use futures::TryStreamExt;
8pub use object_store::{CopyMode, GetResult, GetResultPayload, PutPayload, PutResult};
9use tuwunel_core::{
10	Result, at,
11	config::{StorageProvider, StorageProviderLocal},
12	err, implement,
13	utils::{BoolExt, stream::IterStream},
14};
15
16pub use self::provider::Provider;
17
18#[derive(Debug)]
19pub struct Service {
20	providers: Providers,
21
22	#[debug(skip)]
23	services: Arc<crate::services::OnceServices>,
24}
25
26type Providers = BTreeMap<String, Arc<Provider>>;
27
28#[async_trait]
29impl crate::Service for Service {
30	fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
31		Ok(Arc::new(Self {
32			services: args.services.clone(),
33			providers: Self::build_providers(args)?,
34		}))
35	}
36
37	async fn worker(self: Arc<Self>) -> Result {
38		self.start_providers().await?;
39
40		Ok(())
41	}
42
43	fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
44}
45
46#[implement(Service)]
47#[tracing::instrument(
48	level = "info",
49	err(level = "error")
50	skip_all,
51)]
52fn build_providers(args: &crate::Args<'_>) -> Result<Providers> {
53	let default_media_provider = args
54		.server
55		.config
56		.storage_provider
57		.contains_key("media")
58		.is_false()
59		.then(|| {
60			let db_path = args.server.config.database_path.clone();
61			let provider = StorageProviderLocal {
62				create_if_missing: true,
63				base_path: [db_path, "media".into()]
64					.into_iter()
65					.collect::<PathBuf>()
66					.to_string_lossy()
67					.into(),
68
69				..Default::default()
70			};
71
72			("media".into(), StorageProvider::local(provider))
73		});
74
75	args.server
76		.config
77		.storage_provider
78		.iter()
79		.chain(
80			default_media_provider
81				.iter()
82				.map(|(name, conf)| (name, conf)),
83		)
84		.filter_map(|(name, conf)| match conf {
85			| StorageProvider::local(conf) => provider::local::new(args, name, conf).transpose(),
86			| StorageProvider::s3(conf) => provider::s3::new(args, name, conf).transpose(),
87			| _ => None,
88		})
89		.collect::<Result<_>>()
90}
91
92#[implement(Service)]
93async fn start_providers(&self) -> Result {
94	self.providers
95		.iter()
96		.map(at!(1))
97		.try_stream()
98		.and_then(Provider::start)
99		.try_collect()
100		.await
101}
102
103/// Get the specific storage provider's instance by ID.
104#[implement(Service)]
105pub fn provider<'a>(&'a self, id: &'a str) -> Result<&'a Arc<Provider>> {
106	self.providers
107		.get(id)
108		.ok_or_else(|| err!(Request(NotFound(error!("No instance of provider")))))
109}
110
111/// Get the specific storage provider's configuration by ID.
112#[implement(Service)]
113pub fn config<'a>(&'a self, id: &'a str) -> Result<&'a StorageProvider> {
114	self.configs(Some(id))
115		.next()
116		.map(at!(1))
117		.ok_or_else(|| err!(Request(NotFound("No configuration for provider"))))
118}
119
120/// Iterate the storage provider configurations.
121#[implement(Service)]
122pub fn providers(&self) -> impl Iterator<Item = &Arc<Provider>> + Send + '_ {
123	self.providers.values()
124}
125
126/// Iterate the storage provider configurations.
127#[implement(Service)]
128pub fn configs<'a, Id>(
129	&'a self,
130	id: Id,
131) -> impl Iterator<Item = (&'a String, &'a StorageProvider)> + Send + 'a
132where
133	Id: Into<Option<&'a str>>,
134{
135	let id = id.into();
136
137	self.services
138		.config
139		.storage_provider
140		.iter()
141		.filter(move |(id_, _)| id.is_none_or(|id| id_.starts_with(id)))
142}