1use std::fmt::{Debug, Display};
2
3use jacquard_common::types::cid::IpldCid;
4use jacquard_common::types::nsid::Nsid;
5use jacquard_common::types::string::{Did, Rkey};
6use jacquard_common::types::tid::Tid;
7use jacquard_common::{CowStr, IntoStatic, types::string::Handle};
8use serde::{Deserialize, Serialize, Serializer};
9use serde_json::Value;
10use smol_str::{SmolStr, ToSmolStr};
11
12use crate::db::types::{DbAction, DbRkey, DbTid, DidKey, TrimmedDid};
13use crate::resolver::MiniDoc;
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16pub enum RepoStatus {
17 Backfilling,
18 Synced,
19 Error(SmolStr),
20 Deactivated,
21 Takendown,
22 Suspended,
23}
24
25impl Display for RepoStatus {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 match self {
28 RepoStatus::Backfilling => write!(f, "backfilling"),
29 RepoStatus::Synced => write!(f, "synced"),
30 RepoStatus::Error(e) => write!(f, "error({e})"),
31 RepoStatus::Deactivated => write!(f, "deactivated"),
32 RepoStatus::Takendown => write!(f, "takendown"),
33 RepoStatus::Suspended => write!(f, "suspended"),
34 }
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(bound(deserialize = "'i: 'de"))]
40pub(crate) struct RepoState<'i> {
41 pub status: RepoStatus,
42 pub rev: Option<DbTid>,
43 pub data: Option<IpldCid>,
44 #[serde(default)]
51 pub last_message_time: Option<i64>,
52 pub last_updated_at: i64, pub tracked: bool,
56 pub index_id: u64,
58 #[serde(borrow)]
59 pub signing_key: Option<DidKey<'i>>,
60 #[serde(borrow)]
61 pub pds: Option<CowStr<'i>>,
62 #[serde(borrow)]
63 pub handle: Option<Handle<'i>>,
64}
65
66impl<'i> RepoState<'i> {
67 pub fn backfilling(index_id: u64) -> Self {
68 Self {
69 status: RepoStatus::Backfilling,
70 rev: None,
71 data: None,
72 last_updated_at: chrono::Utc::now().timestamp(),
73 index_id,
74 tracked: true,
75 handle: None,
76 pds: None,
77 signing_key: None,
78 last_message_time: None,
79 }
80 }
81
82 pub fn untracked(index_id: u64) -> Self {
84 Self {
85 tracked: false,
86 ..Self::backfilling(index_id)
87 }
88 }
89
90 pub fn advance_message_time(&mut self, event_ms: i64) {
92 self.last_message_time = Some(event_ms.max(self.last_message_time.unwrap_or(0)));
93 }
94
95 pub fn touch(&mut self) {
97 self.last_updated_at = chrono::Utc::now().timestamp();
98 }
99
100 pub fn update_from_doc(&mut self, doc: MiniDoc) -> bool {
101 let new_signing_key = doc.key.map(From::from);
102 let changed = self.pds.as_deref() != Some(doc.pds.as_str())
103 || self.handle != doc.handle
104 || self.signing_key != new_signing_key;
105 self.pds = Some(CowStr::Owned(doc.pds.to_smolstr()));
106 self.handle = doc.handle;
107 self.signing_key = new_signing_key;
108 changed
109 }
110}
111
112impl<'i> IntoStatic for RepoState<'i> {
113 type Output = RepoState<'static>;
114
115 fn into_static(self) -> Self::Output {
116 RepoState {
117 status: self.status,
118 rev: self.rev,
119 data: self.data,
120 last_updated_at: self.last_updated_at,
121 index_id: self.index_id,
122 tracked: self.tracked,
123 handle: self.handle.map(IntoStatic::into_static),
124 pds: self.pds.map(IntoStatic::into_static),
125 signing_key: self.signing_key.map(IntoStatic::into_static),
126 last_message_time: self.last_message_time,
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
132pub(crate) enum ResyncErrorKind {
133 Ratelimited,
134 Transport,
135 Generic,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub(crate) enum ResyncState {
140 Error {
141 kind: ResyncErrorKind,
142 retry_count: u32,
143 next_retry: i64, },
145 Gone {
146 status: RepoStatus, },
148}
149
150impl ResyncState {
151 pub fn next_backoff(retry_count: u32) -> i64 {
152 let base = 60;
154 let cap = 3600;
155 let mult = 2u64.pow(retry_count.min(10)) as i64;
156 let delay = (base * mult).min(cap);
157
158 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
160 let delay = (delay as f64 + jitter) as i64;
161
162 chrono::Utc::now().timestamp() + delay
163 }
164}
165
166#[derive(Debug, Serialize, Clone)]
167pub enum EventType {
168 Record,
169 Identity,
170 Account,
171}
172
173impl AsRef<str> for EventType {
174 fn as_ref(&self) -> &str {
175 match self {
176 Self::Record => "record",
177 Self::Identity => "identity",
178 Self::Account => "account",
179 }
180 }
181}
182
183fn event_type_ser_str<S: Serializer>(v: &EventType, s: S) -> Result<S::Ok, S::Error> {
184 s.serialize_str(v.as_ref())
185}
186
187#[derive(Debug, Serialize, Clone)]
188pub struct MarshallableEvt<'i> {
189 pub id: u64,
190 #[serde(rename = "type")]
191 #[serde(serialize_with = "event_type_ser_str")]
192 pub kind: EventType,
193 #[serde(borrow)]
194 #[serde(skip_serializing_if = "Option::is_none")]
195 pub record: Option<RecordEvt<'i>>,
196 #[serde(borrow)]
197 #[serde(skip_serializing_if = "Option::is_none")]
198 pub identity: Option<IdentityEvt<'i>>,
199 #[serde(borrow)]
200 #[serde(skip_serializing_if = "Option::is_none")]
201 pub account: Option<AccountEvt<'i>>,
202}
203
204#[derive(Clone, Debug)]
205pub(crate) enum BroadcastEvent {
206 #[allow(dead_code)]
207 Persisted(u64),
208 Ephemeral(Box<MarshallableEvt<'static>>),
209}
210
211#[derive(Debug, Serialize, Clone)]
212pub struct RecordEvt<'i> {
213 pub live: bool,
214 #[serde(borrow)]
215 pub did: Did<'i>,
216 pub rev: Tid,
217 pub collection: Nsid<'i>,
218 pub rkey: Rkey<'i>,
219 pub action: CowStr<'i>,
220 #[serde(skip_serializing_if = "Option::is_none")]
221 pub record: Option<Value>,
222 #[serde(skip_serializing_if = "Option::is_none")]
223 #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
224 pub cid: Option<IpldCid>,
225}
226
227#[derive(Debug, Serialize, Clone)]
228pub struct IdentityEvt<'i> {
229 #[serde(borrow)]
230 pub did: Did<'i>,
231 #[serde(skip_serializing_if = "Option::is_none")]
232 pub handle: Option<Handle<'i>>,
233}
234
235#[derive(Debug, Serialize, Clone)]
236pub struct AccountEvt<'i> {
237 #[serde(borrow)]
238 pub did: Did<'i>,
239 pub active: bool,
240 #[serde(skip_serializing_if = "Option::is_none")]
241 pub status: Option<CowStr<'i>>,
242}
243
244use jacquard_common::bytes::Bytes;
245
246#[derive(Serialize, Deserialize, Clone)]
247pub(crate) enum StoredData {
248 Nothing,
249 Ptr(IpldCid),
250 #[serde(with = "serde_bytes_squared")]
251 Block(Bytes),
252}
253
254impl StoredData {
255 pub fn is_nothing(&self) -> bool {
256 matches!(self, StoredData::Nothing)
257 }
258}
259
260impl Default for StoredData {
261 fn default() -> Self {
262 Self::Nothing
263 }
264}
265
266impl Debug for StoredData {
267 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268 match self {
269 Self::Nothing => f.write_str("nothing"),
270 Self::Block(_) => f.write_str("<block>"),
271 Self::Ptr(cid) => write!(f, "{cid}"),
272 }
273 }
274}
275
276#[derive(Debug, Serialize, Deserialize, Clone)]
277#[serde(bound(deserialize = "'i: 'de"))]
278pub(crate) struct StoredEvent<'i> {
279 #[serde(default)]
280 pub live: bool,
281 #[serde(borrow)]
282 pub did: TrimmedDid<'i>,
283 pub rev: DbTid,
284 #[serde(borrow)]
285 pub collection: CowStr<'i>,
286 pub rkey: DbRkey,
287 pub action: DbAction,
288 #[serde(default)]
289 #[serde(skip_serializing_if = "StoredData::is_nothing")]
290 pub data: StoredData,
291}
292
293mod serde_bytes_squared {
294 use jacquard_common::bytes::Bytes;
295 use serde::{Deserialize, Deserializer, Serializer};
296
297 pub fn serialize<S: Serializer>(v: impl AsRef<[u8]>, s: S) -> Result<S::Ok, S::Error> {
298 s.serialize_bytes(serde_bytes::Bytes::new(v.as_ref()))
299 }
300
301 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Bytes, D::Error> {
302 serde_bytes::ByteBuf::deserialize(d).map(|b| b.into_vec().into())
303 }
304}
305
306#[derive(Debug, PartialEq, Eq, Clone, Copy)]
307pub(crate) enum GaugeState {
308 Synced,
309 Pending,
310 Resync(Option<ResyncErrorKind>),
311}
312
313impl GaugeState {
314 pub fn is_resync(&self) -> bool {
315 matches!(self, GaugeState::Resync(_))
316 }
317}