hydrant/control/
indexer.rs

1use super::*;
2
3/// a stream of [`Event`]s. returned by [`Hydrant::subscribe`].
4///
5/// implements [`futures::Stream`] and can be used with `StreamExt::next`,
6/// `while let Some(evt) = stream.next().await`, `forward`, etc.
7/// the stream terminates when the underlying channel closes (i.e. hydrant shuts down).
8pub struct EventStream(mpsc::Receiver<Event>);
9
10impl Stream for EventStream {
11    type Item = Event;
12
13    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
14        self.0.poll_recv(cx)
15    }
16}
17
18/// runtime control over the backfill worker component.
19///
20/// the backfill worker fetches full repo CAR files from each repo's PDS for any
21/// repository in the pending queue, parses the MST, and inserts all matching records
22/// into the database. concurrency is bounded by `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`.
23#[derive(Clone)]
24pub struct BackfillHandle(Arc<AppState>);
25
26impl BackfillHandle {
27    pub(crate) fn new(state: Arc<AppState>) -> Self {
28        Self(state)
29    }
30
31    /// enable the backfill worker, no-op if already enabled.
32    pub fn enable(&self) {
33        self.0.backfill_enabled.send_replace(true);
34    }
35    /// disable the backfill worker, in-flight repos complete before pausing.
36    pub fn disable(&self) {
37        self.0.backfill_enabled.send_replace(false);
38    }
39    /// returns the current enabled state of the backfill worker.
40    pub fn is_enabled(&self) -> bool {
41        *self.0.backfill_enabled.borrow()
42    }
43}
44
45impl Hydrant {
46    /// subscribe to the ordered event stream.
47    ///
48    /// returns an [`EventStream`] that implements [`futures::Stream`].
49    ///
50    /// - if `cursor` is `None`, streaming starts from the current head (live tail only).
51    /// - if `cursor` is `Some(id)`, all persisted `record` events from that ID onward are
52    ///   replayed first, then the stream will switch to live tailing.
53    ///
54    /// `identity` and `account` events are ephemeral and are never replayed from a cursor,
55    /// only live ones are delivered. use [`ReposControl::info`] to fetch current state for
56    /// a specific repository.
57    ///
58    /// multiple concurrent subscribers each receive a full independent copy of the stream.
59    /// the stream ends when the `EventStream` is dropped.
60    pub fn subscribe(&self, cursor: Option<u64>) -> EventStream {
61        let (tx, rx) = mpsc::channel(500);
62        let state = self.state.clone();
63        let runtime = tokio::runtime::Handle::current();
64
65        std::thread::Builder::new()
66            .name("hydrant-stream".into())
67            .spawn(move || {
68                let _g = runtime.enter();
69                event_stream_thread(state, tx, cursor);
70            })
71            .expect("failed to spawn stream thread");
72
73        EventStream(rx)
74    }
75}