hydrant/control/
filter.rs

1use std::sync::Arc;
2
3use miette::{IntoDiagnostic, Result};
4
5use crate::db::filter as db_filter;
6use crate::filter::{FilterMode, SetUpdate};
7use crate::state::AppState;
8
9/// a point-in-time snapshot of the filter configuration. returned by all [`FilterControl`] methods.
10///
11/// because the filter is stored in the database and loaded on demand, this snapshot
12/// may be stale if another caller modifies the filter concurrently. for the authoritative
13/// live config use [`FilterControl::get`].
14#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
15pub struct FilterSnapshot {
16    pub mode: FilterMode,
17    pub signals: Vec<String>,
18    pub collections: Vec<String>,
19    pub excludes: Vec<String>,
20}
21
22/// runtime control over the indexing filter.
23///
24/// the filter has two orthogonal axes:
25///
26/// **mode** controls discovery:
27/// - [`FilterMode::Filter`]: only indexes repos whose firehose commits touch a collection
28///   matching a configured `signal`. explicit [`ReposControl::track`] always works regardless.
29/// - [`FilterMode::Full`]: indexes the entire network. `signals` are ignored for discovery
30///   but `collections` and `excludes` still apply.
31///
32/// **sets** are each independently configurable:
33/// - `signals`: NSID patterns that trigger auto-discovery in `filter` mode (e.g. `app.bsky.feed.post`, `app.bsky.graph.*`)
34/// - `collections`: NSID patterns that filter which records are *stored*. empty means store all.
35/// - `excludes`: DIDs that are always skipped regardless of mode.
36///
37/// NSID patterns support an optional `.*` suffix to match an entire namespace.
38/// all mutations are persisted to the database and take effect immediately.
39#[derive(Clone)]
40pub struct FilterControl(pub(super) Arc<AppState>);
41
42impl FilterControl {
43    /// return the current filter configuration from the database.
44    pub async fn get(&self) -> Result<FilterSnapshot> {
45        let filter_ks = self.0.db.filter.clone();
46        tokio::task::spawn_blocking(move || {
47            let hot = db_filter::load(&filter_ks)?;
48            let excludes = db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX)?;
49            Ok(FilterSnapshot {
50                mode: hot.mode,
51                signals: hot.signals.iter().map(|s| s.to_string()).collect(),
52                collections: hot.collections.iter().map(|s| s.to_string()).collect(),
53                excludes,
54            })
55        })
56        .await
57        .into_diagnostic()?
58    }
59
60    /// set the indexing mode. see [`FilterControl`] for mode semantics.
61    pub fn set_mode(&self, mode: FilterMode) -> FilterPatch {
62        FilterPatch::new(self).set_mode(mode)
63    }
64
65    /// replace the entire signals set. existing signals are removed.
66    pub fn set_signals(&self, signals: impl IntoIterator<Item = impl Into<String>>) -> FilterPatch {
67        FilterPatch::new(self).set_signals(signals)
68    }
69
70    /// add multiple signals without disturbing existing ones.
71    pub fn append_signals(
72        &self,
73        signals: impl IntoIterator<Item = impl Into<String>>,
74    ) -> FilterPatch {
75        FilterPatch::new(self).append_signals(signals)
76    }
77
78    /// add a single signal. no-op if already present.
79    pub fn add_signal(&self, signal: impl Into<String>) -> FilterPatch {
80        FilterPatch::new(self).add_signal(signal)
81    }
82
83    /// remove a single signal. no-op if not present.
84    pub fn remove_signal(&self, signal: impl Into<String>) -> FilterPatch {
85        FilterPatch::new(self).remove_signal(signal)
86    }
87
88    /// replace the entire collections set. pass an empty iterator to store all collections.
89    pub fn set_collections(
90        &self,
91        collections: impl IntoIterator<Item = impl Into<String>>,
92    ) -> FilterPatch {
93        FilterPatch::new(self).set_collections(collections)
94    }
95
96    /// add multiple collections without disturbing existing ones.
97    pub fn append_collections(
98        &self,
99        collections: impl IntoIterator<Item = impl Into<String>>,
100    ) -> FilterPatch {
101        FilterPatch::new(self).append_collections(collections)
102    }
103
104    /// add a single collection filter. no-op if already present.
105    pub fn add_collection(&self, collection: impl Into<String>) -> FilterPatch {
106        FilterPatch::new(self).add_collection(collection)
107    }
108
109    /// remove a single collection filter. no-op if not present.
110    pub fn remove_collection(&self, collection: impl Into<String>) -> FilterPatch {
111        FilterPatch::new(self).remove_collection(collection)
112    }
113
114    /// replace the entire excludes set.
115    pub fn set_excludes(
116        &self,
117        excludes: impl IntoIterator<Item = impl Into<String>>,
118    ) -> FilterPatch {
119        FilterPatch::new(self).set_excludes(excludes)
120    }
121
122    /// add multiple DIDs to the excludes set without disturbing existing ones.
123    pub fn append_excludes(
124        &self,
125        excludes: impl IntoIterator<Item = impl Into<String>>,
126    ) -> FilterPatch {
127        FilterPatch::new(self).append_excludes(excludes)
128    }
129
130    /// add a single DID to the excludes set. no-op if already excluded.
131    pub fn add_exclude(&self, did: impl Into<String>) -> FilterPatch {
132        FilterPatch::new(self).add_exclude(did)
133    }
134
135    /// remove a single DID from the excludes set. no-op if not present.
136    pub fn remove_exclude(&self, did: impl Into<String>) -> FilterPatch {
137        FilterPatch::new(self).remove_exclude(did)
138    }
139}
140
141/// a staged set of filter mutations. all methods accumulate changes without touching
142/// the database. call [`FilterPatch::apply`] to commit the entire patch atomically.
143///
144/// obtain an instance by calling any mutation method on [`FilterControl`], or via
145/// [`FilterPatch::new`] to start from a blank patch.
146pub struct FilterPatch {
147    state: Arc<AppState>,
148    /// if set, replaces the current indexing mode.
149    pub mode: Option<FilterMode>,
150    /// if set, replaces or patches the signals set.
151    pub(crate) signals: Option<SetUpdate>,
152    /// if set, replaces or patches the collections set.
153    pub(crate) collections: Option<SetUpdate>,
154    /// if set, replaces or patches the excludes set.
155    pub(crate) excludes: Option<SetUpdate>,
156}
157
158impl FilterPatch {
159    /// create a new blank patch associated with the given [`FilterControl`].
160    pub fn new(control: &FilterControl) -> Self {
161        Self {
162            state: control.0.clone(),
163            mode: None,
164            signals: None,
165            collections: None,
166            excludes: None,
167        }
168    }
169
170    /// set the indexing mode. see [`FilterControl`] for mode semantics.
171    pub fn set_mode(mut self, mode: FilterMode) -> Self {
172        self.mode = Some(mode);
173        self
174    }
175
176    /// replace the entire signals set. existing signals are removed.
177    pub fn set_signals(mut self, signals: impl IntoIterator<Item = impl Into<String>>) -> Self {
178        self.signals = Some(SetUpdate::Set(
179            signals.into_iter().map(Into::into).collect(),
180        ));
181        self
182    }
183
184    /// add multiple signals without disturbing existing ones.
185    pub fn append_signals(mut self, signals: impl IntoIterator<Item = impl Into<String>>) -> Self {
186        self.signals = Some(SetUpdate::Patch(
187            signals.into_iter().map(|s| (s.into(), true)).collect(),
188        ));
189        self
190    }
191
192    /// add a single signal. no-op if already present.
193    pub fn add_signal(mut self, signal: impl Into<String>) -> Self {
194        self.signals = Some(SetUpdate::Patch([(signal.into(), true)].into()));
195        self
196    }
197
198    /// remove a single signal. no-op if not present.
199    pub fn remove_signal(mut self, signal: impl Into<String>) -> Self {
200        self.signals = Some(SetUpdate::Patch([(signal.into(), false)].into()));
201        self
202    }
203
204    /// replace the entire collections set. pass an empty iterator to store all collections.
205    pub fn set_collections(
206        mut self,
207        collections: impl IntoIterator<Item = impl Into<String>>,
208    ) -> Self {
209        self.collections = Some(SetUpdate::Set(
210            collections.into_iter().map(Into::into).collect(),
211        ));
212        self
213    }
214
215    /// add multiple collections without disturbing existing ones.
216    pub fn append_collections(
217        mut self,
218        collections: impl IntoIterator<Item = impl Into<String>>,
219    ) -> Self {
220        self.collections = Some(SetUpdate::Patch(
221            collections.into_iter().map(|c| (c.into(), true)).collect(),
222        ));
223        self
224    }
225
226    /// add a single collection filter. no-op if already present.
227    pub fn add_collection(mut self, collection: impl Into<String>) -> Self {
228        self.collections = Some(SetUpdate::Patch([(collection.into(), true)].into()));
229        self
230    }
231
232    /// remove a single collection filter. no-op if not present.
233    pub fn remove_collection(mut self, collection: impl Into<String>) -> Self {
234        self.collections = Some(SetUpdate::Patch([(collection.into(), false)].into()));
235        self
236    }
237
238    /// replace the entire excludes set.
239    pub fn set_excludes(mut self, excludes: impl IntoIterator<Item = impl Into<String>>) -> Self {
240        self.excludes = Some(SetUpdate::Set(
241            excludes.into_iter().map(Into::into).collect(),
242        ));
243        self
244    }
245
246    /// add multiple DIDs to the excludes set without disturbing existing ones.
247    pub fn append_excludes(
248        mut self,
249        excludes: impl IntoIterator<Item = impl Into<String>>,
250    ) -> Self {
251        self.excludes = Some(SetUpdate::Patch(
252            excludes.into_iter().map(|d| (d.into(), true)).collect(),
253        ));
254        self
255    }
256
257    /// add a single DID to the excludes set. no-op if already excluded.
258    pub fn add_exclude(mut self, did: impl Into<String>) -> Self {
259        self.excludes = Some(SetUpdate::Patch([(did.into(), true)].into()));
260        self
261    }
262
263    /// remove a single DID from the excludes set. no-op if not present.
264    pub fn remove_exclude(mut self, did: impl Into<String>) -> Self {
265        self.excludes = Some(SetUpdate::Patch([(did.into(), false)].into()));
266        self
267    }
268
269    /// commit the patch atomically to the database and update the in-memory filter.
270    /// returns the updated [`FilterSnapshot`].
271    pub async fn apply(self) -> Result<FilterSnapshot> {
272        let filter_ks = self.state.db.filter.clone();
273        let inner = self.state.db.inner.clone();
274        let filter_handle = self.state.filter.clone();
275        let mode = self.mode;
276        let signals = self.signals;
277        let collections = self.collections;
278        let excludes = self.excludes;
279
280        let new_filter = tokio::task::spawn_blocking(move || {
281            let mut batch = inner.batch();
282            db_filter::apply_patch(&mut batch, &filter_ks, mode, signals, collections, excludes)?;
283            batch.commit().into_diagnostic()?;
284            db_filter::load(&filter_ks)
285        })
286        .await
287        .into_diagnostic()??;
288
289        let exclude_list = {
290            let filter_ks = self.state.db.filter.clone();
291            tokio::task::spawn_blocking(move || {
292                db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX)
293            })
294            .await
295            .into_diagnostic()??
296        };
297
298        let snapshot = FilterSnapshot {
299            mode: new_filter.mode,
300            signals: new_filter.signals.iter().map(|s| s.to_string()).collect(),
301            collections: new_filter
302                .collections
303                .iter()
304                .map(|s| s.to_string())
305                .collect(),
306            excludes: exclude_list,
307        };
308
309        filter_handle.store(Arc::new(new_filter));
310        Ok(snapshot)
311    }
312}