hydrant/
types.rs

1use std::fmt::{Debug, Display};
2
3use bytes::Bytes;
4use jacquard_common::types::cid::IpldCid;
5use jacquard_common::types::nsid::Nsid;
6use jacquard_common::types::string::{Did, Rkey};
7use jacquard_common::types::tid::Tid;
8use jacquard_common::{CowStr, IntoStatic, types::string::Handle};
9use jacquard_repo::commit::Commit as AtpCommit;
10use serde::{Deserialize, Serialize, Serializer};
11use serde_json::Value;
12use smol_str::{SmolStr, ToSmolStr};
13
14use crate::db::types::{DbAction, DbRkey, DbTid, DidKey, TrimmedDid};
15use crate::resolver::MiniDoc;
16
17pub(crate) mod v2 {
18    use super::*;
19
20    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
21    pub enum RepoStatus {
22        Backfilling,
23        Synced,
24        Error(SmolStr),
25        Deactivated,
26        Takendown,
27        Suspended,
28    }
29
30    #[derive(Debug, Clone, Serialize, Deserialize)]
31    pub(crate) struct Commit {
32        pub version: i64,
33        pub rev: DbTid,
34        pub data: IpldCid,
35        pub prev: Option<IpldCid>,
36        #[serde(with = "jacquard_common::serde_bytes_helper")]
37        pub sig: Bytes,
38    }
39
40    #[derive(Debug, Clone, Serialize, Deserialize)]
41    #[serde(bound(deserialize = "'i: 'de"))]
42    pub(crate) struct RepoState<'i> {
43        pub status: RepoStatus,
44        pub root: Option<Commit>,
45        pub last_message_time: Option<i64>,
46        pub last_updated_at: i64,
47        pub tracked: bool,
48        pub index_id: u64,
49        #[serde(borrow)]
50        pub signing_key: Option<DidKey<'i>>,
51        #[serde(borrow)]
52        pub pds: Option<CowStr<'i>>,
53        #[serde(borrow)]
54        pub handle: Option<Handle<'i>>,
55    }
56}
57
58pub(crate) mod v4 {
59    use super::*;
60    pub(crate) use v2::Commit;
61
62    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
63    pub enum RepoStatus {
64        /// repo is synced to latest commit from what we know of
65        Synced,
66        /// some unclassified fatal error
67        Error(SmolStr),
68        /// user has temporarily paused their overall account. content should
69        /// not be displayed or redistributed, but does not need to be deleted
70        /// from infrastructure. implied time-limited. also the initial state
71        /// for an account after migrating to another pds instance.
72        Deactivated,
73        /// host or service has takendown the account. implied permanent or
74        /// long-term, though may be reverted.
75        Takendown,
76        /// host or service has temporarily paused the account. implied
77        /// time-limited.
78        Suspended,
79        /// user or host has deleted the account, and content should be removed
80        /// from the network. implied permanent or long-term, though may be
81        /// reverted (deleted accounts may reactivate on the same or another
82        /// host).
83        ///
84        /// account is deleted; kept as a tombstone so stale commits arriving from the upstream
85        /// backfill window are not forwarded. active=false per spec.
86        Deleted,
87        /// host detected a repo sync problem. active may be true or false per spec;
88        /// the `active` field on `RepoState` is authoritative.
89        Desynchronized,
90        /// resource rate-limit exceeded. active may be true or false per spec;
91        /// the `active` field on `RepoState` is authoritative.
92        Throttled,
93    }
94
95    #[derive(Debug, Clone, Serialize, Deserialize)]
96    #[serde(bound(deserialize = "'i: 'de"))]
97    pub(crate) struct RepoState<'i> {
98        /// whether the upstream considers this account active.
99        /// services should use the `active` flag to control overall account visibility
100        pub active: bool,
101        pub status: RepoStatus,
102        pub root: Option<Commit>,
103        /// ms since epoch of the last firehose message we processed for this repo.
104        /// used to deduplicate identity / account events that can arrive from multiple relays at
105        /// different wall-clock times but represent the same underlying PDS event.
106        pub last_message_time: Option<i64>,
107        /// this is when we *ingested* any last updates
108        pub last_updated_at: i64, // unix timestamp
109        #[serde(borrow)]
110        pub signing_key: Option<DidKey<'i>>,
111        #[serde(borrow)]
112        pub pds: Option<CowStr<'i>>,
113        #[serde(borrow)]
114        pub handle: Option<Handle<'i>>,
115    }
116
117    #[derive(Debug, Clone, Serialize, Deserialize)]
118    pub(crate) struct RepoMetadata {
119        /// whether we are ingesting events for this repo
120        pub tracked: bool,
121        /// index id in pending keyspace (if backfilling)
122        pub index_id: u64,
123    }
124}
125
126pub(crate) use v4::*;
127
128impl<'c> From<AtpCommit<'c>> for Commit {
129    fn from(value: AtpCommit<'c>) -> Self {
130        Self {
131            data: value.data,
132            prev: value.prev,
133            rev: DbTid::from(&value.rev),
134            sig: value.sig,
135            version: value.version,
136        }
137    }
138}
139
140impl Commit {
141    pub(crate) fn into_atp_commit<'i>(self, did: Did<'i>) -> Option<AtpCommit<'i>> {
142        // version < 0 is a sentinel used in v2 migration for repos with no commit data
143        if self.version < 0 {
144            return None;
145        }
146        Some(AtpCommit {
147            did,
148            rev: self.rev.to_tid(),
149            data: self.data,
150            prev: self.prev,
151            sig: self.sig,
152            version: self.version,
153        })
154    }
155}
156
157impl Display for RepoStatus {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        match self {
160            RepoStatus::Synced => write!(f, "synced"),
161            RepoStatus::Error(e) => write!(f, "error({e})"),
162            RepoStatus::Deactivated => write!(f, "deactivated"),
163            RepoStatus::Takendown => write!(f, "takendown"),
164            RepoStatus::Suspended => write!(f, "suspended"),
165            RepoStatus::Deleted => write!(f, "deleted"),
166            RepoStatus::Desynchronized => write!(f, "desynchronized"),
167            RepoStatus::Throttled => write!(f, "throttled"),
168        }
169    }
170}
171
172impl RepoMetadata {}
173
174#[cfg(feature = "indexer")]
175mod indexer {
176    use super::*;
177
178    impl RepoMetadata {
179        pub fn backfilling(index_id: u64) -> Self {
180            Self {
181                index_id,
182                tracked: true,
183            }
184        }
185    }
186
187    impl ResyncState {
188        pub fn next_backoff(retry_count: u32) -> i64 {
189            // exponential backoff: 1m, 2m, 4m, 8m... up to 1h
190            let base = 60;
191            let cap = 3600;
192            let mult = 2u64.pow(retry_count.min(10)) as i64;
193            let delay = (base * mult).min(cap);
194
195            // add +/- 10% jitter
196            let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
197            let delay = (delay as f64 + jitter) as i64;
198
199            chrono::Utc::now().timestamp() + delay
200        }
201    }
202
203    #[derive(Clone, Debug)]
204    pub(crate) enum BroadcastEvent {
205        #[allow(dead_code)]
206        Persisted(u64),
207        Ephemeral(Box<MarshallableEvt<'static>>),
208    }
209
210    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
211    pub(crate) enum GaugeState {
212        Synced,
213        Pending,
214        Resync(Option<ResyncErrorKind>),
215    }
216
217    impl GaugeState {
218        pub fn is_resync(&self) -> bool {
219            matches!(self, GaugeState::Resync(_))
220        }
221    }
222}
223
224#[cfg(feature = "indexer")]
225pub(crate) use indexer::*;
226
227impl<'i> RepoState<'i> {
228    pub fn backfilling() -> Self {
229        Self {
230            active: true,
231            status: RepoStatus::Desynchronized,
232            root: None,
233            last_updated_at: chrono::Utc::now().timestamp(),
234            handle: None,
235            pds: None,
236            signing_key: None,
237            last_message_time: None,
238        }
239    }
240
241    // advances the high-water mark to event_ms if it's newer than what we've seen
242    pub fn advance_message_time(&mut self, event_ms: i64) {
243        self.last_message_time = Some(event_ms.max(self.last_message_time.unwrap_or(0)));
244    }
245
246    // updates last_updated_at to now
247    pub fn touch(&mut self) {
248        self.last_updated_at = chrono::Utc::now().timestamp();
249    }
250
251    pub fn update_from_doc(&mut self, doc: MiniDoc) -> bool {
252        let new_signing_key = doc.key.map(From::from);
253        let changed = self.pds.as_deref() != Some(doc.pds.as_str())
254            || self.handle != doc.handle
255            || self.signing_key != new_signing_key;
256        self.pds = Some(CowStr::Owned(doc.pds.to_smolstr()));
257        self.handle = doc.handle;
258        self.signing_key = new_signing_key;
259        changed
260    }
261}
262
263impl<'i> IntoStatic for RepoState<'i> {
264    type Output = RepoState<'static>;
265
266    fn into_static(self) -> Self::Output {
267        RepoState {
268            active: self.active,
269            status: self.status,
270            root: self.root,
271            last_updated_at: self.last_updated_at,
272            handle: self.handle.map(IntoStatic::into_static),
273            pds: self.pds.map(IntoStatic::into_static),
274            signing_key: self.signing_key.map(IntoStatic::into_static),
275            last_message_time: self.last_message_time,
276        }
277    }
278}
279
280#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
281pub(crate) enum ResyncErrorKind {
282    Ratelimited,
283    Transport,
284    Generic,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
288pub(crate) enum ResyncState {
289    Error {
290        kind: ResyncErrorKind,
291        retry_count: u32,
292        next_retry: i64, // unix timestamp
293    },
294    Gone {
295        status: RepoStatus, // deactivated, takendown, suspended
296    },
297}
298
299#[derive(Debug, Serialize, Clone)]
300pub enum EventType {
301    Record,
302    Identity,
303    Account,
304}
305
306impl AsRef<str> for EventType {
307    fn as_ref(&self) -> &str {
308        match self {
309            Self::Record => "record",
310            Self::Identity => "identity",
311            Self::Account => "account",
312        }
313    }
314}
315
316fn event_type_ser_str<S: Serializer>(v: &EventType, s: S) -> Result<S::Ok, S::Error> {
317    s.serialize_str(v.as_ref())
318}
319
320#[derive(Debug, Serialize, Clone)]
321pub struct MarshallableEvt<'i> {
322    pub id: u64,
323    #[serde(rename = "type")]
324    #[serde(serialize_with = "event_type_ser_str")]
325    pub kind: EventType,
326    #[serde(borrow)]
327    #[serde(skip_serializing_if = "Option::is_none")]
328    pub record: Option<RecordEvt<'i>>,
329    #[serde(borrow)]
330    #[serde(skip_serializing_if = "Option::is_none")]
331    pub identity: Option<IdentityEvt<'i>>,
332    #[serde(borrow)]
333    #[serde(skip_serializing_if = "Option::is_none")]
334    pub account: Option<AccountEvt<'i>>,
335}
336
337#[derive(Debug, Serialize, Clone)]
338pub struct RecordEvt<'i> {
339    pub live: bool,
340    #[serde(borrow)]
341    pub did: Did<'i>,
342    pub rev: Tid,
343    pub collection: Nsid<'i>,
344    pub rkey: Rkey<'i>,
345    pub action: CowStr<'i>,
346    #[serde(skip_serializing_if = "Option::is_none")]
347    pub record: Option<Value>,
348    #[serde(skip_serializing_if = "Option::is_none")]
349    #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
350    pub cid: Option<IpldCid>,
351}
352
353#[derive(Debug, Serialize, Clone)]
354pub struct IdentityEvt<'i> {
355    #[serde(borrow)]
356    pub did: Did<'i>,
357    #[serde(skip_serializing_if = "Option::is_none")]
358    pub handle: Option<Handle<'i>>,
359}
360
361#[derive(Debug, Serialize, Clone)]
362pub struct AccountEvt<'i> {
363    #[serde(borrow)]
364    pub did: Did<'i>,
365    pub active: bool,
366    #[serde(skip_serializing_if = "Option::is_none")]
367    pub status: Option<CowStr<'i>>,
368}
369
370#[derive(Serialize, Deserialize, Clone)]
371pub(crate) enum StoredData {
372    Nothing,
373    Ptr(IpldCid),
374    #[serde(with = "jacquard_common::serde_bytes_helper")]
375    Block(Bytes),
376}
377
378impl StoredData {
379    pub fn is_nothing(&self) -> bool {
380        matches!(self, StoredData::Nothing)
381    }
382}
383
384impl Default for StoredData {
385    fn default() -> Self {
386        Self::Nothing
387    }
388}
389
390impl Debug for StoredData {
391    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392        match self {
393            Self::Nothing => f.write_str("nothing"),
394            Self::Block(_) => f.write_str("<block>"),
395            Self::Ptr(cid) => write!(f, "{cid}"),
396        }
397    }
398}
399
400#[derive(Debug, Serialize, Deserialize, Clone)]
401#[serde(bound(deserialize = "'i: 'de"))]
402pub(crate) struct StoredEvent<'i> {
403    #[serde(default)]
404    pub live: bool,
405    #[serde(borrow)]
406    pub did: TrimmedDid<'i>,
407    pub rev: DbTid,
408    #[serde(borrow)]
409    pub collection: CowStr<'i>,
410    pub rkey: DbRkey,
411    pub action: DbAction,
412    #[serde(default)]
413    #[serde(skip_serializing_if = "StoredData::is_nothing")]
414    pub data: StoredData,
415}
416
417#[cfg(feature = "relay")]
418#[derive(Clone)]
419pub(crate) enum RelayBroadcast {
420    Persisted(#[allow(dead_code)] u64),
421    #[allow(dead_code)]
422    Ephemeral(bytes::Bytes),
423}