hydrant/control/
firehose.rs

1use std::sync::Arc;
2
3use miette::{IntoDiagnostic, Result};
4use tokio::sync::watch;
5use tracing::{error, info};
6use url::Url;
7
8use crate::config::FirehoseSource;
9use crate::db::{self, keys};
10use crate::ingest::{BufferTx, firehose::FirehoseIngestor};
11use crate::state::AppState;
12
13pub(super) struct FirehoseIngestorHandle {
14    abort: tokio::task::AbortHandle,
15    pub(super) is_pds: bool,
16}
17
18impl Drop for FirehoseIngestorHandle {
19    fn drop(&mut self) {
20        self.abort.abort();
21    }
22}
23
24pub(super) struct FirehoseShared {
25    pub(super) buffer_tx: BufferTx,
26    pub(super) verify_signatures: bool,
27}
28
29/// a snapshot of a single firehose relay's runtime state.
30#[derive(Debug, Clone, serde::Serialize)]
31pub struct FirehoseSourceInfo {
32    pub url: Url,
33    /// true if added via the API and persisted to the database; false for `RELAY_HOSTS` sources.
34    pub persisted: bool,
35    /// true when this is a direct PDS connection; enables host authority enforcement.
36    pub is_pds: bool,
37}
38
39/// runtime control over the firehose ingestor component.
40#[derive(Clone)]
41pub struct FirehoseHandle {
42    pub(super) state: Arc<AppState>,
43    /// set once by [`Hydrant::run`]; `None` means run() has not been called yet.
44    pub(super) shared: Arc<std::sync::OnceLock<FirehoseShared>>,
45    /// per-relay running tasks, keyed by url.
46    pub(super) tasks: Arc<scc::HashMap<Url, FirehoseIngestorHandle>>,
47    /// set of urls persisted in the database (dynamically added sources).
48    pub(super) persisted: Arc<scc::HashSet<Url>>,
49}
50
51impl FirehoseHandle {
52    pub(super) fn new(state: Arc<AppState>) -> Self {
53        Self {
54            state,
55            shared: Arc::new(std::sync::OnceLock::new()),
56            tasks: Arc::new(scc::HashMap::new()),
57            persisted: Arc::new(scc::HashSet::new()),
58        }
59    }
60
61    pub(super) async fn spawn_firehose_ingestor(
62        &self,
63        source: &FirehoseSource,
64        shared: &FirehoseShared,
65    ) -> Result<()> {
66        use std::sync::atomic::AtomicI64;
67        let state = &self.state;
68
69        let start = db::get_firehose_cursor(&state.db, &source.url).await?;
70        // insert into relay_cursors if not already present; existing in-memory cursor takes precedence
71        let _ = state
72            .firehose_cursors
73            .insert_async(source.url.clone(), AtomicI64::new(start.unwrap_or(0)))
74            .await;
75
76        info!(relay = %source.url, source.is_pds, cursor = ?start, "starting firehose ingestor");
77
78        let enabled = state.firehose_enabled.subscribe();
79        let ingestor = FirehoseIngestor::new(
80            state.clone(),
81            shared.buffer_tx.clone(),
82            source.url.clone(),
83            source.is_pds,
84            state.filter.clone(),
85            enabled,
86            shared.verify_signatures,
87        )
88        .await;
89
90        let abort = tokio::spawn({
91            let relay_url = source.url.clone();
92            let tasks = self.tasks.clone();
93            async move {
94                if let Err(e) = ingestor.run().await {
95                    error!(relay = %relay_url, err = %e, "firehose ingestor exited with error");
96                } else {
97                    // remove from tasks since we shutdown
98                    tasks.remove_async(&relay_url).await;
99                    info!(relay = %relay_url, "firehose shut down!");
100                }
101            }
102        })
103        .abort_handle();
104
105        let handle = FirehoseIngestorHandle {
106            abort,
107            is_pds: source.is_pds,
108        };
109        self.tasks.upsert_async(source.url.clone(), handle).await;
110
111        Ok(())
112    }
113
114    /// enable firehose ingestion, no-op if already enabled.
115    pub fn enable(&self) {
116        self.state.firehose_enabled.send_replace(true);
117    }
118    /// disable firehose ingestion, in-flight messages complete before pausing.
119    pub fn disable(&self) {
120        self.state.firehose_enabled.send_replace(false);
121    }
122    /// returns the current enabled state of firehose ingestion.
123    pub fn is_enabled(&self) -> bool {
124        *self.state.firehose_enabled.borrow()
125    }
126
127    /// list all currently active firehose sources.
128    pub async fn list_sources(&self) -> Vec<FirehoseSourceInfo> {
129        let mut out = Vec::new();
130        self.tasks
131            .any_async(|url, handle| {
132                out.push(FirehoseSourceInfo {
133                    url: url.clone(),
134                    persisted: self.persisted.contains_sync(url),
135                    is_pds: handle.is_pds,
136                });
137                false
138            })
139            .await;
140        out
141    }
142
143    /// add a new firehose source at runtime, persisting it to the database.
144    ///
145    /// if a source with the same URL already exists, it is replaced: the
146    /// running task is stopped and a new one is started with the new `is_pds`
147    /// setting. existing cursor state for the URL is preserved.
148    pub async fn add_source(&self, url: Url, is_pds: bool) -> Result<()> {
149        let shared = self
150            .shared
151            .get()
152            .ok_or_else(|| miette::miette!("firehose worker not started"))?;
153
154        // persist to db first
155        let key = keys::firehose_source_key(url.as_str());
156        tokio::task::spawn_blocking({
157            let state = self.state.clone();
158            move || {
159                let mut batch = state.db.inner.batch();
160                let value = rmp_serde::to_vec(&db::FirehoseSourceMeta { is_pds }).map_err(|e| {
161                    miette::miette!("failed to serialize firehose source meta: {e}")
162                })?;
163                batch.insert(&state.db.crawler, key, &value);
164                batch.commit().into_diagnostic()?;
165                state.db.persist()
166            }
167        })
168        .await
169        .into_diagnostic()??;
170
171        let _ = self.persisted.insert_async(url.clone()).await;
172
173        self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared)
174            .await?;
175
176        Ok(())
177    }
178
179    /// remove a firehose source at runtime.
180    ///
181    /// returns `true` if the source was found and removed, `false` otherwise.
182    /// if the source was added via the API, it is removed from the database;
183    /// if it came from the static config, only the running task is stopped.
184    pub async fn remove_source(&self, url: &Url) -> Result<bool> {
185        if self.persisted.contains_async(url).await {
186            let url_str = url.to_string();
187            tokio::task::spawn_blocking({
188                let state = self.state.clone();
189                move || {
190                    state
191                        .db
192                        .crawler
193                        .remove(keys::firehose_source_key(&url_str))
194                        .into_diagnostic()?;
195                    state.db.persist()
196                }
197            })
198            .await
199            .into_diagnostic()??;
200            self.persisted.remove_async(url).await;
201        }
202
203        Ok(self.tasks.remove_async(url).await.is_some())
204    }
205
206    /// reset the stored firehose cursor for a given URL.
207    pub async fn reset_cursor(&self, url: &str) -> Result<()> {
208        let url = Url::parse(url).into_diagnostic()?;
209        let key = keys::firehose_cursor_key_from_url(&url);
210        tokio::task::spawn_blocking({
211            let state = self.state.clone();
212            move || {
213                state.db.cursors.remove(key).into_diagnostic()?;
214                state.db.persist()
215            }
216        })
217        .await
218        .into_diagnostic()??;
219
220        self.state.firehose_cursors.remove_async(&url).await;
221
222        Ok(())
223    }
224}