1use std::fmt::{Debug, Display};
2
3use bytes::Bytes;
4use jacquard_common::types::cid::IpldCid;
5use jacquard_common::types::nsid::Nsid;
6use jacquard_common::types::string::{Did, Rkey};
7use jacquard_common::types::tid::Tid;
8use jacquard_common::{CowStr, IntoStatic, types::string::Handle};
9use jacquard_repo::commit::Commit as AtpCommit;
10use serde::{Deserialize, Serialize, Serializer};
11use serde_json::Value;
12use smol_str::{SmolStr, ToSmolStr};
13
14use crate::db::types::{DbAction, DbRkey, DbTid, DidKey, TrimmedDid};
15use crate::resolver::MiniDoc;
16
17pub(crate) mod v2 {
18 use super::*;
19
20 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
21 pub enum RepoStatus {
22 Backfilling,
23 Synced,
24 Error(SmolStr),
25 Deactivated,
26 Takendown,
27 Suspended,
28 }
29
30 #[derive(Debug, Clone, Serialize, Deserialize)]
31 pub(crate) struct Commit {
32 pub version: i64,
33 pub rev: DbTid,
34 pub data: IpldCid,
35 pub prev: Option<IpldCid>,
36 #[serde(with = "jacquard_common::serde_bytes_helper")]
37 pub sig: Bytes,
38 }
39
40 #[derive(Debug, Clone, Serialize, Deserialize)]
41 #[serde(bound(deserialize = "'i: 'de"))]
42 pub(crate) struct RepoState<'i> {
43 pub status: RepoStatus,
44 pub root: Option<Commit>,
45 pub last_message_time: Option<i64>,
46 pub last_updated_at: i64,
47 pub tracked: bool,
48 pub index_id: u64,
49 #[serde(borrow)]
50 pub signing_key: Option<DidKey<'i>>,
51 #[serde(borrow)]
52 pub pds: Option<CowStr<'i>>,
53 #[serde(borrow)]
54 pub handle: Option<Handle<'i>>,
55 }
56}
57
58pub(crate) mod v4 {
59 use super::*;
60 pub(crate) use v2::Commit;
61
62 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
63 pub enum RepoStatus {
64 Synced,
66 Error(SmolStr),
68 Deactivated,
73 Takendown,
76 Suspended,
79 Deleted,
87 Desynchronized,
90 Throttled,
93 }
94
95 #[derive(Debug, Clone, Serialize, Deserialize)]
96 #[serde(bound(deserialize = "'i: 'de"))]
97 pub(crate) struct RepoState<'i> {
98 pub active: bool,
101 pub status: RepoStatus,
102 pub root: Option<Commit>,
103 pub last_message_time: Option<i64>,
107 pub last_updated_at: i64, #[serde(borrow)]
110 pub signing_key: Option<DidKey<'i>>,
111 #[serde(borrow)]
112 pub pds: Option<CowStr<'i>>,
113 #[serde(borrow)]
114 pub handle: Option<Handle<'i>>,
115 }
116
117 #[derive(Debug, Clone, Serialize, Deserialize)]
118 pub(crate) struct RepoMetadata {
119 pub tracked: bool,
121 pub index_id: u64,
123 }
124}
125
126pub(crate) use v4::*;
127
128impl<'c> From<AtpCommit<'c>> for Commit {
129 fn from(value: AtpCommit<'c>) -> Self {
130 Self {
131 data: value.data,
132 prev: value.prev,
133 rev: DbTid::from(&value.rev),
134 sig: value.sig,
135 version: value.version,
136 }
137 }
138}
139
140impl Commit {
141 pub(crate) fn into_atp_commit<'i>(self, did: Did<'i>) -> Option<AtpCommit<'i>> {
142 if self.version < 0 {
144 return None;
145 }
146 Some(AtpCommit {
147 did,
148 rev: self.rev.to_tid(),
149 data: self.data,
150 prev: self.prev,
151 sig: self.sig,
152 version: self.version,
153 })
154 }
155}
156
157impl Display for RepoStatus {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 match self {
160 RepoStatus::Synced => write!(f, "synced"),
161 RepoStatus::Error(e) => write!(f, "error({e})"),
162 RepoStatus::Deactivated => write!(f, "deactivated"),
163 RepoStatus::Takendown => write!(f, "takendown"),
164 RepoStatus::Suspended => write!(f, "suspended"),
165 RepoStatus::Deleted => write!(f, "deleted"),
166 RepoStatus::Desynchronized => write!(f, "desynchronized"),
167 RepoStatus::Throttled => write!(f, "throttled"),
168 }
169 }
170}
171
172impl RepoMetadata {}
173
174#[cfg(feature = "indexer")]
175mod indexer {
176 use super::*;
177
178 impl RepoMetadata {
179 pub fn backfilling(index_id: u64) -> Self {
180 Self {
181 index_id,
182 tracked: true,
183 }
184 }
185 }
186
187 impl ResyncState {
188 pub fn next_backoff(retry_count: u32) -> i64 {
189 let base = 60;
191 let cap = 3600;
192 let mult = 2u64.pow(retry_count.min(10)) as i64;
193 let delay = (base * mult).min(cap);
194
195 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
197 let delay = (delay as f64 + jitter) as i64;
198
199 chrono::Utc::now().timestamp() + delay
200 }
201 }
202
203 #[derive(Clone, Debug)]
204 pub(crate) enum BroadcastEvent {
205 #[allow(dead_code)]
206 Persisted(u64),
207 Ephemeral(Box<MarshallableEvt<'static>>),
208 }
209
210 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
211 pub(crate) enum GaugeState {
212 Synced,
213 Pending,
214 Resync(Option<ResyncErrorKind>),
215 }
216
217 impl GaugeState {
218 pub fn is_resync(&self) -> bool {
219 matches!(self, GaugeState::Resync(_))
220 }
221 }
222}
223
224#[cfg(feature = "indexer")]
225pub(crate) use indexer::*;
226
227impl<'i> RepoState<'i> {
228 pub fn backfilling() -> Self {
229 Self {
230 active: true,
231 status: RepoStatus::Desynchronized,
232 root: None,
233 last_updated_at: chrono::Utc::now().timestamp(),
234 handle: None,
235 pds: None,
236 signing_key: None,
237 last_message_time: None,
238 }
239 }
240
241 pub fn advance_message_time(&mut self, event_ms: i64) {
243 self.last_message_time = Some(event_ms.max(self.last_message_time.unwrap_or(0)));
244 }
245
246 pub fn touch(&mut self) {
248 self.last_updated_at = chrono::Utc::now().timestamp();
249 }
250
251 pub fn update_from_doc(&mut self, doc: MiniDoc) -> bool {
252 let new_signing_key = doc.key.map(From::from);
253 let changed = self.pds.as_deref() != Some(doc.pds.as_str())
254 || self.handle != doc.handle
255 || self.signing_key != new_signing_key;
256 self.pds = Some(CowStr::Owned(doc.pds.to_smolstr()));
257 self.handle = doc.handle;
258 self.signing_key = new_signing_key;
259 changed
260 }
261}
262
263impl<'i> IntoStatic for RepoState<'i> {
264 type Output = RepoState<'static>;
265
266 fn into_static(self) -> Self::Output {
267 RepoState {
268 active: self.active,
269 status: self.status,
270 root: self.root,
271 last_updated_at: self.last_updated_at,
272 handle: self.handle.map(IntoStatic::into_static),
273 pds: self.pds.map(IntoStatic::into_static),
274 signing_key: self.signing_key.map(IntoStatic::into_static),
275 last_message_time: self.last_message_time,
276 }
277 }
278}
279
280#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
281pub(crate) enum ResyncErrorKind {
282 Ratelimited,
283 Transport,
284 Generic,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
288pub(crate) enum ResyncState {
289 Error {
290 kind: ResyncErrorKind,
291 retry_count: u32,
292 next_retry: i64, },
294 Gone {
295 status: RepoStatus, },
297}
298
299#[derive(Debug, Serialize, Clone)]
300pub enum EventType {
301 Record,
302 Identity,
303 Account,
304}
305
306impl AsRef<str> for EventType {
307 fn as_ref(&self) -> &str {
308 match self {
309 Self::Record => "record",
310 Self::Identity => "identity",
311 Self::Account => "account",
312 }
313 }
314}
315
316fn event_type_ser_str<S: Serializer>(v: &EventType, s: S) -> Result<S::Ok, S::Error> {
317 s.serialize_str(v.as_ref())
318}
319
320#[derive(Debug, Serialize, Clone)]
321pub struct MarshallableEvt<'i> {
322 pub id: u64,
323 #[serde(rename = "type")]
324 #[serde(serialize_with = "event_type_ser_str")]
325 pub kind: EventType,
326 #[serde(borrow)]
327 #[serde(skip_serializing_if = "Option::is_none")]
328 pub record: Option<RecordEvt<'i>>,
329 #[serde(borrow)]
330 #[serde(skip_serializing_if = "Option::is_none")]
331 pub identity: Option<IdentityEvt<'i>>,
332 #[serde(borrow)]
333 #[serde(skip_serializing_if = "Option::is_none")]
334 pub account: Option<AccountEvt<'i>>,
335}
336
337#[derive(Debug, Serialize, Clone)]
338pub struct RecordEvt<'i> {
339 pub live: bool,
340 #[serde(borrow)]
341 pub did: Did<'i>,
342 pub rev: Tid,
343 pub collection: Nsid<'i>,
344 pub rkey: Rkey<'i>,
345 pub action: CowStr<'i>,
346 #[serde(skip_serializing_if = "Option::is_none")]
347 pub record: Option<Value>,
348 #[serde(skip_serializing_if = "Option::is_none")]
349 #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
350 pub cid: Option<IpldCid>,
351}
352
353#[derive(Debug, Serialize, Clone)]
354pub struct IdentityEvt<'i> {
355 #[serde(borrow)]
356 pub did: Did<'i>,
357 #[serde(skip_serializing_if = "Option::is_none")]
358 pub handle: Option<Handle<'i>>,
359}
360
361#[derive(Debug, Serialize, Clone)]
362pub struct AccountEvt<'i> {
363 #[serde(borrow)]
364 pub did: Did<'i>,
365 pub active: bool,
366 #[serde(skip_serializing_if = "Option::is_none")]
367 pub status: Option<CowStr<'i>>,
368}
369
370#[derive(Serialize, Deserialize, Clone)]
371pub(crate) enum StoredData {
372 Nothing,
373 Ptr(IpldCid),
374 #[serde(with = "jacquard_common::serde_bytes_helper")]
375 Block(Bytes),
376}
377
378impl StoredData {
379 pub fn is_nothing(&self) -> bool {
380 matches!(self, StoredData::Nothing)
381 }
382}
383
384impl Default for StoredData {
385 fn default() -> Self {
386 Self::Nothing
387 }
388}
389
390impl Debug for StoredData {
391 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392 match self {
393 Self::Nothing => f.write_str("nothing"),
394 Self::Block(_) => f.write_str("<block>"),
395 Self::Ptr(cid) => write!(f, "{cid}"),
396 }
397 }
398}
399
400#[derive(Debug, Serialize, Deserialize, Clone)]
401#[serde(bound(deserialize = "'i: 'de"))]
402pub(crate) struct StoredEvent<'i> {
403 #[serde(default)]
404 pub live: bool,
405 #[serde(borrow)]
406 pub did: TrimmedDid<'i>,
407 pub rev: DbTid,
408 #[serde(borrow)]
409 pub collection: CowStr<'i>,
410 pub rkey: DbRkey,
411 pub action: DbAction,
412 #[serde(default)]
413 #[serde(skip_serializing_if = "StoredData::is_nothing")]
414 pub data: StoredData,
415}
416
417#[cfg(feature = "relay")]
418#[derive(Clone)]
419pub(crate) enum RelayBroadcast {
420 Persisted(#[allow(dead_code)] u64),
421 #[allow(dead_code)]
422 Ephemeral(bytes::Bytes),
423}