tuwunel_database/engine/
open.rs1use std::{
2 collections::BTreeSet,
3 path::Path,
4 sync::{Arc, atomic::AtomicU32},
5};
6
7use rocksdb::{ColumnFamilyDescriptor, Options};
8use tuwunel_core::{
9 Result, debug, debug_warn, implement, info, itertools::Itertools, trace, warn,
10};
11
12use super::{
13 Db, Engine, cf_opts::cf_options, context, db_opts::db_options, descriptor,
14 descriptor::Descriptor, repair::repair,
15};
16use crate::{Context, or_else};
17
18#[implement(Engine)]
19#[tracing::instrument(skip_all)]
20pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<Self>> {
21 let server = &ctx.server;
22 let config = &server.config;
23 let path = &config.database_path;
24
25 context::before_open(&ctx, path)?;
26 let db_opts = db_options(
27 config,
28 &ctx.env.lock().expect("environment locked"),
29 &ctx.row_cache.lock().expect("row cache locked"),
30 )?;
31
32 let (cfds, dropped) = Self::configure_cfds(&ctx, &db_opts, desc)?;
33 let num_cfds = cfds.len();
34 debug!("Configured {num_cfds} column descriptors...");
35
36 let load_time = std::time::Instant::now();
37 if config.rocksdb_repair {
38 repair(&db_opts, &config.database_path)?;
39 }
40
41 debug!("Opening database...");
42 let db = if config.rocksdb_read_only {
43 Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false)
44 } else if config.rocksdb_secondary {
45 Db::open_cf_descriptors_as_secondary(&db_opts, path, path, cfds)
46 } else {
47 Db::open_cf_descriptors(&db_opts, path, cfds)
48 }
49 .or_else(or_else)?;
50
51 if !config.rocksdb_read_only && !config.rocksdb_secondary {
52 for name in &dropped {
53 debug!("Deleting dropped column {name:?} ...");
54 db.drop_cf(name).or_else(or_else)?;
55 }
56 }
57
58 info!(
59 columns = num_cfds,
60 sequence = %db.latest_sequence_number(),
61 time = ?load_time.elapsed(),
62 "Opened database."
63 );
64
65 Ok(Arc::new(Self {
66 db,
67 pool: ctx.pool.clone(),
68 ctx: ctx.clone(),
69 read_only: config.rocksdb_read_only,
70 secondary: config.rocksdb_secondary,
71 checksums: config.rocksdb_checksums,
72 corks: AtomicU32::new(0),
73 }))
74}
75
76#[implement(Engine)]
77#[tracing::instrument(name = "configure", skip_all)]
78fn configure_cfds(
79 ctx: &Arc<Context>,
80 db_opts: &Options,
81 desc: &[Descriptor],
82) -> Result<(Vec<ColumnFamilyDescriptor>, Vec<String>)> {
83 let server = &ctx.server;
84 let config = &server.config;
85 let path = &config.database_path;
86 let existing = Self::discover_cfs(path, db_opts);
87
88 let missing = existing
90 .iter()
91 .map(String::as_str)
92 .filter(|&name| name != "default")
93 .filter(|&name| !desc.iter().any(|desc| desc.name == name));
94
95 let creating = desc
97 .iter()
98 .filter(|desc| !desc.dropped)
99 .filter(|desc| !existing.contains(desc.name));
100
101 let dropping = desc
103 .iter()
104 .filter(|desc| desc.dropped)
105 .filter(|desc| existing.contains(desc.name))
106 .filter(|_| !config.rocksdb_never_drop_columns);
107
108 let dropped = desc
110 .iter()
111 .filter(|desc| desc.dropped)
112 .filter(|desc| !existing.contains(desc.name));
113
114 debug!(
115 existing = existing.len(),
116 described = desc.len(),
117 missing = missing.clone().count(),
118 dropped = dropped.clone().count(),
119 creating = creating.clone().count(),
120 dropping = dropping.clone().count(),
121 "Discovered database columns"
122 );
123
124 missing.clone().for_each(|name| {
125 debug_warn!("Found undescribed column {name:?} in existing database.");
126 });
127
128 dropped
129 .clone()
130 .map(|desc| desc.name)
131 .for_each(|name| {
132 debug!("Previously dropped column {name:?} no longer found in database.");
133 });
134
135 creating
136 .clone()
137 .map(|desc| desc.name)
138 .for_each(|name| {
139 debug!("Creating new column {name:?} not previously found in existing database.");
140 });
141
142 dropping
143 .clone()
144 .map(|desc| desc.name)
145 .for_each(|name| {
146 warn!(
147 "Column {name:?} has been scheduled for deletion. Storage may not appear \
148 reclaimed until further restart or compaction."
149 );
150 });
151
152 let not_dropped = |desc: &&Descriptor| {
153 !dropped
154 .clone()
155 .any(|dropped| desc.name == dropped.name)
156 };
157
158 let dropping = dropping
159 .map(|desc| desc.name)
160 .map(ToOwned::to_owned)
161 .collect();
162
163 let cfnames = desc
164 .iter()
165 .filter(not_dropped)
166 .map(|desc| desc.name)
167 .chain(missing.clone());
168
169 let cfds: Vec<_> = desc
170 .iter()
171 .filter(not_dropped)
172 .copied()
173 .chain(missing.map(|_| descriptor::IGNORED))
174 .zip(cfnames)
175 .inspect(|&(desc, name)| {
176 assert!(
177 desc.ignored || desc.name == name,
178 "{name:?} does not match descriptor {:?}",
179 desc.name
180 );
181 })
182 .inspect(|&(_, name)| debug!(name, "Described column"))
183 .map(|(desc, name)| (desc, name.to_owned()))
184 .map(|(desc, name)| Ok((name, cf_options(ctx, db_opts.clone(), &desc)?)))
185 .map_ok(|(name, opts)| ColumnFamilyDescriptor::new(name, opts))
186 .collect::<Result<_>>()?;
187
188 trace!(?dropping);
189 Ok((cfds, dropping))
190}
191
192#[implement(Engine)]
193#[tracing::instrument(name = "discover", skip_all)]
194fn discover_cfs(path: &Path, opts: &Options) -> BTreeSet<String> {
195 Db::list_cf(opts, path)
196 .unwrap_or_default()
197 .into_iter()
198 .collect::<BTreeSet<_>>()
199}