hydrant/
config.rs

1use miette::Result;
2use serde::{Deserialize, Serialize};
3use smol_str::ToSmolStr;
4use std::collections::HashMap;
5use std::fmt;
6use std::path::PathBuf;
7use std::str::FromStr;
8use std::time::Duration;
9use url::Url;
10
11/// rate limit parameters for a named tier of PDS connections.
12///
13/// the per-second limit is `max(per_second_base, accounts * per_second_account_mul)`,
14/// giving a floor at `per_second_base` that scales up with the PDS's active account count.
15#[derive(Debug, Clone, Copy)]
16pub struct RateTier {
17    /// floor for the per-second limit, regardless of account count.
18    pub per_second_base: u64,
19    /// per-second events allowed per active account on this PDS.
20    pub per_second_account_mul: f64,
21    /// per-hour limit.
22    pub per_hour: u64,
23    /// per-day limit.
24    pub per_day: u64,
25}
26
27impl RateTier {
28    /// built-in "trusted" tier: high limits for well-behaved PDS operators.
29    pub fn trusted() -> Self {
30        Self {
31            per_second_base: 5000,
32            per_second_account_mul: 10.0,
33            per_hour: 5000 * 3600,
34            per_day: 5000 * 86400,
35        }
36    }
37
38    /// built-in "default" tier: conservative limits for unknown PDS operators.
39    pub fn default_tier() -> Self {
40        Self {
41            per_second_base: 50,
42            per_second_account_mul: 0.5,
43            per_hour: 1000 * 3600,
44            per_day: 1000 * 86400,
45        }
46    }
47
48    /// parse `base/mul/hourly/daily` format used by `HYDRANT_RATE_TIERS`.
49    fn parse(s: &str) -> Option<Self> {
50        let parts: Vec<&str> = s.split('/').collect();
51        if parts.len() != 4 {
52            return None;
53        }
54        Some(Self {
55            per_second_base: parts[0].parse().ok()?,
56            per_second_account_mul: parts[1].parse().ok()?,
57            per_hour: parts[2].parse().ok()?,
58            per_day: parts[3].parse().ok()?,
59        })
60    }
61}
62
63/// this is for internal use only, please don't use this macro.
64#[doc(hidden)]
65#[macro_export]
66macro_rules! __cfg {
67    (@val $key:expr) => {
68        std::env::var(concat!("HYDRANT_", $key))
69    };
70    ($key:expr, $default:expr, sec) => {
71        cfg!(@val $key)
72            .ok()
73            .and_then(|s| humantime::parse_duration(&s).ok())
74            .unwrap_or($default)
75    };
76    ($key:expr, $default:expr) => {
77        cfg!(@val $key)
78            .ok()
79            .and_then(|s| s.parse().ok())
80            .unwrap_or($default.to_owned())
81            .into()
82    };
83}
84use crate::__cfg as cfg;
85
86/// loads `.env` from the current directory, setting any variables not already in the environment.
87fn load_dotenv() {
88    let Ok(contents) = std::fs::read_to_string(".env") else {
89        return;
90    };
91    for line in contents.lines() {
92        let line = line.trim();
93        if line.is_empty() || line.starts_with('#') {
94            continue;
95        }
96        let Some((key, val)) = line.split_once('=') else {
97            continue;
98        };
99        let key = key.trim();
100        let val = val.trim();
101        let val = val
102            .strip_prefix('"')
103            .and_then(|v| v.strip_suffix('"'))
104            .or_else(|| val.strip_prefix('\'').and_then(|v| v.strip_suffix('\'')))
105            .unwrap_or(val);
106        if std::env::var(key).is_err() {
107            // SAFETY: single-threaded at startup; no other threads are reading env yet.
108            unsafe { std::env::set_var(key, val) };
109        }
110    }
111}
112
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum CrawlerMode {
115    /// enumerate via `com.atproto.sync.listRepos`, check signals with `describeRepo`.
116    ListRepos,
117    /// enumerate via `com.atproto.sync.listReposByCollection` for each configured signal.
118    /// note: if no signals are specified, this won't crawl for any repos.
119    ByCollection,
120}
121
122impl CrawlerMode {
123    fn default_for(full_network: bool) -> Self {
124        full_network
125            .then_some(Self::ListRepos)
126            .unwrap_or(Self::ByCollection)
127    }
128}
129
130impl Serialize for CrawlerMode {
131    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
132    where
133        S: serde::Serializer,
134    {
135        serializer.serialize_str(&self.to_smolstr())
136    }
137}
138
139impl<'de> Deserialize<'de> for CrawlerMode {
140    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
141    where
142        D: serde::Deserializer<'de>,
143    {
144        let s = String::deserialize(deserializer)?;
145        FromStr::from_str(&s).map_err(serde::de::Error::custom)
146    }
147}
148
149impl FromStr for CrawlerMode {
150    type Err = miette::Error;
151    fn from_str(s: &str) -> Result<Self> {
152        match s {
153            "list_repos" | "list-repos" => Ok(Self::ListRepos),
154            "by_collection" | "by-collection" => Ok(Self::ByCollection),
155            _ => Err(miette::miette!(
156                "invalid crawler mode: expected 'list_repos' or 'by_collection'"
157            )),
158        }
159    }
160}
161
162impl fmt::Display for CrawlerMode {
163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164        match self {
165            Self::ListRepos => write!(f, "list_repos"),
166            Self::ByCollection => write!(f, "by_collection"),
167        }
168    }
169}
170
171/// a single crawler source: a URL and the mode used to enumerate it.
172#[derive(Debug, Clone)]
173pub struct CrawlerSource {
174    pub url: Url,
175    pub mode: CrawlerMode,
176}
177
178/// a single firehose source: a URL and whether it is a direct PDS connection.
179///
180/// set via `HYDRANT_RELAY_HOSTS` as a comma-separated list of `[pds::]url` entries.
181/// e.g. `wss://bsky.network,pds::wss://pds.example.com`.
182/// a bare URL (no `pds::` prefix) is treated as an aggregating relay (`is_pds = false`).
183#[derive(Debug, Clone)]
184pub struct FirehoseSource {
185    pub url: Url,
186    /// true when this is a direct PDS connection; enables host authority enforcement.
187    pub is_pds: bool,
188}
189
190impl FirehoseSource {
191    /// parse `[pds::]url`. the `pds::` prefix marks the source as a direct PDS connection.
192    pub fn parse(s: &str) -> Option<Self> {
193        if let Some(url_str) = s.strip_prefix("pds::") {
194            let url = Url::parse(url_str).ok()?;
195            Some(Self { url, is_pds: true })
196        } else {
197            let url = Url::parse(s).ok()?;
198            Some(Self { url, is_pds: false })
199        }
200    }
201}
202
203impl CrawlerSource {
204    /// parse `[mode::]url`. mode prefix is optional, falls back to `default_mode`.
205    fn parse(s: &str, default_mode: CrawlerMode) -> Option<Self> {
206        if let Some((prefix, rest)) = s.split_once("::") {
207            let mode = prefix.parse().ok()?;
208            let url = Url::parse(rest).ok()?;
209            Some(Self { url, mode })
210        } else {
211            let url = Url::parse(s).ok()?;
212            Some(Self {
213                url,
214                mode: default_mode,
215            })
216        }
217    }
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
221pub enum Compression {
222    Lz4,
223    Zstd,
224    None,
225}
226
227impl FromStr for Compression {
228    type Err = miette::Error;
229    fn from_str(s: &str) -> Result<Self> {
230        match s {
231            "lz4" => Ok(Self::Lz4),
232            "zstd" => Ok(Self::Zstd),
233            "none" => Ok(Self::None),
234            _ => Err(miette::miette!("invalid compression type")),
235        }
236    }
237}
238
239impl fmt::Display for Compression {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        match self {
242            Self::Lz4 => write!(f, "lz4"),
243            Self::Zstd => write!(f, "zstd"),
244            Self::None => write!(f, "none"),
245        }
246    }
247}
248
249#[derive(Debug, Clone, Copy)]
250pub enum SignatureVerification {
251    /// verify all commits, from the firehose and when backfilling a repo from a PDS.
252    Full,
253    /// only verify commits when backfilling a repo from a PDS.
254    BackfillOnly,
255    /// don't verify anything.
256    None,
257}
258
259impl FromStr for SignatureVerification {
260    type Err = miette::Error;
261    fn from_str(s: &str) -> Result<Self> {
262        match s {
263            "full" => Ok(Self::Full),
264            "backfill-only" => Ok(Self::BackfillOnly),
265            "none" => Ok(Self::None),
266            _ => Err(miette::miette!("invalid signature verification level")),
267        }
268    }
269}
270
271impl fmt::Display for SignatureVerification {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        match self {
274            Self::Full => write!(f, "full"),
275            Self::BackfillOnly => write!(f, "backfill-only"),
276            Self::None => write!(f, "none"),
277        }
278    }
279}
280
281#[derive(Debug, Clone)]
282pub struct Config {
283    /// path to the database folder. set via `HYDRANT_DATABASE_PATH`.
284    pub database_path: PathBuf,
285    /// if `true`, discovers and indexes all repositories in the network.
286    /// set via `HYDRANT_FULL_NETWORK`.
287    pub full_network: bool,
288    /// if `true`, no records are stored; events are deleted after `ephemeral_ttl`.
289    /// set via `HYDRANT_EPHEMERAL`.
290    pub ephemeral: bool,
291    /// how long events are retained in ephemeral mode before deletion.
292    /// set via `HYDRANT_EPHEMERAL_TTL` (humantime duration, e.g. `60min`).
293    pub ephemeral_ttl: Duration,
294
295    /// firehose sources for ingestion. set via `HYDRANT_RELAY_HOST` (single)
296    /// or `HYDRANT_RELAY_HOSTS` (comma-separated; takes precedence).
297    /// prefix a URL with `pds::` to mark it as a direct PDS connection.
298    pub relays: Vec<FirehoseSource>,
299    /// base URL(s) of the PLC directory (comma-separated for multiple).
300    /// defaults to `https://plc.wtf`, or `https://plc.directory` in full-network mode.
301    /// set via `HYDRANT_PLC_URL`.
302    pub plc_urls: Vec<Url>,
303    /// whether to ingest events from relay firehose subscriptions.
304    /// set via `HYDRANT_ENABLE_FIREHOSE`.
305    pub enable_firehose: bool,
306    /// number of concurrent workers processing firehose events.
307    /// set via `HYDRANT_FIREHOSE_WORKERS`.
308    pub firehose_workers: usize,
309    /// how often the firehose cursor is persisted to disk.
310    /// set via `HYDRANT_CURSOR_SAVE_INTERVAL` (humantime duration, e.g. `3sec`).
311    pub cursor_save_interval: Duration,
312    /// timeout for fetching a full repository CAR during backfill.
313    /// set via `HYDRANT_REPO_FETCH_TIMEOUT` (humantime duration, e.g. `5min`).
314    pub repo_fetch_timeout: Duration,
315    /// maximum number of concurrent backfill tasks.
316    /// set via `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`.
317    pub backfill_concurrency_limit: usize,
318
319    /// whether to run the network crawler. `None` defers to the default for the current mode.
320    /// set via `HYDRANT_ENABLE_CRAWLER`.
321    pub enable_crawler: Option<bool>,
322    /// maximum number of repos allowed in the backfill pending queue before the crawler pauses.
323    /// set via `HYDRANT_CRAWLER_MAX_PENDING_REPOS`.
324    pub crawler_max_pending_repos: usize,
325    /// pending queue size at which the crawler resumes after being paused.
326    /// set via `HYDRANT_CRAWLER_RESUME_PENDING_REPOS`.
327    pub crawler_resume_pending_repos: usize,
328    /// crawler sources: each entry pairs a URL with a discovery mode.
329    ///
330    /// set via `HYDRANT_CRAWLER_URLS` as a comma-separated list of `[mode::]url` entries,
331    /// e.g. `relay::wss://bsky.network,by_collection::https://lightrail.microcosm.blue`.
332    /// a bare URL without a `mode::` prefix uses the default mode (`relay` for full-network,
333    /// `by_collection` otherwise). defaults to the relay hosts with the default mode.
334    /// set to an empty string to disable crawling entirely.
335    pub crawler_sources: Vec<CrawlerSource>,
336
337    /// signature verification level for incoming commits.
338    /// set via `HYDRANT_VERIFY_SIGNATURES` (`full`, `backfill-only`, or `none`).
339    pub verify_signatures: SignatureVerification,
340    /// number of resolved identities to keep in the in-memory LRU cache.
341    /// set via `HYDRANT_IDENTITY_CACHE_SIZE`.
342    pub identity_cache_size: u64,
343    /// enable MST inversion validation on incoming commits (expensive).
344    /// set via `HYDRANT_VERIFY_MST`.
345    pub verify_mst: bool,
346    /// clock drift window for future-rev rejection, in seconds.
347    /// commits with a rev timestamp more than this many seconds in the future are rejected.
348    /// set via `HYDRANT_REV_CLOCK_SKEW`. default: 300 (5 minutes).
349    pub rev_clock_skew_secs: i64,
350
351    /// NSID patterns that trigger auto-discovery in filter mode (e.g. `app.bsky.feed.post`).
352    /// set via `HYDRANT_FILTER_SIGNALS` as a comma-separated list.
353    pub filter_signals: Option<Vec<String>>,
354    /// NSID patterns used to filter which record collections are stored.
355    /// if `None`, all collections are stored. set via `HYDRANT_FILTER_COLLECTIONS`.
356    pub filter_collections: Option<Vec<String>>,
357    /// DIDs that are always skipped, regardless of mode.
358    /// set via `HYDRANT_FILTER_EXCLUDES` as a comma-separated list.
359    pub filter_excludes: Option<Vec<String>>,
360
361    /// enable backlinks indexing (only meaningful in non-ephemeral mode).
362    /// set via `HYDRANT_ENABLE_BACKLINKS=true`.
363    pub enable_backlinks: bool,
364
365    /// base URL(s) of relay or aggregator services to seed firehose PDS sources from at startup.
366    ///
367    /// hydrant calls `com.atproto.sync.listHosts` on each URL and adds the returned PDSes
368    /// as firehose sources (with `is_pds = true`). account counts from the response are
369    /// applied to newly-seen hosts to initialise rate-limiting immediately.
370    ///
371    /// set via `HYDRANT_SEED_HOSTS` as a comma-separated list of base URLs.
372    pub seed_hosts: Vec<Url>,
373    /// list of trusted PDS/relay hosts to pre-assign to the "trusted" rate tier at startup.
374    /// set via `HYDRANT_TRUSTED_HOSTS` as a comma-separated list of hostnames.
375    /// hosts not present in this list use the "default" tier unless assigned via the API.
376    pub trusted_hosts: Vec<String>,
377    /// named rate tier definitions for PDS rate limiting.
378    ///
379    /// built-in tiers ("default" and "trusted") are always present and may be overridden.
380    /// set via `HYDRANT_RATE_TIERS` as a comma-separated list of `name:base/mul/hourly/daily` entries,
381    /// e.g. `trusted:5000/10.0/18000000/432000000,custom:100/1.0/7200000/172800000`.
382    pub rate_tiers: HashMap<String, RateTier>,
383
384    /// db internals, tune only if you know what you're doing.
385    ///
386    /// size of the fjall block cache in MB. set via `HYDRANT_CACHE_SIZE`.
387    pub cache_size: u64,
388    /// db internals, tune only if you know what you're doing.
389    ///
390    /// compression algorithm for data keyspaces (blocks, records, repos, events).
391    /// set via `HYDRANT_DATA_COMPRESSION` (`lz4`, `zstd`, or `none`).
392    pub data_compression: Compression,
393    /// db internals, tune only if you know what you're doing.
394    ///
395    /// compression algorithm for the fjall journal.
396    /// set via `HYDRANT_JOURNAL_COMPRESSION` (`lz4`, `zstd`, or `none`).
397    pub journal_compression: Compression,
398    /// db internals, tune only if you know what you're doing.
399    ///
400    /// number of background threads used by the fjall storage engine.
401    /// set via `HYDRANT_DB_WORKER_THREADS`.
402    pub db_worker_threads: usize,
403    /// db internals, tune only if you know what you're doing.
404    ///
405    /// maximum total size of the fjall journal in MB before a flush is forced.
406    /// set via `HYDRANT_DB_MAX_JOURNALING_SIZE_MB`.
407    pub db_max_journaling_size_mb: u64,
408    /// db internals, tune only if you know what you're doing.
409    ///
410    /// in-memory write buffer (memtable) size for the blocks keyspace in MB.
411    /// set via `HYDRANT_DB_BLOCKS_MEMTABLE_SIZE_MB`.
412    pub db_blocks_memtable_size_mb: u64,
413    /// db internals, tune only if you know what you're doing.
414    ///
415    /// in-memory write buffer (memtable) size for the repos keyspace in MB.
416    /// set via `HYDRANT_DB_REPOS_MEMTABLE_SIZE_MB`.
417    pub db_repos_memtable_size_mb: u64,
418    /// db internals, tune only if you know what you're doing.
419    ///
420    /// in-memory write buffer (memtable) size for the events keyspace in MB.
421    /// set via `HYDRANT_DB_EVENTS_MEMTABLE_SIZE_MB`.
422    pub db_events_memtable_size_mb: u64,
423    /// db internals, tune only if you know what you're doing.
424    ///
425    /// in-memory write buffer (memtable) size for the records keyspace in MB.
426    /// set via `HYDRANT_DB_RECORDS_MEMTABLE_SIZE_MB`.
427    pub db_records_memtable_size_mb: u64,
428}
429
430impl Default for Config {
431    fn default() -> Self {
432        const BASE_MEMTABLE_MB: u64 = 32;
433        Self {
434            database_path: PathBuf::from("./hydrant.db"),
435            ephemeral: false,
436            #[cfg(feature = "indexer")]
437            ephemeral_ttl: Duration::from_secs(3600), // 1 hour
438            #[cfg(feature = "relay")]
439            ephemeral_ttl: Duration::from_secs(3600 * 24 * 3), // 3 days
440            #[cfg(not(feature = "relay"))]
441            full_network: false,
442            #[cfg(feature = "relay")]
443            full_network: true,
444            #[cfg(not(feature = "relay"))]
445            relays: vec![FirehoseSource {
446                url: Url::parse("wss://relay.fire.hose.cam/").unwrap(),
447                is_pds: false,
448            }],
449            #[cfg(feature = "relay")]
450            relays: vec![],
451            #[cfg(not(feature = "relay"))]
452            seed_hosts: vec![],
453            #[cfg(feature = "relay")]
454            seed_hosts: vec![Url::parse("https://bsky.network").unwrap()],
455            plc_urls: vec![Url::parse("https://plc.wtf").unwrap()],
456            enable_firehose: true,
457            firehose_workers: 8,
458            cursor_save_interval: Duration::from_secs(3),
459            repo_fetch_timeout: Duration::from_secs(300),
460            backfill_concurrency_limit: 16,
461            enable_crawler: None,
462            crawler_max_pending_repos: 2000,
463            crawler_resume_pending_repos: 1000,
464            crawler_sources: vec![CrawlerSource {
465                url: Url::parse("https://lightrail.microcosm.blue").unwrap(),
466                mode: CrawlerMode::ByCollection,
467            }],
468            verify_signatures: SignatureVerification::Full,
469            identity_cache_size: 1_000_000,
470            verify_mst: false,
471            rev_clock_skew_secs: 300,
472            filter_signals: None,
473            filter_collections: None,
474            filter_excludes: None,
475            enable_backlinks: false,
476            trusted_hosts: vec![],
477            rate_tiers: {
478                let mut m = HashMap::new();
479                m.insert("default".to_string(), RateTier::default_tier());
480                m.insert("trusted".to_string(), RateTier::trusted());
481                m
482            },
483            cache_size: 256,
484            data_compression: Compression::Zstd,
485            journal_compression: Compression::Lz4,
486            db_worker_threads: 4,
487            db_max_journaling_size_mb: 400,
488            db_blocks_memtable_size_mb: BASE_MEMTABLE_MB,
489            db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 2,
490            db_events_memtable_size_mb: BASE_MEMTABLE_MB,
491            db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2,
492        }
493    }
494}
495
496impl Config {
497    /// returns the default config for full network usage.
498    pub fn full_network() -> Self {
499        const BASE_MEMTABLE_MB: u64 = 192;
500        Self {
501            full_network: true,
502            plc_urls: vec![Url::parse("https://plc.directory").unwrap()],
503            firehose_workers: 24,
504            backfill_concurrency_limit: 64,
505            crawler_sources: vec![CrawlerSource {
506                url: Url::parse("wss://relay.fire.hose.cam/").unwrap(),
507                mode: CrawlerMode::ListRepos,
508            }],
509            db_worker_threads: 8,
510            db_max_journaling_size_mb: 1024,
511            db_blocks_memtable_size_mb: BASE_MEMTABLE_MB,
512            db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 4,
513            db_events_memtable_size_mb: BASE_MEMTABLE_MB,
514            db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2,
515            ..Self::default()
516        }
517    }
518
519    /// reads and builds the config from environment variables, loading `.env` first if present.
520    pub fn from_env() -> Result<Self> {
521        load_dotenv();
522
523        // full_network is read first since it determines which defaults to use.
524        // relay mode defaults to true so that the network is indexed by default.
525        #[cfg(feature = "relay")]
526        let default_full_network = true;
527        #[cfg(not(feature = "relay"))]
528        let default_full_network = false;
529        let full_network: bool = cfg!("FULL_NETWORK", default_full_network);
530        let defaults = full_network
531            .then(Self::full_network)
532            .unwrap_or_else(Self::default);
533
534        let relay_hosts = match std::env::var("HYDRANT_RELAY_HOSTS") {
535            Ok(hosts) if !hosts.trim().is_empty() => hosts
536                .split(',')
537                .filter_map(|s| {
538                    let s = s.trim();
539                    (!s.is_empty())
540                        .then(|| {
541                            FirehoseSource::parse(s).or_else(|| {
542                                tracing::warn!("invalid relay host URL: {s}");
543                                None
544                            })
545                        })
546                        .flatten()
547                })
548                .collect(),
549            // HYDRANT_RELAY_HOSTS explicitly set to ""
550            Ok(_) => vec![],
551            // not set at all, fall back to RELAY_HOST (bare URL, no pds:: prefix support here)
552            Err(_) => match std::env::var("HYDRANT_RELAY_HOST") {
553                Ok(s) if !s.trim().is_empty() => {
554                    FirehoseSource::parse(s.trim()).into_iter().collect()
555                }
556                _ => defaults.relays.clone(),
557            },
558        };
559
560        let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL")
561            .ok()
562            .map(|s| {
563                s.split(',')
564                    .map(|s| Url::parse(s.trim()))
565                    .collect::<Result<Vec<_>, _>>()
566                    .map_err(|e| miette::miette!("invalid PLC URL: {e}"))
567            })
568            .unwrap_or_else(|| Ok(defaults.plc_urls.clone()))?;
569
570        let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", defaults.cursor_save_interval, sec);
571        let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", defaults.repo_fetch_timeout, sec);
572
573        let ephemeral: bool = cfg!("EPHEMERAL", defaults.ephemeral);
574        let ephemeral_ttl = cfg!("EPHEMERAL_TTL", defaults.ephemeral_ttl, sec);
575        let database_path = cfg!("DATABASE_PATH", defaults.database_path);
576        let cache_size = cfg!("CACHE_SIZE", defaults.cache_size);
577        let data_compression = cfg!("DATA_COMPRESSION", defaults.data_compression);
578        let journal_compression = cfg!("JOURNAL_COMPRESSION", defaults.journal_compression);
579
580        let verify_signatures = cfg!("VERIFY_SIGNATURES", defaults.verify_signatures);
581        let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", defaults.identity_cache_size);
582        let verify_mst: bool = cfg!("VERIFY_MST", defaults.verify_mst);
583        let rev_clock_skew_secs: i64 = cfg!("REV_CLOCK_SKEW", defaults.rev_clock_skew_secs);
584        let enable_firehose = cfg!("ENABLE_FIREHOSE", defaults.enable_firehose);
585        let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER")
586            .ok()
587            .and_then(|s| s.parse().ok());
588
589        let backfill_concurrency_limit = cfg!(
590            "BACKFILL_CONCURRENCY_LIMIT",
591            defaults.backfill_concurrency_limit
592        );
593        let firehose_workers = cfg!("FIREHOSE_WORKERS", defaults.firehose_workers);
594
595        let db_worker_threads = cfg!("DB_WORKER_THREADS", defaults.db_worker_threads);
596        let db_max_journaling_size_mb = cfg!(
597            "DB_MAX_JOURNALING_SIZE_MB",
598            defaults.db_max_journaling_size_mb
599        );
600        let db_blocks_memtable_size_mb = cfg!(
601            "DB_BLOCKS_MEMTABLE_SIZE_MB",
602            defaults.db_blocks_memtable_size_mb
603        );
604        let db_events_memtable_size_mb = cfg!(
605            "DB_EVENTS_MEMTABLE_SIZE_MB",
606            defaults.db_events_memtable_size_mb
607        );
608        let db_records_memtable_size_mb = cfg!(
609            "DB_RECORDS_MEMTABLE_SIZE_MB",
610            defaults.db_records_memtable_size_mb
611        );
612        let db_repos_memtable_size_mb = cfg!(
613            "DB_REPOS_MEMTABLE_SIZE_MB",
614            defaults.db_repos_memtable_size_mb
615        );
616
617        let crawler_max_pending_repos = cfg!(
618            "CRAWLER_MAX_PENDING_REPOS",
619            defaults.crawler_max_pending_repos
620        );
621        let crawler_resume_pending_repos = cfg!(
622            "CRAWLER_RESUME_PENDING_REPOS",
623            defaults.crawler_resume_pending_repos
624        );
625
626        let filter_signals = std::env::var("HYDRANT_FILTER_SIGNALS").ok().map(|s| {
627            s.split(',')
628                .map(|s| s.trim().to_string())
629                .filter(|s| !s.is_empty())
630                .collect()
631        });
632
633        let filter_collections = std::env::var("HYDRANT_FILTER_COLLECTIONS").ok().map(|s| {
634            s.split(',')
635                .map(|s| s.trim().to_string())
636                .filter(|s| !s.is_empty())
637                .collect()
638        });
639
640        let filter_excludes = std::env::var("HYDRANT_FILTER_EXCLUDES").ok().map(|s| {
641            s.split(',')
642                .map(|s| s.trim().to_string())
643                .filter(|s| !s.is_empty())
644                .collect()
645        });
646
647        let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", defaults.enable_backlinks);
648
649        // start with built-in tiers, then layer in any env-defined overrides.
650        // format: HYDRANT_RATE_TIERS=name:base/mul/hourly/daily,...
651        let mut rate_tiers = defaults.rate_tiers.clone();
652        if let Ok(s) = std::env::var("HYDRANT_RATE_TIERS") {
653            for entry in s.split(',') {
654                let entry = entry.trim();
655                if let Some((name, spec)) = entry.split_once(':') {
656                    match RateTier::parse(spec) {
657                        Some(tier) => {
658                            rate_tiers.insert(name.trim().to_string(), tier);
659                        }
660                        None => tracing::warn!(
661                            "ignoring invalid rate tier '{name}': expected base/mul/hourly/daily format"
662                        ),
663                    }
664                }
665            }
666        }
667
668        let seed_hosts: Vec<Url> = std::env::var("HYDRANT_SEED_HOSTS")
669            .ok()
670            .map(|s| {
671                s.split(',')
672                    .filter_map(|u| {
673                        let u = u.trim();
674                        if u.is_empty() {
675                            return None;
676                        }
677                        Url::parse(u).ok().or_else(|| {
678                            tracing::warn!("invalid seed host URL: {u}");
679                            None
680                        })
681                    })
682                    .collect()
683            })
684            .unwrap_or_else(|| defaults.seed_hosts.clone());
685
686        let trusted_hosts = std::env::var("HYDRANT_TRUSTED_HOSTS")
687            .ok()
688            .map(|s| {
689                s.split(',')
690                    .map(|s| s.trim().to_string())
691                    .filter(|s| !s.is_empty())
692                    .collect()
693            })
694            .unwrap_or_else(|| defaults.trusted_hosts.clone());
695
696        let default_mode = CrawlerMode::default_for(full_network);
697        let crawler_sources = match std::env::var("HYDRANT_CRAWLER_URLS") {
698            Ok(s) => s
699                .split(',')
700                .map(|s| s.trim())
701                .filter(|s| !s.is_empty())
702                .filter_map(|s| CrawlerSource::parse(s, default_mode))
703                .collect(),
704            Err(_) => match default_mode {
705                CrawlerMode::ListRepos => relay_hosts
706                    .iter()
707                    .map(|source| CrawlerSource {
708                        url: source.url.clone(),
709                        mode: CrawlerMode::ListRepos,
710                    })
711                    .collect(),
712                CrawlerMode::ByCollection => defaults.crawler_sources.clone(),
713            },
714        };
715
716        Ok(Self {
717            database_path,
718            full_network,
719            ephemeral,
720            seed_hosts,
721            ephemeral_ttl,
722            relays: relay_hosts,
723            plc_urls,
724            enable_firehose,
725            firehose_workers,
726            cursor_save_interval,
727            repo_fetch_timeout,
728            backfill_concurrency_limit,
729            enable_crawler,
730            crawler_max_pending_repos,
731            crawler_resume_pending_repos,
732            crawler_sources,
733            verify_signatures,
734            identity_cache_size,
735            verify_mst,
736            rev_clock_skew_secs,
737            filter_signals,
738            filter_collections,
739            filter_excludes,
740            enable_backlinks,
741            trusted_hosts,
742            rate_tiers,
743            cache_size,
744            data_compression,
745            journal_compression,
746            db_worker_threads,
747            db_max_journaling_size_mb,
748            db_blocks_memtable_size_mb,
749            db_repos_memtable_size_mb,
750            db_events_memtable_size_mb,
751            db_records_memtable_size_mb,
752        })
753    }
754}
755
756macro_rules! config_line {
757    ($f:expr, $label:expr, $value:expr) => {
758        writeln!($f, "  {:<width$}{}", $label, $value, width = LABEL_WIDTH)
759    };
760}
761
762impl fmt::Display for Config {
763    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
764        const LABEL_WIDTH: usize = 27;
765
766        writeln!(f, "hydrant configuration:")?;
767        config_line!(
768            f,
769            "relay hosts",
770            format_args!(
771                "{:?}",
772                self.relays
773                    .iter()
774                    .map(|s| if s.is_pds {
775                        format!("pds::{}", s.url)
776                    } else {
777                        s.url.to_string()
778                    })
779                    .collect::<Vec<_>>()
780            )
781        )?;
782        config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?;
783        config_line!(f, "full network indexing", self.full_network)?;
784        config_line!(f, "verify signatures", self.verify_signatures)?;
785        config_line!(f, "backfill concurrency", self.backfill_concurrency_limit)?;
786        config_line!(f, "identity cache size", self.identity_cache_size)?;
787        config_line!(
788            f,
789            "cursor save interval",
790            format_args!("{}sec", self.cursor_save_interval.as_secs())
791        )?;
792        config_line!(
793            f,
794            "repo fetch timeout",
795            format_args!("{}sec", self.repo_fetch_timeout.as_secs())
796        )?;
797        config_line!(f, "ephemeral", self.ephemeral)?;
798        config_line!(f, "database path", self.database_path.to_string_lossy())?;
799        config_line!(f, "cache size", format_args!("{} mb", self.cache_size))?;
800        config_line!(f, "data compression", self.data_compression)?;
801        config_line!(f, "journal compression", self.journal_compression)?;
802        config_line!(f, "firehose workers", self.firehose_workers)?;
803        config_line!(f, "db worker threads", self.db_worker_threads)?;
804        config_line!(
805            f,
806            "db journal size",
807            format_args!("{} mb", self.db_max_journaling_size_mb)
808        )?;
809        config_line!(
810            f,
811            "db blocks memtable",
812            format_args!("{} mb", self.db_blocks_memtable_size_mb)
813        )?;
814        config_line!(
815            f,
816            "db repos memtable",
817            format_args!("{} mb", self.db_repos_memtable_size_mb)
818        )?;
819        config_line!(
820            f,
821            "db events memtable",
822            format_args!("{} mb", self.db_events_memtable_size_mb)
823        )?;
824        config_line!(
825            f,
826            "db records memtable",
827            format_args!("{} mb", self.db_records_memtable_size_mb)
828        )?;
829        config_line!(f, "crawler max pending", self.crawler_max_pending_repos)?;
830        config_line!(
831            f,
832            "crawler resume pending",
833            self.crawler_resume_pending_repos
834        )?;
835        if !self.crawler_sources.is_empty() {
836            let sources: Vec<_> = self
837                .crawler_sources
838                .iter()
839                .map(|s| format!("{}::{}", s.mode, s.url))
840                .collect();
841            config_line!(f, "crawler sources", sources.join(", "))?;
842        }
843        if let Some(signals) = &self.filter_signals {
844            config_line!(f, "filter signals", format_args!("{:?}", signals))?;
845        }
846        if let Some(collections) = &self.filter_collections {
847            config_line!(f, "filter collections", format_args!("{:?}", collections))?;
848        }
849        if let Some(excludes) = &self.filter_excludes {
850            config_line!(f, "filter excludes", format_args!("{:?}", excludes))?;
851        }
852        if self.enable_backlinks {
853            config_line!(f, "backlinks", "enabled")?;
854        }
855        if !self.seed_hosts.is_empty() {
856            config_line!(
857                f,
858                "seed hosts",
859                format_args!(
860                    "{:?}",
861                    self.seed_hosts
862                        .iter()
863                        .map(|u| u.as_str())
864                        .collect::<Vec<_>>()
865                )
866            )?;
867        }
868        Ok(())
869    }
870}