hydrant/control/
crawler.rs

1use std::sync::Arc;
2
3use miette::{IntoDiagnostic, Result};
4use tokio::sync::{mpsc, watch};
5use tracing::{error, info};
6use url::Url;
7
8use crate::db::keys;
9use crate::state::AppState;
10
11pub(super) struct ProducerHandle {
12    mode: crate::config::CrawlerMode,
13    abort: tokio::task::AbortHandle,
14}
15
16impl Drop for ProducerHandle {
17    fn drop(&mut self) {
18        self.abort.abort();
19    }
20}
21
22pub(super) struct CrawlerShared {
23    pub(super) http: reqwest::Client,
24    pub(super) checker: crate::crawler::SignalChecker,
25    pub(super) in_flight: crate::crawler::InFlight,
26    pub(super) tx: mpsc::Sender<crate::crawler::CrawlerBatch>,
27    pub(super) stats: crate::crawler::CrawlerStats,
28}
29
30/// a snapshot of a single crawler source's runtime state.
31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub struct CrawlerSourceInfo {
33    pub url: Url,
34    pub mode: crate::config::CrawlerMode,
35    /// whether this source is persisted in the database (i.e. it was dynamically added
36    /// and will survive restarts). config-sourced entries have `persisted: false`.
37    pub persisted: bool,
38}
39
40pub(super) fn spawn_crawler_producer(
41    source: &crate::config::CrawlerSource,
42    http: &reqwest::Client,
43    state: &Arc<AppState>,
44    checker: &crate::crawler::SignalChecker,
45    in_flight: &crate::crawler::InFlight,
46    tx: &mpsc::Sender<crate::crawler::CrawlerBatch>,
47    stats: &crate::crawler::CrawlerStats,
48    enabled: watch::Receiver<bool>,
49) -> ProducerHandle {
50    use crate::config::CrawlerMode;
51    use crate::crawler::{ByCollectionProducer, RelayProducer};
52    use std::time::Duration;
53    use tracing::Instrument;
54
55    let abort = match source.mode {
56        CrawlerMode::Relay => {
57            info!(relay = %source.url, enabled = *state.crawler_enabled.borrow(), "starting relay crawler");
58            let span = tracing::info_span!("crawl", url = %source.url);
59            tokio::spawn(
60                RelayProducer {
61                    relay_url: source.url.clone(),
62                    checker: checker.clone(),
63                    in_flight: in_flight.clone(),
64                    tx: tx.clone(),
65                    enabled,
66                    stats: stats.clone(),
67                }
68                .run()
69                .instrument(span),
70            )
71            .abort_handle()
72        }
73        CrawlerMode::ByCollection => {
74            info!(
75                host = source.url.host_str(),
76                enabled = *state.crawler_enabled.borrow(),
77                "starting by-collection crawler"
78            );
79            let span = tracing::info_span!("by_collection", host = source.url.host_str());
80            let http = http.clone();
81            let state = state.clone();
82            let in_flight = in_flight.clone();
83            let tx = tx.clone();
84            let stats = stats.clone();
85            let url = source.url.clone();
86            tokio::spawn(
87                async move {
88                    loop {
89                        let producer = ByCollectionProducer {
90                            index_url: url.clone(),
91                            http: http.clone(),
92                            state: state.clone(),
93                            in_flight: in_flight.clone(),
94                            tx: tx.clone(),
95                            enabled: enabled.clone(),
96                            stats: stats.clone(),
97                        };
98                        if let Err(e) = producer.run().await {
99                            error!(err = ?e, "by-collection crawler fatal error, restarting in 30s");
100                            tokio::time::sleep(Duration::from_secs(30)).await;
101                        }
102                    }
103                }
104                .instrument(span),
105            )
106            .abort_handle()
107        }
108    };
109    ProducerHandle {
110        mode: source.mode,
111        abort,
112    }
113}
114
115/// runtime control over the crawler component.
116///
117/// the crawler walks `com.atproto.sync.listRepos` on each configured relay to discover
118/// repositories that have never emitted a firehose event. in `filter` mode it also
119/// checks each discovered repo against the configured signal collections before
120/// enqueuing it for backfill.
121///
122/// disabling the crawler does not affect in-progress repo checks. each one completes
123/// its current PDS request before pausing.
124#[derive(Clone)]
125pub struct CrawlerHandle {
126    pub(super) state: Arc<AppState>,
127    /// set once by [`Hydrant::run`]; `None` means run() has not been called yet.
128    pub(super) shared: Arc<std::sync::OnceLock<CrawlerShared>>,
129    /// per-source running tasks, keyed by url.
130    pub(super) tasks: Arc<scc::HashMap<Url, ProducerHandle>>,
131    /// set of urls persisted in the database (dynamically added sources).
132    pub(super) persisted: Arc<scc::HashSet<Url>>,
133}
134
135impl CrawlerHandle {
136    /// enable the crawler (enables all configured producers). no-op if already enabled.
137    pub fn enable(&self) {
138        self.state.crawler_enabled.send_replace(true);
139    }
140    /// disable the crawler (disables all configured producers).
141    /// in-progress repo checks finish before the crawler pauses.
142    pub fn disable(&self) {
143        self.state.crawler_enabled.send_replace(false);
144    }
145    /// returns the current enabled state of the crawler.
146    pub fn is_enabled(&self) -> bool {
147        *self.state.crawler_enabled.borrow()
148    }
149
150    /// delete all cursor entries associated with the given URL.
151    pub async fn reset_cursor(&self, url: &str) -> Result<()> {
152        let db = self.state.db.clone();
153        let point_keys = [keys::crawler_cursor_key(url)];
154        let by_collection_prefix = keys::by_collection_cursor_prefix(url);
155        tokio::task::spawn_blocking(move || {
156            let mut batch = db.inner.batch();
157            for k in point_keys {
158                batch.remove(&db.cursors, k);
159            }
160            for entry in db.cursors.prefix(&by_collection_prefix) {
161                let k = entry.key().into_diagnostic()?;
162                batch.remove(&db.cursors, k);
163            }
164            batch.commit().into_diagnostic()
165        })
166        .await
167        .into_diagnostic()??;
168        Ok(())
169    }
170
171    /// return info on all currently active crawler sources.
172    ///
173    /// returns an empty list if called before [`Hydrant::run`].
174    pub async fn list_sources(&self) -> Vec<CrawlerSourceInfo> {
175        let mut sources = Vec::new();
176        self.tasks
177            .iter_async(|url, h| {
178                sources.push(CrawlerSourceInfo {
179                    url: url.clone(),
180                    mode: h.mode,
181                    persisted: self.persisted.contains_sync(url),
182                });
183                true
184            })
185            .await;
186        sources
187    }
188
189    /// add a new crawler source at runtime.
190    ///
191    /// the source is persisted to the database and will be re-spawned on restart.
192    /// if a source with the same URL already exists, it is replaced (the old task is
193    /// aborted and a new one is started with the new mode).
194    ///
195    /// returns an error if called before [`Hydrant::run`].
196    pub async fn add_source(&self, source: crate::config::CrawlerSource) -> Result<()> {
197        let Some(shared) = self.shared.get() else {
198            miette::bail!("crawler not yet started: call Hydrant::run() first");
199        };
200
201        let db = self.state.db.clone();
202        let key = keys::crawler_source_key(source.url.as_str());
203        let val = rmp_serde::to_vec(&source.mode).into_diagnostic()?;
204        tokio::task::spawn_blocking(move || db.crawler.insert(key, val).into_diagnostic())
205            .await
206            .into_diagnostic()??;
207
208        let enabled_rx = self.state.crawler_enabled.subscribe();
209        let handle = spawn_crawler_producer(
210            &source,
211            &shared.http,
212            &self.state,
213            &shared.checker,
214            &shared.in_flight,
215            &shared.tx,
216            &shared.stats,
217            enabled_rx,
218        );
219
220        let _ = self.persisted.insert_async(source.url.clone()).await;
221        match self.tasks.entry_async(source.url).await {
222            scc::hash_map::Entry::Vacant(e) => {
223                e.insert_entry(handle);
224            }
225            scc::hash_map::Entry::Occupied(mut e) => {
226                *e.get_mut() = handle;
227            }
228        }
229        Ok(())
230    }
231
232    /// remove a crawler source at runtime by URL.
233    ///
234    /// aborts the running producer task and removes the source from the database if it
235    /// was dynamically added. config-sourced entries are aborted but not persisted, so
236    /// they will reappear on restart.
237    ///
238    /// returns `true` if a source with the given URL was found and removed.
239    /// returns an error if called before [`Hydrant::run`].
240    pub async fn remove_source(&self, url: &Url) -> Result<bool> {
241        if self.shared.get().is_none() {
242            miette::bail!("crawler not yet started: call Hydrant::run() first");
243        }
244
245        // dropping the ProducerHandle aborts the task via Drop
246        if self.tasks.remove_async(url).await.is_none() {
247            return Ok(false);
248        }
249
250        // remove from DB if it was a persisted source
251        if self.persisted.remove_async(url).await.is_some() {
252            let db = self.state.db.clone();
253            let key = keys::crawler_source_key(url.as_str());
254            tokio::task::spawn_blocking(move || db.crawler.remove(key).into_diagnostic())
255                .await
256                .into_diagnostic()??;
257        }
258
259        Ok(true)
260    }
261}