hydrant/control/
repos.rs

1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use jacquard_common::cowstr::ToCowStr;
5use jacquard_common::types::cid::{Cid, IpldCid};
6use jacquard_common::types::ident::AtIdentifier;
7use jacquard_common::types::string::{Did, Handle, Rkey};
8use jacquard_common::types::tid::Tid;
9use jacquard_common::{CowStr, Data, IntoStatic};
10use miette::{IntoDiagnostic, Result};
11use rand::Rng;
12use smol_str::ToSmolStr;
13use url::Url;
14
15use crate::db::types::DbRkey;
16use crate::db::{self, keys, ser_repo_state};
17use crate::state::AppState;
18use crate::types::{GaugeState, RepoState, RepoStatus};
19
20/// information about a tracked or known repository. returned by [`ReposControl`] methods.
21#[derive(Debug, Clone, serde::Serialize)]
22pub struct RepoInfo {
23    /// the DID of the repository.
24    pub did: Did<'static>,
25    /// the status of the repository.
26    #[serde(serialize_with = "crate::util::repo_status_serialize_str")]
27    pub status: RepoStatus,
28    /// whether this repository is tracked or not.
29    /// untracked repositories are not updated and they stay frozen.
30    pub tracked: bool,
31    /// the revision of the root commit of this repository.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub rev: Option<Tid>,
34    /// the CID of the root commit of this repository.
35    #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub data: Option<IpldCid>,
38    /// the handle for the DID of this repository.
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub handle: Option<Handle<'static>>,
41    /// the URL for the PDS in which this repository is hosted on.
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub pds: Option<Url>,
44    /// ATProto signing key of this repository.
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub signing_key: Option<String>,
47    /// when this repository was last touched (status update, commit ingested, etc.).
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub last_updated_at: Option<DateTime<Utc>>,
50    /// the time of the last message gotten from the firehose for this repository.
51    /// this is equal to the `time` field.
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub last_message_at: Option<DateTime<Utc>>,
54}
55
56/// control over which repositories are tracked and access to their state.
57///
58/// in `filter` mode, a repo is only indexed if it either matches a signal or is
59/// explicitly tracked via [`ReposControl::track`]. in `full` mode all repos are indexed
60/// and tracking is implicit.
61///
62/// tracking a DID that hydrant has never seen enqueues an immediate backfill.
63/// tracking a DID that hydrant already knows about (but has marked untracked)
64/// re-enqueues it for backfill.
65#[derive(Clone)]
66pub struct ReposControl(pub(super) Arc<AppState>);
67
68impl ReposControl {
69    /// gets a handle for a repository to allow acting upon it.
70    pub fn get<'i>(&self, did: &Did<'i>) -> Result<RepoHandle<'i>> {
71        Ok(RepoHandle {
72            state: self.0.clone(),
73            did: did.clone(),
74        })
75    }
76
77    /// same as [`ReposControl::get`] but allows you to pass in an identifier that can be
78    /// either a handle or a DID.
79    pub async fn resolve(&self, repo: &AtIdentifier<'_>) -> Result<RepoHandle<'static>> {
80        let did = self.0.resolver.resolve_did(repo).await?;
81        Ok(RepoHandle {
82            state: self.0.clone(),
83            did,
84        })
85    }
86
87    /// fetch the current state of a single repository. returns `None` if hydrant
88    /// has never seen this DID.
89    pub async fn info(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> {
90        self.get(did)?.info().await
91    }
92
93    /// explicitly track one or more repositories, enqueuing them for backfill if needed.
94    ///
95    /// - if a DID is new, a fresh [`RepoState`] is created and backfill is queued.
96    /// - if a DID is already known but untracked, it is marked tracked and re-enqueued.
97    /// - if a DID is already tracked, this is a no-op.
98    pub async fn track(&self, dids: impl IntoIterator<Item = Did<'_>>) -> Result<()> {
99        let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect();
100        let state = self.0.clone();
101
102        let (new_count, transitions) = tokio::task::spawn_blocking(move || {
103            let db = &state.db;
104            let mut batch = db.inner.batch();
105            let mut added = 0i64;
106            let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new();
107            let mut rng = rand::rng();
108
109            for did in &dids {
110                let did_key = keys::repo_key(did);
111                let repo_bytes = db.repos.get(&did_key).into_diagnostic()?;
112                let existing = repo_bytes
113                    .as_deref()
114                    .map(db::deser_repo_state)
115                    .transpose()?;
116
117                if let Some(mut repo_state) = existing {
118                    if !repo_state.tracked {
119                        let resync = db.resync.get(&did_key).into_diagnostic()?;
120                        let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref());
121                        repo_state.tracked = true;
122                        batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?);
123                        batch.insert(
124                            &db.pending,
125                            keys::pending_key(repo_state.index_id),
126                            &did_key,
127                        );
128                        batch.remove(&db.resync, &did_key);
129                        transitions.push((old, GaugeState::Pending));
130                    }
131                } else {
132                    let repo_state = RepoState::backfilling(rng.next_u64());
133                    batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?);
134                    batch.insert(
135                        &db.pending,
136                        keys::pending_key(repo_state.index_id),
137                        &did_key,
138                    );
139                    added += 1;
140                    transitions.push((GaugeState::Synced, GaugeState::Pending));
141                }
142            }
143
144            batch.commit().into_diagnostic()?;
145            Ok::<_, miette::Report>((added, transitions))
146        })
147        .await
148        .into_diagnostic()??;
149
150        if new_count > 0 {
151            self.0.db.update_count_async("repos", new_count).await;
152        }
153        for (old, new) in transitions {
154            self.0.db.update_gauge_diff_async(&old, &new).await;
155        }
156        self.0.notify_backfill();
157        Ok(())
158    }
159
160    /// stop tracking one or more repositories. hydrant will stop processing new events
161    /// for them and remove them from the pending/resync queues, but existing indexed
162    /// records are **not** deleted.
163    pub async fn untrack(&self, dids: impl IntoIterator<Item = Did<'_>>) -> Result<()> {
164        let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect();
165        let state = self.0.clone();
166
167        let gauge_decrements = tokio::task::spawn_blocking(move || {
168            let db = &state.db;
169            let mut batch = db.inner.batch();
170            let mut gauge_decrements = Vec::new();
171
172            for did in &dids {
173                let did_key = keys::repo_key(did);
174                let repo_bytes = db.repos.get(&did_key).into_diagnostic()?;
175                let existing = repo_bytes
176                    .as_deref()
177                    .map(db::deser_repo_state)
178                    .transpose()?;
179
180                if let Some(repo_state) = existing {
181                    if repo_state.tracked {
182                        let resync = db.resync.get(&did_key).into_diagnostic()?;
183                        let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref());
184                        let mut repo_state = repo_state.into_static();
185                        repo_state.tracked = false;
186                        batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?);
187                        batch.remove(&db.pending, keys::pending_key(repo_state.index_id));
188                        batch.remove(&db.resync, &did_key);
189                        if old != GaugeState::Synced {
190                            gauge_decrements.push(old);
191                        }
192                    }
193                }
194            }
195
196            batch.commit().into_diagnostic()?;
197            Ok::<_, miette::Report>(gauge_decrements)
198        })
199        .await
200        .into_diagnostic()??;
201
202        for gauge in gauge_decrements {
203            self.0
204                .db
205                .update_gauge_diff_async(&gauge, &GaugeState::Synced)
206                .await;
207        }
208        Ok(())
209    }
210}
211
212pub(crate) fn repo_state_to_info(did: Did<'static>, s: RepoState<'_>) -> RepoInfo {
213    RepoInfo {
214        did,
215        status: s.status,
216        tracked: s.tracked,
217        rev: s.rev.map(|r| r.to_tid()),
218        data: s.data,
219        handle: s.handle.map(|h| h.into_static()),
220        pds: s.pds.and_then(|p| p.parse().ok()),
221        signing_key: s.signing_key.map(|k| k.encode()),
222        last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at),
223        last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs),
224    }
225}
226
227pub struct Record {
228    pub did: Did<'static>,
229    pub cid: Cid<'static>,
230    pub value: Data<'static>,
231}
232
233pub struct ListedRecord {
234    pub rkey: Rkey<'static>,
235    pub cid: Cid<'static>,
236    pub value: Data<'static>,
237}
238
239pub struct RecordList {
240    pub records: Vec<ListedRecord>,
241    pub cursor: Option<Rkey<'static>>,
242}
243
244/// handle to access data related to this repository.
245#[derive(Clone)]
246pub struct RepoHandle<'i> {
247    state: Arc<AppState>,
248    pub did: Did<'i>,
249}
250
251impl<'i> RepoHandle<'i> {
252    pub async fn info(&self) -> Result<Option<RepoInfo>> {
253        let did_key = keys::repo_key(&self.did);
254        let state = self.state.clone();
255        let did = self.did.clone().into_static();
256
257        tokio::task::spawn_blocking(move || {
258            let bytes = state.db.repos.get(&did_key).into_diagnostic()?;
259            let state = bytes.as_deref().map(db::deser_repo_state).transpose()?;
260            Ok(state.map(|s| repo_state_to_info(did, s)))
261        })
262        .await
263        .into_diagnostic()?
264    }
265
266    pub async fn get_record(&self, collection: &str, rkey: &str) -> Result<Option<Record>> {
267        let did = self.did.clone().into_static();
268        let db_key = keys::record_key(&did, collection, &DbRkey::new(rkey));
269
270        let collection = collection.to_smolstr();
271        let state = self.state.clone();
272        tokio::task::spawn_blocking(move || {
273            use miette::WrapErr;
274
275            let cid_bytes = state.db.records.get(db_key).into_diagnostic()?;
276            let Some(cid_bytes) = cid_bytes else {
277                return Ok(None);
278            };
279
280            // lookup block using col|cid key
281            let block_key = keys::block_key(&collection, &cid_bytes);
282            let Some(block_bytes) = state.db.blocks.get(block_key).into_diagnostic()? else {
283                miette::bail!("block {cid_bytes:?} not found, this is a bug!!");
284            };
285
286            let value = serde_ipld_dagcbor::from_slice::<Data>(&block_bytes)
287                .into_diagnostic()
288                .wrap_err("cant parse block")?
289                .into_static();
290            let cid = Cid::new(&cid_bytes)
291                .into_diagnostic()
292                .wrap_err("cant parse block cid")?;
293            let cid = Cid::Str(cid.to_cowstr().into_static());
294
295            Ok(Some(Record { did, cid, value }))
296        })
297        .await
298        .into_diagnostic()?
299    }
300
301    pub async fn list_records(
302        &self,
303        collection: &str,
304        limit: usize,
305        reverse: bool,
306        cursor: Option<&str>,
307    ) -> Result<RecordList> {
308        let did = self.did.clone().into_static();
309
310        let state = self.state.clone();
311        let prefix = keys::record_prefix_collection(&did, collection);
312        let collection = collection.to_smolstr();
313        let cursor = cursor.map(|c| c.to_smolstr());
314
315        tokio::task::spawn_blocking(move || {
316            let mut results = Vec::new();
317            let mut next_cursor = None;
318
319            let iter: Box<dyn Iterator<Item = _>> = if !reverse {
320                let mut end_prefix = prefix.clone();
321                if let Some(last) = end_prefix.last_mut() {
322                    *last += 1;
323                }
324
325                let end_key = if let Some(cursor) = &cursor {
326                    let mut k = prefix.clone();
327                    k.extend_from_slice(cursor.as_bytes());
328                    k
329                } else {
330                    end_prefix
331                };
332
333                Box::new(
334                    state
335                        .db
336                        .records
337                        .range(prefix.as_slice()..end_key.as_slice())
338                        .rev(),
339                )
340            } else {
341                let start_key = if let Some(cursor) = &cursor {
342                    let mut k = prefix.clone();
343                    k.extend_from_slice(cursor.as_bytes());
344                    k.push(0);
345                    k
346                } else {
347                    prefix.clone()
348                };
349
350                Box::new(state.db.records.range(start_key.as_slice()..))
351            };
352
353            for item in iter {
354                let (key, cid_bytes) = item.into_inner().into_diagnostic()?;
355
356                if !key.starts_with(prefix.as_slice()) {
357                    break;
358                }
359
360                let rkey = keys::parse_rkey(&key[prefix.len()..])?;
361                if results.len() >= limit {
362                    next_cursor = Some(rkey);
363                    break;
364                }
365
366                // look up using col|cid key built from collection and binary cid bytes
367                if let Ok(Some(block_bytes)) = state
368                    .db
369                    .blocks
370                    .get(&keys::block_key(collection.as_str(), &cid_bytes))
371                {
372                    let value: Data =
373                        serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null);
374                    let cid = Cid::new(&cid_bytes).into_diagnostic()?;
375                    let cid = Cid::Str(cid.to_cowstr().into_static());
376                    results.push(ListedRecord {
377                        rkey: Rkey::new_cow(CowStr::Owned(rkey.to_smolstr()))
378                            .expect("that rkey is validated"),
379                        cid,
380                        value: value.into_static(),
381                    });
382                }
383            }
384            Result::<_, miette::Report>::Ok((results, next_cursor))
385        })
386        .await
387        .into_diagnostic()?
388        .map(|(records, next_cursor)| RecordList {
389            records,
390            cursor: next_cursor.map(|rkey| {
391                Rkey::new_cow(CowStr::Owned(rkey.to_smolstr())).expect("that rkey is validated")
392            }),
393        })
394    }
395
396    pub async fn count_records(&self, collection: &str) -> Result<u64> {
397        let did = self.did.clone().into_static();
398        let state = self.state.clone();
399        let collection = collection.to_string();
400        tokio::task::spawn_blocking(move || db::get_record_count(&state.db, &did, &collection))
401            .await
402            .into_diagnostic()?
403    }
404}