hydrant/control/
filter.rs1use std::sync::Arc;
2use tracing::error;
3
4use miette::{IntoDiagnostic, Result};
5
6use crate::db::filter as db_filter;
7use crate::filter::FilterMode;
8use crate::patch::SetUpdate;
9use crate::state::AppState;
10
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct FilterSnapshot {
18 pub mode: FilterMode,
19 pub signals: Vec<String>,
20 pub collections: Vec<String>,
21 pub excludes: Vec<String>,
22}
23
24#[derive(Clone)]
42pub struct FilterControl(pub(super) Arc<AppState>);
43
44impl FilterControl {
45 pub async fn get(&self) -> Result<FilterSnapshot> {
47 let filter_ks = self.0.db.filter.clone();
48 tokio::task::spawn_blocking(move || {
49 let hot = db_filter::load(&filter_ks)?;
50 let excludes = db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX)?;
51 Ok(FilterSnapshot {
52 mode: hot.mode,
53 signals: hot.signals.iter().map(|s| s.to_string()).collect(),
54 collections: hot.collections.iter().map(|s| s.to_string()).collect(),
55 excludes,
56 })
57 })
58 .await
59 .into_diagnostic()?
60 }
61
62 pub fn set_mode(&self, mode: FilterMode) -> FilterPatch {
64 FilterPatch::new(self).set_mode(mode)
65 }
66
67 pub fn set_signals(&self, signals: impl IntoIterator<Item = impl Into<String>>) -> FilterPatch {
69 FilterPatch::new(self).set_signals(signals)
70 }
71
72 pub fn append_signals(
74 &self,
75 signals: impl IntoIterator<Item = impl Into<String>>,
76 ) -> FilterPatch {
77 FilterPatch::new(self).append_signals(signals)
78 }
79
80 pub fn add_signal(&self, signal: impl Into<String>) -> FilterPatch {
82 FilterPatch::new(self).add_signal(signal)
83 }
84
85 pub fn remove_signal(&self, signal: impl Into<String>) -> FilterPatch {
87 FilterPatch::new(self).remove_signal(signal)
88 }
89
90 pub fn set_collections(
92 &self,
93 collections: impl IntoIterator<Item = impl Into<String>>,
94 ) -> FilterPatch {
95 FilterPatch::new(self).set_collections(collections)
96 }
97
98 pub fn append_collections(
100 &self,
101 collections: impl IntoIterator<Item = impl Into<String>>,
102 ) -> FilterPatch {
103 FilterPatch::new(self).append_collections(collections)
104 }
105
106 pub fn add_collection(&self, collection: impl Into<String>) -> FilterPatch {
108 FilterPatch::new(self).add_collection(collection)
109 }
110
111 pub fn remove_collection(&self, collection: impl Into<String>) -> FilterPatch {
113 FilterPatch::new(self).remove_collection(collection)
114 }
115
116 pub fn set_excludes(
118 &self,
119 excludes: impl IntoIterator<Item = impl Into<String>>,
120 ) -> FilterPatch {
121 FilterPatch::new(self).set_excludes(excludes)
122 }
123
124 pub fn append_excludes(
126 &self,
127 excludes: impl IntoIterator<Item = impl Into<String>>,
128 ) -> FilterPatch {
129 FilterPatch::new(self).append_excludes(excludes)
130 }
131
132 pub fn add_exclude(&self, did: impl Into<String>) -> FilterPatch {
134 FilterPatch::new(self).add_exclude(did)
135 }
136
137 pub fn remove_exclude(&self, did: impl Into<String>) -> FilterPatch {
139 FilterPatch::new(self).remove_exclude(did)
140 }
141}
142
143pub struct FilterPatch {
149 state: Arc<AppState>,
150 pub mode: Option<FilterMode>,
152 pub(crate) signals: Option<SetUpdate>,
154 pub(crate) collections: Option<SetUpdate>,
156 pub(crate) excludes: Option<SetUpdate>,
158}
159
160impl FilterPatch {
161 pub fn new(control: &FilterControl) -> Self {
163 Self {
164 state: control.0.clone(),
165 mode: None,
166 signals: None,
167 collections: None,
168 excludes: None,
169 }
170 }
171
172 pub fn set_mode(mut self, mode: FilterMode) -> Self {
174 self.mode = Some(mode);
175 self
176 }
177
178 pub fn set_signals(mut self, signals: impl IntoIterator<Item = impl Into<String>>) -> Self {
180 self.signals = Some(SetUpdate::Set(
181 signals.into_iter().map(Into::into).collect(),
182 ));
183 self
184 }
185
186 pub fn append_signals(mut self, signals: impl IntoIterator<Item = impl Into<String>>) -> Self {
188 self.signals = Some(SetUpdate::Patch(
189 signals.into_iter().map(|s| (s.into(), true)).collect(),
190 ));
191 self
192 }
193
194 pub fn add_signal(mut self, signal: impl Into<String>) -> Self {
196 self.signals = Some(SetUpdate::Patch([(signal.into(), true)].into()));
197 self
198 }
199
200 pub fn remove_signal(mut self, signal: impl Into<String>) -> Self {
202 self.signals = Some(SetUpdate::Patch([(signal.into(), false)].into()));
203 self
204 }
205
206 pub fn set_collections(
208 mut self,
209 collections: impl IntoIterator<Item = impl Into<String>>,
210 ) -> Self {
211 self.collections = Some(SetUpdate::Set(
212 collections.into_iter().map(Into::into).collect(),
213 ));
214 self
215 }
216
217 pub fn append_collections(
219 mut self,
220 collections: impl IntoIterator<Item = impl Into<String>>,
221 ) -> Self {
222 self.collections = Some(SetUpdate::Patch(
223 collections.into_iter().map(|c| (c.into(), true)).collect(),
224 ));
225 self
226 }
227
228 pub fn add_collection(mut self, collection: impl Into<String>) -> Self {
230 self.collections = Some(SetUpdate::Patch([(collection.into(), true)].into()));
231 self
232 }
233
234 pub fn remove_collection(mut self, collection: impl Into<String>) -> Self {
236 self.collections = Some(SetUpdate::Patch([(collection.into(), false)].into()));
237 self
238 }
239
240 pub fn set_excludes(mut self, excludes: impl IntoIterator<Item = impl Into<String>>) -> Self {
242 self.excludes = Some(SetUpdate::Set(
243 excludes.into_iter().map(Into::into).collect(),
244 ));
245 self
246 }
247
248 pub fn append_excludes(
250 mut self,
251 excludes: impl IntoIterator<Item = impl Into<String>>,
252 ) -> Self {
253 self.excludes = Some(SetUpdate::Patch(
254 excludes.into_iter().map(|d| (d.into(), true)).collect(),
255 ));
256 self
257 }
258
259 pub fn add_exclude(mut self, did: impl Into<String>) -> Self {
261 self.excludes = Some(SetUpdate::Patch([(did.into(), true)].into()));
262 self
263 }
264
265 pub fn remove_exclude(mut self, did: impl Into<String>) -> Self {
267 self.excludes = Some(SetUpdate::Patch([(did.into(), false)].into()));
268 self
269 }
270
271 pub async fn apply(self) -> Result<FilterSnapshot> {
274 let filter_ks = self.state.db.filter.clone();
275 let inner = self.state.db.inner.clone();
276 let filter_handle = self.state.filter.clone();
277 let state = self.state.clone();
278 let mode = self.mode;
279 let signals = self.signals;
280 let collections = self.collections;
281 let excludes = self.excludes;
282
283 let new_filter = tokio::task::spawn_blocking(move || {
284 let mut batch = inner.batch();
285 db_filter::apply_patch(&mut batch, &filter_ks, mode, signals, collections, excludes)?;
286 batch.commit().into_diagnostic()?;
287 state.db.persist()?;
288 db_filter::load(&filter_ks)
289 })
290 .await
291 .into_diagnostic()?
292 .map_err(|e| {
293 error!(err = %e, "failed to apply filter patch");
294 e
295 })?;
296
297 let exclude_list = {
298 let filter_ks = self.state.db.filter.clone();
299 tokio::task::spawn_blocking(move || {
300 db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX)
301 })
302 .await
303 .into_diagnostic()??
304 };
305
306 let snapshot = FilterSnapshot {
307 mode: new_filter.mode,
308 signals: new_filter.signals.iter().map(|s| s.to_string()).collect(),
309 collections: new_filter
310 .collections
311 .iter()
312 .map(|s| s.to_string())
313 .collect(),
314 excludes: exclude_list,
315 };
316
317 filter_handle.store(Arc::new(new_filter));
318 Ok(snapshot)
319 }
320}