hydrant/
types.rs

1use std::fmt::{Debug, Display};
2
3use jacquard_common::types::cid::IpldCid;
4use jacquard_common::types::nsid::Nsid;
5use jacquard_common::types::string::{Did, Rkey};
6use jacquard_common::types::tid::Tid;
7use jacquard_common::{CowStr, IntoStatic, types::string::Handle};
8use serde::{Deserialize, Serialize, Serializer};
9use serde_json::Value;
10use smol_str::{SmolStr, ToSmolStr};
11
12use crate::db::types::{DbAction, DbRkey, DbTid, DidKey, TrimmedDid};
13use crate::resolver::MiniDoc;
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16pub enum RepoStatus {
17    Backfilling,
18    Synced,
19    Error(SmolStr),
20    Deactivated,
21    Takendown,
22    Suspended,
23}
24
25impl Display for RepoStatus {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        match self {
28            RepoStatus::Backfilling => write!(f, "backfilling"),
29            RepoStatus::Synced => write!(f, "synced"),
30            RepoStatus::Error(e) => write!(f, "error({e})"),
31            RepoStatus::Deactivated => write!(f, "deactivated"),
32            RepoStatus::Takendown => write!(f, "takendown"),
33            RepoStatus::Suspended => write!(f, "suspended"),
34        }
35    }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(bound(deserialize = "'i: 'de"))]
40pub(crate) struct RepoState<'i> {
41    pub status: RepoStatus,
42    pub rev: Option<DbTid>,
43    pub data: Option<IpldCid>,
44    // todo: is this actually valid? the spec says this is informal and intermadiate
45    // services may change it. we should probably document it. if we cant use this
46    // then how do we dedup account / identity ops?
47    /// ms since epoch of the last firehose message we processed for this repo.
48    /// used to deduplicate identity / account events that can arrive from multiple relays at
49    /// different wall-clock times but represent the same underlying PDS event.
50    #[serde(default)]
51    pub last_message_time: Option<i64>,
52    /// this is when we *ingested* any last updates
53    pub last_updated_at: i64, // unix timestamp
54    /// whether we are ingesting events for this repo
55    pub tracked: bool,
56    /// index id in pending keyspace
57    pub index_id: u64,
58    #[serde(borrow)]
59    pub signing_key: Option<DidKey<'i>>,
60    #[serde(borrow)]
61    pub pds: Option<CowStr<'i>>,
62    #[serde(borrow)]
63    pub handle: Option<Handle<'i>>,
64}
65
66impl<'i> RepoState<'i> {
67    pub fn backfilling(index_id: u64) -> Self {
68        Self {
69            status: RepoStatus::Backfilling,
70            rev: None,
71            data: None,
72            last_updated_at: chrono::Utc::now().timestamp(),
73            index_id,
74            tracked: true,
75            handle: None,
76            pds: None,
77            signing_key: None,
78            last_message_time: None,
79        }
80    }
81
82    /// backfilling, but not tracked yet
83    pub fn untracked(index_id: u64) -> Self {
84        Self {
85            tracked: false,
86            ..Self::backfilling(index_id)
87        }
88    }
89
90    // advances the high-water mark to event_ms if it's newer than what we've seen
91    pub fn advance_message_time(&mut self, event_ms: i64) {
92        self.last_message_time = Some(event_ms.max(self.last_message_time.unwrap_or(0)));
93    }
94
95    // updates last_updated_at to now
96    pub fn touch(&mut self) {
97        self.last_updated_at = chrono::Utc::now().timestamp();
98    }
99
100    pub fn update_from_doc(&mut self, doc: MiniDoc) -> bool {
101        let new_signing_key = doc.key.map(From::from);
102        let changed = self.pds.as_deref() != Some(doc.pds.as_str())
103            || self.handle != doc.handle
104            || self.signing_key != new_signing_key;
105        self.pds = Some(CowStr::Owned(doc.pds.to_smolstr()));
106        self.handle = doc.handle;
107        self.signing_key = new_signing_key;
108        changed
109    }
110}
111
112impl<'i> IntoStatic for RepoState<'i> {
113    type Output = RepoState<'static>;
114
115    fn into_static(self) -> Self::Output {
116        RepoState {
117            status: self.status,
118            rev: self.rev,
119            data: self.data,
120            last_updated_at: self.last_updated_at,
121            index_id: self.index_id,
122            tracked: self.tracked,
123            handle: self.handle.map(IntoStatic::into_static),
124            pds: self.pds.map(IntoStatic::into_static),
125            signing_key: self.signing_key.map(IntoStatic::into_static),
126            last_message_time: self.last_message_time,
127        }
128    }
129}
130
131#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
132pub(crate) enum ResyncErrorKind {
133    Ratelimited,
134    Transport,
135    Generic,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub(crate) enum ResyncState {
140    Error {
141        kind: ResyncErrorKind,
142        retry_count: u32,
143        next_retry: i64, // unix timestamp
144    },
145    Gone {
146        status: RepoStatus, // deactivated, takendown, suspended
147    },
148}
149
150impl ResyncState {
151    pub fn next_backoff(retry_count: u32) -> i64 {
152        // exponential backoff: 1m, 2m, 4m, 8m... up to 1h
153        let base = 60;
154        let cap = 3600;
155        let mult = 2u64.pow(retry_count.min(10)) as i64;
156        let delay = (base * mult).min(cap);
157
158        // add +/- 10% jitter
159        let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
160        let delay = (delay as f64 + jitter) as i64;
161
162        chrono::Utc::now().timestamp() + delay
163    }
164}
165
166#[derive(Debug, Serialize, Clone)]
167pub enum EventType {
168    Record,
169    Identity,
170    Account,
171}
172
173impl AsRef<str> for EventType {
174    fn as_ref(&self) -> &str {
175        match self {
176            Self::Record => "record",
177            Self::Identity => "identity",
178            Self::Account => "account",
179        }
180    }
181}
182
183fn event_type_ser_str<S: Serializer>(v: &EventType, s: S) -> Result<S::Ok, S::Error> {
184    s.serialize_str(v.as_ref())
185}
186
187#[derive(Debug, Serialize, Clone)]
188pub struct MarshallableEvt<'i> {
189    pub id: u64,
190    #[serde(rename = "type")]
191    #[serde(serialize_with = "event_type_ser_str")]
192    pub kind: EventType,
193    #[serde(borrow)]
194    #[serde(skip_serializing_if = "Option::is_none")]
195    pub record: Option<RecordEvt<'i>>,
196    #[serde(borrow)]
197    #[serde(skip_serializing_if = "Option::is_none")]
198    pub identity: Option<IdentityEvt<'i>>,
199    #[serde(borrow)]
200    #[serde(skip_serializing_if = "Option::is_none")]
201    pub account: Option<AccountEvt<'i>>,
202}
203
204#[derive(Clone, Debug)]
205pub(crate) enum BroadcastEvent {
206    #[allow(dead_code)]
207    Persisted(u64),
208    Ephemeral(Box<MarshallableEvt<'static>>),
209}
210
211#[derive(Debug, Serialize, Clone)]
212pub struct RecordEvt<'i> {
213    pub live: bool,
214    #[serde(borrow)]
215    pub did: Did<'i>,
216    pub rev: Tid,
217    pub collection: Nsid<'i>,
218    pub rkey: Rkey<'i>,
219    pub action: CowStr<'i>,
220    #[serde(skip_serializing_if = "Option::is_none")]
221    pub record: Option<Value>,
222    #[serde(skip_serializing_if = "Option::is_none")]
223    #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
224    pub cid: Option<IpldCid>,
225}
226
227#[derive(Debug, Serialize, Clone)]
228pub struct IdentityEvt<'i> {
229    #[serde(borrow)]
230    pub did: Did<'i>,
231    #[serde(skip_serializing_if = "Option::is_none")]
232    pub handle: Option<Handle<'i>>,
233}
234
235#[derive(Debug, Serialize, Clone)]
236pub struct AccountEvt<'i> {
237    #[serde(borrow)]
238    pub did: Did<'i>,
239    pub active: bool,
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub status: Option<CowStr<'i>>,
242}
243
244use jacquard_common::bytes::Bytes;
245
246#[derive(Serialize, Deserialize, Clone)]
247pub(crate) enum StoredData {
248    Nothing,
249    Ptr(IpldCid),
250    #[serde(with = "serde_bytes_squared")]
251    Block(Bytes),
252}
253
254impl StoredData {
255    pub fn is_nothing(&self) -> bool {
256        matches!(self, StoredData::Nothing)
257    }
258}
259
260impl Default for StoredData {
261    fn default() -> Self {
262        Self::Nothing
263    }
264}
265
266impl Debug for StoredData {
267    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268        match self {
269            Self::Nothing => f.write_str("nothing"),
270            Self::Block(_) => f.write_str("<block>"),
271            Self::Ptr(cid) => write!(f, "{cid}"),
272        }
273    }
274}
275
276#[derive(Debug, Serialize, Deserialize, Clone)]
277#[serde(bound(deserialize = "'i: 'de"))]
278pub(crate) struct StoredEvent<'i> {
279    #[serde(default)]
280    pub live: bool,
281    #[serde(borrow)]
282    pub did: TrimmedDid<'i>,
283    pub rev: DbTid,
284    #[serde(borrow)]
285    pub collection: CowStr<'i>,
286    pub rkey: DbRkey,
287    pub action: DbAction,
288    #[serde(default)]
289    #[serde(skip_serializing_if = "StoredData::is_nothing")]
290    pub data: StoredData,
291}
292
293mod serde_bytes_squared {
294    use jacquard_common::bytes::Bytes;
295    use serde::{Deserialize, Deserializer, Serializer};
296
297    pub fn serialize<S: Serializer>(v: impl AsRef<[u8]>, s: S) -> Result<S::Ok, S::Error> {
298        s.serialize_bytes(serde_bytes::Bytes::new(v.as_ref()))
299    }
300
301    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Bytes, D::Error> {
302        serde_bytes::ByteBuf::deserialize(d).map(|b| b.into_vec().into())
303    }
304}
305
306#[derive(Debug, PartialEq, Eq, Clone, Copy)]
307pub(crate) enum GaugeState {
308    Synced,
309    Pending,
310    Resync(Option<ResyncErrorKind>),
311}
312
313impl GaugeState {
314    pub fn is_resync(&self) -> bool {
315        matches!(self, GaugeState::Resync(_))
316    }
317}