1use miette::Result;
2use serde::{Deserialize, Serialize};
3use smol_str::ToSmolStr;
4use std::fmt;
5use std::path::PathBuf;
6use std::str::FromStr;
7use std::time::Duration;
8use url::Url;
9
10fn load_dotenv() {
12 let Ok(contents) = std::fs::read_to_string(".env") else {
13 return;
14 };
15 for line in contents.lines() {
16 let line = line.trim();
17 if line.is_empty() || line.starts_with('#') {
18 continue;
19 }
20 let Some((key, val)) = line.split_once('=') else {
21 continue;
22 };
23 let key = key.trim();
24 let val = val.trim();
25 let val = val
26 .strip_prefix('"')
27 .and_then(|v| v.strip_suffix('"'))
28 .or_else(|| val.strip_prefix('\'').and_then(|v| v.strip_suffix('\'')))
29 .unwrap_or(val);
30 if std::env::var(key).is_err() {
31 unsafe { std::env::set_var(key, val) };
33 }
34 }
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum CrawlerMode {
39 Relay,
41 ByCollection,
43}
44
45impl CrawlerMode {
46 fn default_for(full_network: bool) -> Self {
47 full_network
48 .then_some(Self::Relay)
49 .unwrap_or(Self::ByCollection)
50 }
51}
52
53impl Serialize for CrawlerMode {
54 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
55 where
56 S: serde::Serializer,
57 {
58 serializer.serialize_str(&self.to_smolstr())
59 }
60}
61
62impl<'de> Deserialize<'de> for CrawlerMode {
63 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
64 where
65 D: serde::Deserializer<'de>,
66 {
67 let s = String::deserialize(deserializer)?;
68 FromStr::from_str(&s).map_err(serde::de::Error::custom)
69 }
70}
71
72impl FromStr for CrawlerMode {
73 type Err = miette::Error;
74 fn from_str(s: &str) -> Result<Self> {
75 match s {
76 "relay" => Ok(Self::Relay),
77 "by_collection" | "by-collection" => Ok(Self::ByCollection),
78 _ => Err(miette::miette!(
79 "invalid crawler mode: expected 'relay' or 'by_collection'"
80 )),
81 }
82 }
83}
84
85impl fmt::Display for CrawlerMode {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 match self {
88 Self::Relay => write!(f, "relay"),
89 Self::ByCollection => write!(f, "by_collection"),
90 }
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct CrawlerSource {
97 pub url: Url,
98 pub mode: CrawlerMode,
99}
100
101impl CrawlerSource {
102 fn parse(s: &str, default_mode: CrawlerMode) -> Option<Self> {
104 if let Some((prefix, rest)) = s.split_once("::") {
105 let mode = prefix.parse().ok()?;
106 let url = Url::parse(rest).ok()?;
107 Some(Self { url, mode })
108 } else {
109 let url = Url::parse(s).ok()?;
110 Some(Self {
111 url,
112 mode: default_mode,
113 })
114 }
115 }
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119pub enum Compression {
120 Lz4,
121 Zstd,
122 None,
123}
124
125impl FromStr for Compression {
126 type Err = miette::Error;
127 fn from_str(s: &str) -> Result<Self> {
128 match s {
129 "lz4" => Ok(Self::Lz4),
130 "zstd" => Ok(Self::Zstd),
131 "none" => Ok(Self::None),
132 _ => Err(miette::miette!("invalid compression type")),
133 }
134 }
135}
136
137impl fmt::Display for Compression {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 match self {
140 Self::Lz4 => write!(f, "lz4"),
141 Self::Zstd => write!(f, "zstd"),
142 Self::None => write!(f, "none"),
143 }
144 }
145}
146
147#[derive(Debug, Clone, Copy)]
148pub enum SignatureVerification {
149 Full,
151 BackfillOnly,
153 None,
155}
156
157impl FromStr for SignatureVerification {
158 type Err = miette::Error;
159 fn from_str(s: &str) -> Result<Self> {
160 match s {
161 "full" => Ok(Self::Full),
162 "backfill-only" => Ok(Self::BackfillOnly),
163 "none" => Ok(Self::None),
164 _ => Err(miette::miette!("invalid signature verification level")),
165 }
166 }
167}
168
169impl fmt::Display for SignatureVerification {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171 match self {
172 Self::Full => write!(f, "full"),
173 Self::BackfillOnly => write!(f, "backfill-only"),
174 Self::None => write!(f, "none"),
175 }
176 }
177}
178
179#[derive(Debug, Clone)]
180pub struct Config {
181 pub database_path: PathBuf,
183 pub full_network: bool,
186 pub ephemeral: bool,
189 pub ephemeral_ttl: Duration,
192
193 pub relays: Vec<Url>,
196 pub plc_urls: Vec<Url>,
200 pub enable_firehose: bool,
203 pub firehose_workers: usize,
206 pub cursor_save_interval: Duration,
209 pub repo_fetch_timeout: Duration,
212 pub backfill_concurrency_limit: usize,
215
216 pub enable_crawler: Option<bool>,
219 pub crawler_max_pending_repos: usize,
222 pub crawler_resume_pending_repos: usize,
225 pub crawler_sources: Vec<CrawlerSource>,
233
234 pub verify_signatures: SignatureVerification,
237 pub identity_cache_size: u64,
240
241 pub filter_signals: Option<Vec<String>>,
244 pub filter_collections: Option<Vec<String>>,
247 pub filter_excludes: Option<Vec<String>>,
250
251 pub enable_backlinks: bool,
254
255 pub cache_size: u64,
259 pub data_compression: Compression,
264 pub journal_compression: Compression,
269 pub db_worker_threads: usize,
274 pub db_max_journaling_size_mb: u64,
279 pub db_blocks_memtable_size_mb: u64,
284 pub db_repos_memtable_size_mb: u64,
289 pub db_events_memtable_size_mb: u64,
294 pub db_records_memtable_size_mb: u64,
299}
300
301impl Default for Config {
302 fn default() -> Self {
303 const BASE_MEMTABLE_MB: u64 = 32;
304 Self {
305 database_path: PathBuf::from("./hydrant.db"),
306 full_network: false,
307 ephemeral: false,
308 ephemeral_ttl: Duration::from_secs(3600),
309 relays: vec![Url::parse("wss://relay.fire.hose.cam/").unwrap()],
310 plc_urls: vec![Url::parse("https://plc.wtf").unwrap()],
311 enable_firehose: true,
312 firehose_workers: 8,
313 cursor_save_interval: Duration::from_secs(3),
314 repo_fetch_timeout: Duration::from_secs(300),
315 backfill_concurrency_limit: 16,
316 enable_crawler: None,
317 crawler_max_pending_repos: 2000,
318 crawler_resume_pending_repos: 1000,
319 crawler_sources: vec![CrawlerSource {
320 url: Url::parse("https://lightrail.microcosm.blue").unwrap(),
321 mode: CrawlerMode::ByCollection,
322 }],
323 verify_signatures: SignatureVerification::Full,
324 identity_cache_size: 1_000_000,
325 filter_signals: None,
326 filter_collections: None,
327 filter_excludes: None,
328 enable_backlinks: false,
329 cache_size: 256,
330 data_compression: Compression::Lz4,
331 journal_compression: Compression::Lz4,
332 db_worker_threads: 4,
333 db_max_journaling_size_mb: 400,
334 db_blocks_memtable_size_mb: BASE_MEMTABLE_MB,
335 db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 2,
336 db_events_memtable_size_mb: BASE_MEMTABLE_MB,
337 db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2,
338 }
339 }
340}
341
342impl Config {
343 pub fn full_network() -> Self {
345 const BASE_MEMTABLE_MB: u64 = 192;
346 Self {
347 full_network: true,
348 plc_urls: vec![Url::parse("https://plc.directory").unwrap()],
349 firehose_workers: 24,
350 backfill_concurrency_limit: 64,
351 crawler_sources: vec![CrawlerSource {
352 url: Url::parse("wss://relay.fire.hose.cam/").unwrap(),
353 mode: CrawlerMode::Relay,
354 }],
355 db_worker_threads: 8,
356 db_max_journaling_size_mb: 1024,
357 db_blocks_memtable_size_mb: BASE_MEMTABLE_MB,
358 db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 2,
359 db_events_memtable_size_mb: BASE_MEMTABLE_MB,
360 db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2,
361 ..Self::default()
362 }
363 }
364
365 pub fn from_env() -> Result<Self> {
367 load_dotenv();
368
369 macro_rules! cfg {
370 (@val $key:expr) => {
371 std::env::var(concat!("HYDRANT_", $key))
372 };
373 ($key:expr, $default:expr, sec) => {
374 cfg!(@val $key)
375 .ok()
376 .and_then(|s| humantime::parse_duration(&s).ok())
377 .unwrap_or($default)
378 };
379 ($key:expr, $default:expr) => {
380 cfg!(@val $key)
381 .ok()
382 .and_then(|s| s.parse().ok())
383 .unwrap_or($default.to_owned())
384 .into()
385 };
386 }
387
388 let full_network: bool = cfg!("FULL_NETWORK", false);
390 let defaults = full_network
391 .then(Self::full_network)
392 .unwrap_or_else(Self::default);
393
394 let relay_hosts = match std::env::var("HYDRANT_RELAY_HOSTS") {
395 Ok(hosts) if !hosts.trim().is_empty() => hosts
396 .split(',')
397 .filter_map(|s| {
398 let s = s.trim();
399 (!s.is_empty())
400 .then(|| {
401 Url::parse(s)
402 .inspect_err(|e| tracing::warn!("invalid relay host URL: {e}"))
403 .ok()
404 })
405 .flatten()
406 })
407 .collect(),
408 Ok(_) => vec![],
410 Err(_) => vec![cfg!("RELAY_HOST", defaults.relays[0].clone())],
412 };
413
414 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL")
415 .ok()
416 .map(|s| {
417 s.split(',')
418 .map(|s| Url::parse(s.trim()))
419 .collect::<Result<Vec<_>, _>>()
420 .map_err(|e| miette::miette!("invalid PLC URL: {e}"))
421 })
422 .unwrap_or_else(|| Ok(defaults.plc_urls.clone()))?;
423
424 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", defaults.cursor_save_interval, sec);
425 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", defaults.repo_fetch_timeout, sec);
426
427 let ephemeral: bool = cfg!("EPHEMERAL", defaults.ephemeral);
428 let ephemeral_ttl = cfg!("EPHEMERAL_TTL", defaults.ephemeral_ttl, sec);
429 let database_path = cfg!("DATABASE_PATH", defaults.database_path);
430 let cache_size = cfg!("CACHE_SIZE", defaults.cache_size);
431 let data_compression = cfg!("DATA_COMPRESSION", defaults.data_compression);
432 let journal_compression = cfg!("JOURNAL_COMPRESSION", defaults.journal_compression);
433
434 let verify_signatures = cfg!("VERIFY_SIGNATURES", defaults.verify_signatures);
435 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", defaults.identity_cache_size);
436 let enable_firehose = cfg!("ENABLE_FIREHOSE", defaults.enable_firehose);
437 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER")
438 .ok()
439 .and_then(|s| s.parse().ok());
440
441 let backfill_concurrency_limit = cfg!(
442 "BACKFILL_CONCURRENCY_LIMIT",
443 defaults.backfill_concurrency_limit
444 );
445 let firehose_workers = cfg!("FIREHOSE_WORKERS", defaults.firehose_workers);
446
447 let db_worker_threads = cfg!("DB_WORKER_THREADS", defaults.db_worker_threads);
448 let db_max_journaling_size_mb = cfg!(
449 "DB_MAX_JOURNALING_SIZE_MB",
450 defaults.db_max_journaling_size_mb
451 );
452 let db_blocks_memtable_size_mb = cfg!(
453 "DB_BLOCKS_MEMTABLE_SIZE_MB",
454 defaults.db_blocks_memtable_size_mb
455 );
456 let db_events_memtable_size_mb = cfg!(
457 "DB_EVENTS_MEMTABLE_SIZE_MB",
458 defaults.db_events_memtable_size_mb
459 );
460 let db_records_memtable_size_mb = cfg!(
461 "DB_RECORDS_MEMTABLE_SIZE_MB",
462 defaults.db_records_memtable_size_mb
463 );
464 let db_repos_memtable_size_mb = cfg!(
465 "DB_REPOS_MEMTABLE_SIZE_MB",
466 defaults.db_repos_memtable_size_mb
467 );
468
469 let crawler_max_pending_repos = cfg!(
470 "CRAWLER_MAX_PENDING_REPOS",
471 defaults.crawler_max_pending_repos
472 );
473 let crawler_resume_pending_repos = cfg!(
474 "CRAWLER_RESUME_PENDING_REPOS",
475 defaults.crawler_resume_pending_repos
476 );
477
478 let filter_signals = std::env::var("HYDRANT_FILTER_SIGNALS").ok().map(|s| {
479 s.split(',')
480 .map(|s| s.trim().to_string())
481 .filter(|s| !s.is_empty())
482 .collect()
483 });
484
485 let filter_collections = std::env::var("HYDRANT_FILTER_COLLECTIONS").ok().map(|s| {
486 s.split(',')
487 .map(|s| s.trim().to_string())
488 .filter(|s| !s.is_empty())
489 .collect()
490 });
491
492 let filter_excludes = std::env::var("HYDRANT_FILTER_EXCLUDES").ok().map(|s| {
493 s.split(',')
494 .map(|s| s.trim().to_string())
495 .filter(|s| !s.is_empty())
496 .collect()
497 });
498
499 let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", defaults.enable_backlinks);
500
501 let default_mode = CrawlerMode::default_for(full_network);
502 let crawler_sources = match std::env::var("HYDRANT_CRAWLER_URLS") {
503 Ok(s) => s
504 .split(',')
505 .map(|s| s.trim())
506 .filter(|s| !s.is_empty())
507 .filter_map(|s| CrawlerSource::parse(s, default_mode))
508 .collect(),
509 Err(_) => match default_mode {
510 CrawlerMode::Relay => relay_hosts
511 .iter()
512 .map(|url| CrawlerSource {
513 url: url.clone(),
514 mode: CrawlerMode::Relay,
515 })
516 .collect(),
517 CrawlerMode::ByCollection => defaults.crawler_sources.clone(),
518 },
519 };
520
521 Ok(Self {
522 database_path,
523 full_network,
524 ephemeral,
525 ephemeral_ttl,
526 relays: relay_hosts,
527 plc_urls,
528 enable_firehose,
529 firehose_workers,
530 cursor_save_interval,
531 repo_fetch_timeout,
532 backfill_concurrency_limit,
533 enable_crawler,
534 crawler_max_pending_repos,
535 crawler_resume_pending_repos,
536 crawler_sources,
537 verify_signatures,
538 identity_cache_size,
539 filter_signals,
540 filter_collections,
541 filter_excludes,
542 enable_backlinks,
543 cache_size,
544 data_compression,
545 journal_compression,
546 db_worker_threads,
547 db_max_journaling_size_mb,
548 db_blocks_memtable_size_mb,
549 db_repos_memtable_size_mb,
550 db_events_memtable_size_mb,
551 db_records_memtable_size_mb,
552 })
553 }
554}
555
556macro_rules! config_line {
557 ($f:expr, $label:expr, $value:expr) => {
558 writeln!($f, " {:<width$}{}", $label, $value, width = LABEL_WIDTH)
559 };
560}
561
562impl fmt::Display for Config {
563 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
564 const LABEL_WIDTH: usize = 27;
565
566 writeln!(f, "hydrant configuration:")?;
567 config_line!(f, "relay hosts", format_args!("{:?}", self.relays))?;
568 config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?;
569 config_line!(f, "full network indexing", self.full_network)?;
570 config_line!(f, "verify signatures", self.verify_signatures)?;
571 config_line!(f, "backfill concurrency", self.backfill_concurrency_limit)?;
572 config_line!(f, "identity cache size", self.identity_cache_size)?;
573 config_line!(
574 f,
575 "cursor save interval",
576 format_args!("{}sec", self.cursor_save_interval.as_secs())
577 )?;
578 config_line!(
579 f,
580 "repo fetch timeout",
581 format_args!("{}sec", self.repo_fetch_timeout.as_secs())
582 )?;
583 config_line!(f, "ephemeral", self.ephemeral)?;
584 config_line!(f, "database path", self.database_path.to_string_lossy())?;
585 config_line!(f, "cache size", format_args!("{} mb", self.cache_size))?;
586 config_line!(f, "data compression", self.data_compression)?;
587 config_line!(f, "journal compression", self.journal_compression)?;
588 config_line!(f, "firehose workers", self.firehose_workers)?;
589 config_line!(f, "db worker threads", self.db_worker_threads)?;
590 config_line!(
591 f,
592 "db journal size",
593 format_args!("{} mb", self.db_max_journaling_size_mb)
594 )?;
595 config_line!(
596 f,
597 "db blocks memtable",
598 format_args!("{} mb", self.db_blocks_memtable_size_mb)
599 )?;
600 config_line!(
601 f,
602 "db repos memtable",
603 format_args!("{} mb", self.db_repos_memtable_size_mb)
604 )?;
605 config_line!(
606 f,
607 "db events memtable",
608 format_args!("{} mb", self.db_events_memtable_size_mb)
609 )?;
610 config_line!(
611 f,
612 "db records memtable",
613 format_args!("{} mb", self.db_records_memtable_size_mb)
614 )?;
615 config_line!(f, "crawler max pending", self.crawler_max_pending_repos)?;
616 config_line!(
617 f,
618 "crawler resume pending",
619 self.crawler_resume_pending_repos
620 )?;
621 if !self.crawler_sources.is_empty() {
622 let sources: Vec<_> = self
623 .crawler_sources
624 .iter()
625 .map(|s| format!("{}::{}", s.mode, s.url))
626 .collect();
627 config_line!(f, "crawler sources", sources.join(", "))?;
628 }
629 if let Some(signals) = &self.filter_signals {
630 config_line!(f, "filter signals", format_args!("{:?}", signals))?;
631 }
632 if let Some(collections) = &self.filter_collections {
633 config_line!(f, "filter collections", format_args!("{:?}", collections))?;
634 }
635 if let Some(excludes) = &self.filter_excludes {
636 config_line!(f, "filter excludes", format_args!("{:?}", excludes))?;
637 }
638 if self.enable_backlinks {
639 config_line!(f, "backlinks", "enabled")?;
640 }
641 Ok(())
642 }
643}