hydrant/control/
mod.rs

1#![allow(unused_imports)]
2
3#[cfg(feature = "indexer")]
4pub(crate) mod crawler;
5pub(crate) mod filter;
6pub(crate) mod firehose;
7pub(crate) mod pds;
8pub(crate) mod repos;
9mod seed;
10pub(crate) mod stream;
11
12#[cfg(feature = "indexer")]
13mod indexer;
14#[cfg(feature = "indexer")]
15pub use indexer::*;
16
17#[cfg(feature = "relay")]
18mod relay;
19#[cfg(feature = "relay")]
20pub use relay::*;
21
22pub use filter::{FilterControl, FilterPatch, FilterSnapshot};
23pub use firehose::{FirehoseHandle, FirehoseSourceInfo};
24pub use pds::{PdsControl, PdsTierAssignment, PdsTierDefinition};
25pub use repos::{ListedRecord, Record, RecordList, RepoHandle, RepoInfo, ReposControl};
26use smol_str::{SmolStr, ToSmolStr};
27
28use std::collections::BTreeMap;
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::task::{Context, Poll};
34
35use futures::{FutureExt, Stream};
36use miette::{IntoDiagnostic, Result, WrapErr};
37use tokio::sync::{mpsc, watch};
38use tracing::{debug, error, info};
39
40#[cfg(feature = "indexer")]
41use crate::backfill::BackfillWorker;
42use crate::config::{Config, SignatureVerification};
43#[cfg(feature = "indexer")]
44use crate::db::load_persisted_crawler_sources;
45use crate::db::{self, filter as db_filter, keys, load_persisted_firehose_sources};
46use crate::filter::FilterMode;
47#[cfg(feature = "indexer")]
48use crate::ingest::indexer::FirehoseWorker;
49use crate::pds_meta::{PdsMeta, PdsMetaHandle};
50use crate::state::AppState;
51use crate::types::MarshallableEvt;
52
53use firehose::FirehoseShared;
54#[cfg(feature = "indexer")]
55use stream::event_stream_thread;
56#[cfg(feature = "relay")]
57use stream::relay_stream_thread;
58
59/// infromation about a host hydrant is consuming from.
60pub struct Host {
61    /// hostname of the host.
62    pub name: SmolStr,
63    /// latest seq hydrant has processed from this host.
64    pub seq: i64,
65    /// the amount of accounts hydrant has seen from this host.
66    pub account_count: u64,
67    /// whether this host is banned or not.
68    pub is_banned: bool,
69}
70
71/// an event emitted by the hydrant event stream.
72///
73/// three variants are possible depending on the `type` field:
74/// - `"record"`: a repo record was created, updated, or deleted. carries a [`RecordEvt`].
75/// - `"identity"`: a DID's handle or PDS changed. carries an [`IdentityEvt`]. ephemeral, not replayable.
76/// - `"account"`: a repo's active/inactive status changed. carries an [`AccountEvt`]. ephemeral, not replayable.
77///
78/// the `id` field is a monotonically increasing sequence number usable as a cursor for [`Hydrant::subscribe`].
79pub type Event = MarshallableEvt<'static>;
80
81/// the top-level handle to a hydrant instance.
82///
83/// `Hydrant` is cheaply cloneable. all sub-handles share the same underlying state.
84/// construct it via [`Hydrant::new`] or [`Hydrant::from_env`], configure the filter
85/// and repos as needed, then call [`Hydrant::run`] to start all background components.
86///
87/// # example
88///
89/// ```rust,no_run
90/// use hydrant::control::Hydrant;
91///
92/// #[tokio::main]
93/// async fn main() -> miette::Result<()> {
94///     let hydrant = Hydrant::from_env().await?;
95///
96///     tokio::select! {
97///         r = hydrant.run()?        => r,
98///         r = hydrant.serve(3000)  => r,
99///     }
100/// }
101/// ```
102#[derive(Clone)]
103pub struct Hydrant {
104    #[cfg(feature = "indexer")]
105    pub crawler: crawler::CrawlerHandle,
106    pub firehose: FirehoseHandle,
107    #[cfg(feature = "indexer")]
108    pub backfill: BackfillHandle,
109    pub filter: FilterControl,
110    pub pds: PdsControl,
111    pub repos: ReposControl,
112    pub db: DbControl,
113    #[cfg(feature = "backlinks")]
114    pub backlinks: crate::backlinks::BacklinksControl,
115    pub(crate) state: Arc<AppState>,
116    config: Arc<Config>,
117    started: Arc<AtomicBool>,
118    _priv: (),
119}
120
121impl Hydrant {
122    /// open the database and configure hydrant from `config`.
123    ///
124    /// this sets up the database, applies any filter configuration from `config`, and
125    /// initializes all sub-handles. no background tasks are started yet: call
126    /// [`run`](Self::run) to start all components and drive the instance.
127    pub async fn new(config: Config) -> Result<Self> {
128        info!("{config}");
129
130        // 1. open database and construct AppState
131        let state = AppState::new(&config)?;
132
133        // 2. apply any filter config from env variables
134        if config.full_network
135            || config.filter_signals.is_some()
136            || config.filter_collections.is_some()
137            || config.filter_excludes.is_some()
138        {
139            let filter_ks = state.db.filter.clone();
140            let inner = state.db.inner.clone();
141            let mode = config.full_network.then_some(FilterMode::Full);
142            let signals = config
143                .filter_signals
144                .clone()
145                .map(crate::patch::SetUpdate::Set);
146            let collections = config
147                .filter_collections
148                .clone()
149                .map(crate::patch::SetUpdate::Set);
150            let excludes = config
151                .filter_excludes
152                .clone()
153                .map(crate::patch::SetUpdate::Set);
154
155            tokio::task::spawn_blocking(move || {
156                let mut batch = inner.batch();
157                db_filter::apply_patch(
158                    &mut batch,
159                    &filter_ks,
160                    mode,
161                    signals,
162                    collections,
163                    excludes,
164                )?;
165                batch.commit().into_diagnostic()
166            })
167            .await
168            .into_diagnostic()??;
169
170            // 3. reload the live filter into the hot-path arc-swap
171            let new_filter = tokio::task::spawn_blocking({
172                let filter_ks = state.db.filter.clone();
173                move || db_filter::load(&filter_ks)
174            })
175            .await
176            .into_diagnostic()??;
177            state.filter.store(Arc::new(new_filter));
178        }
179
180        #[cfg(feature = "indexer")]
181        {
182            // 4. set crawler enabled state from config, evaluated against the post-patch filter
183            let post_patch_crawler = match config.enable_crawler {
184                Some(b) => b,
185                None => {
186                    state.filter.load().mode == FilterMode::Full
187                        || !config.crawler_sources.is_empty()
188                }
189            };
190            state.crawler_enabled.send_replace(post_patch_crawler);
191        }
192
193        let state = Arc::new(state);
194
195        Ok(Self {
196            firehose: FirehoseHandle::new(state.clone()),
197            filter: FilterControl(state.clone()),
198            pds: pds::PdsControl(state.clone()),
199            repos: ReposControl(state.clone()),
200            db: DbControl(state.clone()),
201            #[cfg(feature = "indexer")]
202            crawler: crawler::CrawlerHandle {
203                state: state.clone(),
204                shared: Arc::new(std::sync::OnceLock::new()),
205                tasks: Arc::new(scc::HashMap::new()),
206                persisted: Arc::new(scc::HashSet::new()),
207            },
208            #[cfg(feature = "indexer")]
209            backfill: BackfillHandle::new(state.clone()),
210            #[cfg(feature = "backlinks")]
211            backlinks: crate::backlinks::BacklinksControl(state.clone()),
212            state,
213            config: Arc::new(config),
214            started: Arc::new(AtomicBool::new(false)),
215            _priv: (),
216        })
217    }
218
219    /// reads config from environment variables and calls [`Hydrant::new`].
220    pub async fn from_env() -> Result<Self> {
221        Self::new(Config::from_env()?).await
222    }
223
224    /// start all background components and return a future that resolves when any
225    /// fatal component exits.
226    ///
227    /// starts the backfill worker, firehose ingestors, crawler, and worker thread.
228    /// resolves with `Ok(())` if a fatal component exits cleanly, or `Err(e)` if it
229    /// fails. intended for use in `tokio::select!` alongside [`serve`](Self::serve).
230    ///
231    /// returns an error if called more than once on the same `Hydrant` instance.
232    pub fn run(&self) -> Result<impl Future<Output = Result<()>>> {
233        let state = self.state.clone();
234        let config = self.config.clone();
235        #[cfg(feature = "indexer")]
236        let crawler = self.crawler.clone();
237        let firehose = self.firehose.clone();
238
239        if self.started.swap(true, Ordering::SeqCst) {
240            miette::bail!("Hydrant::run() called more than once");
241        }
242
243        let fut = async move {
244            // raw firehose events from pds/relay to RelayWorker
245            let (buffer_tx, buffer_rx) = mpsc::channel::<crate::ingest::IngestMessage>(500);
246
247            // validated IndexerMessages from RelayWorker/backfill to FirehoseWorker
248            #[cfg(feature = "indexer")]
249            let (indexer_tx, indexer_rx) =
250                mpsc::channel::<crate::ingest::indexer::IndexerMessage>(500);
251
252            // 5. spawn the backfill worker (not used in relay mode)
253            #[cfg(feature = "indexer")]
254            tokio::spawn({
255                let state = state.clone();
256                BackfillWorker::new(
257                    state.clone(),
258                    indexer_tx.clone(),
259                    config.repo_fetch_timeout,
260                    config.backfill_concurrency_limit,
261                    matches!(
262                        config.verify_signatures,
263                        SignatureVerification::Full | SignatureVerification::BackfillOnly
264                    ),
265                    config.ephemeral,
266                    state.backfill_enabled.subscribe(),
267                )
268                .run()
269            });
270
271            // 6. re-queue any repos that lost their backfill state, then start the retry worker
272            #[cfg(feature = "indexer")]
273            {
274                if let Err(e) = tokio::task::spawn_blocking({
275                    let state = state.clone();
276                    move || crate::backfill::manager::queue_gone_backfills(&state)
277                })
278                .await
279                .into_diagnostic()?
280                {
281                    error!(err = %e, "failed to queue gone backfills");
282                    db::check_poisoned_report(&e);
283                }
284
285                std::thread::spawn({
286                    let state = state.clone();
287                    move || crate::backfill::manager::retry_worker(state)
288                });
289            }
290
291            // 7. ephemeral GC thread (not used in relay mode)
292            #[cfg(feature = "indexer")]
293            if config.ephemeral {
294                let state = state.clone();
295                std::thread::Builder::new()
296                    .name("ephemeral-gc".into())
297                    .spawn(move || crate::db::ephemeral::ephemeral_ttl_worker(state))
298                    .into_diagnostic()?;
299            }
300
301            // relay events TTL: relay_events keyspace grows unbounded without pruning
302            #[cfg(feature = "relay")]
303            {
304                let state = state.clone();
305                std::thread::Builder::new()
306                    .name("relay-events-gc".into())
307                    .spawn(move || crate::db::ephemeral::relay_events_ttl_worker(state))
308                    .into_diagnostic()?;
309            }
310
311            // 8. cursor / counts persist thread
312            std::thread::spawn({
313                let state = state.clone();
314                let persist_interval = config.cursor_save_interval;
315                move || loop {
316                    std::thread::sleep(persist_interval);
317
318                    state.firehose_cursors.iter_sync(|relay, cursor| {
319                        let seq = cursor.load(Ordering::SeqCst);
320                        if seq > 0 {
321                            if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) {
322                                error!(relay = %relay, err = %e, "failed to save cursor");
323                                db::check_poisoned_report(&e);
324                            }
325                        }
326                        true
327                    });
328
329                    if let Err(e) = db::persist_counts(&state.db) {
330                        error!(err = %e, "failed to persist counts");
331                        db::check_poisoned_report(&e);
332                    }
333
334                    if let Err(e) = state.db.persist() {
335                        error!(err = %e, "db persist failed");
336                        db::check_poisoned_report(&e);
337                    }
338                }
339            });
340
341            // 9. events/sec stats ticker
342            tokio::spawn({
343                let state = state.clone();
344                let get_id = |state: &AppState| {
345                    #[cfg(feature = "indexer")]
346                    let id = state.db.next_event_id.load(Ordering::Relaxed);
347                    #[cfg(feature = "relay")]
348                    let id = state.db.next_relay_seq.load(Ordering::Relaxed);
349                    id
350                };
351                let mut last_id = get_id(&state);
352                let mut last_time = std::time::Instant::now();
353                let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
354                async move {
355                    loop {
356                        interval.tick().await;
357
358                        let current_id = get_id(&state);
359                        let current_time = std::time::Instant::now();
360                        let delta = current_id.saturating_sub(last_id);
361
362                        if delta == 0 {
363                            debug!("no new events in 60s");
364                            continue;
365                        }
366
367                        let elapsed = current_time.duration_since(last_time).as_secs_f64();
368                        let rate = if elapsed > 0.0 {
369                            delta as f64 / elapsed
370                        } else {
371                            0.0
372                        };
373                        info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)");
374
375                        last_id = current_id;
376                        last_time = current_time;
377                    }
378                }
379            });
380
381            let (fatal_tx_inner, mut fatal_rx) = watch::channel(None);
382            let fatal_tx = Arc::new(fatal_tx_inner);
383
384            // 10. set shared and spawn firehose ingestors
385            firehose
386                .shared
387                .set(FirehoseShared {
388                    buffer_tx: buffer_tx.clone(),
389                    verify_signatures: matches!(
390                        config.verify_signatures,
391                        SignatureVerification::Full
392                    ),
393                })
394                .ok()
395                .expect("firehose shared already set");
396            let fire_shared = firehose.shared.get().unwrap();
397
398            let relay_hosts = config.relays.clone();
399            if !relay_hosts.is_empty() {
400                info!(
401                    relay_count = relay_hosts.len(),
402                    hosts = relay_hosts
403                        .iter()
404                        .map(|h| h.url.as_str())
405                        .collect::<Vec<_>>()
406                        .join(", "),
407                    "starting firehose ingestor(s)"
408                );
409                for source in &relay_hosts {
410                    firehose
411                        .spawn_firehose_ingestor(source, fire_shared)
412                        .await?;
413                }
414            }
415
416            let persisted_sources = tokio::task::spawn_blocking({
417                let state = state.clone();
418                move || load_persisted_firehose_sources(&state.db)
419            })
420            .await
421            .into_diagnostic()??;
422
423            for source in &persisted_sources {
424                let _ = firehose.persisted.insert_async(source.url.clone()).await;
425                if firehose.tasks.contains_async(&source.url).await {
426                    continue;
427                }
428                firehose
429                    .spawn_firehose_ingestor(source, fire_shared)
430                    .await?;
431            }
432
433            // 10c. seed firehose PDS sources from listHosts on configured seed URLs
434            if !config.seed_hosts.is_empty() {
435                let seed_urls = config.seed_hosts.clone();
436                let firehose = firehose.clone();
437                let state = state.clone();
438                tokio::spawn(async move {
439                    seed::seed_from_list_hosts(&seed_urls, &firehose, &state).await;
440                });
441            }
442
443            // 11. spawn crawler infrastructure
444            #[cfg(feature = "indexer")]
445            {
446                use crate::crawler::{
447                    CrawlerStats, CrawlerWorker, InFlight, RetryProducer, SignalChecker,
448                };
449                use crate::util::throttle::Throttler;
450
451                let http = reqwest::Client::builder()
452                    .user_agent(concat!(
453                        env!("CARGO_PKG_NAME"),
454                        "/",
455                        env!("CARGO_PKG_VERSION")
456                    ))
457                    .gzip(true)
458                    .build()
459                    .expect("that reqwest will build");
460                let pds_throttler = state.throttler.clone();
461                let in_flight = InFlight::new();
462                let stats = CrawlerStats::new(
463                    state.clone(),
464                    config
465                        .crawler_sources
466                        .iter()
467                        .map(|s| s.url.clone())
468                        .collect(),
469                    pds_throttler.clone(),
470                );
471                let checker = SignalChecker {
472                    http: http.clone(),
473                    state: state.clone(),
474                    throttler: pds_throttler,
475                };
476
477                info!(
478                    max_pending = config.crawler_max_pending_repos,
479                    resume_pending = config.crawler_resume_pending_repos,
480                    enabled = *state.crawler_enabled.borrow(),
481                    "starting crawler worker"
482                );
483                let (worker, tx) = CrawlerWorker::new(
484                    state.clone(),
485                    config.crawler_max_pending_repos,
486                    config.crawler_resume_pending_repos,
487                    stats.clone(),
488                );
489                tokio::spawn(async move {
490                    worker.run().await;
491                    error!("crawler worker exited unexpectedly, aborting");
492                    std::process::abort();
493                });
494
495                let ticker = tokio::spawn(stats.clone().task());
496                tokio::spawn(async move {
497                    match ticker.await {
498                        Err(e) => error!(err = ?e, "stats ticker panicked, aborting"),
499                        Ok(()) => error!("stats ticker exited unexpectedly, aborting"),
500                    }
501                    std::process::abort();
502                });
503
504                tokio::spawn(
505                    RetryProducer {
506                        checker: checker.clone(),
507                        in_flight: in_flight.clone(),
508                        tx: tx.clone(),
509                    }
510                    .run(),
511                );
512
513                // set shared objects so CrawlerHandle methods can use them
514                crawler
515                    .shared
516                    .set(crawler::CrawlerShared {
517                        http,
518                        checker,
519                        in_flight,
520                        tx,
521                        stats,
522                    })
523                    .ok()
524                    .expect("crawler shared already set");
525                let shared = crawler.shared.get().unwrap();
526
527                // spawn initial sources from config
528                for source in config.crawler_sources.iter() {
529                    let enabled_rx = state.crawler_enabled.subscribe();
530                    let handle = crawler::spawn_crawler_producer(
531                        source,
532                        &shared.http,
533                        &state,
534                        &shared.checker,
535                        &shared.in_flight,
536                        &shared.tx,
537                        &shared.stats,
538                        enabled_rx,
539                    );
540                    let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
541                }
542
543                let persisted_sources = tokio::task::spawn_blocking({
544                    let state = state.clone();
545                    move || load_persisted_crawler_sources(&state.db)
546                })
547                .await
548                .into_diagnostic()??;
549
550                for source in &persisted_sources {
551                    let _ = crawler.persisted.insert_async(source.url.clone()).await;
552                    if crawler.tasks.contains_async(&source.url).await {
553                        continue;
554                    }
555                    let enabled_rx = state.crawler_enabled.subscribe();
556                    let handle = crawler::spawn_crawler_producer(
557                        source,
558                        &shared.http,
559                        &state,
560                        &shared.checker,
561                        &shared.in_flight,
562                        &shared.tx,
563                        &shared.stats,
564                        enabled_rx,
565                    );
566                    let _ = crawler.tasks.insert_async(source.url.clone(), handle).await;
567                }
568            }
569
570            // 12. spawn the relay worker
571            let relay_worker = std::thread::spawn({
572                let state = state.clone();
573                let handle = tokio::runtime::Handle::current();
574                let config = config.clone();
575
576                #[cfg(feature = "indexer")]
577                let hook = indexer_tx.clone();
578
579                move || {
580                    crate::ingest::relay::RelayWorker::new(
581                        state,
582                        buffer_rx,
583                        #[cfg(feature = "indexer")]
584                        hook,
585                        matches!(config.verify_signatures, SignatureVerification::Full),
586                        config.firehose_workers,
587                        crate::ingest::validation::ValidationOptions {
588                            verify_mst: config.verify_mst,
589                            rev_clock_skew_secs: config.rev_clock_skew_secs,
590                        },
591                    )
592                    .run(handle)
593                }
594            });
595
596            let tx = Arc::clone(&fatal_tx);
597            tokio::spawn(
598                tokio::task::spawn_blocking(move || {
599                    relay_worker
600                        .join()
601                        .map_err(|e| miette::miette!("relay worker died: {e:?}"))
602                })
603                .map(move |r| {
604                    let result = r.into_diagnostic().flatten().flatten();
605                    let _ = tx.send(Some(result.map_err(|e| e.to_string())));
606                }),
607            );
608
609            // 13. spawn the firehose worker (if enabled)
610            #[cfg(feature = "indexer")]
611            let firehose_worker = std::thread::spawn({
612                let state = state.clone();
613                let handle = tokio::runtime::Handle::current();
614                let config = config.clone();
615                move || {
616                    FirehoseWorker::new(
617                        state,
618                        indexer_rx,
619                        config.ephemeral,
620                        config.firehose_workers,
621                    )
622                    .run(handle)
623                }
624            });
625
626            #[cfg(feature = "indexer")]
627            {
628                let tx = Arc::clone(&fatal_tx);
629                tokio::spawn(
630                    tokio::task::spawn_blocking(move || {
631                        firehose_worker
632                            .join()
633                            .map_err(|e| miette::miette!("firehose worker died: {e:?}"))
634                    })
635                    .map(move |r| {
636                        let result = r.into_diagnostic().flatten().flatten();
637                        let _ = tx.send(Some(result.map_err(|e| e.to_string())));
638                    }),
639                );
640            }
641
642            // drop the local fatal_tx so the watch channel is only kept alive by the
643            // spawned tasks. when all fatal tasks exit (and drop their tx clones),
644            // fatal_rx.changed() returns Err and we return Ok(()).
645            drop(fatal_tx);
646
647            loop {
648                match fatal_rx.changed().await {
649                    Ok(()) => {
650                        if let Some(result) = fatal_rx.borrow().clone() {
651                            return result.map_err(|s| miette::miette!("{s}"));
652                        }
653                    }
654                    // all fatal_tx clones dropped: all tasks finished cleanly
655                    Err(_) => return Ok(()),
656                }
657            }
658        };
659        Ok(fut)
660    }
661
662    /// return database counts and on-disk sizes for all keyspaces.
663    ///
664    /// counts include: `repos`, `pending`, `resync`, `records`, `blocks`, `events`,
665    /// `error_ratelimited`, `error_transport`, `error_generic`.
666    ///
667    /// sizes are in bytes, reported per keyspace.
668    pub async fn stats(&self) -> Result<StatsResponse> {
669        let state = self.state.clone();
670
671        #[allow(unused_mut)]
672        let mut count_keys = vec![
673            "repos",
674            "error_ratelimited",
675            "error_transport",
676            "error_generic",
677        ];
678
679        #[cfg(feature = "indexer")]
680        {
681            count_keys.push("pending");
682            count_keys.push("records");
683            count_keys.push("blocks");
684            count_keys.push("resync");
685        }
686
687        let mut counts: BTreeMap<&'static str, u64> =
688            futures::future::join_all(count_keys.into_iter().map(|name| {
689                let state = state.clone();
690                async move { (name, state.db.get_count(name).await) }
691            }))
692            .await
693            .into_iter()
694            .collect();
695
696        #[cfg(feature = "indexer")]
697        counts.insert("events", state.db.events.approximate_len() as u64);
698
699        #[cfg(feature = "relay")]
700        counts.insert(
701            "relay_events",
702            state.db.relay_events.approximate_len() as u64,
703        );
704
705        let sizes = tokio::task::spawn_blocking(move || {
706            let mut s = BTreeMap::new();
707            s.insert("repos", state.db.repos.disk_space());
708            s.insert("cursors", state.db.cursors.disk_space());
709            s.insert("counts", state.db.counts.disk_space());
710            s.insert("filter", state.db.filter.disk_space());
711            s.insert("crawler", state.db.crawler.disk_space());
712
713            #[cfg(feature = "indexer")]
714            {
715                s.insert("records", state.db.records.disk_space());
716                s.insert("blocks", state.db.blocks.disk_space());
717                s.insert("pending", state.db.pending.disk_space());
718                s.insert("resync", state.db.resync.disk_space());
719                s.insert("resync_buffer", state.db.resync_buffer.disk_space());
720                s.insert("events", state.db.events.disk_space());
721            }
722
723            #[cfg(feature = "relay")]
724            s.insert("relay_events", state.db.relay_events.disk_space());
725
726            #[cfg(feature = "backlinks")]
727            s.insert("backlinks", state.db.backlinks.disk_space());
728
729            s
730        })
731        .await
732        .into_diagnostic()?;
733
734        Ok(StatsResponse { counts, sizes })
735    }
736
737    /// returns a future that runs the HTTP management API server on `0.0.0.0:{port}`.
738    ///
739    /// the server exposes all management endpoints (`/filter`, `/repos`, `/ingestion`,
740    /// `/stream`, `/stats`, `/db/*`, `/xrpc/*`). it runs indefinitely and resolves
741    /// only on error.
742    ///
743    /// intended for `tokio::spawn` or inclusion in a `select!` / task list. the clone
744    /// of `self` is deferred until the future is first polled.
745    ///
746    /// to disable the HTTP API entirely, simply don't call this method.
747    pub fn serve(&self, port: u16) -> impl Future<Output = Result<()>> {
748        let hydrant = self.clone();
749        async move { crate::api::serve(hydrant, port).await }
750    }
751
752    /// returns a future that runs the debug HTTP API server on `127.0.0.1:{port}`.
753    ///
754    /// exposes internal inspection endpoints (`/debug/get`, `/debug/iter`, etc.).
755    /// binds only to loopback.
756    pub fn serve_debug(&self, port: u16) -> impl Future<Output = Result<()>> {
757        let state = self.state.clone();
758        async move { crate::api::serve_debug(state, port).await }
759    }
760
761    /// get the status of a (firehose) host we are consuming from.
762    ///
763    /// returns the seq we are on for this host.
764    pub async fn get_host_status(&self, hostname: &str) -> Result<Option<Host>> {
765        let state = self.state.clone();
766        let hostname = hostname.to_smolstr();
767
768        tokio::task::spawn_blocking(move || {
769            let key = keys::firehose_cursor_key(&hostname);
770            let Some(seq) = state.db.cursors.get(&key).into_diagnostic()? else {
771                return Ok(None);
772            };
773            let seq = i64::from_be_bytes(
774                seq.as_ref()
775                    .try_into()
776                    .into_diagnostic()
777                    .wrap_err("cursor value is not 8 bytes")?,
778            );
779            let account_count = state
780                .db
781                .get_count_sync(&keys::pds_account_count_key(&hostname));
782            let is_banned = state.pds_meta.load().is_banned(&hostname);
783
784            Ok(Some(Host {
785                name: hostname.into(),
786                seq,
787                account_count,
788                is_banned,
789            }))
790        })
791        .await
792        .into_diagnostic()?
793    }
794
795    /// enumerates all hosts hydrant is consuming from.
796    ///
797    /// returns hosts enumerated in this pagination and the cursor to paginate from.
798    pub async fn list_hosts(
799        &self,
800        cursor: Option<&str>,
801        limit: usize,
802    ) -> Result<(Vec<Host>, Option<SmolStr>)> {
803        let state = self.state.clone();
804        let cursor = cursor.map(str::to_string);
805
806        tokio::task::spawn_blocking(move || {
807            let prefix_end = {
808                let mut end = keys::FIREHOSE_CURSOR_PREFIX.to_vec();
809                *end.last_mut().unwrap() += 1;
810                end
811            };
812            let start_bound = match cursor.as_deref() {
813                Some(host) => std::ops::Bound::Excluded(keys::firehose_cursor_key(host)),
814                None => std::ops::Bound::Included(keys::FIREHOSE_CURSOR_PREFIX.to_vec()),
815            };
816
817            // fetch one extra item to detect whether there is a next page
818            let mut hosts: Vec<Host> = Vec::with_capacity(limit + 1);
819            for item in state
820                .db
821                .cursors
822                .range((start_bound, std::ops::Bound::Excluded(prefix_end)))
823                .take(limit + 1)
824            {
825                let (k, v) = item.into_inner().into_diagnostic()?;
826                let hostname = std::str::from_utf8(&k[keys::FIREHOSE_CURSOR_PREFIX.len()..])
827                    .into_diagnostic()
828                    .wrap_err("firehose cursor key contains non-utf8 hostname")?;
829                let seq = i64::from_be_bytes(
830                    v.as_ref()
831                        .try_into()
832                        .into_diagnostic()
833                        .wrap_err("cursor value is not 8 bytes")?,
834                );
835                let account_count = state
836                    .db
837                    .get_count_sync(&keys::pds_account_count_key(hostname));
838                let is_banned = state.pds_meta.load().is_banned(&hostname);
839                hosts.push(Host {
840                    name: hostname.into(),
841                    seq,
842                    account_count,
843                    is_banned,
844                });
845            }
846
847            let next_cursor = if hosts.len() > limit {
848                hosts.pop();
849                hosts.last().map(|h| h.name.clone())
850            } else {
851                None
852            };
853
854            Ok((hosts, next_cursor))
855        })
856        .await
857        .into_diagnostic()?
858    }
859}
860
861impl axum::extract::FromRef<Hydrant> for Arc<AppState> {
862    fn from_ref(h: &Hydrant) -> Self {
863        h.state.clone()
864    }
865}
866
867/// database statistics returned by [`Hydrant::stats`].
868#[derive(serde::Serialize)]
869pub struct StatsResponse {
870    /// record counts per logical category (repos, records, events, error kinds, etc.)
871    pub counts: BTreeMap<&'static str, u64>,
872    /// on-disk size in bytes per keyspace
873    pub sizes: BTreeMap<&'static str, u64>,
874}
875
876/// control over database maintenance operations.
877///
878/// all methods pause the crawler, firehose, and backfill worker for the duration
879/// of the operation and restore their prior state on completion, whether or not
880/// the operation succeeds.
881#[derive(Clone)]
882pub struct DbControl(Arc<AppState>);
883
884impl DbControl {
885    /// trigger a major compaction of all keyspaces in parallel.
886    ///
887    /// compaction reclaims disk space from deleted/updated keys and improves
888    /// read performance. can take several minutes on large datasets.
889    pub async fn compact(&self) -> Result<()> {
890        let state = self.0.clone();
891        state
892            .with_ingestion_paused(async || state.db.compact().await)
893            .await
894    }
895
896    /// train zstd compression dictionaries for the `repos`, `blocks`, and `events` keyspaces.
897    ///
898    /// dictionaries are written to `dict_{name}.bin` files inside the database folder.
899    /// a restart is required to apply them. training samples data blocks from the
900    /// existing database, so the database must have a reasonable amount of data first.
901    pub async fn train_dicts(&self) -> Result<()> {
902        let state = self.0.clone();
903        state
904            .with_ingestion_paused(async || {
905                let train = |name: &'static str| {
906                    let state = state.clone();
907                    tokio::task::spawn_blocking(move || state.db.train_dict(name))
908                        .map(|res: Result<_, _>| res.into_diagnostic().flatten())
909                };
910                tokio::try_join!(train("repos"), train("blocks"), train("events")).map(|_| ())
911            })
912            .await
913    }
914}