Skip to main content

tuwunel_database/engine/
events.rs

1use rocksdb::{
2	Env,
3	event_listener::{
4		CompactionJobInfo, DBBackgroundErrorReason, DBWriteStallCondition, EventListener,
5		FlushJobInfo, IngestionInfo, MemTableInfo, MutableStatus, SubcompactionJobInfo,
6		WriteStallInfo,
7	},
8};
9use tuwunel_core::{Config, debug, debug::INFO_SPAN_LEVEL, debug_info, error, info, warn};
10
11pub(super) struct Events;
12
13impl Events {
14	pub(super) fn new(_config: &Config, _env: &Env) -> Self { Self {} }
15}
16
17impl EventListener for Events {
18	#[tracing::instrument(name = "error", level = "error", skip_all)]
19	fn on_background_error(&self, reason: DBBackgroundErrorReason, _status: MutableStatus) {
20		error!(error = ?reason, "Critical RocksDB Error");
21	}
22
23	#[tracing::instrument(name = "stall", level = "warn", skip_all)]
24	fn on_stall_conditions_changed(&self, info: &WriteStallInfo) {
25		let col = info.cf_name();
26		let col = col
27			.as_deref()
28			.map(str::from_utf8)
29			.expect("column has a name")
30			.expect("column name is valid utf8");
31
32		let prev = info.prev();
33		match info.cur() {
34			| DBWriteStallCondition::KStopped => {
35				error!(?col, ?prev, "Database Stalled");
36			},
37			| DBWriteStallCondition::KDelayed if prev == DBWriteStallCondition::KStopped => {
38				warn!(?col, ?prev, "Database Stall Recovering");
39			},
40			| DBWriteStallCondition::KDelayed => {
41				warn!(?col, ?prev, "Database Stalling");
42			},
43			| DBWriteStallCondition::KNormal
44				if prev == DBWriteStallCondition::KStopped
45					|| prev == DBWriteStallCondition::KDelayed =>
46			{
47				info!(?col, ?prev, "Database Stall Recovered");
48			},
49			| DBWriteStallCondition::KNormal => {
50				debug!(?col, ?prev, "Database Normal");
51			},
52		}
53	}
54
55	#[tracing::instrument(
56		name = "compaction",
57		level = INFO_SPAN_LEVEL,
58		skip_all,
59	)]
60	fn on_compaction_begin(&self, info: &CompactionJobInfo) {
61		let col = info.cf_name();
62		let col = col
63			.as_deref()
64			.map(str::from_utf8)
65			.expect("column has a name")
66			.expect("column name is valid utf8");
67
68		let level = (info.base_input_level(), info.output_level());
69		let records = (info.input_records(), info.output_records());
70		let bytes = (info.total_input_bytes(), info.total_output_bytes());
71		let files = (
72			info.input_file_count(),
73			info.output_file_count(),
74			info.num_input_files_at_output_level(),
75		);
76
77		debug!(
78			status = ?info.status(),
79			?level,
80			?files,
81			?records,
82			?bytes,
83			micros = info.elapsed_micros(),
84			errs = info.num_corrupt_keys(),
85			reason = ?info.compaction_reason(),
86			?col,
87			"Compaction Starting",
88		);
89	}
90
91	#[tracing::instrument(
92		name = "compaction",
93		level = INFO_SPAN_LEVEL,
94		skip_all,
95	)]
96	fn on_compaction_completed(&self, info: &CompactionJobInfo) {
97		let col = info.cf_name();
98		let col = col
99			.as_deref()
100			.map(str::from_utf8)
101			.expect("column has a name")
102			.expect("column name is valid utf8");
103
104		let level = (info.base_input_level(), info.output_level());
105		let records = (info.input_records(), info.output_records());
106		let bytes = (info.total_input_bytes(), info.total_output_bytes());
107		let files = (
108			info.input_file_count(),
109			info.output_file_count(),
110			info.num_input_files_at_output_level(),
111		);
112
113		debug_info!(
114			status = ?info.status(),
115			?level,
116			?files,
117			?records,
118			?bytes,
119			micros = info.elapsed_micros(),
120			errs = info.num_corrupt_keys(),
121			reason = ?info.compaction_reason(),
122			?col,
123			"Compaction Complete",
124		);
125	}
126
127	#[tracing::instrument(name = "compaction", level = "debug", skip_all)]
128	fn on_subcompaction_begin(&self, info: &SubcompactionJobInfo) {
129		let col = info.cf_name();
130		let col = col
131			.as_deref()
132			.map(str::from_utf8)
133			.expect("column has a name")
134			.expect("column name is valid utf8");
135
136		let level = (info.base_input_level(), info.output_level());
137
138		debug!(
139			status = ?info.status(),
140			?level,
141			tid = info.thread_id(),
142			reason = ?info.compaction_reason(),
143			?col,
144			"Compaction Starting",
145		);
146	}
147
148	#[tracing::instrument(name = "compaction", level = "debug", skip_all)]
149	fn on_subcompaction_completed(&self, info: &SubcompactionJobInfo) {
150		let col = info.cf_name();
151		let col = col
152			.as_deref()
153			.map(str::from_utf8)
154			.expect("column has a name")
155			.expect("column name is valid utf8");
156
157		let level = (info.base_input_level(), info.output_level());
158
159		debug!(
160			status = ?info.status(),
161			?level,
162			tid = info.thread_id(),
163			reason = ?info.compaction_reason(),
164			?col,
165			"Compaction Complete",
166		);
167	}
168
169	#[tracing::instrument(
170		name = "flush",
171		level = INFO_SPAN_LEVEL,
172		skip_all,
173	)]
174	fn on_flush_begin(&self, info: &FlushJobInfo) {
175		let col = info.cf_name();
176		let col = col
177			.as_deref()
178			.map(str::from_utf8)
179			.expect("column has a name")
180			.expect("column name is valid utf8");
181
182		debug!(
183			seq_start = info.smallest_seqno(),
184			seq_end = info.largest_seqno(),
185			slow = info.triggered_writes_slowdown(),
186			stop = info.triggered_writes_stop(),
187			reason = ?info.flush_reason(),
188			?col,
189			"Flush Starting",
190		);
191	}
192
193	#[tracing::instrument(
194		name = "flush",
195		level = INFO_SPAN_LEVEL,
196		skip_all,
197	)]
198	fn on_flush_completed(&self, info: &FlushJobInfo) {
199		let col = info.cf_name();
200		let col = col
201			.as_deref()
202			.map(str::from_utf8)
203			.expect("column has a name")
204			.expect("column name is valid utf8");
205
206		debug_info!(
207			seq_start = info.smallest_seqno(),
208			seq_end = info.largest_seqno(),
209			slow = info.triggered_writes_slowdown(),
210			stop = info.triggered_writes_stop(),
211			reason = ?info.flush_reason(),
212			?col,
213			"Flush Complete",
214		);
215	}
216
217	#[tracing::instrument(
218		name = "memtable",
219		level = INFO_SPAN_LEVEL,
220		skip_all,
221	)]
222	fn on_memtable_sealed(&self, info: &MemTableInfo) {
223		let col = info.cf_name();
224		let col = col
225			.as_deref()
226			.map(str::from_utf8)
227			.expect("column has a name")
228			.expect("column name is valid utf8");
229
230		debug_info!(
231			seq_first = info.first_seqno(),
232			seq_early = info.earliest_seqno(),
233			ents = info.num_entries(),
234			dels = info.num_deletes(),
235			?col,
236			"Buffer Filled",
237		);
238	}
239
240	fn on_external_file_ingested(&self, _info: &IngestionInfo) {
241		unimplemented!();
242	}
243}