Skip to main content

tuwunel_database/engine/
open.rs

1use std::{
2	collections::BTreeSet,
3	path::Path,
4	sync::{Arc, atomic::AtomicU32},
5};
6
7use rocksdb::{ColumnFamilyDescriptor, Options};
8use tuwunel_core::{
9	Result, debug, debug_warn, implement, info, itertools::Itertools, trace, warn,
10};
11
12use super::{
13	Db, Engine, cf_opts::cf_options, context, db_opts::db_options, descriptor,
14	descriptor::Descriptor, repair::repair,
15};
16use crate::{Context, or_else};
17
18#[implement(Engine)]
19#[tracing::instrument(skip_all)]
20pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<Self>> {
21	let server = &ctx.server;
22	let config = &server.config;
23	let path = &config.database_path;
24
25	context::before_open(&ctx, path)?;
26	let db_opts = db_options(
27		config,
28		&ctx.env.lock().expect("environment locked"),
29		&ctx.row_cache.lock().expect("row cache locked"),
30	)?;
31
32	let (cfds, dropped) = Self::configure_cfds(&ctx, &db_opts, desc)?;
33	let num_cfds = cfds.len();
34	debug!("Configured {num_cfds} column descriptors...");
35
36	let load_time = std::time::Instant::now();
37	if config.rocksdb_repair {
38		repair(&db_opts, &config.database_path)?;
39	}
40
41	debug!("Opening database...");
42	let db = if config.rocksdb_read_only {
43		Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false)
44	} else if config.rocksdb_secondary {
45		Db::open_cf_descriptors_as_secondary(&db_opts, path, path, cfds)
46	} else {
47		Db::open_cf_descriptors(&db_opts, path, cfds)
48	}
49	.or_else(or_else)?;
50
51	if !config.rocksdb_read_only && !config.rocksdb_secondary {
52		for name in &dropped {
53			debug!("Deleting dropped column {name:?} ...");
54			db.drop_cf(name).or_else(or_else)?;
55		}
56	}
57
58	info!(
59		columns = num_cfds,
60		sequence = %db.latest_sequence_number(),
61		time = ?load_time.elapsed(),
62		"Opened database."
63	);
64
65	Ok(Arc::new(Self {
66		db,
67		pool: ctx.pool.clone(),
68		ctx: ctx.clone(),
69		read_only: config.rocksdb_read_only,
70		secondary: config.rocksdb_secondary,
71		checksums: config.rocksdb_checksums,
72		corks: AtomicU32::new(0),
73	}))
74}
75
76#[implement(Engine)]
77#[tracing::instrument(name = "configure", skip_all)]
78fn configure_cfds(
79	ctx: &Arc<Context>,
80	db_opts: &Options,
81	desc: &[Descriptor],
82) -> Result<(Vec<ColumnFamilyDescriptor>, Vec<String>)> {
83	let server = &ctx.server;
84	let config = &server.config;
85	let path = &config.database_path;
86	let existing = Self::discover_cfs(path, db_opts);
87
88	// Found columns which are not described.
89	let missing = existing
90		.iter()
91		.map(String::as_str)
92		.filter(|&name| name != "default")
93		.filter(|&name| !desc.iter().any(|desc| desc.name == name));
94
95	// Described columns which are not found.
96	let creating = desc
97		.iter()
98		.filter(|desc| !desc.dropped)
99		.filter(|desc| !existing.contains(desc.name));
100
101	// Found columns which are described as dropped.
102	let dropping = desc
103		.iter()
104		.filter(|desc| desc.dropped)
105		.filter(|desc| existing.contains(desc.name))
106		.filter(|_| !config.rocksdb_never_drop_columns);
107
108	// Described dropped columns which are no longer found.
109	let dropped = desc
110		.iter()
111		.filter(|desc| desc.dropped)
112		.filter(|desc| !existing.contains(desc.name));
113
114	debug!(
115		existing = existing.len(),
116		described = desc.len(),
117		missing = missing.clone().count(),
118		dropped = dropped.clone().count(),
119		creating = creating.clone().count(),
120		dropping = dropping.clone().count(),
121		"Discovered database columns"
122	);
123
124	missing.clone().for_each(|name| {
125		debug_warn!("Found undescribed column {name:?} in existing database.");
126	});
127
128	dropped
129		.clone()
130		.map(|desc| desc.name)
131		.for_each(|name| {
132			debug!("Previously dropped column {name:?} no longer found in database.");
133		});
134
135	creating
136		.clone()
137		.map(|desc| desc.name)
138		.for_each(|name| {
139			debug!("Creating new column {name:?} not previously found in existing database.");
140		});
141
142	dropping
143		.clone()
144		.map(|desc| desc.name)
145		.for_each(|name| {
146			warn!(
147				"Column {name:?} has been scheduled for deletion. Storage may not appear \
148				 reclaimed until further restart or compaction."
149			);
150		});
151
152	let not_dropped = |desc: &&Descriptor| {
153		!dropped
154			.clone()
155			.any(|dropped| desc.name == dropped.name)
156	};
157
158	let dropping = dropping
159		.map(|desc| desc.name)
160		.map(ToOwned::to_owned)
161		.collect();
162
163	let cfnames = desc
164		.iter()
165		.filter(not_dropped)
166		.map(|desc| desc.name)
167		.chain(missing.clone());
168
169	let cfds: Vec<_> = desc
170		.iter()
171		.filter(not_dropped)
172		.copied()
173		.chain(missing.map(|_| descriptor::IGNORED))
174		.zip(cfnames)
175		.inspect(|&(desc, name)| {
176			assert!(
177				desc.ignored || desc.name == name,
178				"{name:?} does not match descriptor {:?}",
179				desc.name
180			);
181		})
182		.inspect(|&(_, name)| debug!(name, "Described column"))
183		.map(|(desc, name)| (desc, name.to_owned()))
184		.map(|(desc, name)| Ok((name, cf_options(ctx, db_opts.clone(), &desc)?)))
185		.map_ok(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
186		.collect::<Result<_>>()?;
187
188	trace!(?dropping);
189	Ok((cfds, dropping))
190}
191
192#[implement(Engine)]
193#[tracing::instrument(name = "discover", skip_all)]
194fn discover_cfs(path: &Path, opts: &Options) -> BTreeSet<String> {
195	Db::list_cf(opts, path)
196		.unwrap_or_default()
197		.into_iter()
198		.collect::<BTreeSet<_>>()
199}