1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use jacquard_common::cowstr::ToCowStr;
5use jacquard_common::types::cid::{Cid, IpldCid};
6use jacquard_common::types::ident::AtIdentifier;
7use jacquard_common::types::string::{Did, Handle, Rkey};
8use jacquard_common::types::tid::Tid;
9use jacquard_common::{CowStr, Data, IntoStatic};
10use miette::{IntoDiagnostic, Result};
11use rand::Rng;
12use smol_str::ToSmolStr;
13use url::Url;
14
15use crate::db::types::DbRkey;
16use crate::db::{self, keys, ser_repo_state};
17use crate::state::AppState;
18use crate::types::{GaugeState, RepoState, RepoStatus};
19
20#[derive(Debug, Clone, serde::Serialize)]
22pub struct RepoInfo {
23 pub did: Did<'static>,
25 #[serde(serialize_with = "crate::util::repo_status_serialize_str")]
27 pub status: RepoStatus,
28 pub tracked: bool,
31 #[serde(skip_serializing_if = "Option::is_none")]
33 pub rev: Option<Tid>,
34 #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub data: Option<IpldCid>,
38 #[serde(skip_serializing_if = "Option::is_none")]
40 pub handle: Option<Handle<'static>>,
41 #[serde(skip_serializing_if = "Option::is_none")]
43 pub pds: Option<Url>,
44 #[serde(skip_serializing_if = "Option::is_none")]
46 pub signing_key: Option<String>,
47 #[serde(skip_serializing_if = "Option::is_none")]
49 pub last_updated_at: Option<DateTime<Utc>>,
50 #[serde(skip_serializing_if = "Option::is_none")]
53 pub last_message_at: Option<DateTime<Utc>>,
54}
55
56#[derive(Clone)]
66pub struct ReposControl(pub(super) Arc<AppState>);
67
68impl ReposControl {
69 pub fn get<'i>(&self, did: &Did<'i>) -> Result<RepoHandle<'i>> {
71 Ok(RepoHandle {
72 state: self.0.clone(),
73 did: did.clone(),
74 })
75 }
76
77 pub async fn resolve(&self, repo: &AtIdentifier<'_>) -> Result<RepoHandle<'static>> {
80 let did = self.0.resolver.resolve_did(repo).await?;
81 Ok(RepoHandle {
82 state: self.0.clone(),
83 did,
84 })
85 }
86
87 pub async fn info(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> {
90 self.get(did)?.info().await
91 }
92
93 pub async fn track(&self, dids: impl IntoIterator<Item = Did<'_>>) -> Result<()> {
99 let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect();
100 let state = self.0.clone();
101
102 let (new_count, transitions) = tokio::task::spawn_blocking(move || {
103 let db = &state.db;
104 let mut batch = db.inner.batch();
105 let mut added = 0i64;
106 let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new();
107 let mut rng = rand::rng();
108
109 for did in &dids {
110 let did_key = keys::repo_key(did);
111 let repo_bytes = db.repos.get(&did_key).into_diagnostic()?;
112 let existing = repo_bytes
113 .as_deref()
114 .map(db::deser_repo_state)
115 .transpose()?;
116
117 if let Some(mut repo_state) = existing {
118 if !repo_state.tracked {
119 let resync = db.resync.get(&did_key).into_diagnostic()?;
120 let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref());
121 repo_state.tracked = true;
122 batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?);
123 batch.insert(
124 &db.pending,
125 keys::pending_key(repo_state.index_id),
126 &did_key,
127 );
128 batch.remove(&db.resync, &did_key);
129 transitions.push((old, GaugeState::Pending));
130 }
131 } else {
132 let repo_state = RepoState::backfilling(rng.next_u64());
133 batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?);
134 batch.insert(
135 &db.pending,
136 keys::pending_key(repo_state.index_id),
137 &did_key,
138 );
139 added += 1;
140 transitions.push((GaugeState::Synced, GaugeState::Pending));
141 }
142 }
143
144 batch.commit().into_diagnostic()?;
145 Ok::<_, miette::Report>((added, transitions))
146 })
147 .await
148 .into_diagnostic()??;
149
150 if new_count > 0 {
151 self.0.db.update_count_async("repos", new_count).await;
152 }
153 for (old, new) in transitions {
154 self.0.db.update_gauge_diff_async(&old, &new).await;
155 }
156 self.0.notify_backfill();
157 Ok(())
158 }
159
160 pub async fn untrack(&self, dids: impl IntoIterator<Item = Did<'_>>) -> Result<()> {
164 let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect();
165 let state = self.0.clone();
166
167 let gauge_decrements = tokio::task::spawn_blocking(move || {
168 let db = &state.db;
169 let mut batch = db.inner.batch();
170 let mut gauge_decrements = Vec::new();
171
172 for did in &dids {
173 let did_key = keys::repo_key(did);
174 let repo_bytes = db.repos.get(&did_key).into_diagnostic()?;
175 let existing = repo_bytes
176 .as_deref()
177 .map(db::deser_repo_state)
178 .transpose()?;
179
180 if let Some(repo_state) = existing {
181 if repo_state.tracked {
182 let resync = db.resync.get(&did_key).into_diagnostic()?;
183 let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref());
184 let mut repo_state = repo_state.into_static();
185 repo_state.tracked = false;
186 batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?);
187 batch.remove(&db.pending, keys::pending_key(repo_state.index_id));
188 batch.remove(&db.resync, &did_key);
189 if old != GaugeState::Synced {
190 gauge_decrements.push(old);
191 }
192 }
193 }
194 }
195
196 batch.commit().into_diagnostic()?;
197 Ok::<_, miette::Report>(gauge_decrements)
198 })
199 .await
200 .into_diagnostic()??;
201
202 for gauge in gauge_decrements {
203 self.0
204 .db
205 .update_gauge_diff_async(&gauge, &GaugeState::Synced)
206 .await;
207 }
208 Ok(())
209 }
210}
211
212pub(crate) fn repo_state_to_info(did: Did<'static>, s: RepoState<'_>) -> RepoInfo {
213 RepoInfo {
214 did,
215 status: s.status,
216 tracked: s.tracked,
217 rev: s.rev.map(|r| r.to_tid()),
218 data: s.data,
219 handle: s.handle.map(|h| h.into_static()),
220 pds: s.pds.and_then(|p| p.parse().ok()),
221 signing_key: s.signing_key.map(|k| k.encode()),
222 last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at),
223 last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs),
224 }
225}
226
227pub struct Record {
228 pub did: Did<'static>,
229 pub cid: Cid<'static>,
230 pub value: Data<'static>,
231}
232
233pub struct ListedRecord {
234 pub rkey: Rkey<'static>,
235 pub cid: Cid<'static>,
236 pub value: Data<'static>,
237}
238
239pub struct RecordList {
240 pub records: Vec<ListedRecord>,
241 pub cursor: Option<Rkey<'static>>,
242}
243
244#[derive(Clone)]
246pub struct RepoHandle<'i> {
247 state: Arc<AppState>,
248 pub did: Did<'i>,
249}
250
251impl<'i> RepoHandle<'i> {
252 pub async fn info(&self) -> Result<Option<RepoInfo>> {
253 let did_key = keys::repo_key(&self.did);
254 let state = self.state.clone();
255 let did = self.did.clone().into_static();
256
257 tokio::task::spawn_blocking(move || {
258 let bytes = state.db.repos.get(&did_key).into_diagnostic()?;
259 let state = bytes.as_deref().map(db::deser_repo_state).transpose()?;
260 Ok(state.map(|s| repo_state_to_info(did, s)))
261 })
262 .await
263 .into_diagnostic()?
264 }
265
266 pub async fn get_record(&self, collection: &str, rkey: &str) -> Result<Option<Record>> {
267 let did = self.did.clone().into_static();
268 let db_key = keys::record_key(&did, collection, &DbRkey::new(rkey));
269
270 let collection = collection.to_smolstr();
271 let state = self.state.clone();
272 tokio::task::spawn_blocking(move || {
273 use miette::WrapErr;
274
275 let cid_bytes = state.db.records.get(db_key).into_diagnostic()?;
276 let Some(cid_bytes) = cid_bytes else {
277 return Ok(None);
278 };
279
280 let block_key = keys::block_key(&collection, &cid_bytes);
282 let Some(block_bytes) = state.db.blocks.get(block_key).into_diagnostic()? else {
283 miette::bail!("block {cid_bytes:?} not found, this is a bug!!");
284 };
285
286 let value = serde_ipld_dagcbor::from_slice::<Data>(&block_bytes)
287 .into_diagnostic()
288 .wrap_err("cant parse block")?
289 .into_static();
290 let cid = Cid::new(&cid_bytes)
291 .into_diagnostic()
292 .wrap_err("cant parse block cid")?;
293 let cid = Cid::Str(cid.to_cowstr().into_static());
294
295 Ok(Some(Record { did, cid, value }))
296 })
297 .await
298 .into_diagnostic()?
299 }
300
301 pub async fn list_records(
302 &self,
303 collection: &str,
304 limit: usize,
305 reverse: bool,
306 cursor: Option<&str>,
307 ) -> Result<RecordList> {
308 let did = self.did.clone().into_static();
309
310 let state = self.state.clone();
311 let prefix = keys::record_prefix_collection(&did, collection);
312 let collection = collection.to_smolstr();
313 let cursor = cursor.map(|c| c.to_smolstr());
314
315 tokio::task::spawn_blocking(move || {
316 let mut results = Vec::new();
317 let mut next_cursor = None;
318
319 let iter: Box<dyn Iterator<Item = _>> = if !reverse {
320 let mut end_prefix = prefix.clone();
321 if let Some(last) = end_prefix.last_mut() {
322 *last += 1;
323 }
324
325 let end_key = if let Some(cursor) = &cursor {
326 let mut k = prefix.clone();
327 k.extend_from_slice(cursor.as_bytes());
328 k
329 } else {
330 end_prefix
331 };
332
333 Box::new(
334 state
335 .db
336 .records
337 .range(prefix.as_slice()..end_key.as_slice())
338 .rev(),
339 )
340 } else {
341 let start_key = if let Some(cursor) = &cursor {
342 let mut k = prefix.clone();
343 k.extend_from_slice(cursor.as_bytes());
344 k.push(0);
345 k
346 } else {
347 prefix.clone()
348 };
349
350 Box::new(state.db.records.range(start_key.as_slice()..))
351 };
352
353 for item in iter {
354 let (key, cid_bytes) = item.into_inner().into_diagnostic()?;
355
356 if !key.starts_with(prefix.as_slice()) {
357 break;
358 }
359
360 let rkey = keys::parse_rkey(&key[prefix.len()..])?;
361 if results.len() >= limit {
362 next_cursor = Some(rkey);
363 break;
364 }
365
366 if let Ok(Some(block_bytes)) = state
368 .db
369 .blocks
370 .get(&keys::block_key(collection.as_str(), &cid_bytes))
371 {
372 let value: Data =
373 serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null);
374 let cid = Cid::new(&cid_bytes).into_diagnostic()?;
375 let cid = Cid::Str(cid.to_cowstr().into_static());
376 results.push(ListedRecord {
377 rkey: Rkey::new_cow(CowStr::Owned(rkey.to_smolstr()))
378 .expect("that rkey is validated"),
379 cid,
380 value: value.into_static(),
381 });
382 }
383 }
384 Result::<_, miette::Report>::Ok((results, next_cursor))
385 })
386 .await
387 .into_diagnostic()?
388 .map(|(records, next_cursor)| RecordList {
389 records,
390 cursor: next_cursor.map(|rkey| {
391 Rkey::new_cow(CowStr::Owned(rkey.to_smolstr())).expect("that rkey is validated")
392 }),
393 })
394 }
395
396 pub async fn count_records(&self, collection: &str) -> Result<u64> {
397 let did = self.did.clone().into_static();
398 let state = self.state.clone();
399 let collection = collection.to_string();
400 tokio::task::spawn_blocking(move || db::get_record_count(&state.db, &did, &collection))
401 .await
402 .into_diagnostic()?
403 }
404}