hydrant/control/
filter.rs

1use std::sync::Arc;
2use tracing::error;
3
4use miette::{IntoDiagnostic, Result};
5
6use crate::db::filter as db_filter;
7use crate::filter::FilterMode;
8use crate::patch::SetUpdate;
9use crate::state::AppState;
10
11/// a point-in-time snapshot of the filter configuration. returned by all [`FilterControl`] methods.
12///
13/// because the filter is stored in the database and loaded on demand, this snapshot
14/// may be stale if another caller modifies the filter concurrently. for the authoritative
15/// live config use [`FilterControl::get`].
16#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct FilterSnapshot {
18    pub mode: FilterMode,
19    pub signals: Vec<String>,
20    pub collections: Vec<String>,
21    pub excludes: Vec<String>,
22}
23
24/// runtime control over the indexing filter.
25///
26/// the filter has two orthogonal axes:
27///
28/// **mode** controls discovery:
29/// - [`FilterMode::Filter`]: only indexes repos whose firehose commits touch a collection
30///   matching a configured `signal`. explicit [`ReposControl::track`] always works regardless.
31/// - [`FilterMode::Full`]: indexes the entire network. `signals` are ignored for discovery
32///   but `collections` and `excludes` still apply.
33///
34/// **sets** are each independently configurable:
35/// - `signals`: NSID patterns that trigger auto-discovery in `filter` mode (e.g. `app.bsky.feed.post`, `app.bsky.graph.*`)
36/// - `collections`: NSID patterns that filter which records are *stored*. empty means store all.
37/// - `excludes`: DIDs that are always skipped regardless of mode.
38///
39/// NSID patterns support an optional `.*` suffix to match an entire namespace.
40/// all mutations are persisted to the database and take effect immediately.
41#[derive(Clone)]
42pub struct FilterControl(pub(super) Arc<AppState>);
43
44impl FilterControl {
45    /// return the current filter configuration from the database.
46    pub async fn get(&self) -> Result<FilterSnapshot> {
47        let filter_ks = self.0.db.filter.clone();
48        tokio::task::spawn_blocking(move || {
49            let hot = db_filter::load(&filter_ks)?;
50            let excludes = db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX)?;
51            Ok(FilterSnapshot {
52                mode: hot.mode,
53                signals: hot.signals.iter().map(|s| s.to_string()).collect(),
54                collections: hot.collections.iter().map(|s| s.to_string()).collect(),
55                excludes,
56            })
57        })
58        .await
59        .into_diagnostic()?
60    }
61
62    /// set the indexing mode. see [`FilterControl`] for mode semantics.
63    pub fn set_mode(&self, mode: FilterMode) -> FilterPatch {
64        FilterPatch::new(self).set_mode(mode)
65    }
66
67    /// replace the entire signals set. existing signals are removed.
68    pub fn set_signals(&self, signals: impl IntoIterator<Item = impl Into<String>>) -> FilterPatch {
69        FilterPatch::new(self).set_signals(signals)
70    }
71
72    /// add multiple signals without disturbing existing ones.
73    pub fn append_signals(
74        &self,
75        signals: impl IntoIterator<Item = impl Into<String>>,
76    ) -> FilterPatch {
77        FilterPatch::new(self).append_signals(signals)
78    }
79
80    /// add a single signal. no-op if already present.
81    pub fn add_signal(&self, signal: impl Into<String>) -> FilterPatch {
82        FilterPatch::new(self).add_signal(signal)
83    }
84
85    /// remove a single signal. no-op if not present.
86    pub fn remove_signal(&self, signal: impl Into<String>) -> FilterPatch {
87        FilterPatch::new(self).remove_signal(signal)
88    }
89
90    /// replace the entire collections set. pass an empty iterator to store all collections.
91    pub fn set_collections(
92        &self,
93        collections: impl IntoIterator<Item = impl Into<String>>,
94    ) -> FilterPatch {
95        FilterPatch::new(self).set_collections(collections)
96    }
97
98    /// add multiple collections without disturbing existing ones.
99    pub fn append_collections(
100        &self,
101        collections: impl IntoIterator<Item = impl Into<String>>,
102    ) -> FilterPatch {
103        FilterPatch::new(self).append_collections(collections)
104    }
105
106    /// add a single collection filter. no-op if already present.
107    pub fn add_collection(&self, collection: impl Into<String>) -> FilterPatch {
108        FilterPatch::new(self).add_collection(collection)
109    }
110
111    /// remove a single collection filter. no-op if not present.
112    pub fn remove_collection(&self, collection: impl Into<String>) -> FilterPatch {
113        FilterPatch::new(self).remove_collection(collection)
114    }
115
116    /// replace the entire excludes set.
117    pub fn set_excludes(
118        &self,
119        excludes: impl IntoIterator<Item = impl Into<String>>,
120    ) -> FilterPatch {
121        FilterPatch::new(self).set_excludes(excludes)
122    }
123
124    /// add multiple DIDs to the excludes set without disturbing existing ones.
125    pub fn append_excludes(
126        &self,
127        excludes: impl IntoIterator<Item = impl Into<String>>,
128    ) -> FilterPatch {
129        FilterPatch::new(self).append_excludes(excludes)
130    }
131
132    /// add a single DID to the excludes set. no-op if already excluded.
133    pub fn add_exclude(&self, did: impl Into<String>) -> FilterPatch {
134        FilterPatch::new(self).add_exclude(did)
135    }
136
137    /// remove a single DID from the excludes set. no-op if not present.
138    pub fn remove_exclude(&self, did: impl Into<String>) -> FilterPatch {
139        FilterPatch::new(self).remove_exclude(did)
140    }
141}
142
143/// a staged set of filter mutations. all methods accumulate changes without touching
144/// the database. call [`FilterPatch::apply`] to commit the entire patch atomically.
145///
146/// obtain an instance by calling any mutation method on [`FilterControl`], or via
147/// [`FilterPatch::new`] to start from a blank patch.
148pub struct FilterPatch {
149    state: Arc<AppState>,
150    /// if set, replaces the current indexing mode.
151    pub mode: Option<FilterMode>,
152    /// if set, replaces or patches the signals set.
153    pub(crate) signals: Option<SetUpdate>,
154    /// if set, replaces or patches the collections set.
155    pub(crate) collections: Option<SetUpdate>,
156    /// if set, replaces or patches the excludes set.
157    pub(crate) excludes: Option<SetUpdate>,
158}
159
160impl FilterPatch {
161    /// create a new blank patch associated with the given [`FilterControl`].
162    pub fn new(control: &FilterControl) -> Self {
163        Self {
164            state: control.0.clone(),
165            mode: None,
166            signals: None,
167            collections: None,
168            excludes: None,
169        }
170    }
171
172    /// set the indexing mode. see [`FilterControl`] for mode semantics.
173    pub fn set_mode(mut self, mode: FilterMode) -> Self {
174        self.mode = Some(mode);
175        self
176    }
177
178    /// replace the entire signals set. existing signals are removed.
179    pub fn set_signals(mut self, signals: impl IntoIterator<Item = impl Into<String>>) -> Self {
180        self.signals = Some(SetUpdate::Set(
181            signals.into_iter().map(Into::into).collect(),
182        ));
183        self
184    }
185
186    /// add multiple signals without disturbing existing ones.
187    pub fn append_signals(mut self, signals: impl IntoIterator<Item = impl Into<String>>) -> Self {
188        self.signals = Some(SetUpdate::Patch(
189            signals.into_iter().map(|s| (s.into(), true)).collect(),
190        ));
191        self
192    }
193
194    /// add a single signal. no-op if already present.
195    pub fn add_signal(mut self, signal: impl Into<String>) -> Self {
196        self.signals = Some(SetUpdate::Patch([(signal.into(), true)].into()));
197        self
198    }
199
200    /// remove a single signal. no-op if not present.
201    pub fn remove_signal(mut self, signal: impl Into<String>) -> Self {
202        self.signals = Some(SetUpdate::Patch([(signal.into(), false)].into()));
203        self
204    }
205
206    /// replace the entire collections set. pass an empty iterator to store all collections.
207    pub fn set_collections(
208        mut self,
209        collections: impl IntoIterator<Item = impl Into<String>>,
210    ) -> Self {
211        self.collections = Some(SetUpdate::Set(
212            collections.into_iter().map(Into::into).collect(),
213        ));
214        self
215    }
216
217    /// add multiple collections without disturbing existing ones.
218    pub fn append_collections(
219        mut self,
220        collections: impl IntoIterator<Item = impl Into<String>>,
221    ) -> Self {
222        self.collections = Some(SetUpdate::Patch(
223            collections.into_iter().map(|c| (c.into(), true)).collect(),
224        ));
225        self
226    }
227
228    /// add a single collection filter. no-op if already present.
229    pub fn add_collection(mut self, collection: impl Into<String>) -> Self {
230        self.collections = Some(SetUpdate::Patch([(collection.into(), true)].into()));
231        self
232    }
233
234    /// remove a single collection filter. no-op if not present.
235    pub fn remove_collection(mut self, collection: impl Into<String>) -> Self {
236        self.collections = Some(SetUpdate::Patch([(collection.into(), false)].into()));
237        self
238    }
239
240    /// replace the entire excludes set.
241    pub fn set_excludes(mut self, excludes: impl IntoIterator<Item = impl Into<String>>) -> Self {
242        self.excludes = Some(SetUpdate::Set(
243            excludes.into_iter().map(Into::into).collect(),
244        ));
245        self
246    }
247
248    /// add multiple DIDs to the excludes set without disturbing existing ones.
249    pub fn append_excludes(
250        mut self,
251        excludes: impl IntoIterator<Item = impl Into<String>>,
252    ) -> Self {
253        self.excludes = Some(SetUpdate::Patch(
254            excludes.into_iter().map(|d| (d.into(), true)).collect(),
255        ));
256        self
257    }
258
259    /// add a single DID to the excludes set. no-op if already excluded.
260    pub fn add_exclude(mut self, did: impl Into<String>) -> Self {
261        self.excludes = Some(SetUpdate::Patch([(did.into(), true)].into()));
262        self
263    }
264
265    /// remove a single DID from the excludes set. no-op if not present.
266    pub fn remove_exclude(mut self, did: impl Into<String>) -> Self {
267        self.excludes = Some(SetUpdate::Patch([(did.into(), false)].into()));
268        self
269    }
270
271    /// commit the patch atomically to the database and update the in-memory filter.
272    /// returns the updated [`FilterSnapshot`].
273    pub async fn apply(self) -> Result<FilterSnapshot> {
274        let filter_ks = self.state.db.filter.clone();
275        let inner = self.state.db.inner.clone();
276        let filter_handle = self.state.filter.clone();
277        let state = self.state.clone();
278        let mode = self.mode;
279        let signals = self.signals;
280        let collections = self.collections;
281        let excludes = self.excludes;
282
283        let new_filter = tokio::task::spawn_blocking(move || {
284            let mut batch = inner.batch();
285            db_filter::apply_patch(&mut batch, &filter_ks, mode, signals, collections, excludes)?;
286            batch.commit().into_diagnostic()?;
287            state.db.persist()?;
288            db_filter::load(&filter_ks)
289        })
290        .await
291        .into_diagnostic()?
292        .map_err(|e| {
293            error!(err = %e, "failed to apply filter patch");
294            e
295        })?;
296
297        let exclude_list = {
298            let filter_ks = self.state.db.filter.clone();
299            tokio::task::spawn_blocking(move || {
300                db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX)
301            })
302            .await
303            .into_diagnostic()??
304        };
305
306        let snapshot = FilterSnapshot {
307            mode: new_filter.mode,
308            signals: new_filter.signals.iter().map(|s| s.to_string()).collect(),
309            collections: new_filter
310                .collections
311                .iter()
312                .map(|s| s.to_string())
313                .collect(),
314            excludes: exclude_list,
315        };
316
317        filter_handle.store(Arc::new(new_filter));
318        Ok(snapshot)
319    }
320}