hydrant/control/
mod.rs

1mod crawler;
2mod filter;
3mod firehose;
4mod repos;
5mod stream;
6
7pub use crawler::{CrawlerHandle, CrawlerSourceInfo};
8pub use filter::{FilterControl, FilterPatch, FilterSnapshot};
9pub use firehose::{FirehoseHandle, FirehoseSourceInfo};
10pub(crate) use repos::repo_state_to_info;
11pub use repos::{ListedRecord, Record, RecordList, RepoHandle, RepoInfo, ReposControl};
12
13use std::collections::BTreeMap;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::task::{Context, Poll};
19
20use futures::{FutureExt, Stream};
21use miette::{IntoDiagnostic, Result};
22use tokio::sync::{mpsc, watch};
23use tracing::{debug, error, info};
24
25use crate::backfill::BackfillWorker;
26use crate::config::{Config, SignatureVerification};
27use crate::db::{
28    self, filter as db_filter, load_persisted_crawler_sources, load_persisted_firehose_sources,
29};
30use crate::filter::FilterMode;
31use crate::ingest::worker::FirehoseWorker;
32use crate::state::AppState;
33use crate::types::MarshallableEvt;
34
35use crawler::{CrawlerShared, spawn_crawler_producer};
36use firehose::{FirehoseShared, spawn_firehose_ingestor};
37use stream::event_stream_thread;
38
39/// an event emitted by the hydrant event stream.
40///
41/// three variants are possible depending on the `type` field:
42/// - `"record"`: a repo record was created, updated, or deleted. carries a [`RecordEvt`].
43/// - `"identity"`: a DID's handle or PDS changed. carries an [`IdentityEvt`]. ephemeral, not replayable.
44/// - `"account"`: a repo's active/inactive status changed. carries an [`AccountEvt`]. ephemeral, not replayable.
45///
46/// the `id` field is a monotonically increasing sequence number usable as a cursor for [`Hydrant::subscribe`].
47pub type Event = MarshallableEvt<'static>;
48
49/// the top-level handle to a hydrant instance.
50///
51/// `Hydrant` is cheaply cloneable. all sub-handles share the same underlying state.
52/// construct it via [`Hydrant::new`] or [`Hydrant::from_env`], configure the filter
53/// and repos as needed, then call [`Hydrant::run`] to start all background components.
54///
55/// # example
56///
57/// ```rust,no_run
58/// use hydrant::control::Hydrant;
59///
60/// #[tokio::main]
61/// async fn main() -> miette::Result<()> {
62///     let hydrant = Hydrant::from_env().await?;
63///
64///     tokio::select! {
65///         r = hydrant.run()?        => r,
66///         r = hydrant.serve(3000)  => r,
67///     }
68/// }
69/// ```
70#[derive(Clone)]
71pub struct Hydrant {
72    pub crawler: CrawlerHandle,
73    pub firehose: FirehoseHandle,
74    pub backfill: BackfillHandle,
75    pub filter: FilterControl,
76    pub repos: ReposControl,
77    pub db: DbControl,
78    #[cfg(feature = "backlinks")]
79    pub backlinks: crate::backlinks::BacklinksControl,
80    pub(crate) state: Arc<AppState>,
81    config: Arc<Config>,
82    started: Arc<AtomicBool>,
83    _priv: (),
84}
85
86impl Hydrant {
87    /// open the database and configure hydrant from `config`.
88    ///
89    /// this sets up the database, applies any filter configuration from `config`, and
90    /// initializes all sub-handles. no background tasks are started yet: call
91    /// [`run`](Self::run) to start all components and drive the instance.
92    pub async fn new(config: Config) -> Result<Self> {
93        info!("{config}");
94
95        // 1. open database and construct AppState
96        let state = AppState::new(&config)?;
97
98        // 2. apply any filter config from env variables
99        if config.full_network
100            || config.filter_signals.is_some()
101            || config.filter_collections.is_some()
102            || config.filter_excludes.is_some()
103        {
104            let filter_ks = state.db.filter.clone();
105            let inner = state.db.inner.clone();
106            let mode = config.full_network.then_some(FilterMode::Full);
107            let signals = config
108                .filter_signals
109                .clone()
110                .map(crate::filter::SetUpdate::Set);
111            let collections = config
112                .filter_collections
113                .clone()
114                .map(crate::filter::SetUpdate::Set);
115            let excludes = config
116                .filter_excludes
117                .clone()
118                .map(crate::filter::SetUpdate::Set);
119
120            tokio::task::spawn_blocking(move || {
121                let mut batch = inner.batch();
122                db_filter::apply_patch(
123                    &mut batch,
124                    &filter_ks,
125                    mode,
126                    signals,
127                    collections,
128                    excludes,
129                )?;
130                batch.commit().into_diagnostic()
131            })
132            .await
133            .into_diagnostic()??;
134
135            // 3. reload the live filter into the hot-path arc-swap
136            let new_filter = tokio::task::spawn_blocking({
137                let filter_ks = state.db.filter.clone();
138                move || db_filter::load(&filter_ks)
139            })
140            .await
141            .into_diagnostic()??;
142            state.filter.store(Arc::new(new_filter));
143        }
144
145        // 4. set crawler enabled state from config, evaluated against the post-patch filter
146        let post_patch_crawler = match config.enable_crawler {
147            Some(b) => b,
148            None => {
149                state.filter.load().mode == FilterMode::Full || !config.crawler_sources.is_empty()
150            }
151        };
152        state.crawler_enabled.send_replace(post_patch_crawler);
153
154        let state = Arc::new(state);
155
156        Ok(Self {
157            crawler: CrawlerHandle {
158                state: state.clone(),
159                shared: Arc::new(std::sync::OnceLock::new()),
160                tasks: Arc::new(scc::HashMap::new()),
161                persisted: Arc::new(scc::HashSet::new()),
162            },
163            firehose: FirehoseHandle {
164                state: state.clone(),
165                shared: Arc::new(std::sync::OnceLock::new()),
166                tasks: Arc::new(scc::HashMap::new()),
167                persisted: Arc::new(scc::HashSet::new()),
168            },
169            backfill: BackfillHandle(state.clone()),
170            filter: FilterControl(state.clone()),
171            repos: ReposControl(state.clone()),
172            db: DbControl(state.clone()),
173            #[cfg(feature = "backlinks")]
174            backlinks: crate::backlinks::BacklinksControl(state.clone()),
175            state,
176            config: Arc::new(config),
177            started: Arc::new(AtomicBool::new(false)),
178            _priv: (),
179        })
180    }
181
182    /// reads config from environment variables and calls [`Hydrant::new`].
183    pub async fn from_env() -> Result<Self> {
184        Self::new(Config::from_env()?).await
185    }
186
187    /// start all background components and return a future that resolves when any
188    /// fatal component exits.
189    ///
190    /// starts the backfill worker, firehose ingestors, crawler, and worker thread.
191    /// resolves with `Ok(())` if a fatal component exits cleanly, or `Err(e)` if it
192    /// fails. intended for use in `tokio::select!` alongside [`serve`](Self::serve).
193    ///
194    /// returns an error if called more than once on the same `Hydrant` instance.
195    pub fn run(&self) -> Result<impl Future<Output = Result<()>>> {
196        let state = self.state.clone();
197        let config = self.config.clone();
198        let crawler = self.crawler.clone();
199        let firehose = self.firehose.clone();
200
201        if self.started.swap(true, Ordering::SeqCst) {
202            miette::bail!("Hydrant::run() called more than once");
203        }
204
205        let fut = async move {
206            // internal buffered channel between ingestors / backfill and the firehose worker
207            let (buffer_tx, buffer_rx) = mpsc::unbounded_channel();
208
209            // 5. spawn the backfill worker
210            tokio::spawn({
211                let state = state.clone();
212                BackfillWorker::new(
213                    state.clone(),
214                    buffer_tx.clone(),
215                    config.repo_fetch_timeout,
216                    config.backfill_concurrency_limit,
217                    matches!(
218                        config.verify_signatures,
219                        SignatureVerification::Full | SignatureVerification::BackfillOnly
220                    ),
221                    config.ephemeral,
222                    state.backfill_enabled.subscribe(),
223                )
224                .run()
225            });
226
227            // 6. re-queue any repos that lost their backfill state, then start the retry worker
228            if let Err(e) = tokio::task::spawn_blocking({
229                let state = state.clone();
230                move || crate::backfill::manager::queue_gone_backfills(&state)
231            })
232            .await
233            .into_diagnostic()?
234            {
235                error!(err = %e, "failed to queue gone backfills");
236                db::check_poisoned_report(&e);
237            }
238
239            std::thread::spawn({
240                let state = state.clone();
241                move || crate::backfill::manager::retry_worker(state)
242            });
243
244            // 7. ephemeral GC thread
245            if config.ephemeral {
246                let state = state.clone();
247                std::thread::Builder::new()
248                    .name("ephemeral-gc".into())
249                    .spawn(move || crate::db::ephemeral::ephemeral_ttl_worker(state))
250                    .into_diagnostic()?;
251            }
252
253            // 8. cursor / counts persist thread
254            std::thread::spawn({
255                let state = state.clone();
256                let persist_interval = config.cursor_save_interval;
257                move || loop {
258                    std::thread::sleep(persist_interval);
259
260                    state.relay_cursors.iter_sync(|relay, cursor| {
261                        let seq = cursor.load(Ordering::SeqCst);
262                        if seq > 0 {
263                            if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) {
264                                error!(relay = %relay, err = %e, "failed to save cursor");
265                                db::check_poisoned_report(&e);
266                            }
267                        }
268                        true
269                    });
270
271                    if let Err(e) = db::persist_counts(&state.db) {
272                        error!(err = %e, "failed to persist counts");
273                        db::check_poisoned_report(&e);
274                    }
275
276                    if let Err(e) = state.db.persist() {
277                        error!(err = %e, "db persist failed");
278                        db::check_poisoned_report(&e);
279                    }
280                }
281            });
282
283            // 9. events/sec stats ticker
284            tokio::spawn({
285                let state = state.clone();
286                let mut last_id = state.db.next_event_id.load(Ordering::Relaxed);
287                let mut last_time = std::time::Instant::now();
288                let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
289                async move {
290                    loop {
291                        interval.tick().await;
292
293                        let current_id = state.db.next_event_id.load(Ordering::Relaxed);
294                        let current_time = std::time::Instant::now();
295                        let delta = current_id.saturating_sub(last_id);
296
297                        if delta == 0 {
298                            debug!("no new events in 60s");
299                            continue;
300                        }
301
302                        let elapsed = current_time.duration_since(last_time).as_secs_f64();
303                        let rate = if elapsed > 0.0 {
304                            delta as f64 / elapsed
305                        } else {
306                            0.0
307                        };
308                        info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)");
309
310                        last_id = current_id;
311                        last_time = current_time;
312                    }
313                }
314            });
315
316            let (fatal_tx_inner, mut fatal_rx) = watch::channel(None);
317            let fatal_tx = Arc::new(fatal_tx_inner);
318
319            info!(
320                crawler_enabled = *state.crawler_enabled.borrow(),
321                firehose_enabled = *state.firehose_enabled.borrow(),
322                filter_mode = ?state.filter.load().mode,
323                "starting ingestion"
324            );
325
326            // 10. set shared and spawn firehose ingestors
327            firehose
328                .shared
329                .set(FirehoseShared {
330                    buffer_tx: buffer_tx.clone(),
331                    verify_signatures: matches!(
332                        config.verify_signatures,
333                        SignatureVerification::Full
334                    ),
335                })
336                .ok()
337                .expect("firehose shared already set");
338            let fire_shared = firehose.shared.get().unwrap();
339
340            let relay_hosts = config.relays.clone();
341            if !relay_hosts.is_empty() {
342                info!(
343                    relay_count = relay_hosts.len(),
344                    hosts = relay_hosts
345                        .iter()
346                        .map(|h| h.as_str())
347                        .collect::<Vec<_>>()
348                        .join(", "),
349                    "starting firehose ingestor(s)"
350                );
351                for relay_url in &relay_hosts {
352                    let enabled_rx = state.firehose_enabled.subscribe();
353                    let handle =
354                        spawn_firehose_ingestor(relay_url, &state, fire_shared, enabled_rx).await?;
355                    let _ = firehose.tasks.insert_async(relay_url.clone(), handle).await;
356                }
357            }
358
359            let persisted_relay_urls = tokio::task::spawn_blocking({
360                let state = state.clone();
361                move || load_persisted_firehose_sources(&state.db)
362            })
363            .await
364            .into_diagnostic()??;
365
366            for relay_url in &persisted_relay_urls {
367                let _ = firehose.persisted.insert_async(relay_url.clone()).await;
368                if firehose.tasks.contains_async(relay_url).await {
369                    continue;
370                }
371                let enabled_rx = state.firehose_enabled.subscribe();
372                let handle =
373                    spawn_firehose_ingestor(relay_url, &state, fire_shared, enabled_rx).await?;
374                let _ = firehose.tasks.insert_async(relay_url.clone(), handle).await;
375            }
376
377            // 11. spawn crawler infrastructure (always, to support dynamic source management)
378            {
379                use crate::crawler::throttle::Throttler;
380                use crate::crawler::{
381                    CrawlerStats, CrawlerWorker, InFlight, RetryProducer, SignalChecker,
382                };
383
384                let http = reqwest::Client::builder()
385                    .user_agent(concat!(
386                        env!("CARGO_PKG_NAME"),
387                        "/",
388                        env!("CARGO_PKG_VERSION")
389                    ))
390                    .gzip(true)
391                    .build()
392                    .expect("that reqwest will build");
393                let pds_throttler = Throttler::new();
394                let in_flight = InFlight::new();
395                let stats = CrawlerStats::new(
396                    state.clone(),
397                    config
398                        .crawler_sources
399                        .iter()
400                        .map(|s| s.url.clone())
401                        .collect(),
402                    pds_throttler.clone(),
403                );
404                let checker = SignalChecker {
405                    http: http.clone(),
406                    state: state.clone(),
407                    throttler: pds_throttler,
408                };
409
410                info!(
411                    max_pending = config.crawler_max_pending_repos,
412                    resume_pending = config.crawler_resume_pending_repos,
413                    enabled = *state.crawler_enabled.borrow(),
414                    "starting crawler worker"
415                );
416                let (worker, tx) = CrawlerWorker::new(
417                    state.clone(),
418                    config.crawler_max_pending_repos,
419                    config.crawler_resume_pending_repos,
420                    stats.clone(),
421                );
422                tokio::spawn(async move {
423                    worker.run().await;
424                    error!("crawler worker exited unexpectedly, aborting");
425                    std::process::abort();
426                });
427
428                let ticker = tokio::spawn(stats.clone().task());
429                tokio::spawn(async move {
430                    match ticker.await {
431                        Err(e) => error!(err = ?e, "stats ticker panicked, aborting"),
432                        Ok(()) => error!("stats ticker exited unexpectedly, aborting"),
433                    }
434                    std::process::abort();
435                });
436
437                tokio::spawn(
438                    RetryProducer {
439                        checker: checker.clone(),
440                        in_flight: in_flight.clone(),
441                        tx: tx.clone(),
442                    }
443                    .run(),
444                );
445
446                // set shared objects so CrawlerHandle methods can use them
447                crawler
448                    .shared
449                    .set(CrawlerShared {
450                        http,
451                        checker,
452                        in_flight,
453                        tx,
454                        stats,
455                    })
456                    .ok()
457                    .expect("crawler shared already set");
458                let shared = crawler.shared.get().unwrap();
459
460                // spawn initial sources from config
461                for source in config.crawler_sources.iter() {
462                    let enabled_rx = state.crawler_enabled.subscribe();
463                    let handle = spawn_crawler_producer(
464                        source,
465                        &shared.http,
466                        &state,
467                        &shared.checker,
468                        &shared.in_flight,
469                        &shared.tx,
470                        &shared.stats,
471                        enabled_rx,
472                    );
473                    let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
474                }
475
476                let persisted_sources = tokio::task::spawn_blocking({
477                    let state = state.clone();
478                    move || load_persisted_crawler_sources(&state.db)
479                })
480                .await
481                .into_diagnostic()??;
482
483                for source in &persisted_sources {
484                    let _ = crawler.persisted.insert_async(source.url.clone()).await;
485                    if crawler.tasks.contains_async(&source.url).await {
486                        continue;
487                    }
488                    let enabled_rx = state.crawler_enabled.subscribe();
489                    let handle = spawn_crawler_producer(
490                        source,
491                        &shared.http,
492                        &state,
493                        &shared.checker,
494                        &shared.in_flight,
495                        &shared.tx,
496                        &shared.stats,
497                        enabled_rx,
498                    );
499                    let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
500                }
501            }
502
503            // 12. spawn the firehose worker on a blocking thread (fatal task)
504            let handle = tokio::runtime::Handle::current();
505            let firehose_worker = std::thread::spawn({
506                let state = state.clone();
507                move || {
508                    FirehoseWorker::new(
509                        state,
510                        buffer_rx,
511                        matches!(config.verify_signatures, SignatureVerification::Full),
512                        config.ephemeral,
513                        config.firehose_workers,
514                    )
515                    .run(handle)
516                }
517            });
518
519            {
520                let tx = Arc::clone(&fatal_tx);
521                tokio::spawn(
522                    tokio::task::spawn_blocking(move || {
523                        firehose_worker
524                            .join()
525                            .map_err(|e| miette::miette!("buffer processor died: {e:?}"))
526                    })
527                    .map(move |r| {
528                        let result = r.into_diagnostic().flatten().flatten();
529                        let _ = tx.send(Some(result.map_err(|e| e.to_string())));
530                    }),
531                );
532            }
533
534            // drop the local fatal_tx so the watch channel is only kept alive by the
535            // spawned tasks. when all fatal tasks exit (and drop their tx clones),
536            // fatal_rx.changed() returns Err and we return Ok(()).
537            drop(fatal_tx);
538
539            loop {
540                match fatal_rx.changed().await {
541                    Ok(()) => {
542                        if let Some(result) = fatal_rx.borrow().clone() {
543                            return result.map_err(|s| miette::miette!("{s}"));
544                        }
545                    }
546                    // all fatal_tx clones dropped: all tasks finished cleanly
547                    Err(_) => return Ok(()),
548                }
549            }
550        };
551        Ok(fut)
552    }
553
554    /// subscribe to the ordered event stream.
555    ///
556    /// returns an [`EventStream`] that implements [`futures::Stream`].
557    ///
558    /// - if `cursor` is `None`, streaming starts from the current head (live tail only).
559    /// - if `cursor` is `Some(id)`, all persisted `record` events from that ID onward are
560    ///   replayed first, then live events follow seamlessly.
561    ///
562    /// `identity` and `account` events are ephemeral and are never replayed from a cursor -
563    /// only live occurrences are delivered. use [`ReposControl::get`] to fetch current
564    /// identity/account state for a specific DID.
565    ///
566    /// multiple concurrent subscribers each receive a full independent copy of the stream.
567    /// the stream ends when the `EventStream` is dropped.
568    pub fn subscribe(&self, cursor: Option<u64>) -> EventStream {
569        let (tx, rx) = mpsc::channel(500);
570        let state = self.state.clone();
571        let runtime = tokio::runtime::Handle::current();
572
573        std::thread::Builder::new()
574            .name("hydrant-stream".into())
575            .spawn(move || {
576                let _g = runtime.enter();
577                event_stream_thread(state, tx, cursor);
578            })
579            .expect("failed to spawn stream thread");
580
581        EventStream(rx)
582    }
583
584    /// return database counts and on-disk sizes for all keyspaces.
585    ///
586    /// counts include: `repos`, `pending`, `resync`, `records`, `blocks`, `events`,
587    /// `error_ratelimited`, `error_transport`, `error_generic`.
588    ///
589    /// sizes are in bytes, reported per keyspace.
590    pub async fn stats(&self) -> Result<StatsResponse> {
591        let db = self.state.db.clone();
592
593        let mut counts: BTreeMap<&'static str, u64> = futures::future::join_all(
594            [
595                "repos",
596                "pending",
597                "resync",
598                "records",
599                "blocks",
600                "error_ratelimited",
601                "error_transport",
602                "error_generic",
603            ]
604            .into_iter()
605            .map(|name| {
606                let db = db.clone();
607                async move { (name, db.get_count(name).await) }
608            }),
609        )
610        .await
611        .into_iter()
612        .collect();
613
614        counts.insert("events", db.events.approximate_len() as u64);
615
616        let sizes = tokio::task::spawn_blocking(move || {
617            let mut s = BTreeMap::new();
618            s.insert("repos", db.repos.disk_space());
619            s.insert("records", db.records.disk_space());
620            s.insert("blocks", db.blocks.disk_space());
621            s.insert("cursors", db.cursors.disk_space());
622            s.insert("pending", db.pending.disk_space());
623            s.insert("resync", db.resync.disk_space());
624            s.insert("resync_buffer", db.resync_buffer.disk_space());
625            s.insert("events", db.events.disk_space());
626            s.insert("counts", db.counts.disk_space());
627            s.insert("filter", db.filter.disk_space());
628            s.insert("crawler", db.crawler.disk_space());
629            s
630        })
631        .await
632        .into_diagnostic()?;
633
634        Ok(StatsResponse { counts, sizes })
635    }
636
637    /// returns a future that runs the HTTP management API server on `0.0.0.0:{port}`.
638    ///
639    /// the server exposes all management endpoints (`/filter`, `/repos`, `/ingestion`,
640    /// `/stream`, `/stats`, `/db/*`, `/xrpc/*`). it runs indefinitely and resolves
641    /// only on error.
642    ///
643    /// intended for `tokio::spawn` or inclusion in a `select!` / task list. the clone
644    /// of `self` is deferred until the future is first polled.
645    ///
646    /// to disable the HTTP API entirely, simply don't call this method.
647    pub fn serve(&self, port: u16) -> impl Future<Output = Result<()>> {
648        let hydrant = self.clone();
649        async move { crate::api::serve(hydrant, port).await }
650    }
651
652    /// returns a future that runs the debug HTTP API server on `127.0.0.1:{port}`.
653    ///
654    /// exposes internal inspection endpoints (`/debug/get`, `/debug/iter`, etc.)
655    /// that are not safe to expose publicly. binds only to loopback.
656    pub fn serve_debug(&self, port: u16) -> impl Future<Output = Result<()>> {
657        let state = self.state.clone();
658        async move { crate::api::serve_debug(state, port).await }
659    }
660}
661
662impl axum::extract::FromRef<Hydrant> for Arc<AppState> {
663    fn from_ref(h: &Hydrant) -> Self {
664        h.state.clone()
665    }
666}
667
668/// a stream of [`Event`]s. returned by [`Hydrant::subscribe`].
669///
670/// implements [`futures::Stream`] and can be used with `StreamExt::next`,
671/// `while let Some(evt) = stream.next().await`, `forward`, etc.
672/// the stream terminates when the underlying channel closes (i.e. hydrant shuts down).
673pub struct EventStream(mpsc::Receiver<Event>);
674
675impl Stream for EventStream {
676    type Item = Event;
677
678    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
679        self.0.poll_recv(cx)
680    }
681}
682
683/// database statistics returned by [`Hydrant::stats`].
684#[derive(serde::Serialize)]
685pub struct StatsResponse {
686    /// record counts per logical category (repos, records, events, error kinds, etc.)
687    pub counts: BTreeMap<&'static str, u64>,
688    /// on-disk size in bytes per keyspace
689    pub sizes: BTreeMap<&'static str, u64>,
690}
691
692/// runtime control over the backfill worker component.
693///
694/// the backfill worker fetches full repo CAR files from each repo's PDS for any
695/// repository in the pending queue, parses the MST, and inserts all matching records
696/// into the database. concurrency is bounded by `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`.
697#[derive(Clone)]
698pub struct BackfillHandle(Arc<AppState>);
699
700impl BackfillHandle {
701    /// enable the backfill worker, no-op if already enabled.
702    pub fn enable(&self) {
703        self.0.backfill_enabled.send_replace(true);
704    }
705    /// disable the backfill worker, in-flight repos complete before pausing.
706    pub fn disable(&self) {
707        self.0.backfill_enabled.send_replace(false);
708    }
709    /// returns the current enabled state of the backfill worker.
710    pub fn is_enabled(&self) -> bool {
711        *self.0.backfill_enabled.borrow()
712    }
713}
714
715/// control over database maintenance operations.
716///
717/// all methods pause the crawler, firehose, and backfill worker for the duration
718/// of the operation and restore their prior state on completion, whether or not
719/// the operation succeeds.
720#[derive(Clone)]
721pub struct DbControl(Arc<AppState>);
722
723impl DbControl {
724    /// trigger a major compaction of all keyspaces in parallel.
725    ///
726    /// compaction reclaims disk space from deleted/updated keys and improves
727    /// read performance. can take several minutes on large datasets.
728    pub async fn compact(&self) -> Result<()> {
729        let state = self.0.clone();
730        state
731            .with_ingestion_paused(async || state.db.compact().await)
732            .await
733    }
734
735    /// train zstd compression dictionaries for the `repos`, `blocks`, and `events` keyspaces.
736    ///
737    /// dictionaries are written to `dict_{name}.bin` files next to the database.
738    /// a restart is required to apply them. training samples data blocks from the
739    /// existing database, so the database must have a reasonable amount of data first.
740    pub async fn train_dicts(&self) -> Result<()> {
741        let state = self.0.clone();
742        state
743            .with_ingestion_paused(async || {
744                let train = |name: &'static str| {
745                    let db = state.db.clone();
746                    tokio::task::spawn_blocking(move || db.train_dict(name))
747                        .map(|res| res.into_diagnostic().flatten())
748                };
749                tokio::try_join!(train("repos"), train("blocks"), train("events")).map(|_| ())
750            })
751            .await
752    }
753}