hydrant/control/
firehose.rs

1use std::sync::Arc;
2use std::sync::atomic::Ordering;
3
4use miette::{IntoDiagnostic, Result};
5use tokio::sync::watch;
6use tracing::{error, info};
7use url::Url;
8
9use crate::db::{self, keys};
10use crate::ingest::{BufferTx, firehose::FirehoseIngestor};
11use crate::state::AppState;
12
13pub(super) struct FirehoseIngestorHandle {
14    abort: tokio::task::AbortHandle,
15}
16
17impl Drop for FirehoseIngestorHandle {
18    fn drop(&mut self) {
19        self.abort.abort();
20    }
21}
22
23pub(super) struct FirehoseShared {
24    pub(super) buffer_tx: BufferTx,
25    pub(super) verify_signatures: bool,
26}
27
28/// a snapshot of a single firehose relay's runtime state.
29#[derive(Debug, Clone, serde::Serialize)]
30pub struct FirehoseSourceInfo {
31    pub url: Url,
32    /// true if added via the API and persisted to the database; false for `RELAY_HOSTS` sources.
33    pub persisted: bool,
34}
35
36pub(super) async fn spawn_firehose_ingestor(
37    relay_url: &Url,
38    state: &Arc<AppState>,
39    shared: &FirehoseShared,
40    enabled: watch::Receiver<bool>,
41) -> Result<FirehoseIngestorHandle> {
42    use std::sync::atomic::AtomicI64;
43
44    let start = db::get_firehose_cursor(&state.db, relay_url).await?;
45    // insert into relay_cursors if not already present; existing in-memory cursor takes precedence
46    let _ = state
47        .relay_cursors
48        .insert_async(relay_url.clone(), AtomicI64::new(start.unwrap_or(0)))
49        .await;
50
51    info!(relay = %relay_url, cursor = ?start, "starting firehose ingestor");
52
53    let ingestor = FirehoseIngestor::new(
54        state.clone(),
55        shared.buffer_tx.clone(),
56        relay_url.clone(),
57        state.filter.clone(),
58        enabled,
59        shared.verify_signatures,
60    );
61
62    let relay_for_log = relay_url.clone();
63    let abort = tokio::spawn(async move {
64        if let Err(e) = ingestor.run().await {
65            error!(relay = %relay_for_log, err = %e, "firehose ingestor exited with error");
66        }
67    })
68    .abort_handle();
69
70    Ok(FirehoseIngestorHandle { abort })
71}
72
73/// runtime control over the firehose ingestor component.
74#[derive(Clone)]
75pub struct FirehoseHandle {
76    pub(super) state: Arc<AppState>,
77    /// set once by [`Hydrant::run`]; `None` means run() has not been called yet.
78    pub(super) shared: Arc<std::sync::OnceLock<FirehoseShared>>,
79    /// per-relay running tasks, keyed by url.
80    pub(super) tasks: Arc<scc::HashMap<Url, FirehoseIngestorHandle>>,
81    /// set of urls persisted in the database (dynamically added sources).
82    pub(super) persisted: Arc<scc::HashSet<Url>>,
83}
84
85impl FirehoseHandle {
86    /// enable the firehose. no-op if already enabled.
87    pub fn enable(&self) {
88        self.state.firehose_enabled.send_replace(true);
89    }
90    /// disable the firehose. the current message finishes processing before the connection closes.
91    pub fn disable(&self) {
92        self.state.firehose_enabled.send_replace(false);
93    }
94    /// returns the current enabled state of the firehose.
95    pub fn is_enabled(&self) -> bool {
96        *self.state.firehose_enabled.borrow()
97    }
98
99    /// reset the stored cursor for the given relay URL.
100    ///
101    /// clears the `firehose_cursor|{url}` entry from the cursors keyspace and zeroes the
102    /// in-memory cursor. the next connection will tail live events from the current head.
103    pub async fn reset_cursor(&self, url: &str) -> Result<()> {
104        let db = self.state.db.clone();
105        let key = keys::firehose_cursor_key(url);
106        tokio::task::spawn_blocking(move || db.cursors.remove(key).into_diagnostic())
107            .await
108            .into_diagnostic()??;
109
110        if let Ok(relay_url) = Url::parse(url) {
111            self.state.relay_cursors.peek_with(&relay_url, |_, c| {
112                c.store(0, Ordering::SeqCst);
113            });
114        }
115        Ok(())
116    }
117
118    /// return info on all currently active firehose sources.
119    pub async fn list_sources(&self) -> Vec<FirehoseSourceInfo> {
120        let mut sources = Vec::new();
121        self.tasks
122            .iter_async(|url, _| {
123                sources.push(FirehoseSourceInfo {
124                    url: url.clone(),
125                    persisted: self.persisted.contains_sync(url),
126                });
127                true
128            })
129            .await;
130        sources
131    }
132
133    /// add a new firehose relay at runtime.
134    ///
135    /// the URL is persisted to the database and will be re-spawned on restart. if a relay with
136    /// the same URL already exists it is replaced: the running task is stopped and a new one
137    /// is started. any cursor state for that URL is preserved.
138    ///
139    /// returns an error if called before [`Hydrant::run`].
140    pub async fn add_source(&self, url: Url) -> Result<()> {
141        let Some(shared) = self.shared.get() else {
142            miette::bail!("firehose not yet started: call Hydrant::run() first");
143        };
144
145        let db = self.state.db.clone();
146        let key = keys::firehose_source_key(url.as_str());
147        tokio::task::spawn_blocking(move || db.crawler.insert(key, b"").into_diagnostic())
148            .await
149            .into_diagnostic()??;
150
151        let enabled_rx = self.state.firehose_enabled.subscribe();
152        let handle = spawn_firehose_ingestor(&url, &self.state, shared, enabled_rx).await?;
153
154        let _ = self.persisted.insert_async(url.clone()).await;
155        match self.tasks.entry_async(url).await {
156            scc::hash_map::Entry::Vacant(e) => {
157                e.insert_entry(handle);
158            }
159            scc::hash_map::Entry::Occupied(mut e) => {
160                *e.get_mut() = handle;
161            }
162        }
163        Ok(())
164    }
165
166    /// remove a firehose relay at runtime by URL.
167    ///
168    /// aborts the running ingestor task. if the source was added via the API it is removed from
169    /// the database and will not reappear on restart. `RELAY_HOSTS` sources are only stopped for
170    /// the current session; they reappear on the next restart.
171    ///
172    /// returns `true` if the relay was found and removed, `false` if it was not running.
173    /// returns an error if called before [`Hydrant::run`].
174    pub async fn remove_source(&self, url: &Url) -> Result<bool> {
175        if self.shared.get().is_none() {
176            miette::bail!("firehose not yet started: call Hydrant::run() first");
177        }
178
179        if self.tasks.remove_async(url).await.is_none() {
180            return Ok(false);
181        }
182
183        // remove from relay_cursors (persist thread will stop tracking it)
184        self.state.relay_cursors.remove_async(url).await;
185
186        if self.persisted.remove_async(url).await.is_some() {
187            let db = self.state.db.clone();
188            let key = keys::firehose_source_key(url.as_str());
189            tokio::task::spawn_blocking(move || db.crawler.remove(key).into_diagnostic())
190                .await
191                .into_diagnostic()??;
192        }
193
194        Ok(true)
195    }
196}