hydrant/control/
filter.rs1use std::sync::Arc;
2
3use miette::{IntoDiagnostic, Result};
4
5use crate::db::filter as db_filter;
6use crate::filter::{FilterMode, SetUpdate};
7use crate::state::AppState;
8
9#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
15pub struct FilterSnapshot {
16 pub mode: FilterMode,
17 pub signals: Vec<String>,
18 pub collections: Vec<String>,
19 pub excludes: Vec<String>,
20}
21
22#[derive(Clone)]
40pub struct FilterControl(pub(super) Arc<AppState>);
41
42impl FilterControl {
43 pub async fn get(&self) -> Result<FilterSnapshot> {
45 let filter_ks = self.0.db.filter.clone();
46 tokio::task::spawn_blocking(move || {
47 let hot = db_filter::load(&filter_ks)?;
48 let excludes = db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX)?;
49 Ok(FilterSnapshot {
50 mode: hot.mode,
51 signals: hot.signals.iter().map(|s| s.to_string()).collect(),
52 collections: hot.collections.iter().map(|s| s.to_string()).collect(),
53 excludes,
54 })
55 })
56 .await
57 .into_diagnostic()?
58 }
59
60 pub fn set_mode(&self, mode: FilterMode) -> FilterPatch {
62 FilterPatch::new(self).set_mode(mode)
63 }
64
65 pub fn set_signals(&self, signals: impl IntoIterator<Item = impl Into<String>>) -> FilterPatch {
67 FilterPatch::new(self).set_signals(signals)
68 }
69
70 pub fn append_signals(
72 &self,
73 signals: impl IntoIterator<Item = impl Into<String>>,
74 ) -> FilterPatch {
75 FilterPatch::new(self).append_signals(signals)
76 }
77
78 pub fn add_signal(&self, signal: impl Into<String>) -> FilterPatch {
80 FilterPatch::new(self).add_signal(signal)
81 }
82
83 pub fn remove_signal(&self, signal: impl Into<String>) -> FilterPatch {
85 FilterPatch::new(self).remove_signal(signal)
86 }
87
88 pub fn set_collections(
90 &self,
91 collections: impl IntoIterator<Item = impl Into<String>>,
92 ) -> FilterPatch {
93 FilterPatch::new(self).set_collections(collections)
94 }
95
96 pub fn append_collections(
98 &self,
99 collections: impl IntoIterator<Item = impl Into<String>>,
100 ) -> FilterPatch {
101 FilterPatch::new(self).append_collections(collections)
102 }
103
104 pub fn add_collection(&self, collection: impl Into<String>) -> FilterPatch {
106 FilterPatch::new(self).add_collection(collection)
107 }
108
109 pub fn remove_collection(&self, collection: impl Into<String>) -> FilterPatch {
111 FilterPatch::new(self).remove_collection(collection)
112 }
113
114 pub fn set_excludes(
116 &self,
117 excludes: impl IntoIterator<Item = impl Into<String>>,
118 ) -> FilterPatch {
119 FilterPatch::new(self).set_excludes(excludes)
120 }
121
122 pub fn append_excludes(
124 &self,
125 excludes: impl IntoIterator<Item = impl Into<String>>,
126 ) -> FilterPatch {
127 FilterPatch::new(self).append_excludes(excludes)
128 }
129
130 pub fn add_exclude(&self, did: impl Into<String>) -> FilterPatch {
132 FilterPatch::new(self).add_exclude(did)
133 }
134
135 pub fn remove_exclude(&self, did: impl Into<String>) -> FilterPatch {
137 FilterPatch::new(self).remove_exclude(did)
138 }
139}
140
141pub struct FilterPatch {
147 state: Arc<AppState>,
148 pub mode: Option<FilterMode>,
150 pub(crate) signals: Option<SetUpdate>,
152 pub(crate) collections: Option<SetUpdate>,
154 pub(crate) excludes: Option<SetUpdate>,
156}
157
158impl FilterPatch {
159 pub fn new(control: &FilterControl) -> Self {
161 Self {
162 state: control.0.clone(),
163 mode: None,
164 signals: None,
165 collections: None,
166 excludes: None,
167 }
168 }
169
170 pub fn set_mode(mut self, mode: FilterMode) -> Self {
172 self.mode = Some(mode);
173 self
174 }
175
176 pub fn set_signals(mut self, signals: impl IntoIterator<Item = impl Into<String>>) -> Self {
178 self.signals = Some(SetUpdate::Set(
179 signals.into_iter().map(Into::into).collect(),
180 ));
181 self
182 }
183
184 pub fn append_signals(mut self, signals: impl IntoIterator<Item = impl Into<String>>) -> Self {
186 self.signals = Some(SetUpdate::Patch(
187 signals.into_iter().map(|s| (s.into(), true)).collect(),
188 ));
189 self
190 }
191
192 pub fn add_signal(mut self, signal: impl Into<String>) -> Self {
194 self.signals = Some(SetUpdate::Patch([(signal.into(), true)].into()));
195 self
196 }
197
198 pub fn remove_signal(mut self, signal: impl Into<String>) -> Self {
200 self.signals = Some(SetUpdate::Patch([(signal.into(), false)].into()));
201 self
202 }
203
204 pub fn set_collections(
206 mut self,
207 collections: impl IntoIterator<Item = impl Into<String>>,
208 ) -> Self {
209 self.collections = Some(SetUpdate::Set(
210 collections.into_iter().map(Into::into).collect(),
211 ));
212 self
213 }
214
215 pub fn append_collections(
217 mut self,
218 collections: impl IntoIterator<Item = impl Into<String>>,
219 ) -> Self {
220 self.collections = Some(SetUpdate::Patch(
221 collections.into_iter().map(|c| (c.into(), true)).collect(),
222 ));
223 self
224 }
225
226 pub fn add_collection(mut self, collection: impl Into<String>) -> Self {
228 self.collections = Some(SetUpdate::Patch([(collection.into(), true)].into()));
229 self
230 }
231
232 pub fn remove_collection(mut self, collection: impl Into<String>) -> Self {
234 self.collections = Some(SetUpdate::Patch([(collection.into(), false)].into()));
235 self
236 }
237
238 pub fn set_excludes(mut self, excludes: impl IntoIterator<Item = impl Into<String>>) -> Self {
240 self.excludes = Some(SetUpdate::Set(
241 excludes.into_iter().map(Into::into).collect(),
242 ));
243 self
244 }
245
246 pub fn append_excludes(
248 mut self,
249 excludes: impl IntoIterator<Item = impl Into<String>>,
250 ) -> Self {
251 self.excludes = Some(SetUpdate::Patch(
252 excludes.into_iter().map(|d| (d.into(), true)).collect(),
253 ));
254 self
255 }
256
257 pub fn add_exclude(mut self, did: impl Into<String>) -> Self {
259 self.excludes = Some(SetUpdate::Patch([(did.into(), true)].into()));
260 self
261 }
262
263 pub fn remove_exclude(mut self, did: impl Into<String>) -> Self {
265 self.excludes = Some(SetUpdate::Patch([(did.into(), false)].into()));
266 self
267 }
268
269 pub async fn apply(self) -> Result<FilterSnapshot> {
272 let filter_ks = self.state.db.filter.clone();
273 let inner = self.state.db.inner.clone();
274 let filter_handle = self.state.filter.clone();
275 let mode = self.mode;
276 let signals = self.signals;
277 let collections = self.collections;
278 let excludes = self.excludes;
279
280 let new_filter = tokio::task::spawn_blocking(move || {
281 let mut batch = inner.batch();
282 db_filter::apply_patch(&mut batch, &filter_ks, mode, signals, collections, excludes)?;
283 batch.commit().into_diagnostic()?;
284 db_filter::load(&filter_ks)
285 })
286 .await
287 .into_diagnostic()??;
288
289 let exclude_list = {
290 let filter_ks = self.state.db.filter.clone();
291 tokio::task::spawn_blocking(move || {
292 db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX)
293 })
294 .await
295 .into_diagnostic()??
296 };
297
298 let snapshot = FilterSnapshot {
299 mode: new_filter.mode,
300 signals: new_filter.signals.iter().map(|s| s.to_string()).collect(),
301 collections: new_filter
302 .collections
303 .iter()
304 .map(|s| s.to_string())
305 .collect(),
306 excludes: exclude_list,
307 };
308
309 filter_handle.store(Arc::new(new_filter));
310 Ok(snapshot)
311 }
312}