hydrant/control/indexer.rs
1use super::*;
2
3/// a stream of [`Event`]s. returned by [`Hydrant::subscribe`].
4///
5/// implements [`futures::Stream`] and can be used with `StreamExt::next`,
6/// `while let Some(evt) = stream.next().await`, `forward`, etc.
7/// the stream terminates when the underlying channel closes (i.e. hydrant shuts down).
8pub struct EventStream(mpsc::Receiver<Event>);
9
10impl Stream for EventStream {
11 type Item = Event;
12
13 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
14 self.0.poll_recv(cx)
15 }
16}
17
18/// runtime control over the backfill worker component.
19///
20/// the backfill worker fetches full repo CAR files from each repo's PDS for any
21/// repository in the pending queue, parses the MST, and inserts all matching records
22/// into the database. concurrency is bounded by `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`.
23#[derive(Clone)]
24pub struct BackfillHandle(Arc<AppState>);
25
26impl BackfillHandle {
27 pub(crate) fn new(state: Arc<AppState>) -> Self {
28 Self(state)
29 }
30
31 /// enable the backfill worker, no-op if already enabled.
32 pub fn enable(&self) {
33 self.0.backfill_enabled.send_replace(true);
34 }
35 /// disable the backfill worker, in-flight repos complete before pausing.
36 pub fn disable(&self) {
37 self.0.backfill_enabled.send_replace(false);
38 }
39 /// returns the current enabled state of the backfill worker.
40 pub fn is_enabled(&self) -> bool {
41 *self.0.backfill_enabled.borrow()
42 }
43}
44
45impl Hydrant {
46 /// subscribe to the ordered event stream.
47 ///
48 /// returns an [`EventStream`] that implements [`futures::Stream`].
49 ///
50 /// - if `cursor` is `None`, streaming starts from the current head (live tail only).
51 /// - if `cursor` is `Some(id)`, all persisted `record` events from that ID onward are
52 /// replayed first, then the stream will switch to live tailing.
53 ///
54 /// `identity` and `account` events are ephemeral and are never replayed from a cursor,
55 /// only live ones are delivered. use [`ReposControl::info`] to fetch current state for
56 /// a specific repository.
57 ///
58 /// multiple concurrent subscribers each receive a full independent copy of the stream.
59 /// the stream ends when the `EventStream` is dropped.
60 pub fn subscribe(&self, cursor: Option<u64>) -> EventStream {
61 let (tx, rx) = mpsc::channel(500);
62 let state = self.state.clone();
63 let runtime = tokio::runtime::Handle::current();
64
65 std::thread::Builder::new()
66 .name("hydrant-stream".into())
67 .spawn(move || {
68 let _g = runtime.enter();
69 event_stream_thread(state, tx, cursor);
70 })
71 .expect("failed to spawn stream thread");
72
73 EventStream(rx)
74 }
75}