1#![allow(unused_imports)]
2
3#[cfg(feature = "indexer")]
4pub(crate) mod crawler;
5pub(crate) mod filter;
6pub(crate) mod firehose;
7pub(crate) mod pds;
8pub(crate) mod repos;
9mod seed;
10pub(crate) mod stream;
11
12#[cfg(feature = "indexer")]
13mod indexer;
14#[cfg(feature = "indexer")]
15pub use indexer::*;
16
17#[cfg(feature = "relay")]
18mod relay;
19#[cfg(feature = "relay")]
20pub use relay::*;
21
22pub use filter::{FilterControl, FilterPatch, FilterSnapshot};
23pub use firehose::{FirehoseHandle, FirehoseSourceInfo};
24pub use pds::{PdsControl, PdsTierAssignment, PdsTierDefinition};
25pub use repos::{ListedRecord, Record, RecordList, RepoHandle, RepoInfo, ReposControl};
26use smol_str::{SmolStr, ToSmolStr};
27
28use std::collections::BTreeMap;
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::task::{Context, Poll};
34
35use futures::{FutureExt, Stream};
36use miette::{IntoDiagnostic, Result, WrapErr};
37use tokio::sync::{mpsc, watch};
38use tracing::{debug, error, info};
39
40#[cfg(feature = "indexer")]
41use crate::backfill::BackfillWorker;
42use crate::config::{Config, SignatureVerification};
43#[cfg(feature = "indexer")]
44use crate::db::load_persisted_crawler_sources;
45use crate::db::{self, filter as db_filter, keys, load_persisted_firehose_sources};
46use crate::filter::FilterMode;
47#[cfg(feature = "indexer")]
48use crate::ingest::indexer::FirehoseWorker;
49use crate::pds_meta::{PdsMeta, PdsMetaHandle};
50use crate::state::AppState;
51use crate::types::MarshallableEvt;
52
53use firehose::FirehoseShared;
54#[cfg(feature = "indexer")]
55use stream::event_stream_thread;
56#[cfg(feature = "relay")]
57use stream::relay_stream_thread;
58
59pub struct Host {
61 pub name: SmolStr,
63 pub seq: i64,
65 pub account_count: u64,
67 pub is_banned: bool,
69}
70
71pub type Event = MarshallableEvt<'static>;
80
81#[derive(Clone)]
103pub struct Hydrant {
104 #[cfg(feature = "indexer")]
105 pub crawler: crawler::CrawlerHandle,
106 pub firehose: FirehoseHandle,
107 #[cfg(feature = "indexer")]
108 pub backfill: BackfillHandle,
109 pub filter: FilterControl,
110 pub pds: PdsControl,
111 pub repos: ReposControl,
112 pub db: DbControl,
113 #[cfg(feature = "backlinks")]
114 pub backlinks: crate::backlinks::BacklinksControl,
115 pub(crate) state: Arc<AppState>,
116 config: Arc<Config>,
117 started: Arc<AtomicBool>,
118 _priv: (),
119}
120
121impl Hydrant {
122 pub async fn new(config: Config) -> Result<Self> {
128 info!("{config}");
129
130 let state = AppState::new(&config)?;
132
133 if config.full_network
135 || config.filter_signals.is_some()
136 || config.filter_collections.is_some()
137 || config.filter_excludes.is_some()
138 {
139 let filter_ks = state.db.filter.clone();
140 let inner = state.db.inner.clone();
141 let mode = config.full_network.then_some(FilterMode::Full);
142 let signals = config
143 .filter_signals
144 .clone()
145 .map(crate::patch::SetUpdate::Set);
146 let collections = config
147 .filter_collections
148 .clone()
149 .map(crate::patch::SetUpdate::Set);
150 let excludes = config
151 .filter_excludes
152 .clone()
153 .map(crate::patch::SetUpdate::Set);
154
155 tokio::task::spawn_blocking(move || {
156 let mut batch = inner.batch();
157 db_filter::apply_patch(
158 &mut batch,
159 &filter_ks,
160 mode,
161 signals,
162 collections,
163 excludes,
164 )?;
165 batch.commit().into_diagnostic()
166 })
167 .await
168 .into_diagnostic()??;
169
170 let new_filter = tokio::task::spawn_blocking({
172 let filter_ks = state.db.filter.clone();
173 move || db_filter::load(&filter_ks)
174 })
175 .await
176 .into_diagnostic()??;
177 state.filter.store(Arc::new(new_filter));
178 }
179
180 #[cfg(feature = "indexer")]
181 {
182 let post_patch_crawler = match config.enable_crawler {
184 Some(b) => b,
185 None => {
186 state.filter.load().mode == FilterMode::Full
187 || !config.crawler_sources.is_empty()
188 }
189 };
190 state.crawler_enabled.send_replace(post_patch_crawler);
191 }
192
193 let state = Arc::new(state);
194
195 Ok(Self {
196 firehose: FirehoseHandle::new(state.clone()),
197 filter: FilterControl(state.clone()),
198 pds: pds::PdsControl(state.clone()),
199 repos: ReposControl(state.clone()),
200 db: DbControl(state.clone()),
201 #[cfg(feature = "indexer")]
202 crawler: crawler::CrawlerHandle {
203 state: state.clone(),
204 shared: Arc::new(std::sync::OnceLock::new()),
205 tasks: Arc::new(scc::HashMap::new()),
206 persisted: Arc::new(scc::HashSet::new()),
207 },
208 #[cfg(feature = "indexer")]
209 backfill: BackfillHandle::new(state.clone()),
210 #[cfg(feature = "backlinks")]
211 backlinks: crate::backlinks::BacklinksControl(state.clone()),
212 state,
213 config: Arc::new(config),
214 started: Arc::new(AtomicBool::new(false)),
215 _priv: (),
216 })
217 }
218
219 pub async fn from_env() -> Result<Self> {
221 Self::new(Config::from_env()?).await
222 }
223
224 pub fn run(&self) -> Result<impl Future<Output = Result<()>>> {
233 let state = self.state.clone();
234 let config = self.config.clone();
235 #[cfg(feature = "indexer")]
236 let crawler = self.crawler.clone();
237 let firehose = self.firehose.clone();
238
239 if self.started.swap(true, Ordering::SeqCst) {
240 miette::bail!("Hydrant::run() called more than once");
241 }
242
243 let fut = async move {
244 let (buffer_tx, buffer_rx) = mpsc::channel::<crate::ingest::IngestMessage>(500);
246
247 #[cfg(feature = "indexer")]
249 let (indexer_tx, indexer_rx) =
250 mpsc::channel::<crate::ingest::indexer::IndexerMessage>(500);
251
252 #[cfg(feature = "indexer")]
254 tokio::spawn({
255 let state = state.clone();
256 BackfillWorker::new(
257 state.clone(),
258 indexer_tx.clone(),
259 config.repo_fetch_timeout,
260 config.backfill_concurrency_limit,
261 matches!(
262 config.verify_signatures,
263 SignatureVerification::Full | SignatureVerification::BackfillOnly
264 ),
265 config.ephemeral,
266 state.backfill_enabled.subscribe(),
267 )
268 .run()
269 });
270
271 #[cfg(feature = "indexer")]
273 {
274 if let Err(e) = tokio::task::spawn_blocking({
275 let state = state.clone();
276 move || crate::backfill::manager::queue_gone_backfills(&state)
277 })
278 .await
279 .into_diagnostic()?
280 {
281 error!(err = %e, "failed to queue gone backfills");
282 db::check_poisoned_report(&e);
283 }
284
285 std::thread::spawn({
286 let state = state.clone();
287 move || crate::backfill::manager::retry_worker(state)
288 });
289 }
290
291 #[cfg(feature = "indexer")]
293 if config.ephemeral {
294 let state = state.clone();
295 std::thread::Builder::new()
296 .name("ephemeral-gc".into())
297 .spawn(move || crate::db::ephemeral::ephemeral_ttl_worker(state))
298 .into_diagnostic()?;
299 }
300
301 #[cfg(feature = "relay")]
303 {
304 let state = state.clone();
305 std::thread::Builder::new()
306 .name("relay-events-gc".into())
307 .spawn(move || crate::db::ephemeral::relay_events_ttl_worker(state))
308 .into_diagnostic()?;
309 }
310
311 std::thread::spawn({
313 let state = state.clone();
314 let persist_interval = config.cursor_save_interval;
315 move || loop {
316 std::thread::sleep(persist_interval);
317
318 state.firehose_cursors.iter_sync(|relay, cursor| {
319 let seq = cursor.load(Ordering::SeqCst);
320 if seq > 0 {
321 if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) {
322 error!(relay = %relay, err = %e, "failed to save cursor");
323 db::check_poisoned_report(&e);
324 }
325 }
326 true
327 });
328
329 if let Err(e) = db::persist_counts(&state.db) {
330 error!(err = %e, "failed to persist counts");
331 db::check_poisoned_report(&e);
332 }
333
334 if let Err(e) = state.db.persist() {
335 error!(err = %e, "db persist failed");
336 db::check_poisoned_report(&e);
337 }
338 }
339 });
340
341 tokio::spawn({
343 let state = state.clone();
344 let get_id = |state: &AppState| {
345 #[cfg(feature = "indexer")]
346 let id = state.db.next_event_id.load(Ordering::Relaxed);
347 #[cfg(feature = "relay")]
348 let id = state.db.next_relay_seq.load(Ordering::Relaxed);
349 id
350 };
351 let mut last_id = get_id(&state);
352 let mut last_time = std::time::Instant::now();
353 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
354 async move {
355 loop {
356 interval.tick().await;
357
358 let current_id = get_id(&state);
359 let current_time = std::time::Instant::now();
360 let delta = current_id.saturating_sub(last_id);
361
362 if delta == 0 {
363 debug!("no new events in 60s");
364 continue;
365 }
366
367 let elapsed = current_time.duration_since(last_time).as_secs_f64();
368 let rate = if elapsed > 0.0 {
369 delta as f64 / elapsed
370 } else {
371 0.0
372 };
373 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)");
374
375 last_id = current_id;
376 last_time = current_time;
377 }
378 }
379 });
380
381 let (fatal_tx_inner, mut fatal_rx) = watch::channel(None);
382 let fatal_tx = Arc::new(fatal_tx_inner);
383
384 firehose
386 .shared
387 .set(FirehoseShared {
388 buffer_tx: buffer_tx.clone(),
389 verify_signatures: matches!(
390 config.verify_signatures,
391 SignatureVerification::Full
392 ),
393 })
394 .ok()
395 .expect("firehose shared already set");
396 let fire_shared = firehose.shared.get().unwrap();
397
398 let relay_hosts = config.relays.clone();
399 if !relay_hosts.is_empty() {
400 info!(
401 relay_count = relay_hosts.len(),
402 hosts = relay_hosts
403 .iter()
404 .map(|h| h.url.as_str())
405 .collect::<Vec<_>>()
406 .join(", "),
407 "starting firehose ingestor(s)"
408 );
409 for source in &relay_hosts {
410 firehose
411 .spawn_firehose_ingestor(source, fire_shared)
412 .await?;
413 }
414 }
415
416 let persisted_sources = tokio::task::spawn_blocking({
417 let state = state.clone();
418 move || load_persisted_firehose_sources(&state.db)
419 })
420 .await
421 .into_diagnostic()??;
422
423 for source in &persisted_sources {
424 let _ = firehose.persisted.insert_async(source.url.clone()).await;
425 if firehose.tasks.contains_async(&source.url).await {
426 continue;
427 }
428 firehose
429 .spawn_firehose_ingestor(source, fire_shared)
430 .await?;
431 }
432
433 if !config.seed_hosts.is_empty() {
435 let seed_urls = config.seed_hosts.clone();
436 let firehose = firehose.clone();
437 let state = state.clone();
438 tokio::spawn(async move {
439 seed::seed_from_list_hosts(&seed_urls, &firehose, &state).await;
440 });
441 }
442
443 #[cfg(feature = "indexer")]
445 {
446 use crate::crawler::{
447 CrawlerStats, CrawlerWorker, InFlight, RetryProducer, SignalChecker,
448 };
449 use crate::util::throttle::Throttler;
450
451 let http = reqwest::Client::builder()
452 .user_agent(concat!(
453 env!("CARGO_PKG_NAME"),
454 "/",
455 env!("CARGO_PKG_VERSION")
456 ))
457 .gzip(true)
458 .build()
459 .expect("that reqwest will build");
460 let pds_throttler = state.throttler.clone();
461 let in_flight = InFlight::new();
462 let stats = CrawlerStats::new(
463 state.clone(),
464 config
465 .crawler_sources
466 .iter()
467 .map(|s| s.url.clone())
468 .collect(),
469 pds_throttler.clone(),
470 );
471 let checker = SignalChecker {
472 http: http.clone(),
473 state: state.clone(),
474 throttler: pds_throttler,
475 };
476
477 info!(
478 max_pending = config.crawler_max_pending_repos,
479 resume_pending = config.crawler_resume_pending_repos,
480 enabled = *state.crawler_enabled.borrow(),
481 "starting crawler worker"
482 );
483 let (worker, tx) = CrawlerWorker::new(
484 state.clone(),
485 config.crawler_max_pending_repos,
486 config.crawler_resume_pending_repos,
487 stats.clone(),
488 );
489 tokio::spawn(async move {
490 worker.run().await;
491 error!("crawler worker exited unexpectedly, aborting");
492 std::process::abort();
493 });
494
495 let ticker = tokio::spawn(stats.clone().task());
496 tokio::spawn(async move {
497 match ticker.await {
498 Err(e) => error!(err = ?e, "stats ticker panicked, aborting"),
499 Ok(()) => error!("stats ticker exited unexpectedly, aborting"),
500 }
501 std::process::abort();
502 });
503
504 tokio::spawn(
505 RetryProducer {
506 checker: checker.clone(),
507 in_flight: in_flight.clone(),
508 tx: tx.clone(),
509 }
510 .run(),
511 );
512
513 crawler
515 .shared
516 .set(crawler::CrawlerShared {
517 http,
518 checker,
519 in_flight,
520 tx,
521 stats,
522 })
523 .ok()
524 .expect("crawler shared already set");
525 let shared = crawler.shared.get().unwrap();
526
527 for source in config.crawler_sources.iter() {
529 let enabled_rx = state.crawler_enabled.subscribe();
530 let handle = crawler::spawn_crawler_producer(
531 source,
532 &shared.http,
533 &state,
534 &shared.checker,
535 &shared.in_flight,
536 &shared.tx,
537 &shared.stats,
538 enabled_rx,
539 );
540 let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
541 }
542
543 let persisted_sources = tokio::task::spawn_blocking({
544 let state = state.clone();
545 move || load_persisted_crawler_sources(&state.db)
546 })
547 .await
548 .into_diagnostic()??;
549
550 for source in &persisted_sources {
551 let _ = crawler.persisted.insert_async(source.url.clone()).await;
552 if crawler.tasks.contains_async(&source.url).await {
553 continue;
554 }
555 let enabled_rx = state.crawler_enabled.subscribe();
556 let handle = crawler::spawn_crawler_producer(
557 source,
558 &shared.http,
559 &state,
560 &shared.checker,
561 &shared.in_flight,
562 &shared.tx,
563 &shared.stats,
564 enabled_rx,
565 );
566 let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
567 }
568 }
569
570 let relay_worker = std::thread::spawn({
572 let state = state.clone();
573 let handle = tokio::runtime::Handle::current();
574 let config = config.clone();
575
576 #[cfg(feature = "indexer")]
577 let hook = indexer_tx.clone();
578
579 move || {
580 crate::ingest::relay::RelayWorker::new(
581 state,
582 buffer_rx,
583 #[cfg(feature = "indexer")]
584 hook,
585 matches!(config.verify_signatures, SignatureVerification::Full),
586 config.firehose_workers,
587 crate::ingest::validation::ValidationOptions {
588 verify_mst: config.verify_mst,
589 rev_clock_skew_secs: config.rev_clock_skew_secs,
590 },
591 )
592 .run(handle)
593 }
594 });
595
596 let tx = Arc::clone(&fatal_tx);
597 tokio::spawn(
598 tokio::task::spawn_blocking(move || {
599 relay_worker
600 .join()
601 .map_err(|e| miette::miette!("relay worker died: {e:?}"))
602 })
603 .map(move |r| {
604 let result = r.into_diagnostic().flatten().flatten();
605 let _ = tx.send(Some(result.map_err(|e| e.to_string())));
606 }),
607 );
608
609 #[cfg(feature = "indexer")]
611 let firehose_worker = std::thread::spawn({
612 let state = state.clone();
613 let handle = tokio::runtime::Handle::current();
614 let config = config.clone();
615 move || {
616 FirehoseWorker::new(
617 state,
618 indexer_rx,
619 config.ephemeral,
620 config.firehose_workers,
621 )
622 .run(handle)
623 }
624 });
625
626 #[cfg(feature = "indexer")]
627 {
628 let tx = Arc::clone(&fatal_tx);
629 tokio::spawn(
630 tokio::task::spawn_blocking(move || {
631 firehose_worker
632 .join()
633 .map_err(|e| miette::miette!("firehose worker died: {e:?}"))
634 })
635 .map(move |r| {
636 let result = r.into_diagnostic().flatten().flatten();
637 let _ = tx.send(Some(result.map_err(|e| e.to_string())));
638 }),
639 );
640 }
641
642 drop(fatal_tx);
646
647 loop {
648 match fatal_rx.changed().await {
649 Ok(()) => {
650 if let Some(result) = fatal_rx.borrow().clone() {
651 return result.map_err(|s| miette::miette!("{s}"));
652 }
653 }
654 Err(_) => return Ok(()),
656 }
657 }
658 };
659 Ok(fut)
660 }
661
662 pub async fn stats(&self) -> Result<StatsResponse> {
669 let state = self.state.clone();
670
671 #[allow(unused_mut)]
672 let mut count_keys = vec![
673 "repos",
674 "error_ratelimited",
675 "error_transport",
676 "error_generic",
677 ];
678
679 #[cfg(feature = "indexer")]
680 {
681 count_keys.push("pending");
682 count_keys.push("records");
683 count_keys.push("blocks");
684 count_keys.push("resync");
685 }
686
687 let mut counts: BTreeMap<&'static str, u64> =
688 futures::future::join_all(count_keys.into_iter().map(|name| {
689 let state = state.clone();
690 async move { (name, state.db.get_count(name).await) }
691 }))
692 .await
693 .into_iter()
694 .collect();
695
696 #[cfg(feature = "indexer")]
697 counts.insert("events", state.db.events.approximate_len() as u64);
698
699 #[cfg(feature = "relay")]
700 counts.insert(
701 "relay_events",
702 state.db.relay_events.approximate_len() as u64,
703 );
704
705 let sizes = tokio::task::spawn_blocking(move || {
706 let mut s = BTreeMap::new();
707 s.insert("repos", state.db.repos.disk_space());
708 s.insert("cursors", state.db.cursors.disk_space());
709 s.insert("counts", state.db.counts.disk_space());
710 s.insert("filter", state.db.filter.disk_space());
711 s.insert("crawler", state.db.crawler.disk_space());
712
713 #[cfg(feature = "indexer")]
714 {
715 s.insert("records", state.db.records.disk_space());
716 s.insert("blocks", state.db.blocks.disk_space());
717 s.insert("pending", state.db.pending.disk_space());
718 s.insert("resync", state.db.resync.disk_space());
719 s.insert("resync_buffer", state.db.resync_buffer.disk_space());
720 s.insert("events", state.db.events.disk_space());
721 }
722
723 #[cfg(feature = "relay")]
724 s.insert("relay_events", state.db.relay_events.disk_space());
725
726 #[cfg(feature = "backlinks")]
727 s.insert("backlinks", state.db.backlinks.disk_space());
728
729 s
730 })
731 .await
732 .into_diagnostic()?;
733
734 Ok(StatsResponse { counts, sizes })
735 }
736
737 pub fn serve(&self, port: u16) -> impl Future<Output = Result<()>> {
748 let hydrant = self.clone();
749 async move { crate::api::serve(hydrant, port).await }
750 }
751
752 pub fn serve_debug(&self, port: u16) -> impl Future<Output = Result<()>> {
757 let state = self.state.clone();
758 async move { crate::api::serve_debug(state, port).await }
759 }
760
761 pub async fn get_host_status(&self, hostname: &str) -> Result<Option<Host>> {
765 let state = self.state.clone();
766 let hostname = hostname.to_smolstr();
767
768 tokio::task::spawn_blocking(move || {
769 let key = keys::firehose_cursor_key(&hostname);
770 let Some(seq) = state.db.cursors.get(&key).into_diagnostic()? else {
771 return Ok(None);
772 };
773 let seq = i64::from_be_bytes(
774 seq.as_ref()
775 .try_into()
776 .into_diagnostic()
777 .wrap_err("cursor value is not 8 bytes")?,
778 );
779 let account_count = state
780 .db
781 .get_count_sync(&keys::pds_account_count_key(&hostname));
782 let is_banned = state.pds_meta.load().is_banned(&hostname);
783
784 Ok(Some(Host {
785 name: hostname.into(),
786 seq,
787 account_count,
788 is_banned,
789 }))
790 })
791 .await
792 .into_diagnostic()?
793 }
794
795 pub async fn list_hosts(
799 &self,
800 cursor: Option<&str>,
801 limit: usize,
802 ) -> Result<(Vec<Host>, Option<SmolStr>)> {
803 let state = self.state.clone();
804 let cursor = cursor.map(str::to_string);
805
806 tokio::task::spawn_blocking(move || {
807 let prefix_end = {
808 let mut end = keys::FIREHOSE_CURSOR_PREFIX.to_vec();
809 *end.last_mut().unwrap() += 1;
810 end
811 };
812 let start_bound = match cursor.as_deref() {
813 Some(host) => std::ops::Bound::Excluded(keys::firehose_cursor_key(host)),
814 None => std::ops::Bound::Included(keys::FIREHOSE_CURSOR_PREFIX.to_vec()),
815 };
816
817 let mut hosts: Vec<Host> = Vec::with_capacity(limit + 1);
819 for item in state
820 .db
821 .cursors
822 .range((start_bound, std::ops::Bound::Excluded(prefix_end)))
823 .take(limit + 1)
824 {
825 let (k, v) = item.into_inner().into_diagnostic()?;
826 let hostname = std::str::from_utf8(&k[keys::FIREHOSE_CURSOR_PREFIX.len()..])
827 .into_diagnostic()
828 .wrap_err("firehose cursor key contains non-utf8 hostname")?;
829 let seq = i64::from_be_bytes(
830 v.as_ref()
831 .try_into()
832 .into_diagnostic()
833 .wrap_err("cursor value is not 8 bytes")?,
834 );
835 let account_count = state
836 .db
837 .get_count_sync(&keys::pds_account_count_key(hostname));
838 let is_banned = state.pds_meta.load().is_banned(&hostname);
839 hosts.push(Host {
840 name: hostname.into(),
841 seq,
842 account_count,
843 is_banned,
844 });
845 }
846
847 let next_cursor = if hosts.len() > limit {
848 hosts.pop();
849 hosts.last().map(|h| h.name.clone())
850 } else {
851 None
852 };
853
854 Ok((hosts, next_cursor))
855 })
856 .await
857 .into_diagnostic()?
858 }
859}
860
861impl axum::extract::FromRef<Hydrant> for Arc<AppState> {
862 fn from_ref(h: &Hydrant) -> Self {
863 h.state.clone()
864 }
865}
866
867#[derive(serde::Serialize)]
869pub struct StatsResponse {
870 pub counts: BTreeMap<&'static str, u64>,
872 pub sizes: BTreeMap<&'static str, u64>,
874}
875
876#[derive(Clone)]
882pub struct DbControl(Arc<AppState>);
883
884impl DbControl {
885 pub async fn compact(&self) -> Result<()> {
890 let state = self.0.clone();
891 state
892 .with_ingestion_paused(async || state.db.compact().await)
893 .await
894 }
895
896 pub async fn train_dicts(&self) -> Result<()> {
902 let state = self.0.clone();
903 state
904 .with_ingestion_paused(async || {
905 let train = |name: &'static str| {
906 let state = state.clone();
907 tokio::task::spawn_blocking(move || state.db.train_dict(name))
908 .map(|res: Result<_, _>| res.into_diagnostic().flatten())
909 };
910 tokio::try_join!(train("repos"), train("blocks"), train("events")).map(|_| ())
911 })
912 .await
913 }
914}