hydrant/control/repos/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::{DateTime, Utc};
5use jacquard_common::cowstr::ToCowStr;
6use jacquard_common::types::cid::{Cid, IpldCid};
7use jacquard_common::types::ident::AtIdentifier;
8use jacquard_common::types::nsid::Nsid;
9use jacquard_common::types::string::{Did, Handle, Rkey};
10use jacquard_common::types::tid::Tid;
11use jacquard_common::{CowStr, Data, IntoStatic};
12use miette::{Context, IntoDiagnostic, Result, WrapErr};
13use smol_str::ToSmolStr;
14use url::Url;
15
16use crate::db::types::{DbRkey, DidKey, TrimmedDid};
17use crate::db::{self, Db, keys};
18use crate::state::AppState;
19#[cfg(feature = "indexer")]
20use crate::types::GaugeState;
21use crate::types::{RepoMetadata, RepoState, RepoStatus};
22use crate::util::invalid_handle;
23
24#[cfg(feature = "indexer")]
25mod indexer;
26
27#[cfg(feature = "indexer")]
28pub use indexer::*;
29
30/// information about a tracked or known repository. returned by [`ReposControl`] methods.
31#[derive(Debug, Clone, serde::Serialize)]
32pub struct RepoInfo {
33    /// the DID of the repository.
34    pub did: Did<'static>,
35    /// the status of the repository.
36    #[serde(serialize_with = "crate::util::repo_status_serialize_str")]
37    pub status: RepoStatus,
38    /// whether this repository is tracked or not.
39    /// untracked repositories are not updated and they stay frozen.
40    pub tracked: bool,
41    /// the revision of the root commit of this repository.
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub rev: Option<Tid>,
44    /// the CID of the MST root of this repository.
45    #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub data: Option<IpldCid>,
48    /// the handle for the DID of this repository.
49    ///
50    /// note that this handle is not bi-directionally verified.
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub handle: Option<Handle<'static>>,
53    /// the URL for the PDS in which this repository is hosted on.
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub pds: Option<Url>,
56    /// ATProto signing key of this repository.
57    #[serde(serialize_with = "crate::util::opt_did_key_serialize_str")]
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub signing_key: Option<DidKey<'static>>,
60    /// when this repository was last touched (status update, commit ingested, etc.).
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub last_updated_at: Option<DateTime<Utc>>,
63    /// the time of the last message gotten from the firehose for this repository.
64    /// this is equal to the `time` field.
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub last_message_at: Option<DateTime<Utc>>,
67}
68
69/// control over which repositories are tracked and access to their state.
70///
71/// in `filter` mode, a repo is only indexed if it either matches a signal or is
72/// explicitly tracked via [`ReposControl::track`]. in `full` mode all repos are
73/// indexed and tracking is implicit.
74///
75/// tracking a DID that hydrant has never seen enqueues an immediate backfill.
76/// tracking a DID that hydrant already knows about (but has marked untracked)
77/// re-enqueues it for backfill.
78#[derive(Clone)]
79pub struct ReposControl(pub(super) Arc<AppState>);
80
81impl ReposControl {
82    pub(crate) fn iter_states(
83        &self,
84        cursor: Option<&Did<'_>>,
85    ) -> impl Iterator<Item = Result<(Did<'static>, RepoState<'static>, crate::types::RepoMetadata)>>
86    {
87        let start_bound = if let Some(cursor) = cursor {
88            let did_key = keys::repo_key(cursor);
89            std::ops::Bound::Excluded(did_key)
90        } else {
91            std::ops::Bound::Unbounded
92        };
93
94        let state = self.0.clone();
95        self.0
96            .db
97            .repos
98            .range((start_bound, std::ops::Bound::Unbounded))
99            .map(move |g| {
100                let (k, v) = g.into_inner().into_diagnostic()?;
101                let repo_state = crate::db::deser_repo_state(&v)?.into_static();
102                let did = TrimmedDid::try_from(k.as_ref())?.to_did();
103                let metadata_key = keys::repo_metadata_key(&did);
104                let metadata = state
105                    .db
106                    .repo_metadata
107                    .get(&metadata_key)
108                    .into_diagnostic()?
109                    .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
110                let metadata = crate::db::deser_repo_meta(metadata.as_ref())?;
111                Ok((did, repo_state, metadata))
112            })
113    }
114
115    /// iterates through all repositories, returning their state.
116    pub fn iter(&self, cursor: Option<&Did<'_>>) -> impl Iterator<Item = Result<RepoInfo>> {
117        self.iter_states(cursor)
118            .map(|r| r.map(|(did, s, m)| repo_state_to_info(did, s, m.tracked)))
119    }
120
121    /// gets a handle for a repository to read from it.
122    pub fn get<'i>(&self, did: &Did<'i>) -> RepoHandle<'i> {
123        RepoHandle {
124            state: self.0.clone(),
125            did: did.clone(),
126        }
127    }
128
129    /// same as [`ReposControl::get`] but allows you to pass in an identifier that can be
130    /// either a handle or a DID.
131    pub async fn resolve(&self, repo: &AtIdentifier<'_>) -> Result<RepoHandle<'static>> {
132        let did = self.0.resolver.resolve_did(repo).await?;
133        Ok(RepoHandle {
134            state: self.0.clone(),
135            did,
136        })
137    }
138
139    /// fetch the current state of a repository.
140    /// returns `None` if hydrant has never seen this repository.
141    pub async fn info(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> {
142        self.get(did).info().await
143    }
144}
145
146pub(crate) fn repo_state_to_info(did: Did<'static>, s: RepoState<'_>, tracked: bool) -> RepoInfo {
147    let (rev, data) = s
148        .root
149        .map(|c| (Some(c.rev.to_tid()), Some(c.data)))
150        .unwrap_or_default();
151    RepoInfo {
152        did,
153        status: s.status,
154        tracked,
155        rev,
156        data,
157        handle: s.handle.map(|h| h.into_static()),
158        pds: s.pds.and_then(|p| p.parse().ok()),
159        signing_key: s.signing_key.map(|k| k.into_static()),
160        last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at),
161        last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs),
162    }
163}
164
165pub struct Record {
166    pub did: Did<'static>,
167    pub cid: Cid<'static>,
168    pub value: Data<'static>,
169}
170
171pub struct ListedRecord {
172    pub rkey: Rkey<'static>,
173    pub cid: Cid<'static>,
174    pub value: Data<'static>,
175}
176
177pub struct RecordList {
178    pub records: Vec<ListedRecord>,
179    pub cursor: Option<Rkey<'static>>,
180}
181
182#[derive(Debug, thiserror::Error)]
183pub enum MiniDocError {
184    #[error("repo is not synced yet")]
185    NotSynced,
186    #[error("repo not found")]
187    RepoNotFound,
188    #[error("could not resolve identity")]
189    CouldNotResolveIdentity,
190    #[error("{0}")]
191    Other(miette::Error),
192}
193
194/// a mini doc with a bi-directionally verified handle.
195pub struct MiniDoc<'i> {
196    /// the did.
197    pub did: Did<'i>,
198    /// the handle. if verification fails or no handle is found,
199    /// this will be "handle.invalid".
200    pub handle: Handle<'i>,
201    /// the url of the PDS of this repo.
202    pub pds: Url,
203    /// the atproto signing key of this repo.
204    pub signing_key: DidKey<'i>,
205}
206
207/// handle to access data related to this repository.
208#[derive(Clone)]
209pub struct RepoHandle<'i> {
210    state: Arc<AppState>,
211    pub did: Did<'i>,
212}
213
214impl<'i> RepoHandle<'i> {
215    pub(crate) async fn state(&self) -> Result<Option<RepoState<'static>>> {
216        let did_key = keys::repo_key(&self.did);
217        let app_state = self.state.clone();
218
219        tokio::task::spawn_blocking(move || {
220            let bytes = app_state.db.repos.get(&did_key).into_diagnostic()?;
221            bytes
222                .as_deref()
223                .map(db::deser_repo_state)
224                .transpose()
225                .map(|opt| opt.map(IntoStatic::into_static))
226        })
227        .await
228        .into_diagnostic()?
229    }
230
231    /// fetch the current state of this repository.
232    /// returns `None` if hydrant has never seen this repository.
233    pub async fn info(&self) -> Result<Option<RepoInfo>> {
234        let did = self.did.clone().into_static();
235        let did_key = keys::repo_key(&did);
236        let metadata_key = keys::repo_metadata_key(&did);
237        let app_state = self.state.clone();
238
239        tokio::task::spawn_blocking(move || {
240            let state_bytes = app_state.db.repos.get(&did_key).into_diagnostic()?;
241            let Some(state_bytes) = state_bytes else {
242                return Ok(None);
243            };
244            let repo_state = crate::db::deser_repo_state(&state_bytes)?;
245
246            let metadata_bytes = app_state
247                .db
248                .repo_metadata
249                .get(&metadata_key)
250                .into_diagnostic()?
251                .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
252            let metadata = crate::db::deser_repo_meta(&metadata_bytes)?;
253
254            Ok(Some(repo_state_to_info(did, repo_state, metadata.tracked)))
255        })
256        .await
257        .into_diagnostic()?
258    }
259
260    /// returns the collections of this repository and the number of records it has in each.
261    pub async fn collections(&self) -> Result<HashMap<Nsid<'static>, u64>> {
262        let did = self.did.clone().into_static();
263        let state = self.state.clone();
264
265        tokio::task::spawn_blocking(move || {
266            let prefix = keys::did_collection_prefix(&did);
267            let mut res = HashMap::new();
268            for item in state.db.counts.prefix(&prefix) {
269                let (k, v) = item.into_inner().into_diagnostic()?;
270                let col = k
271                    .strip_prefix(prefix.as_slice())
272                    .ok_or_else(|| miette::miette!("invalid collection count key: {k:?}"))
273                    .and_then(|r| std::str::from_utf8(r).into_diagnostic())
274                    .and_then(|n| Nsid::new(n).into_diagnostic())?
275                    .into_static();
276                let count = u64::from_be_bytes(
277                    v.as_ref()
278                        .try_into()
279                        .into_diagnostic()
280                        .wrap_err("expected to be count (8 bytes)")?,
281                );
282                res.insert(col, count);
283            }
284            Ok(res)
285        })
286        .await
287        .into_diagnostic()?
288    }
289
290    /// returns a bi-directionally validated mini doc.
291    pub async fn mini_doc(&self) -> Result<MiniDoc<'static>, MiniDocError> {
292        let Some(info) = self.info().await.map_err(MiniDocError::Other)? else {
293            return Err(MiniDocError::RepoNotFound);
294        };
295
296        // check if repo is still backfilling (in pending)
297        #[cfg(feature = "indexer")]
298        let is_pending = {
299            let metadata_key = keys::repo_metadata_key(&self.did);
300            let app_state = self.state.clone();
301            tokio::task::spawn_blocking(move || {
302                let metadata_bytes = app_state
303                    .db
304                    .repo_metadata
305                    .get(&metadata_key)
306                    .into_diagnostic()?;
307                let Some(metadata_bytes) = metadata_bytes else {
308                    return Ok::<_, miette::Report>(false);
309                };
310
311                let metadata = crate::db::deser_repo_meta(metadata_bytes.as_ref())?;
312                return Ok(app_state
313                    .db
314                    .pending
315                    .get(crate::db::keys::pending_key(metadata.index_id))
316                    .into_diagnostic()?
317                    .is_some());
318            })
319            .await
320            .map_err(|e| MiniDocError::Other(miette::miette!(e)))?
321            .map_err(MiniDocError::Other)?
322        };
323        #[cfg(feature = "relay")]
324        let is_pending = false;
325
326        if is_pending {
327            return Err(MiniDocError::NotSynced);
328        }
329
330        let pds = info
331            .pds
332            .ok_or_else(|| MiniDocError::CouldNotResolveIdentity)?;
333        let signing_key = info
334            .signing_key
335            .ok_or_else(|| MiniDocError::CouldNotResolveIdentity)?
336            .into_static();
337
338        let handle = if let Some(h) = info.handle {
339            let is_valid = self
340                .state
341                .resolver
342                .verify_handle(&self.did, &h)
343                .await
344                .into_diagnostic()
345                .map_err(MiniDocError::Other)?;
346            is_valid.then_some(h).unwrap_or_else(invalid_handle)
347        } else {
348            invalid_handle()
349        };
350
351        Ok(MiniDoc {
352            did: self.did.clone().into_static(),
353            handle,
354            pds,
355            signing_key,
356        })
357    }
358}