1use rocksdb::{
2 Env,
3 event_listener::{
4 CompactionJobInfo, DBBackgroundErrorReason, DBWriteStallCondition, EventListener,
5 FlushJobInfo, IngestionInfo, MemTableInfo, MutableStatus, SubcompactionJobInfo,
6 WriteStallInfo,
7 },
8};
9use tuwunel_core::{Config, debug, debug::INFO_SPAN_LEVEL, debug_info, error, info, warn};
10
11pub(super) struct Events;
12
13impl Events {
14 pub(super) fn new(_config: &Config, _env: &Env) -> Self { Self {} }
15}
16
17impl EventListener for Events {
18 #[tracing::instrument(name = "error", level = "error", skip_all)]
19 fn on_background_error(&self, reason: DBBackgroundErrorReason, _status: MutableStatus) {
20 error!(error = ?reason, "Critical RocksDB Error");
21 }
22
23 #[tracing::instrument(name = "stall", level = "warn", skip_all)]
24 fn on_stall_conditions_changed(&self, info: &WriteStallInfo) {
25 let col = info.cf_name();
26 let col = col
27 .as_deref()
28 .map(str::from_utf8)
29 .expect("column has a name")
30 .expect("column name is valid utf8");
31
32 let prev = info.prev();
33 match info.cur() {
34 | DBWriteStallCondition::KStopped => {
35 error!(?col, ?prev, "Database Stalled");
36 },
37 | DBWriteStallCondition::KDelayed if prev == DBWriteStallCondition::KStopped => {
38 warn!(?col, ?prev, "Database Stall Recovering");
39 },
40 | DBWriteStallCondition::KDelayed => {
41 warn!(?col, ?prev, "Database Stalling");
42 },
43 | DBWriteStallCondition::KNormal
44 if prev == DBWriteStallCondition::KStopped
45 || prev == DBWriteStallCondition::KDelayed =>
46 {
47 info!(?col, ?prev, "Database Stall Recovered");
48 },
49 | DBWriteStallCondition::KNormal => {
50 debug!(?col, ?prev, "Database Normal");
51 },
52 }
53 }
54
55 #[tracing::instrument(
56 name = "compaction",
57 level = INFO_SPAN_LEVEL,
58 skip_all,
59 )]
60 fn on_compaction_begin(&self, info: &CompactionJobInfo) {
61 let col = info.cf_name();
62 let col = col
63 .as_deref()
64 .map(str::from_utf8)
65 .expect("column has a name")
66 .expect("column name is valid utf8");
67
68 let level = (info.base_input_level(), info.output_level());
69 let records = (info.input_records(), info.output_records());
70 let bytes = (info.total_input_bytes(), info.total_output_bytes());
71 let files = (
72 info.input_file_count(),
73 info.output_file_count(),
74 info.num_input_files_at_output_level(),
75 );
76
77 debug!(
78 status = ?info.status(),
79 ?level,
80 ?files,
81 ?records,
82 ?bytes,
83 micros = info.elapsed_micros(),
84 errs = info.num_corrupt_keys(),
85 reason = ?info.compaction_reason(),
86 ?col,
87 "Compaction Starting",
88 );
89 }
90
91 #[tracing::instrument(
92 name = "compaction",
93 level = INFO_SPAN_LEVEL,
94 skip_all,
95 )]
96 fn on_compaction_completed(&self, info: &CompactionJobInfo) {
97 let col = info.cf_name();
98 let col = col
99 .as_deref()
100 .map(str::from_utf8)
101 .expect("column has a name")
102 .expect("column name is valid utf8");
103
104 let level = (info.base_input_level(), info.output_level());
105 let records = (info.input_records(), info.output_records());
106 let bytes = (info.total_input_bytes(), info.total_output_bytes());
107 let files = (
108 info.input_file_count(),
109 info.output_file_count(),
110 info.num_input_files_at_output_level(),
111 );
112
113 debug_info!(
114 status = ?info.status(),
115 ?level,
116 ?files,
117 ?records,
118 ?bytes,
119 micros = info.elapsed_micros(),
120 errs = info.num_corrupt_keys(),
121 reason = ?info.compaction_reason(),
122 ?col,
123 "Compaction Complete",
124 );
125 }
126
127 #[tracing::instrument(name = "compaction", level = "debug", skip_all)]
128 fn on_subcompaction_begin(&self, info: &SubcompactionJobInfo) {
129 let col = info.cf_name();
130 let col = col
131 .as_deref()
132 .map(str::from_utf8)
133 .expect("column has a name")
134 .expect("column name is valid utf8");
135
136 let level = (info.base_input_level(), info.output_level());
137
138 debug!(
139 status = ?info.status(),
140 ?level,
141 tid = info.thread_id(),
142 reason = ?info.compaction_reason(),
143 ?col,
144 "Compaction Starting",
145 );
146 }
147
148 #[tracing::instrument(name = "compaction", level = "debug", skip_all)]
149 fn on_subcompaction_completed(&self, info: &SubcompactionJobInfo) {
150 let col = info.cf_name();
151 let col = col
152 .as_deref()
153 .map(str::from_utf8)
154 .expect("column has a name")
155 .expect("column name is valid utf8");
156
157 let level = (info.base_input_level(), info.output_level());
158
159 debug!(
160 status = ?info.status(),
161 ?level,
162 tid = info.thread_id(),
163 reason = ?info.compaction_reason(),
164 ?col,
165 "Compaction Complete",
166 );
167 }
168
169 #[tracing::instrument(
170 name = "flush",
171 level = INFO_SPAN_LEVEL,
172 skip_all,
173 )]
174 fn on_flush_begin(&self, info: &FlushJobInfo) {
175 let col = info.cf_name();
176 let col = col
177 .as_deref()
178 .map(str::from_utf8)
179 .expect("column has a name")
180 .expect("column name is valid utf8");
181
182 debug!(
183 seq_start = info.smallest_seqno(),
184 seq_end = info.largest_seqno(),
185 slow = info.triggered_writes_slowdown(),
186 stop = info.triggered_writes_stop(),
187 reason = ?info.flush_reason(),
188 ?col,
189 "Flush Starting",
190 );
191 }
192
193 #[tracing::instrument(
194 name = "flush",
195 level = INFO_SPAN_LEVEL,
196 skip_all,
197 )]
198 fn on_flush_completed(&self, info: &FlushJobInfo) {
199 let col = info.cf_name();
200 let col = col
201 .as_deref()
202 .map(str::from_utf8)
203 .expect("column has a name")
204 .expect("column name is valid utf8");
205
206 debug_info!(
207 seq_start = info.smallest_seqno(),
208 seq_end = info.largest_seqno(),
209 slow = info.triggered_writes_slowdown(),
210 stop = info.triggered_writes_stop(),
211 reason = ?info.flush_reason(),
212 ?col,
213 "Flush Complete",
214 );
215 }
216
217 #[tracing::instrument(
218 name = "memtable",
219 level = INFO_SPAN_LEVEL,
220 skip_all,
221 )]
222 fn on_memtable_sealed(&self, info: &MemTableInfo) {
223 let col = info.cf_name();
224 let col = col
225 .as_deref()
226 .map(str::from_utf8)
227 .expect("column has a name")
228 .expect("column name is valid utf8");
229
230 debug_info!(
231 seq_first = info.first_seqno(),
232 seq_early = info.earliest_seqno(),
233 ents = info.num_entries(),
234 dels = info.num_deletes(),
235 ?col,
236 "Buffer Filled",
237 );
238 }
239
240 fn on_external_file_ingested(&self, _info: &IngestionInfo) {
241 unimplemented!();
242 }
243}