1use futures::{FutureExt, TryFutureExt};
2use rand::Rng;
3
4use super::*;
5
6impl ReposControl {
7 #[allow(dead_code)]
9 pub(crate) fn iter_pending(
10 &self,
11 cursor: Option<u64>,
12 ) -> impl Iterator<Item = Result<(u64, RepoInfo)>> {
13 let start_bound = if let Some(cursor) = cursor {
14 std::ops::Bound::Excluded(cursor.to_be_bytes().to_vec())
15 } else {
16 std::ops::Bound::Unbounded
17 };
18
19 let repos = self.0.db.repos.clone();
20 let state = self.0.clone();
21 self.0
22 .db
23 .pending
24 .range((start_bound, std::ops::Bound::Unbounded))
25 .map(move |g| {
26 let (id_raw, did_key) = g.into_inner().into_diagnostic()?;
27 let id = u64::from_be_bytes(
28 id_raw
29 .as_ref()
30 .try_into()
31 .into_diagnostic()
32 .wrap_err("can't parse pending key")?,
33 );
34 let Some(bytes) = repos.get(&did_key).into_diagnostic()? else {
35 tracing::warn!(id, did = ?did_key, "stale pending???");
37 return Ok(None);
38 };
39 let repo_state = crate::db::deser_repo_state(bytes.as_ref())?;
40 let did = TrimmedDid::try_from(did_key.as_ref())?.to_did();
41 let metadata_key = keys::repo_metadata_key(&did);
42 let metadata = state
43 .db
44 .repo_metadata
45 .get(&metadata_key)
46 .into_diagnostic()?
47 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
48 let metadata = crate::db::deser_repo_meta(metadata.as_ref())?;
49 Ok(Some((
50 id,
51 repo_state_to_info(did, repo_state.into_static(), metadata.tracked),
52 )))
53 })
54 .map(|b| b.transpose())
55 .flatten()
56 }
57
58 #[allow(dead_code)]
59 pub(crate) fn iter_resync(
60 &self,
61 cursor: Option<&Did<'_>>,
62 ) -> impl Iterator<Item = Result<RepoInfo>> {
63 let start_bound = if let Some(cursor) = cursor {
64 let did_key = keys::repo_key(cursor);
65 std::ops::Bound::Excluded(did_key)
66 } else {
67 std::ops::Bound::Unbounded
68 };
69
70 let repos = self.0.db.repos.clone();
71 let state = self.0.clone();
72 self.0
73 .db
74 .resync
75 .range((start_bound, std::ops::Bound::Unbounded))
76 .map(move |g| {
77 let did_key = g.key().into_diagnostic()?;
78 let Some(bytes) = repos.get(&did_key).into_diagnostic()? else {
79 tracing::warn!(did = ?did_key, "stale resync???");
81 return Ok(None);
82 };
83 let repo_state = crate::db::deser_repo_state(bytes.as_ref())?;
84 let did = TrimmedDid::try_from(did_key.as_ref())?.to_did();
85 let metadata_key = keys::repo_metadata_key(&did);
86 let metadata = state
87 .db
88 .repo_metadata
89 .get(&metadata_key)
90 .into_diagnostic()?
91 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
92 let metadata = crate::db::deser_repo_meta(metadata.as_ref())?;
93 Ok(Some(repo_state_to_info(
94 did,
95 repo_state.into_static(),
96 metadata.tracked,
97 )))
98 })
99 .map(|b| b.transpose())
100 .flatten()
101 }
102
103 pub(crate) fn _resync(
104 db: &Db,
105 did: &Did<'_>,
106 batch: &mut fjall::OwnedWriteBatch,
107 transitions: &mut Vec<(GaugeState, GaugeState)>,
108 ) -> Result<bool> {
109 let did_key = keys::repo_key(did);
110 let metadata_key = keys::repo_metadata_key(did);
111
112 let repo_bytes = db.repos.get(&did_key).into_diagnostic()?;
113 let existing = repo_bytes
114 .as_deref()
115 .map(db::deser_repo_state)
116 .transpose()?;
117
118 if let Some(repo_state) = existing {
119 let metadata_bytes = db
120 .repo_metadata
121 .get(&metadata_key)
122 .into_diagnostic()?
123 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
124 let mut metadata = crate::db::deser_repo_meta(&metadata_bytes)?;
125
126 let is_pending = db
128 .pending
129 .get(keys::pending_key(metadata.index_id))
130 .into_diagnostic()?
131 .is_some();
132 if !is_pending {
133 let resync = db.resync.get(&did_key).into_diagnostic()?;
134 let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref());
135 metadata.tracked = true;
136 let old_pending = keys::pending_key(metadata.index_id);
138 batch.remove(&db.pending, &old_pending);
139 metadata.index_id = rand::Rng::next_u64(&mut rand::rng());
140 batch.insert(&db.pending, keys::pending_key(metadata.index_id), &did_key);
141 batch.remove(&db.resync, &did_key);
142 batch.insert(
143 &db.repo_metadata,
144 &metadata_key,
145 crate::db::ser_repo_meta(&metadata)?,
146 );
147 transitions.push((old, GaugeState::Pending));
148 return Ok(true);
149 }
150 }
151
152 Ok(false)
153 }
154
155 pub async fn resync(
164 &self,
165 dids: impl IntoIterator<Item = Did<'_>>,
166 ) -> Result<Vec<Did<'static>>> {
167 let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect();
168 let state = self.0.clone();
169
170 let (queued, transitions) = tokio::task::spawn_blocking(move || {
171 let db = &state.db;
172 let mut batch = db.inner.batch();
173 let mut queued: Vec<Did<'static>> = Vec::new();
174 let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new();
175
176 for did in dids {
177 if Self::_resync(db, &did, &mut batch, &mut transitions)? {
178 queued.push(did);
179 }
180 }
181
182 batch.commit().into_diagnostic()?;
183 state.db.persist()?;
184 Ok::<_, miette::Report>((queued, transitions))
185 })
186 .await
187 .into_diagnostic()??;
188
189 for (old, new) in transitions {
190 self.0.db.update_gauge_diff_async(&old, &new).await;
191 }
192 if !queued.is_empty() {
193 self.0.notify_backfill();
194 }
195
196 Ok(queued)
197 }
198
199 pub async fn track(
205 &self,
206 dids: impl IntoIterator<Item = Did<'_>>,
207 ) -> Result<Vec<Did<'static>>> {
208 let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect();
209 let state = self.0.clone();
210
211 let (new_count, queued, transitions) = tokio::task::spawn_blocking(move || {
212 let db = &state.db;
213 let mut batch = db.inner.batch();
214 let mut added = 0i64;
215 let mut queued: Vec<Did<'static>> = Vec::new();
216 let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new();
217
218 for did in dids {
219 let did_key = keys::repo_key(&did);
220 let metadata_key = keys::repo_metadata_key(&did);
221
222 let metadata_bytes = db.repo_metadata.get(&metadata_key).into_diagnostic()?;
223 let existing_metadata = metadata_bytes
224 .map(|b| crate::db::deser_repo_meta(&b))
225 .transpose()?;
226
227 if let Some(metadata) = existing_metadata {
228 if !metadata.tracked && Self::_resync(db, &did, &mut batch, &mut transitions)? {
229 queued.push(did);
230 }
231 } else {
232 let repo_state = RepoState::backfilling();
233 let metadata = RepoMetadata::backfilling(rand::random());
234 batch.insert(&db.repos, &did_key, crate::db::ser_repo_state(&repo_state)?);
235 batch.insert(
236 &db.repo_metadata,
237 &metadata_key,
238 crate::db::ser_repo_meta(&metadata)?,
239 );
240 batch.insert(&db.pending, keys::pending_key(metadata.index_id), &did_key);
241 added += 1;
242 queued.push(did);
243 transitions.push((GaugeState::Synced, GaugeState::Pending));
244 }
245 }
246
247 batch.commit().into_diagnostic()?;
248 state.db.persist()?;
249 Ok::<_, miette::Report>((added, queued, transitions))
250 })
251 .await
252 .into_diagnostic()??;
253
254 if new_count > 0 {
255 self.0.db.update_count_async("repos", new_count).await;
256 }
257 for (old, new) in transitions {
258 self.0.db.update_gauge_diff_async(&old, &new).await;
259 }
260 self.0.notify_backfill();
261 Ok(queued)
262 }
263
264 pub async fn untrack(
268 &self,
269 dids: impl IntoIterator<Item = Did<'_>>,
270 ) -> Result<Vec<Did<'static>>> {
271 let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect();
272 let state = self.0.clone();
273
274 let (untracked, gauge_decrements) = tokio::task::spawn_blocking(move || {
275 let db = &state.db;
276 let mut batch = db.inner.batch();
277 let mut untracked: Vec<Did<'static>> = Vec::new();
278 let mut gauge_decrements = Vec::new();
279
280 for did in dids {
281 let did_key = keys::repo_key(&did);
282 let metadata_key = keys::repo_metadata_key(&did);
283
284 let repo_bytes = db.repos.get(&did_key).into_diagnostic()?;
285 let existing = repo_bytes
286 .as_deref()
287 .map(db::deser_repo_state)
288 .transpose()?;
289
290 if let Some(repo_state) = existing {
291 let metadata_bytes = db.repo_metadata.get(&metadata_key).into_diagnostic()?;
292 let existing_metadata = metadata_bytes
293 .map(|b| crate::db::deser_repo_meta(&b))
294 .transpose()?;
295
296 if let Some(mut metadata) = existing_metadata {
297 if metadata.tracked {
298 let resync = db.resync.get(&did_key).into_diagnostic()?;
299 let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref());
300 metadata.tracked = false;
301 batch.insert(
302 &db.repo_metadata,
303 &metadata_key,
304 crate::db::ser_repo_meta(&metadata)?,
305 );
306 batch.remove(&db.pending, keys::pending_key(metadata.index_id));
307 batch.remove(&db.resync, &did_key);
308 if old != GaugeState::Synced {
309 gauge_decrements.push(old);
310 }
311 untracked.push(did);
312 }
313 }
314 }
315 }
316
317 batch.commit().into_diagnostic()?;
318 state.db.persist()?;
319 Ok::<_, miette::Report>((untracked, gauge_decrements))
320 })
321 .await
322 .into_diagnostic()??;
323
324 for gauge in gauge_decrements {
325 self.0
326 .db
327 .update_gauge_diff_async(&gauge, &GaugeState::Synced)
328 .await;
329 }
330 Ok(untracked)
331 }
332}
333
334impl<'i> RepoHandle<'i> {
335 pub async fn get_record(&self, collection: &str, rkey: &str) -> Result<Option<Record>> {
337 let did = self.did.clone().into_static();
338 let db_key = keys::record_key(&did, collection, &DbRkey::new(rkey));
339
340 let collection = collection.to_smolstr();
341 let state = self.state.clone();
342 tokio::task::spawn_blocking(move || {
343 use miette::WrapErr;
344
345 let cid_bytes = state.db.records.get(db_key).into_diagnostic()?;
346 let Some(cid_bytes) = cid_bytes else {
347 return Ok(None);
348 };
349
350 let block_key = keys::block_key(&collection, &cid_bytes);
352 let Some(block_bytes) = state.db.blocks.get(block_key).into_diagnostic()? else {
353 miette::bail!("block {cid_bytes:?} not found, this is a bug!!");
354 };
355
356 let value = serde_ipld_dagcbor::from_slice::<Data>(&block_bytes)
357 .into_diagnostic()
358 .wrap_err("cant parse block")?
359 .into_static();
360 let cid = Cid::new(&cid_bytes)
361 .into_diagnostic()
362 .wrap_err("cant parse block cid")?;
363 let cid = Cid::Str(cid.to_cowstr().into_static());
364
365 Ok(Some(Record { did, cid, value }))
366 })
367 .await
368 .into_diagnostic()?
369 }
370
371 pub async fn list_records(
373 &self,
374 collection: &str,
375 limit: usize,
376 reverse: bool,
377 cursor: Option<&str>,
378 ) -> Result<RecordList> {
379 let did = self.did.clone().into_static();
380
381 let state = self.state.clone();
382 let prefix = keys::record_prefix_collection(&did, collection);
383 let collection = collection.to_smolstr();
384 let cursor = cursor.map(|c| c.to_smolstr());
385
386 tokio::task::spawn_blocking(move || {
387 let mut results = Vec::new();
388 let mut next_cursor = None;
389
390 let iter: Box<dyn Iterator<Item = _>> = if !reverse {
391 let mut end_prefix = prefix.clone();
392 if let Some(last) = end_prefix.last_mut() {
393 *last += 1;
394 }
395
396 let end_key = if let Some(cursor) = &cursor {
397 let mut k = prefix.clone();
398 k.extend_from_slice(cursor.as_bytes());
399 k
400 } else {
401 end_prefix
402 };
403
404 Box::new(
405 state
406 .db
407 .records
408 .range(prefix.as_slice()..end_key.as_slice())
409 .rev(),
410 )
411 } else {
412 let start_key = if let Some(cursor) = &cursor {
413 let mut k = prefix.clone();
414 k.extend_from_slice(cursor.as_bytes());
415 k.push(0);
416 k
417 } else {
418 prefix.clone()
419 };
420
421 Box::new(state.db.records.range(start_key.as_slice()..))
422 };
423
424 for item in iter {
425 let (key, cid_bytes) = item.into_inner().into_diagnostic()?;
426
427 if !key.starts_with(prefix.as_slice()) {
428 break;
429 }
430
431 let rkey = keys::parse_rkey(&key[prefix.len()..])?;
432 if results.len() >= limit {
433 next_cursor = Some(rkey);
434 break;
435 }
436
437 if let Ok(Some(block_bytes)) = state
439 .db
440 .blocks
441 .get(&keys::block_key(collection.as_str(), &cid_bytes))
442 {
443 let value: Data =
444 serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null);
445 let cid = Cid::new(&cid_bytes).into_diagnostic()?;
446 let cid = Cid::Str(cid.to_cowstr().into_static());
447 results.push(ListedRecord {
448 rkey: Rkey::new_cow(CowStr::Owned(rkey.to_smolstr()))
449 .expect("that rkey is validated"),
450 cid,
451 value: value.into_static(),
452 });
453 }
454 }
455 Result::<_, miette::Report>::Ok((results, next_cursor))
456 })
457 .await
458 .into_diagnostic()?
459 .map(|(records, next_cursor)| RecordList {
460 records,
461 cursor: next_cursor.map(|rkey| {
462 Rkey::new_cow(CowStr::Owned(rkey.to_smolstr())).expect("that rkey is validated")
463 }),
464 })
465 }
466
467 pub async fn generate_car(
479 &self,
480 ) -> Result<Option<impl futures::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static>>
481 {
482 use iroh_car::{CarHeader, CarWriter};
483 use jacquard_repo::{BlockStore, MemoryBlockStore, Mst};
484 use miette::WrapErr;
485 use std::sync::Arc;
486
487 let commit = match self.state().await? {
488 Some(state) => match state.root {
489 Some(c) => c,
490 None => return Ok(None),
491 },
492 None => return Ok(None),
493 };
494
495 let atp_commit = match commit.into_atp_commit(self.did.clone().into_static()) {
496 Some(c) => c,
497 None => return Ok(None),
498 };
499 let commit_cid = atp_commit.to_cid().into_diagnostic()?;
500 let commit_cbor = atp_commit.to_cbor().into_diagnostic()?;
501
502 let did = self.did.clone().into_static();
503 let app_state = self.state.clone();
504
505 let store = Arc::new(MemoryBlockStore::new());
507 let mst = Mst::new(store.clone());
508 let handle = tokio::runtime::Handle::current();
509
510 let mst = tokio::task::spawn_blocking(move || -> Result<_> {
511 let mut mst = mst;
512 let prefix = keys::record_prefix_did(&did);
513
514 for guard in app_state.db.records.prefix(&prefix) {
515 let (key, cid_bytes) = guard.into_inner().into_diagnostic()?;
516
517 let rest = &key[prefix.len()..];
518 let mut parts = rest.splitn(2, |b: &u8| *b == keys::SEP);
519 let collection_raw = parts
520 .next()
521 .ok_or_else(|| miette::miette!("missing collection in record key"))?;
522 let rkey_raw = parts
523 .next()
524 .ok_or_else(|| miette::miette!("missing rkey in record key"))?;
525
526 let collection = std::str::from_utf8(collection_raw)
527 .into_diagnostic()
528 .wrap_err("collection is not valid utf8")?;
529 let rkey = keys::parse_rkey(rkey_raw)?;
530 let mst_key = format!("{collection}/{rkey}");
531
532 let ipld_cid = cid::Cid::read_bytes(cid_bytes.as_ref())
533 .into_diagnostic()
534 .wrap_err_with(|| format!("invalid cid bytes for record {mst_key}"))?;
535
536 let block_key = keys::block_key(collection, cid_bytes.as_ref());
537 let block_bytes = app_state
538 .db
539 .blocks
540 .get(&block_key)
541 .into_diagnostic()?
542 .ok_or_else(|| miette::miette!("block missing for record {mst_key}"))?;
543
544 handle
545 .block_on(mst.add_mut(&mst_key, ipld_cid))
546 .into_diagnostic()?;
547 handle
549 .block_on(mst.storage().put_many([(
550 ipld_cid,
551 bytes::Bytes::copy_from_slice(block_bytes.as_ref()),
552 )]))
553 .into_diagnostic()?;
554 }
555
556 handle.block_on(mst.persist()).into_diagnostic()?;
557
558 Result::<_>::Ok(mst)
559 })
560 .await
561 .into_diagnostic()??;
562
563 let computed_root = mst.get_pointer().await.into_diagnostic()?;
565 if computed_root != atp_commit.data {
566 tracing::warn!(
567 computed = %computed_root,
568 stored = %atp_commit.data,
569 did = %self.did,
570 "mst root mismatch (expected in filter mode)",
571 );
572 }
573
574 store
575 .put_many([(commit_cid, bytes::Bytes::from(commit_cbor))])
576 .await
577 .into_diagnostic()?;
578
579 let (reader, writer) = tokio::io::duplex(64 * 1024);
581 tokio::spawn(
582 async move {
583 let header = CarHeader::new_v1(vec![commit_cid]);
584 let mut car_writer = CarWriter::new(header, writer);
585
586 let commit_data = store.get(&commit_cid).await?;
588 if let Some(data) = commit_data {
589 car_writer
590 .write(commit_cid, &data)
591 .await
592 .into_diagnostic()?;
593 }
594 mst.write_blocks_to_car(&mut car_writer).await?;
595 car_writer.finish().await.into_diagnostic()?;
596
597 Result::<_, miette::Report>::Ok(())
598 }
599 .inspect_err(|e| tracing::error!("can't generate car: {e}")),
600 );
601
602 Ok(Some(tokio_util::io::ReaderStream::new(reader)))
603 }
604
605 pub async fn count_records(&self, collection: &str) -> Result<u64> {
607 let did = self.did.clone().into_static();
608 let state = self.state.clone();
609 let collection = collection.to_string();
610 tokio::task::spawn_blocking(move || db::get_record_count(&state.db, &did, &collection))
611 .await
612 .into_diagnostic()?
613 }
614}