1mod crawler;
2mod filter;
3mod firehose;
4mod repos;
5mod stream;
6
7pub use crawler::{CrawlerHandle, CrawlerSourceInfo};
8pub use filter::{FilterControl, FilterPatch, FilterSnapshot};
9pub use firehose::{FirehoseHandle, FirehoseSourceInfo};
10pub(crate) use repos::repo_state_to_info;
11pub use repos::{ListedRecord, Record, RecordList, RepoHandle, RepoInfo, ReposControl};
12
13use std::collections::BTreeMap;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::task::{Context, Poll};
19
20use futures::{FutureExt, Stream};
21use miette::{IntoDiagnostic, Result};
22use tokio::sync::{mpsc, watch};
23use tracing::{debug, error, info};
24
25use crate::backfill::BackfillWorker;
26use crate::config::{Config, SignatureVerification};
27use crate::db::{
28 self, filter as db_filter, load_persisted_crawler_sources, load_persisted_firehose_sources,
29};
30use crate::filter::FilterMode;
31use crate::ingest::worker::FirehoseWorker;
32use crate::state::AppState;
33use crate::types::MarshallableEvt;
34
35use crawler::{CrawlerShared, spawn_crawler_producer};
36use firehose::{FirehoseShared, spawn_firehose_ingestor};
37use stream::event_stream_thread;
38
39pub type Event = MarshallableEvt<'static>;
48
49#[derive(Clone)]
71pub struct Hydrant {
72 pub crawler: CrawlerHandle,
73 pub firehose: FirehoseHandle,
74 pub backfill: BackfillHandle,
75 pub filter: FilterControl,
76 pub repos: ReposControl,
77 pub db: DbControl,
78 #[cfg(feature = "backlinks")]
79 pub backlinks: crate::backlinks::BacklinksControl,
80 pub(crate) state: Arc<AppState>,
81 config: Arc<Config>,
82 started: Arc<AtomicBool>,
83 _priv: (),
84}
85
86impl Hydrant {
87 pub async fn new(config: Config) -> Result<Self> {
93 info!("{config}");
94
95 let state = AppState::new(&config)?;
97
98 if config.full_network
100 || config.filter_signals.is_some()
101 || config.filter_collections.is_some()
102 || config.filter_excludes.is_some()
103 {
104 let filter_ks = state.db.filter.clone();
105 let inner = state.db.inner.clone();
106 let mode = config.full_network.then_some(FilterMode::Full);
107 let signals = config
108 .filter_signals
109 .clone()
110 .map(crate::filter::SetUpdate::Set);
111 let collections = config
112 .filter_collections
113 .clone()
114 .map(crate::filter::SetUpdate::Set);
115 let excludes = config
116 .filter_excludes
117 .clone()
118 .map(crate::filter::SetUpdate::Set);
119
120 tokio::task::spawn_blocking(move || {
121 let mut batch = inner.batch();
122 db_filter::apply_patch(
123 &mut batch,
124 &filter_ks,
125 mode,
126 signals,
127 collections,
128 excludes,
129 )?;
130 batch.commit().into_diagnostic()
131 })
132 .await
133 .into_diagnostic()??;
134
135 let new_filter = tokio::task::spawn_blocking({
137 let filter_ks = state.db.filter.clone();
138 move || db_filter::load(&filter_ks)
139 })
140 .await
141 .into_diagnostic()??;
142 state.filter.store(Arc::new(new_filter));
143 }
144
145 let post_patch_crawler = match config.enable_crawler {
147 Some(b) => b,
148 None => {
149 state.filter.load().mode == FilterMode::Full || !config.crawler_sources.is_empty()
150 }
151 };
152 state.crawler_enabled.send_replace(post_patch_crawler);
153
154 let state = Arc::new(state);
155
156 Ok(Self {
157 crawler: CrawlerHandle {
158 state: state.clone(),
159 shared: Arc::new(std::sync::OnceLock::new()),
160 tasks: Arc::new(scc::HashMap::new()),
161 persisted: Arc::new(scc::HashSet::new()),
162 },
163 firehose: FirehoseHandle {
164 state: state.clone(),
165 shared: Arc::new(std::sync::OnceLock::new()),
166 tasks: Arc::new(scc::HashMap::new()),
167 persisted: Arc::new(scc::HashSet::new()),
168 },
169 backfill: BackfillHandle(state.clone()),
170 filter: FilterControl(state.clone()),
171 repos: ReposControl(state.clone()),
172 db: DbControl(state.clone()),
173 #[cfg(feature = "backlinks")]
174 backlinks: crate::backlinks::BacklinksControl(state.clone()),
175 state,
176 config: Arc::new(config),
177 started: Arc::new(AtomicBool::new(false)),
178 _priv: (),
179 })
180 }
181
182 pub async fn from_env() -> Result<Self> {
184 Self::new(Config::from_env()?).await
185 }
186
187 pub fn run(&self) -> Result<impl Future<Output = Result<()>>> {
196 let state = self.state.clone();
197 let config = self.config.clone();
198 let crawler = self.crawler.clone();
199 let firehose = self.firehose.clone();
200
201 if self.started.swap(true, Ordering::SeqCst) {
202 miette::bail!("Hydrant::run() called more than once");
203 }
204
205 let fut = async move {
206 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel();
208
209 tokio::spawn({
211 let state = state.clone();
212 BackfillWorker::new(
213 state.clone(),
214 buffer_tx.clone(),
215 config.repo_fetch_timeout,
216 config.backfill_concurrency_limit,
217 matches!(
218 config.verify_signatures,
219 SignatureVerification::Full | SignatureVerification::BackfillOnly
220 ),
221 config.ephemeral,
222 state.backfill_enabled.subscribe(),
223 )
224 .run()
225 });
226
227 if let Err(e) = tokio::task::spawn_blocking({
229 let state = state.clone();
230 move || crate::backfill::manager::queue_gone_backfills(&state)
231 })
232 .await
233 .into_diagnostic()?
234 {
235 error!(err = %e, "failed to queue gone backfills");
236 db::check_poisoned_report(&e);
237 }
238
239 std::thread::spawn({
240 let state = state.clone();
241 move || crate::backfill::manager::retry_worker(state)
242 });
243
244 if config.ephemeral {
246 let state = state.clone();
247 std::thread::Builder::new()
248 .name("ephemeral-gc".into())
249 .spawn(move || crate::db::ephemeral::ephemeral_ttl_worker(state))
250 .into_diagnostic()?;
251 }
252
253 std::thread::spawn({
255 let state = state.clone();
256 let persist_interval = config.cursor_save_interval;
257 move || loop {
258 std::thread::sleep(persist_interval);
259
260 state.relay_cursors.iter_sync(|relay, cursor| {
261 let seq = cursor.load(Ordering::SeqCst);
262 if seq > 0 {
263 if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) {
264 error!(relay = %relay, err = %e, "failed to save cursor");
265 db::check_poisoned_report(&e);
266 }
267 }
268 true
269 });
270
271 if let Err(e) = db::persist_counts(&state.db) {
272 error!(err = %e, "failed to persist counts");
273 db::check_poisoned_report(&e);
274 }
275
276 if let Err(e) = state.db.persist() {
277 error!(err = %e, "db persist failed");
278 db::check_poisoned_report(&e);
279 }
280 }
281 });
282
283 tokio::spawn({
285 let state = state.clone();
286 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed);
287 let mut last_time = std::time::Instant::now();
288 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
289 async move {
290 loop {
291 interval.tick().await;
292
293 let current_id = state.db.next_event_id.load(Ordering::Relaxed);
294 let current_time = std::time::Instant::now();
295 let delta = current_id.saturating_sub(last_id);
296
297 if delta == 0 {
298 debug!("no new events in 60s");
299 continue;
300 }
301
302 let elapsed = current_time.duration_since(last_time).as_secs_f64();
303 let rate = if elapsed > 0.0 {
304 delta as f64 / elapsed
305 } else {
306 0.0
307 };
308 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)");
309
310 last_id = current_id;
311 last_time = current_time;
312 }
313 }
314 });
315
316 let (fatal_tx_inner, mut fatal_rx) = watch::channel(None);
317 let fatal_tx = Arc::new(fatal_tx_inner);
318
319 info!(
320 crawler_enabled = *state.crawler_enabled.borrow(),
321 firehose_enabled = *state.firehose_enabled.borrow(),
322 filter_mode = ?state.filter.load().mode,
323 "starting ingestion"
324 );
325
326 firehose
328 .shared
329 .set(FirehoseShared {
330 buffer_tx: buffer_tx.clone(),
331 verify_signatures: matches!(
332 config.verify_signatures,
333 SignatureVerification::Full
334 ),
335 })
336 .ok()
337 .expect("firehose shared already set");
338 let fire_shared = firehose.shared.get().unwrap();
339
340 let relay_hosts = config.relays.clone();
341 if !relay_hosts.is_empty() {
342 info!(
343 relay_count = relay_hosts.len(),
344 hosts = relay_hosts
345 .iter()
346 .map(|h| h.as_str())
347 .collect::<Vec<_>>()
348 .join(", "),
349 "starting firehose ingestor(s)"
350 );
351 for relay_url in &relay_hosts {
352 let enabled_rx = state.firehose_enabled.subscribe();
353 let handle =
354 spawn_firehose_ingestor(relay_url, &state, fire_shared, enabled_rx).await?;
355 let _ = firehose.tasks.insert_async(relay_url.clone(), handle).await;
356 }
357 }
358
359 let persisted_relay_urls = tokio::task::spawn_blocking({
360 let state = state.clone();
361 move || load_persisted_firehose_sources(&state.db)
362 })
363 .await
364 .into_diagnostic()??;
365
366 for relay_url in &persisted_relay_urls {
367 let _ = firehose.persisted.insert_async(relay_url.clone()).await;
368 if firehose.tasks.contains_async(relay_url).await {
369 continue;
370 }
371 let enabled_rx = state.firehose_enabled.subscribe();
372 let handle =
373 spawn_firehose_ingestor(relay_url, &state, fire_shared, enabled_rx).await?;
374 let _ = firehose.tasks.insert_async(relay_url.clone(), handle).await;
375 }
376
377 {
379 use crate::crawler::throttle::Throttler;
380 use crate::crawler::{
381 CrawlerStats, CrawlerWorker, InFlight, RetryProducer, SignalChecker,
382 };
383
384 let http = reqwest::Client::builder()
385 .user_agent(concat!(
386 env!("CARGO_PKG_NAME"),
387 "/",
388 env!("CARGO_PKG_VERSION")
389 ))
390 .gzip(true)
391 .build()
392 .expect("that reqwest will build");
393 let pds_throttler = Throttler::new();
394 let in_flight = InFlight::new();
395 let stats = CrawlerStats::new(
396 state.clone(),
397 config
398 .crawler_sources
399 .iter()
400 .map(|s| s.url.clone())
401 .collect(),
402 pds_throttler.clone(),
403 );
404 let checker = SignalChecker {
405 http: http.clone(),
406 state: state.clone(),
407 throttler: pds_throttler,
408 };
409
410 info!(
411 max_pending = config.crawler_max_pending_repos,
412 resume_pending = config.crawler_resume_pending_repos,
413 enabled = *state.crawler_enabled.borrow(),
414 "starting crawler worker"
415 );
416 let (worker, tx) = CrawlerWorker::new(
417 state.clone(),
418 config.crawler_max_pending_repos,
419 config.crawler_resume_pending_repos,
420 stats.clone(),
421 );
422 tokio::spawn(async move {
423 worker.run().await;
424 error!("crawler worker exited unexpectedly, aborting");
425 std::process::abort();
426 });
427
428 let ticker = tokio::spawn(stats.clone().task());
429 tokio::spawn(async move {
430 match ticker.await {
431 Err(e) => error!(err = ?e, "stats ticker panicked, aborting"),
432 Ok(()) => error!("stats ticker exited unexpectedly, aborting"),
433 }
434 std::process::abort();
435 });
436
437 tokio::spawn(
438 RetryProducer {
439 checker: checker.clone(),
440 in_flight: in_flight.clone(),
441 tx: tx.clone(),
442 }
443 .run(),
444 );
445
446 crawler
448 .shared
449 .set(CrawlerShared {
450 http,
451 checker,
452 in_flight,
453 tx,
454 stats,
455 })
456 .ok()
457 .expect("crawler shared already set");
458 let shared = crawler.shared.get().unwrap();
459
460 for source in config.crawler_sources.iter() {
462 let enabled_rx = state.crawler_enabled.subscribe();
463 let handle = spawn_crawler_producer(
464 source,
465 &shared.http,
466 &state,
467 &shared.checker,
468 &shared.in_flight,
469 &shared.tx,
470 &shared.stats,
471 enabled_rx,
472 );
473 let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
474 }
475
476 let persisted_sources = tokio::task::spawn_blocking({
477 let state = state.clone();
478 move || load_persisted_crawler_sources(&state.db)
479 })
480 .await
481 .into_diagnostic()??;
482
483 for source in &persisted_sources {
484 let _ = crawler.persisted.insert_async(source.url.clone()).await;
485 if crawler.tasks.contains_async(&source.url).await {
486 continue;
487 }
488 let enabled_rx = state.crawler_enabled.subscribe();
489 let handle = spawn_crawler_producer(
490 source,
491 &shared.http,
492 &state,
493 &shared.checker,
494 &shared.in_flight,
495 &shared.tx,
496 &shared.stats,
497 enabled_rx,
498 );
499 let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
500 }
501 }
502
503 let handle = tokio::runtime::Handle::current();
505 let firehose_worker = std::thread::spawn({
506 let state = state.clone();
507 move || {
508 FirehoseWorker::new(
509 state,
510 buffer_rx,
511 matches!(config.verify_signatures, SignatureVerification::Full),
512 config.ephemeral,
513 config.firehose_workers,
514 )
515 .run(handle)
516 }
517 });
518
519 {
520 let tx = Arc::clone(&fatal_tx);
521 tokio::spawn(
522 tokio::task::spawn_blocking(move || {
523 firehose_worker
524 .join()
525 .map_err(|e| miette::miette!("buffer processor died: {e:?}"))
526 })
527 .map(move |r| {
528 let result = r.into_diagnostic().flatten().flatten();
529 let _ = tx.send(Some(result.map_err(|e| e.to_string())));
530 }),
531 );
532 }
533
534 drop(fatal_tx);
538
539 loop {
540 match fatal_rx.changed().await {
541 Ok(()) => {
542 if let Some(result) = fatal_rx.borrow().clone() {
543 return result.map_err(|s| miette::miette!("{s}"));
544 }
545 }
546 Err(_) => return Ok(()),
548 }
549 }
550 };
551 Ok(fut)
552 }
553
554 pub fn subscribe(&self, cursor: Option<u64>) -> EventStream {
569 let (tx, rx) = mpsc::channel(500);
570 let state = self.state.clone();
571 let runtime = tokio::runtime::Handle::current();
572
573 std::thread::Builder::new()
574 .name("hydrant-stream".into())
575 .spawn(move || {
576 let _g = runtime.enter();
577 event_stream_thread(state, tx, cursor);
578 })
579 .expect("failed to spawn stream thread");
580
581 EventStream(rx)
582 }
583
584 pub async fn stats(&self) -> Result<StatsResponse> {
591 let db = self.state.db.clone();
592
593 let mut counts: BTreeMap<&'static str, u64> = futures::future::join_all(
594 [
595 "repos",
596 "pending",
597 "resync",
598 "records",
599 "blocks",
600 "error_ratelimited",
601 "error_transport",
602 "error_generic",
603 ]
604 .into_iter()
605 .map(|name| {
606 let db = db.clone();
607 async move { (name, db.get_count(name).await) }
608 }),
609 )
610 .await
611 .into_iter()
612 .collect();
613
614 counts.insert("events", db.events.approximate_len() as u64);
615
616 let sizes = tokio::task::spawn_blocking(move || {
617 let mut s = BTreeMap::new();
618 s.insert("repos", db.repos.disk_space());
619 s.insert("records", db.records.disk_space());
620 s.insert("blocks", db.blocks.disk_space());
621 s.insert("cursors", db.cursors.disk_space());
622 s.insert("pending", db.pending.disk_space());
623 s.insert("resync", db.resync.disk_space());
624 s.insert("resync_buffer", db.resync_buffer.disk_space());
625 s.insert("events", db.events.disk_space());
626 s.insert("counts", db.counts.disk_space());
627 s.insert("filter", db.filter.disk_space());
628 s.insert("crawler", db.crawler.disk_space());
629 s
630 })
631 .await
632 .into_diagnostic()?;
633
634 Ok(StatsResponse { counts, sizes })
635 }
636
637 pub fn serve(&self, port: u16) -> impl Future<Output = Result<()>> {
648 let hydrant = self.clone();
649 async move { crate::api::serve(hydrant, port).await }
650 }
651
652 pub fn serve_debug(&self, port: u16) -> impl Future<Output = Result<()>> {
657 let state = self.state.clone();
658 async move { crate::api::serve_debug(state, port).await }
659 }
660}
661
662impl axum::extract::FromRef<Hydrant> for Arc<AppState> {
663 fn from_ref(h: &Hydrant) -> Self {
664 h.state.clone()
665 }
666}
667
668pub struct EventStream(mpsc::Receiver<Event>);
674
675impl Stream for EventStream {
676 type Item = Event;
677
678 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
679 self.0.poll_recv(cx)
680 }
681}
682
683#[derive(serde::Serialize)]
685pub struct StatsResponse {
686 pub counts: BTreeMap<&'static str, u64>,
688 pub sizes: BTreeMap<&'static str, u64>,
690}
691
692#[derive(Clone)]
698pub struct BackfillHandle(Arc<AppState>);
699
700impl BackfillHandle {
701 pub fn enable(&self) {
703 self.0.backfill_enabled.send_replace(true);
704 }
705 pub fn disable(&self) {
707 self.0.backfill_enabled.send_replace(false);
708 }
709 pub fn is_enabled(&self) -> bool {
711 *self.0.backfill_enabled.borrow()
712 }
713}
714
715#[derive(Clone)]
721pub struct DbControl(Arc<AppState>);
722
723impl DbControl {
724 pub async fn compact(&self) -> Result<()> {
729 let state = self.0.clone();
730 state
731 .with_ingestion_paused(async || state.db.compact().await)
732 .await
733 }
734
735 pub async fn train_dicts(&self) -> Result<()> {
741 let state = self.0.clone();
742 state
743 .with_ingestion_paused(async || {
744 let train = |name: &'static str| {
745 let db = state.db.clone();
746 tokio::task::spawn_blocking(move || db.train_dict(name))
747 .map(|res| res.into_diagnostic().flatten())
748 };
749 tokio::try_join!(train("repos"), train("blocks"), train("events")).map(|_| ())
750 })
751 .await
752 }
753}