hydrant/control/repos/
indexer.rs

1use futures::{FutureExt, TryFutureExt};
2use rand::Rng;
3
4use super::*;
5
6impl ReposControl {
7    /// iterates through pending repositories, returning their state.
8    #[allow(dead_code)]
9    pub(crate) fn iter_pending(
10        &self,
11        cursor: Option<u64>,
12    ) -> impl Iterator<Item = Result<(u64, RepoInfo)>> {
13        let start_bound = if let Some(cursor) = cursor {
14            std::ops::Bound::Excluded(cursor.to_be_bytes().to_vec())
15        } else {
16            std::ops::Bound::Unbounded
17        };
18
19        let repos = self.0.db.repos.clone();
20        let state = self.0.clone();
21        self.0
22            .db
23            .pending
24            .range((start_bound, std::ops::Bound::Unbounded))
25            .map(move |g| {
26                let (id_raw, did_key) = g.into_inner().into_diagnostic()?;
27                let id = u64::from_be_bytes(
28                    id_raw
29                        .as_ref()
30                        .try_into()
31                        .into_diagnostic()
32                        .wrap_err("can't parse pending key")?,
33                );
34                let Some(bytes) = repos.get(&did_key).into_diagnostic()? else {
35                    // stale pending that we forgot to delete? shouldn't happen though
36                    tracing::warn!(id, did = ?did_key, "stale pending???");
37                    return Ok(None);
38                };
39                let repo_state = crate::db::deser_repo_state(bytes.as_ref())?;
40                let did = TrimmedDid::try_from(did_key.as_ref())?.to_did();
41                let metadata_key = keys::repo_metadata_key(&did);
42                let metadata = state
43                    .db
44                    .repo_metadata
45                    .get(&metadata_key)
46                    .into_diagnostic()?
47                    .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
48                let metadata = crate::db::deser_repo_meta(metadata.as_ref())?;
49                Ok(Some((
50                    id,
51                    repo_state_to_info(did, repo_state.into_static(), metadata.tracked),
52                )))
53            })
54            .map(|b| b.transpose())
55            .flatten()
56    }
57
58    #[allow(dead_code)]
59    pub(crate) fn iter_resync(
60        &self,
61        cursor: Option<&Did<'_>>,
62    ) -> impl Iterator<Item = Result<RepoInfo>> {
63        let start_bound = if let Some(cursor) = cursor {
64            let did_key = keys::repo_key(cursor);
65            std::ops::Bound::Excluded(did_key)
66        } else {
67            std::ops::Bound::Unbounded
68        };
69
70        let repos = self.0.db.repos.clone();
71        let state = self.0.clone();
72        self.0
73            .db
74            .resync
75            .range((start_bound, std::ops::Bound::Unbounded))
76            .map(move |g| {
77                let did_key = g.key().into_diagnostic()?;
78                let Some(bytes) = repos.get(&did_key).into_diagnostic()? else {
79                    // stale pending that we forgot to delete? shouldn't happen though
80                    tracing::warn!(did = ?did_key, "stale resync???");
81                    return Ok(None);
82                };
83                let repo_state = crate::db::deser_repo_state(bytes.as_ref())?;
84                let did = TrimmedDid::try_from(did_key.as_ref())?.to_did();
85                let metadata_key = keys::repo_metadata_key(&did);
86                let metadata = state
87                    .db
88                    .repo_metadata
89                    .get(&metadata_key)
90                    .into_diagnostic()?
91                    .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
92                let metadata = crate::db::deser_repo_meta(metadata.as_ref())?;
93                Ok(Some(repo_state_to_info(
94                    did,
95                    repo_state.into_static(),
96                    metadata.tracked,
97                )))
98            })
99            .map(|b| b.transpose())
100            .flatten()
101    }
102
103    pub(crate) fn _resync(
104        db: &Db,
105        did: &Did<'_>,
106        batch: &mut fjall::OwnedWriteBatch,
107        transitions: &mut Vec<(GaugeState, GaugeState)>,
108    ) -> Result<bool> {
109        let did_key = keys::repo_key(did);
110        let metadata_key = keys::repo_metadata_key(did);
111
112        let repo_bytes = db.repos.get(&did_key).into_diagnostic()?;
113        let existing = repo_bytes
114            .as_deref()
115            .map(db::deser_repo_state)
116            .transpose()?;
117
118        if let Some(repo_state) = existing {
119            let metadata_bytes = db
120                .repo_metadata
121                .get(&metadata_key)
122                .into_diagnostic()?
123                .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
124            let mut metadata = crate::db::deser_repo_meta(&metadata_bytes)?;
125
126            // skip if already in pending queue
127            let is_pending = db
128                .pending
129                .get(keys::pending_key(metadata.index_id))
130                .into_diagnostic()?
131                .is_some();
132            if !is_pending {
133                let resync = db.resync.get(&did_key).into_diagnostic()?;
134                let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref());
135                metadata.tracked = true;
136                // insert into pending with new index_id
137                let old_pending = keys::pending_key(metadata.index_id);
138                batch.remove(&db.pending, &old_pending);
139                metadata.index_id = rand::Rng::next_u64(&mut rand::rng());
140                batch.insert(&db.pending, keys::pending_key(metadata.index_id), &did_key);
141                batch.remove(&db.resync, &did_key);
142                batch.insert(
143                    &db.repo_metadata,
144                    &metadata_key,
145                    crate::db::ser_repo_meta(&metadata)?,
146                );
147                transitions.push((old, GaugeState::Pending));
148                return Ok(true);
149            }
150        }
151
152        Ok(false)
153    }
154
155    /// request one or more repositories to be resynced.
156    ///
157    /// note that they may not immediately start backfilling if:
158    /// - other repos already filled the backfill concurrency limit,
159    /// - or there are many repos pending already.
160    ///
161    /// this will also clear any error state the repo may have been in,
162    /// allowing it to resync again.
163    pub async fn resync(
164        &self,
165        dids: impl IntoIterator<Item = Did<'_>>,
166    ) -> Result<Vec<Did<'static>>> {
167        let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect();
168        let state = self.0.clone();
169
170        let (queued, transitions) = tokio::task::spawn_blocking(move || {
171            let db = &state.db;
172            let mut batch = db.inner.batch();
173            let mut queued: Vec<Did<'static>> = Vec::new();
174            let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new();
175
176            for did in dids {
177                if Self::_resync(db, &did, &mut batch, &mut transitions)? {
178                    queued.push(did);
179                }
180            }
181
182            batch.commit().into_diagnostic()?;
183            state.db.persist()?;
184            Ok::<_, miette::Report>((queued, transitions))
185        })
186        .await
187        .into_diagnostic()??;
188
189        for (old, new) in transitions {
190            self.0.db.update_gauge_diff_async(&old, &new).await;
191        }
192        if !queued.is_empty() {
193            self.0.notify_backfill();
194        }
195
196        Ok(queued)
197    }
198
199    /// explicitly track one or more repositories, enqueuing them for backfill if needed.
200    ///
201    /// - if a repo is new, a fresh [`RepoState`] is created and backfill is queued.
202    /// - if a repo is already known but untracked, it is marked tracked and re-enqueued.
203    /// - if a repo is already tracked, this is a no-op.
204    pub async fn track(
205        &self,
206        dids: impl IntoIterator<Item = Did<'_>>,
207    ) -> Result<Vec<Did<'static>>> {
208        let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect();
209        let state = self.0.clone();
210
211        let (new_count, queued, transitions) = tokio::task::spawn_blocking(move || {
212            let db = &state.db;
213            let mut batch = db.inner.batch();
214            let mut added = 0i64;
215            let mut queued: Vec<Did<'static>> = Vec::new();
216            let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new();
217
218            for did in dids {
219                let did_key = keys::repo_key(&did);
220                let metadata_key = keys::repo_metadata_key(&did);
221
222                let metadata_bytes = db.repo_metadata.get(&metadata_key).into_diagnostic()?;
223                let existing_metadata = metadata_bytes
224                    .map(|b| crate::db::deser_repo_meta(&b))
225                    .transpose()?;
226
227                if let Some(metadata) = existing_metadata {
228                    if !metadata.tracked && Self::_resync(db, &did, &mut batch, &mut transitions)? {
229                        queued.push(did);
230                    }
231                } else {
232                    let repo_state = RepoState::backfilling();
233                    let metadata = RepoMetadata::backfilling(rand::random());
234                    batch.insert(&db.repos, &did_key, crate::db::ser_repo_state(&repo_state)?);
235                    batch.insert(
236                        &db.repo_metadata,
237                        &metadata_key,
238                        crate::db::ser_repo_meta(&metadata)?,
239                    );
240                    batch.insert(&db.pending, keys::pending_key(metadata.index_id), &did_key);
241                    added += 1;
242                    queued.push(did);
243                    transitions.push((GaugeState::Synced, GaugeState::Pending));
244                }
245            }
246
247            batch.commit().into_diagnostic()?;
248            state.db.persist()?;
249            Ok::<_, miette::Report>((added, queued, transitions))
250        })
251        .await
252        .into_diagnostic()??;
253
254        if new_count > 0 {
255            self.0.db.update_count_async("repos", new_count).await;
256        }
257        for (old, new) in transitions {
258            self.0.db.update_gauge_diff_async(&old, &new).await;
259        }
260        self.0.notify_backfill();
261        Ok(queued)
262    }
263
264    /// stop tracking one or more repositories. hydrant will stop processing new events
265    /// for them and remove them from the pending/resync queues, but existing indexed
266    /// records are **not** deleted.
267    pub async fn untrack(
268        &self,
269        dids: impl IntoIterator<Item = Did<'_>>,
270    ) -> Result<Vec<Did<'static>>> {
271        let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect();
272        let state = self.0.clone();
273
274        let (untracked, gauge_decrements) = tokio::task::spawn_blocking(move || {
275            let db = &state.db;
276            let mut batch = db.inner.batch();
277            let mut untracked: Vec<Did<'static>> = Vec::new();
278            let mut gauge_decrements = Vec::new();
279
280            for did in dids {
281                let did_key = keys::repo_key(&did);
282                let metadata_key = keys::repo_metadata_key(&did);
283
284                let repo_bytes = db.repos.get(&did_key).into_diagnostic()?;
285                let existing = repo_bytes
286                    .as_deref()
287                    .map(db::deser_repo_state)
288                    .transpose()?;
289
290                if let Some(repo_state) = existing {
291                    let metadata_bytes = db.repo_metadata.get(&metadata_key).into_diagnostic()?;
292                    let existing_metadata = metadata_bytes
293                        .map(|b| crate::db::deser_repo_meta(&b))
294                        .transpose()?;
295
296                    if let Some(mut metadata) = existing_metadata {
297                        if metadata.tracked {
298                            let resync = db.resync.get(&did_key).into_diagnostic()?;
299                            let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref());
300                            metadata.tracked = false;
301                            batch.insert(
302                                &db.repo_metadata,
303                                &metadata_key,
304                                crate::db::ser_repo_meta(&metadata)?,
305                            );
306                            batch.remove(&db.pending, keys::pending_key(metadata.index_id));
307                            batch.remove(&db.resync, &did_key);
308                            if old != GaugeState::Synced {
309                                gauge_decrements.push(old);
310                            }
311                            untracked.push(did);
312                        }
313                    }
314                }
315            }
316
317            batch.commit().into_diagnostic()?;
318            state.db.persist()?;
319            Ok::<_, miette::Report>((untracked, gauge_decrements))
320        })
321        .await
322        .into_diagnostic()??;
323
324        for gauge in gauge_decrements {
325            self.0
326                .db
327                .update_gauge_diff_async(&gauge, &GaugeState::Synced)
328                .await;
329        }
330        Ok(untracked)
331    }
332}
333
334impl<'i> RepoHandle<'i> {
335    /// gets a record from this repository.
336    pub async fn get_record(&self, collection: &str, rkey: &str) -> Result<Option<Record>> {
337        let did = self.did.clone().into_static();
338        let db_key = keys::record_key(&did, collection, &DbRkey::new(rkey));
339
340        let collection = collection.to_smolstr();
341        let state = self.state.clone();
342        tokio::task::spawn_blocking(move || {
343            use miette::WrapErr;
344
345            let cid_bytes = state.db.records.get(db_key).into_diagnostic()?;
346            let Some(cid_bytes) = cid_bytes else {
347                return Ok(None);
348            };
349
350            // lookup block using col|cid key
351            let block_key = keys::block_key(&collection, &cid_bytes);
352            let Some(block_bytes) = state.db.blocks.get(block_key).into_diagnostic()? else {
353                miette::bail!("block {cid_bytes:?} not found, this is a bug!!");
354            };
355
356            let value = serde_ipld_dagcbor::from_slice::<Data>(&block_bytes)
357                .into_diagnostic()
358                .wrap_err("cant parse block")?
359                .into_static();
360            let cid = Cid::new(&cid_bytes)
361                .into_diagnostic()
362                .wrap_err("cant parse block cid")?;
363            let cid = Cid::Str(cid.to_cowstr().into_static());
364
365            Ok(Some(Record { did, cid, value }))
366        })
367        .await
368        .into_diagnostic()?
369    }
370
371    /// lists records from this repository.
372    pub async fn list_records(
373        &self,
374        collection: &str,
375        limit: usize,
376        reverse: bool,
377        cursor: Option<&str>,
378    ) -> Result<RecordList> {
379        let did = self.did.clone().into_static();
380
381        let state = self.state.clone();
382        let prefix = keys::record_prefix_collection(&did, collection);
383        let collection = collection.to_smolstr();
384        let cursor = cursor.map(|c| c.to_smolstr());
385
386        tokio::task::spawn_blocking(move || {
387            let mut results = Vec::new();
388            let mut next_cursor = None;
389
390            let iter: Box<dyn Iterator<Item = _>> = if !reverse {
391                let mut end_prefix = prefix.clone();
392                if let Some(last) = end_prefix.last_mut() {
393                    *last += 1;
394                }
395
396                let end_key = if let Some(cursor) = &cursor {
397                    let mut k = prefix.clone();
398                    k.extend_from_slice(cursor.as_bytes());
399                    k
400                } else {
401                    end_prefix
402                };
403
404                Box::new(
405                    state
406                        .db
407                        .records
408                        .range(prefix.as_slice()..end_key.as_slice())
409                        .rev(),
410                )
411            } else {
412                let start_key = if let Some(cursor) = &cursor {
413                    let mut k = prefix.clone();
414                    k.extend_from_slice(cursor.as_bytes());
415                    k.push(0);
416                    k
417                } else {
418                    prefix.clone()
419                };
420
421                Box::new(state.db.records.range(start_key.as_slice()..))
422            };
423
424            for item in iter {
425                let (key, cid_bytes) = item.into_inner().into_diagnostic()?;
426
427                if !key.starts_with(prefix.as_slice()) {
428                    break;
429                }
430
431                let rkey = keys::parse_rkey(&key[prefix.len()..])?;
432                if results.len() >= limit {
433                    next_cursor = Some(rkey);
434                    break;
435                }
436
437                // look up using col|cid key built from collection and binary cid bytes
438                if let Ok(Some(block_bytes)) = state
439                    .db
440                    .blocks
441                    .get(&keys::block_key(collection.as_str(), &cid_bytes))
442                {
443                    let value: Data =
444                        serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null);
445                    let cid = Cid::new(&cid_bytes).into_diagnostic()?;
446                    let cid = Cid::Str(cid.to_cowstr().into_static());
447                    results.push(ListedRecord {
448                        rkey: Rkey::new_cow(CowStr::Owned(rkey.to_smolstr()))
449                            .expect("that rkey is validated"),
450                        cid,
451                        value: value.into_static(),
452                    });
453                }
454            }
455            Result::<_, miette::Report>::Ok((results, next_cursor))
456        })
457        .await
458        .into_diagnostic()?
459        .map(|(records, next_cursor)| RecordList {
460            records,
461            cursor: next_cursor.map(|rkey| {
462                Rkey::new_cow(CowStr::Owned(rkey.to_smolstr())).expect("that rkey is validated")
463            }),
464        })
465    }
466
467    /// generates a streaming CAR v1 response body for this repository.
468    ///
469    /// returns `None` if the repo has no commit yet (still backfilling) or is an
470    /// unmigrated repo that does not have the necessary data to reconstruct the
471    /// root commit from.
472    ///
473    /// ## notes
474    /// - calling this if you are using collection allowlist will always result
475    /// in an error since the commit root won't match the reconstructed CID.
476    /// - calling this for big repositories will incur more resource cost due to
477    /// hydrant's structure, the whole MST is always reconstructed.
478    pub async fn generate_car(
479        &self,
480    ) -> Result<Option<impl futures::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static>>
481    {
482        use iroh_car::{CarHeader, CarWriter};
483        use jacquard_repo::{BlockStore, MemoryBlockStore, Mst};
484        use miette::WrapErr;
485        use std::sync::Arc;
486
487        let commit = match self.state().await? {
488            Some(state) => match state.root {
489                Some(c) => c,
490                None => return Ok(None),
491            },
492            None => return Ok(None),
493        };
494
495        let atp_commit = match commit.into_atp_commit(self.did.clone().into_static()) {
496            Some(c) => c,
497            None => return Ok(None),
498        };
499        let commit_cid = atp_commit.to_cid().into_diagnostic()?;
500        let commit_cbor = atp_commit.to_cbor().into_diagnostic()?;
501
502        let did = self.did.clone().into_static();
503        let app_state = self.state.clone();
504
505        // build mst and populate the block store in a single blocking pass
506        let store = Arc::new(MemoryBlockStore::new());
507        let mst = Mst::new(store.clone());
508        let handle = tokio::runtime::Handle::current();
509
510        let mst = tokio::task::spawn_blocking(move || -> Result<_> {
511            let mut mst = mst;
512            let prefix = keys::record_prefix_did(&did);
513
514            for guard in app_state.db.records.prefix(&prefix) {
515                let (key, cid_bytes) = guard.into_inner().into_diagnostic()?;
516
517                let rest = &key[prefix.len()..];
518                let mut parts = rest.splitn(2, |b: &u8| *b == keys::SEP);
519                let collection_raw = parts
520                    .next()
521                    .ok_or_else(|| miette::miette!("missing collection in record key"))?;
522                let rkey_raw = parts
523                    .next()
524                    .ok_or_else(|| miette::miette!("missing rkey in record key"))?;
525
526                let collection = std::str::from_utf8(collection_raw)
527                    .into_diagnostic()
528                    .wrap_err("collection is not valid utf8")?;
529                let rkey = keys::parse_rkey(rkey_raw)?;
530                let mst_key = format!("{collection}/{rkey}");
531
532                let ipld_cid = cid::Cid::read_bytes(cid_bytes.as_ref())
533                    .into_diagnostic()
534                    .wrap_err_with(|| format!("invalid cid bytes for record {mst_key}"))?;
535
536                let block_key = keys::block_key(collection, cid_bytes.as_ref());
537                let block_bytes = app_state
538                    .db
539                    .blocks
540                    .get(&block_key)
541                    .into_diagnostic()?
542                    .ok_or_else(|| miette::miette!("block missing for record {mst_key}"))?;
543
544                handle
545                    .block_on(mst.add_mut(&mst_key, ipld_cid))
546                    .into_diagnostic()?;
547                // we use put_many here to skip calculating the CID again
548                handle
549                    .block_on(mst.storage().put_many([(
550                        ipld_cid,
551                        bytes::Bytes::copy_from_slice(block_bytes.as_ref()),
552                    )]))
553                    .into_diagnostic()?;
554            }
555
556            handle.block_on(mst.persist()).into_diagnostic()?;
557
558            Result::<_>::Ok(mst)
559        })
560        .await
561        .into_diagnostic()??;
562
563        // sanity check: rebuilt root should match stored commit data in full-index mode
564        let computed_root = mst.get_pointer().await.into_diagnostic()?;
565        if computed_root != atp_commit.data {
566            tracing::warn!(
567                computed = %computed_root,
568                stored = %atp_commit.data,
569                did = %self.did,
570                "mst root mismatch (expected in filter mode)",
571            );
572        }
573
574        store
575            .put_many([(commit_cid, bytes::Bytes::from(commit_cbor))])
576            .await
577            .into_diagnostic()?;
578
579        // stream the car directly to the response
580        let (reader, writer) = tokio::io::duplex(64 * 1024);
581        tokio::spawn(
582            async move {
583                let header = CarHeader::new_v1(vec![commit_cid]);
584                let mut car_writer = CarWriter::new(header, writer);
585
586                // write commit first, then mst nodes + leaf blocks
587                let commit_data = store.get(&commit_cid).await?;
588                if let Some(data) = commit_data {
589                    car_writer
590                        .write(commit_cid, &data)
591                        .await
592                        .into_diagnostic()?;
593                }
594                mst.write_blocks_to_car(&mut car_writer).await?;
595                car_writer.finish().await.into_diagnostic()?;
596
597                Result::<_, miette::Report>::Ok(())
598            }
599            .inspect_err(|e| tracing::error!("can't generate car: {e}")),
600        );
601
602        Ok(Some(tokio_util::io::ReaderStream::new(reader)))
603    }
604
605    /// gets how many records of a collection this repository has.
606    pub async fn count_records(&self, collection: &str) -> Result<u64> {
607        let did = self.did.clone().into_static();
608        let state = self.state.clone();
609        let collection = collection.to_string();
610        tokio::task::spawn_blocking(move || db::get_record_count(&state.db, &did, &collection))
611            .await
612            .into_diagnostic()?
613    }
614}