hydrant/control/
firehose.rs1use std::sync::Arc;
2
3use miette::{IntoDiagnostic, Result};
4use tokio::sync::watch;
5use tracing::{error, info};
6use url::Url;
7
8use crate::config::FirehoseSource;
9use crate::db::{self, keys};
10use crate::ingest::{BufferTx, firehose::FirehoseIngestor};
11use crate::state::AppState;
12
13pub(super) struct FirehoseIngestorHandle {
14 abort: tokio::task::AbortHandle,
15 pub(super) is_pds: bool,
16}
17
18impl Drop for FirehoseIngestorHandle {
19 fn drop(&mut self) {
20 self.abort.abort();
21 }
22}
23
24pub(super) struct FirehoseShared {
25 pub(super) buffer_tx: BufferTx,
26 pub(super) verify_signatures: bool,
27}
28
29#[derive(Debug, Clone, serde::Serialize)]
31pub struct FirehoseSourceInfo {
32 pub url: Url,
33 pub persisted: bool,
35 pub is_pds: bool,
37}
38
39#[derive(Clone)]
41pub struct FirehoseHandle {
42 pub(super) state: Arc<AppState>,
43 pub(super) shared: Arc<std::sync::OnceLock<FirehoseShared>>,
45 pub(super) tasks: Arc<scc::HashMap<Url, FirehoseIngestorHandle>>,
47 pub(super) persisted: Arc<scc::HashSet<Url>>,
49}
50
51impl FirehoseHandle {
52 pub(super) fn new(state: Arc<AppState>) -> Self {
53 Self {
54 state,
55 shared: Arc::new(std::sync::OnceLock::new()),
56 tasks: Arc::new(scc::HashMap::new()),
57 persisted: Arc::new(scc::HashSet::new()),
58 }
59 }
60
61 pub(super) async fn spawn_firehose_ingestor(
62 &self,
63 source: &FirehoseSource,
64 shared: &FirehoseShared,
65 ) -> Result<()> {
66 use std::sync::atomic::AtomicI64;
67 let state = &self.state;
68
69 let start = db::get_firehose_cursor(&state.db, &source.url).await?;
70 let _ = state
72 .firehose_cursors
73 .insert_async(source.url.clone(), AtomicI64::new(start.unwrap_or(0)))
74 .await;
75
76 info!(relay = %source.url, source.is_pds, cursor = ?start, "starting firehose ingestor");
77
78 let enabled = state.firehose_enabled.subscribe();
79 let ingestor = FirehoseIngestor::new(
80 state.clone(),
81 shared.buffer_tx.clone(),
82 source.url.clone(),
83 source.is_pds,
84 state.filter.clone(),
85 enabled,
86 shared.verify_signatures,
87 )
88 .await;
89
90 let abort = tokio::spawn({
91 let relay_url = source.url.clone();
92 let tasks = self.tasks.clone();
93 async move {
94 if let Err(e) = ingestor.run().await {
95 error!(relay = %relay_url, err = %e, "firehose ingestor exited with error");
96 } else {
97 tasks.remove_async(&relay_url).await;
99 info!(relay = %relay_url, "firehose shut down!");
100 }
101 }
102 })
103 .abort_handle();
104
105 let handle = FirehoseIngestorHandle {
106 abort,
107 is_pds: source.is_pds,
108 };
109 self.tasks.upsert_async(source.url.clone(), handle).await;
110
111 Ok(())
112 }
113
114 pub fn enable(&self) {
116 self.state.firehose_enabled.send_replace(true);
117 }
118 pub fn disable(&self) {
120 self.state.firehose_enabled.send_replace(false);
121 }
122 pub fn is_enabled(&self) -> bool {
124 *self.state.firehose_enabled.borrow()
125 }
126
127 pub async fn list_sources(&self) -> Vec<FirehoseSourceInfo> {
129 let mut out = Vec::new();
130 self.tasks
131 .any_async(|url, handle| {
132 out.push(FirehoseSourceInfo {
133 url: url.clone(),
134 persisted: self.persisted.contains_sync(url),
135 is_pds: handle.is_pds,
136 });
137 false
138 })
139 .await;
140 out
141 }
142
143 pub async fn add_source(&self, url: Url, is_pds: bool) -> Result<()> {
149 let shared = self
150 .shared
151 .get()
152 .ok_or_else(|| miette::miette!("firehose worker not started"))?;
153
154 let key = keys::firehose_source_key(url.as_str());
156 tokio::task::spawn_blocking({
157 let state = self.state.clone();
158 move || {
159 let mut batch = state.db.inner.batch();
160 let value = rmp_serde::to_vec(&db::FirehoseSourceMeta { is_pds }).map_err(|e| {
161 miette::miette!("failed to serialize firehose source meta: {e}")
162 })?;
163 batch.insert(&state.db.crawler, key, &value);
164 batch.commit().into_diagnostic()?;
165 state.db.persist()
166 }
167 })
168 .await
169 .into_diagnostic()??;
170
171 let _ = self.persisted.insert_async(url.clone()).await;
172
173 self.spawn_firehose_ingestor(&FirehoseSource { url, is_pds }, shared)
174 .await?;
175
176 Ok(())
177 }
178
179 pub async fn remove_source(&self, url: &Url) -> Result<bool> {
185 if self.persisted.contains_async(url).await {
186 let url_str = url.to_string();
187 tokio::task::spawn_blocking({
188 let state = self.state.clone();
189 move || {
190 state
191 .db
192 .crawler
193 .remove(keys::firehose_source_key(&url_str))
194 .into_diagnostic()?;
195 state.db.persist()
196 }
197 })
198 .await
199 .into_diagnostic()??;
200 self.persisted.remove_async(url).await;
201 }
202
203 Ok(self.tasks.remove_async(url).await.is_some())
204 }
205
206 pub async fn reset_cursor(&self, url: &str) -> Result<()> {
208 let url = Url::parse(url).into_diagnostic()?;
209 let key = keys::firehose_cursor_key_from_url(&url);
210 tokio::task::spawn_blocking({
211 let state = self.state.clone();
212 move || {
213 state.db.cursors.remove(key).into_diagnostic()?;
214 state.db.persist()
215 }
216 })
217 .await
218 .into_diagnostic()??;
219
220 self.state.firehose_cursors.remove_async(&url).await;
221
222 Ok(())
223 }
224}