hydrant/control/
firehose.rs1use std::sync::Arc;
2use std::sync::atomic::Ordering;
3
4use miette::{IntoDiagnostic, Result};
5use tokio::sync::watch;
6use tracing::{error, info};
7use url::Url;
8
9use crate::db::{self, keys};
10use crate::ingest::{BufferTx, firehose::FirehoseIngestor};
11use crate::state::AppState;
12
13pub(super) struct FirehoseIngestorHandle {
14 abort: tokio::task::AbortHandle,
15}
16
17impl Drop for FirehoseIngestorHandle {
18 fn drop(&mut self) {
19 self.abort.abort();
20 }
21}
22
23pub(super) struct FirehoseShared {
24 pub(super) buffer_tx: BufferTx,
25 pub(super) verify_signatures: bool,
26}
27
28#[derive(Debug, Clone, serde::Serialize)]
30pub struct FirehoseSourceInfo {
31 pub url: Url,
32 pub persisted: bool,
34}
35
36pub(super) async fn spawn_firehose_ingestor(
37 relay_url: &Url,
38 state: &Arc<AppState>,
39 shared: &FirehoseShared,
40 enabled: watch::Receiver<bool>,
41) -> Result<FirehoseIngestorHandle> {
42 use std::sync::atomic::AtomicI64;
43
44 let start = db::get_firehose_cursor(&state.db, relay_url).await?;
45 let _ = state
47 .relay_cursors
48 .insert_async(relay_url.clone(), AtomicI64::new(start.unwrap_or(0)))
49 .await;
50
51 info!(relay = %relay_url, cursor = ?start, "starting firehose ingestor");
52
53 let ingestor = FirehoseIngestor::new(
54 state.clone(),
55 shared.buffer_tx.clone(),
56 relay_url.clone(),
57 state.filter.clone(),
58 enabled,
59 shared.verify_signatures,
60 );
61
62 let relay_for_log = relay_url.clone();
63 let abort = tokio::spawn(async move {
64 if let Err(e) = ingestor.run().await {
65 error!(relay = %relay_for_log, err = %e, "firehose ingestor exited with error");
66 }
67 })
68 .abort_handle();
69
70 Ok(FirehoseIngestorHandle { abort })
71}
72
73#[derive(Clone)]
75pub struct FirehoseHandle {
76 pub(super) state: Arc<AppState>,
77 pub(super) shared: Arc<std::sync::OnceLock<FirehoseShared>>,
79 pub(super) tasks: Arc<scc::HashMap<Url, FirehoseIngestorHandle>>,
81 pub(super) persisted: Arc<scc::HashSet<Url>>,
83}
84
85impl FirehoseHandle {
86 pub fn enable(&self) {
88 self.state.firehose_enabled.send_replace(true);
89 }
90 pub fn disable(&self) {
92 self.state.firehose_enabled.send_replace(false);
93 }
94 pub fn is_enabled(&self) -> bool {
96 *self.state.firehose_enabled.borrow()
97 }
98
99 pub async fn reset_cursor(&self, url: &str) -> Result<()> {
104 let db = self.state.db.clone();
105 let key = keys::firehose_cursor_key(url);
106 tokio::task::spawn_blocking(move || db.cursors.remove(key).into_diagnostic())
107 .await
108 .into_diagnostic()??;
109
110 if let Ok(relay_url) = Url::parse(url) {
111 self.state.relay_cursors.peek_with(&relay_url, |_, c| {
112 c.store(0, Ordering::SeqCst);
113 });
114 }
115 Ok(())
116 }
117
118 pub async fn list_sources(&self) -> Vec<FirehoseSourceInfo> {
120 let mut sources = Vec::new();
121 self.tasks
122 .iter_async(|url, _| {
123 sources.push(FirehoseSourceInfo {
124 url: url.clone(),
125 persisted: self.persisted.contains_sync(url),
126 });
127 true
128 })
129 .await;
130 sources
131 }
132
133 pub async fn add_source(&self, url: Url) -> Result<()> {
141 let Some(shared) = self.shared.get() else {
142 miette::bail!("firehose not yet started: call Hydrant::run() first");
143 };
144
145 let db = self.state.db.clone();
146 let key = keys::firehose_source_key(url.as_str());
147 tokio::task::spawn_blocking(move || db.crawler.insert(key, b"").into_diagnostic())
148 .await
149 .into_diagnostic()??;
150
151 let enabled_rx = self.state.firehose_enabled.subscribe();
152 let handle = spawn_firehose_ingestor(&url, &self.state, shared, enabled_rx).await?;
153
154 let _ = self.persisted.insert_async(url.clone()).await;
155 match self.tasks.entry_async(url).await {
156 scc::hash_map::Entry::Vacant(e) => {
157 e.insert_entry(handle);
158 }
159 scc::hash_map::Entry::Occupied(mut e) => {
160 *e.get_mut() = handle;
161 }
162 }
163 Ok(())
164 }
165
166 pub async fn remove_source(&self, url: &Url) -> Result<bool> {
175 if self.shared.get().is_none() {
176 miette::bail!("firehose not yet started: call Hydrant::run() first");
177 }
178
179 if self.tasks.remove_async(url).await.is_none() {
180 return Ok(false);
181 }
182
183 self.state.relay_cursors.remove_async(url).await;
185
186 if self.persisted.remove_async(url).await.is_some() {
187 let db = self.state.db.clone();
188 let key = keys::firehose_source_key(url.as_str());
189 tokio::task::spawn_blocking(move || db.crawler.remove(key).into_diagnostic())
190 .await
191 .into_diagnostic()??;
192 }
193
194 Ok(true)
195 }
196}