hydrant/control/
crawler.rs1use std::sync::Arc;
2
3use miette::{IntoDiagnostic, Result};
4use tokio::sync::{mpsc, watch};
5use tracing::{error, info};
6use url::Url;
7
8use crate::db::keys;
9use crate::state::AppState;
10
11pub(super) struct ProducerHandle {
12 mode: crate::config::CrawlerMode,
13 abort: tokio::task::AbortHandle,
14}
15
16impl Drop for ProducerHandle {
17 fn drop(&mut self) {
18 self.abort.abort();
19 }
20}
21
22pub(super) struct CrawlerShared {
23 pub(super) http: reqwest::Client,
24 pub(super) checker: crate::crawler::SignalChecker,
25 pub(super) in_flight: crate::crawler::InFlight,
26 pub(super) tx: mpsc::Sender<crate::crawler::CrawlerBatch>,
27 pub(super) stats: crate::crawler::CrawlerStats,
28}
29
30#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub struct CrawlerSourceInfo {
33 pub url: Url,
34 pub mode: crate::config::CrawlerMode,
35 pub persisted: bool,
38}
39
40pub(super) fn spawn_crawler_producer(
41 source: &crate::config::CrawlerSource,
42 http: &reqwest::Client,
43 state: &Arc<AppState>,
44 checker: &crate::crawler::SignalChecker,
45 in_flight: &crate::crawler::InFlight,
46 tx: &mpsc::Sender<crate::crawler::CrawlerBatch>,
47 stats: &crate::crawler::CrawlerStats,
48 enabled: watch::Receiver<bool>,
49) -> ProducerHandle {
50 use crate::config::CrawlerMode;
51 use crate::crawler::{ByCollectionProducer, RelayProducer};
52 use std::time::Duration;
53 use tracing::Instrument;
54
55 let abort = match source.mode {
56 CrawlerMode::Relay => {
57 info!(relay = %source.url, enabled = *state.crawler_enabled.borrow(), "starting relay crawler");
58 let span = tracing::info_span!("crawl", url = %source.url);
59 tokio::spawn(
60 RelayProducer {
61 relay_url: source.url.clone(),
62 checker: checker.clone(),
63 in_flight: in_flight.clone(),
64 tx: tx.clone(),
65 enabled,
66 stats: stats.clone(),
67 }
68 .run()
69 .instrument(span),
70 )
71 .abort_handle()
72 }
73 CrawlerMode::ByCollection => {
74 info!(
75 host = source.url.host_str(),
76 enabled = *state.crawler_enabled.borrow(),
77 "starting by-collection crawler"
78 );
79 let span = tracing::info_span!("by_collection", host = source.url.host_str());
80 let http = http.clone();
81 let state = state.clone();
82 let in_flight = in_flight.clone();
83 let tx = tx.clone();
84 let stats = stats.clone();
85 let url = source.url.clone();
86 tokio::spawn(
87 async move {
88 loop {
89 let producer = ByCollectionProducer {
90 index_url: url.clone(),
91 http: http.clone(),
92 state: state.clone(),
93 in_flight: in_flight.clone(),
94 tx: tx.clone(),
95 enabled: enabled.clone(),
96 stats: stats.clone(),
97 };
98 if let Err(e) = producer.run().await {
99 error!(err = ?e, "by-collection crawler fatal error, restarting in 30s");
100 tokio::time::sleep(Duration::from_secs(30)).await;
101 }
102 }
103 }
104 .instrument(span),
105 )
106 .abort_handle()
107 }
108 };
109 ProducerHandle {
110 mode: source.mode,
111 abort,
112 }
113}
114
115#[derive(Clone)]
125pub struct CrawlerHandle {
126 pub(super) state: Arc<AppState>,
127 pub(super) shared: Arc<std::sync::OnceLock<CrawlerShared>>,
129 pub(super) tasks: Arc<scc::HashMap<Url, ProducerHandle>>,
131 pub(super) persisted: Arc<scc::HashSet<Url>>,
133}
134
135impl CrawlerHandle {
136 pub fn enable(&self) {
138 self.state.crawler_enabled.send_replace(true);
139 }
140 pub fn disable(&self) {
143 self.state.crawler_enabled.send_replace(false);
144 }
145 pub fn is_enabled(&self) -> bool {
147 *self.state.crawler_enabled.borrow()
148 }
149
150 pub async fn reset_cursor(&self, url: &str) -> Result<()> {
152 let db = self.state.db.clone();
153 let point_keys = [keys::crawler_cursor_key(url)];
154 let by_collection_prefix = keys::by_collection_cursor_prefix(url);
155 tokio::task::spawn_blocking(move || {
156 let mut batch = db.inner.batch();
157 for k in point_keys {
158 batch.remove(&db.cursors, k);
159 }
160 for entry in db.cursors.prefix(&by_collection_prefix) {
161 let k = entry.key().into_diagnostic()?;
162 batch.remove(&db.cursors, k);
163 }
164 batch.commit().into_diagnostic()
165 })
166 .await
167 .into_diagnostic()??;
168 Ok(())
169 }
170
171 pub async fn list_sources(&self) -> Vec<CrawlerSourceInfo> {
175 let mut sources = Vec::new();
176 self.tasks
177 .iter_async(|url, h| {
178 sources.push(CrawlerSourceInfo {
179 url: url.clone(),
180 mode: h.mode,
181 persisted: self.persisted.contains_sync(url),
182 });
183 true
184 })
185 .await;
186 sources
187 }
188
189 pub async fn add_source(&self, source: crate::config::CrawlerSource) -> Result<()> {
197 let Some(shared) = self.shared.get() else {
198 miette::bail!("crawler not yet started: call Hydrant::run() first");
199 };
200
201 let db = self.state.db.clone();
202 let key = keys::crawler_source_key(source.url.as_str());
203 let val = rmp_serde::to_vec(&source.mode).into_diagnostic()?;
204 tokio::task::spawn_blocking(move || db.crawler.insert(key, val).into_diagnostic())
205 .await
206 .into_diagnostic()??;
207
208 let enabled_rx = self.state.crawler_enabled.subscribe();
209 let handle = spawn_crawler_producer(
210 &source,
211 &shared.http,
212 &self.state,
213 &shared.checker,
214 &shared.in_flight,
215 &shared.tx,
216 &shared.stats,
217 enabled_rx,
218 );
219
220 let _ = self.persisted.insert_async(source.url.clone()).await;
221 match self.tasks.entry_async(source.url).await {
222 scc::hash_map::Entry::Vacant(e) => {
223 e.insert_entry(handle);
224 }
225 scc::hash_map::Entry::Occupied(mut e) => {
226 *e.get_mut() = handle;
227 }
228 }
229 Ok(())
230 }
231
232 pub async fn remove_source(&self, url: &Url) -> Result<bool> {
241 if self.shared.get().is_none() {
242 miette::bail!("crawler not yet started: call Hydrant::run() first");
243 }
244
245 if self.tasks.remove_async(url).await.is_none() {
247 return Ok(false);
248 }
249
250 if self.persisted.remove_async(url).await.is_some() {
252 let db = self.state.db.clone();
253 let key = keys::crawler_source_key(url.as_str());
254 tokio::task::spawn_blocking(move || db.crawler.remove(key).into_diagnostic())
255 .await
256 .into_diagnostic()??;
257 }
258
259 Ok(true)
260 }
261}