1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::{DateTime, Utc};
5use jacquard_common::cowstr::ToCowStr;
6use jacquard_common::types::cid::{Cid, IpldCid};
7use jacquard_common::types::ident::AtIdentifier;
8use jacquard_common::types::nsid::Nsid;
9use jacquard_common::types::string::{Did, Handle, Rkey};
10use jacquard_common::types::tid::Tid;
11use jacquard_common::{CowStr, Data, IntoStatic};
12use miette::{Context, IntoDiagnostic, Result, WrapErr};
13use smol_str::ToSmolStr;
14use url::Url;
15
16use crate::db::types::{DbRkey, DidKey, TrimmedDid};
17use crate::db::{self, Db, keys};
18use crate::state::AppState;
19#[cfg(feature = "indexer")]
20use crate::types::GaugeState;
21use crate::types::{RepoMetadata, RepoState, RepoStatus};
22use crate::util::invalid_handle;
23
24#[cfg(feature = "indexer")]
25mod indexer;
26
27#[cfg(feature = "indexer")]
28pub use indexer::*;
29
30#[derive(Debug, Clone, serde::Serialize)]
32pub struct RepoInfo {
33 pub did: Did<'static>,
35 #[serde(serialize_with = "crate::util::repo_status_serialize_str")]
37 pub status: RepoStatus,
38 pub tracked: bool,
41 #[serde(skip_serializing_if = "Option::is_none")]
43 pub rev: Option<Tid>,
44 #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
46 #[serde(skip_serializing_if = "Option::is_none")]
47 pub data: Option<IpldCid>,
48 #[serde(skip_serializing_if = "Option::is_none")]
52 pub handle: Option<Handle<'static>>,
53 #[serde(skip_serializing_if = "Option::is_none")]
55 pub pds: Option<Url>,
56 #[serde(serialize_with = "crate::util::opt_did_key_serialize_str")]
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub signing_key: Option<DidKey<'static>>,
60 #[serde(skip_serializing_if = "Option::is_none")]
62 pub last_updated_at: Option<DateTime<Utc>>,
63 #[serde(skip_serializing_if = "Option::is_none")]
66 pub last_message_at: Option<DateTime<Utc>>,
67}
68
69#[derive(Clone)]
79pub struct ReposControl(pub(super) Arc<AppState>);
80
81impl ReposControl {
82 pub(crate) fn iter_states(
83 &self,
84 cursor: Option<&Did<'_>>,
85 ) -> impl Iterator<Item = Result<(Did<'static>, RepoState<'static>, crate::types::RepoMetadata)>>
86 {
87 let start_bound = if let Some(cursor) = cursor {
88 let did_key = keys::repo_key(cursor);
89 std::ops::Bound::Excluded(did_key)
90 } else {
91 std::ops::Bound::Unbounded
92 };
93
94 let state = self.0.clone();
95 self.0
96 .db
97 .repos
98 .range((start_bound, std::ops::Bound::Unbounded))
99 .map(move |g| {
100 let (k, v) = g.into_inner().into_diagnostic()?;
101 let repo_state = crate::db::deser_repo_state(&v)?.into_static();
102 let did = TrimmedDid::try_from(k.as_ref())?.to_did();
103 let metadata_key = keys::repo_metadata_key(&did);
104 let metadata = state
105 .db
106 .repo_metadata
107 .get(&metadata_key)
108 .into_diagnostic()?
109 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
110 let metadata = crate::db::deser_repo_meta(metadata.as_ref())?;
111 Ok((did, repo_state, metadata))
112 })
113 }
114
115 pub fn iter(&self, cursor: Option<&Did<'_>>) -> impl Iterator<Item = Result<RepoInfo>> {
117 self.iter_states(cursor)
118 .map(|r| r.map(|(did, s, m)| repo_state_to_info(did, s, m.tracked)))
119 }
120
121 pub fn get<'i>(&self, did: &Did<'i>) -> RepoHandle<'i> {
123 RepoHandle {
124 state: self.0.clone(),
125 did: did.clone(),
126 }
127 }
128
129 pub async fn resolve(&self, repo: &AtIdentifier<'_>) -> Result<RepoHandle<'static>> {
132 let did = self.0.resolver.resolve_did(repo).await?;
133 Ok(RepoHandle {
134 state: self.0.clone(),
135 did,
136 })
137 }
138
139 pub async fn info(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> {
142 self.get(did).info().await
143 }
144}
145
146pub(crate) fn repo_state_to_info(did: Did<'static>, s: RepoState<'_>, tracked: bool) -> RepoInfo {
147 let (rev, data) = s
148 .root
149 .map(|c| (Some(c.rev.to_tid()), Some(c.data)))
150 .unwrap_or_default();
151 RepoInfo {
152 did,
153 status: s.status,
154 tracked,
155 rev,
156 data,
157 handle: s.handle.map(|h| h.into_static()),
158 pds: s.pds.and_then(|p| p.parse().ok()),
159 signing_key: s.signing_key.map(|k| k.into_static()),
160 last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at),
161 last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs),
162 }
163}
164
165pub struct Record {
166 pub did: Did<'static>,
167 pub cid: Cid<'static>,
168 pub value: Data<'static>,
169}
170
171pub struct ListedRecord {
172 pub rkey: Rkey<'static>,
173 pub cid: Cid<'static>,
174 pub value: Data<'static>,
175}
176
177pub struct RecordList {
178 pub records: Vec<ListedRecord>,
179 pub cursor: Option<Rkey<'static>>,
180}
181
182#[derive(Debug, thiserror::Error)]
183pub enum MiniDocError {
184 #[error("repo is not synced yet")]
185 NotSynced,
186 #[error("repo not found")]
187 RepoNotFound,
188 #[error("could not resolve identity")]
189 CouldNotResolveIdentity,
190 #[error("{0}")]
191 Other(miette::Error),
192}
193
194pub struct MiniDoc<'i> {
196 pub did: Did<'i>,
198 pub handle: Handle<'i>,
201 pub pds: Url,
203 pub signing_key: DidKey<'i>,
205}
206
207#[derive(Clone)]
209pub struct RepoHandle<'i> {
210 state: Arc<AppState>,
211 pub did: Did<'i>,
212}
213
214impl<'i> RepoHandle<'i> {
215 pub(crate) async fn state(&self) -> Result<Option<RepoState<'static>>> {
216 let did_key = keys::repo_key(&self.did);
217 let app_state = self.state.clone();
218
219 tokio::task::spawn_blocking(move || {
220 let bytes = app_state.db.repos.get(&did_key).into_diagnostic()?;
221 bytes
222 .as_deref()
223 .map(db::deser_repo_state)
224 .transpose()
225 .map(|opt| opt.map(IntoStatic::into_static))
226 })
227 .await
228 .into_diagnostic()?
229 }
230
231 pub async fn info(&self) -> Result<Option<RepoInfo>> {
234 let did = self.did.clone().into_static();
235 let did_key = keys::repo_key(&did);
236 let metadata_key = keys::repo_metadata_key(&did);
237 let app_state = self.state.clone();
238
239 tokio::task::spawn_blocking(move || {
240 let state_bytes = app_state.db.repos.get(&did_key).into_diagnostic()?;
241 let Some(state_bytes) = state_bytes else {
242 return Ok(None);
243 };
244 let repo_state = crate::db::deser_repo_state(&state_bytes)?;
245
246 let metadata_bytes = app_state
247 .db
248 .repo_metadata
249 .get(&metadata_key)
250 .into_diagnostic()?
251 .ok_or_else(|| miette::miette!("repo metadata not found for {}", did))?;
252 let metadata = crate::db::deser_repo_meta(&metadata_bytes)?;
253
254 Ok(Some(repo_state_to_info(did, repo_state, metadata.tracked)))
255 })
256 .await
257 .into_diagnostic()?
258 }
259
260 pub async fn collections(&self) -> Result<HashMap<Nsid<'static>, u64>> {
262 let did = self.did.clone().into_static();
263 let state = self.state.clone();
264
265 tokio::task::spawn_blocking(move || {
266 let prefix = keys::did_collection_prefix(&did);
267 let mut res = HashMap::new();
268 for item in state.db.counts.prefix(&prefix) {
269 let (k, v) = item.into_inner().into_diagnostic()?;
270 let col = k
271 .strip_prefix(prefix.as_slice())
272 .ok_or_else(|| miette::miette!("invalid collection count key: {k:?}"))
273 .and_then(|r| std::str::from_utf8(r).into_diagnostic())
274 .and_then(|n| Nsid::new(n).into_diagnostic())?
275 .into_static();
276 let count = u64::from_be_bytes(
277 v.as_ref()
278 .try_into()
279 .into_diagnostic()
280 .wrap_err("expected to be count (8 bytes)")?,
281 );
282 res.insert(col, count);
283 }
284 Ok(res)
285 })
286 .await
287 .into_diagnostic()?
288 }
289
290 pub async fn mini_doc(&self) -> Result<MiniDoc<'static>, MiniDocError> {
292 let Some(info) = self.info().await.map_err(MiniDocError::Other)? else {
293 return Err(MiniDocError::RepoNotFound);
294 };
295
296 #[cfg(feature = "indexer")]
298 let is_pending = {
299 let metadata_key = keys::repo_metadata_key(&self.did);
300 let app_state = self.state.clone();
301 tokio::task::spawn_blocking(move || {
302 let metadata_bytes = app_state
303 .db
304 .repo_metadata
305 .get(&metadata_key)
306 .into_diagnostic()?;
307 let Some(metadata_bytes) = metadata_bytes else {
308 return Ok::<_, miette::Report>(false);
309 };
310
311 let metadata = crate::db::deser_repo_meta(metadata_bytes.as_ref())?;
312 return Ok(app_state
313 .db
314 .pending
315 .get(crate::db::keys::pending_key(metadata.index_id))
316 .into_diagnostic()?
317 .is_some());
318 })
319 .await
320 .map_err(|e| MiniDocError::Other(miette::miette!(e)))?
321 .map_err(MiniDocError::Other)?
322 };
323 #[cfg(feature = "relay")]
324 let is_pending = false;
325
326 if is_pending {
327 return Err(MiniDocError::NotSynced);
328 }
329
330 let pds = info
331 .pds
332 .ok_or_else(|| MiniDocError::CouldNotResolveIdentity)?;
333 let signing_key = info
334 .signing_key
335 .ok_or_else(|| MiniDocError::CouldNotResolveIdentity)?
336 .into_static();
337
338 let handle = if let Some(h) = info.handle {
339 let is_valid = self
340 .state
341 .resolver
342 .verify_handle(&self.did, &h)
343 .await
344 .into_diagnostic()
345 .map_err(MiniDocError::Other)?;
346 is_valid.then_some(h).unwrap_or_else(invalid_handle)
347 } else {
348 invalid_handle()
349 };
350
351 Ok(MiniDoc {
352 did: self.did.clone().into_static(),
353 handle,
354 pds,
355 signing_key,
356 })
357 }
358}