feat: PDS backfill for pull.status and issue.state records Fetches state-tracking records directly from each user's PDS via com.atproto.repo.listRecords. Runs 60s after startup to fill in merged/closed states that the Tap may have missed due to timing.

Author: Aaron Steven White
Commit f2e98c33a2f3027266afaedf5ba12776c4b20c3c
Parent: 48ca659281
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
3 files changed +182 -0
@@ -101,6 +101,14 @@ impl DidResolver {
101101         }
102102     }
103103 
104+    /// Resolve a DID to its PDS service endpoint URL.
105+    pub async fn resolve_pds(&self, did: &str) -> Option<String> {
106+        self.resolve_did(did)
107+            .await
108+            .ok()
109+            .and_then(|doc| doc.pds_endpoint().map(String::from))
110+    }
111+
104112     /// Resolve a DID to its DID document.
105113     /// Supports did:plc (via plc.directory) and did:web (.well-known/did.json).
106114     pub async fn resolve_did(&self, did: &str) -> anyhow::Result<DidDocument> {
@@ -2,6 +2,7 @@ pub(crate) mod consumer;
22 mod dispatch;
33 mod jetstream;
44 mod knot_consumer;
5+mod pds_backfill;
56 mod tap;
67 
78 use std::sync::Arc;
@@ -38,6 +39,13 @@ pub async fn run(state: Arc<AppState>) -> anyhow::Result<()> {
3839         knot_consumer::run(knot_state).await;
3940     });
4041 
42+    // Run PDS backfill after 60s delay (let Tap deliver what it can first)
43+    let backfill_state = state.clone();
44+    tokio::spawn(async move {
45+        tokio::time::sleep(std::time::Duration::from_secs(60)).await;
46+        pds_backfill::run(backfill_state).await;
47+    });
48+
4149     // Run Jetstream consumer (primary live stream with cursor persistence)
4250     loop {
4351         tracing::info!("connecting to jetstream");
@@ -0,0 +1,166 @@
1+//! One-time PDS backfill for record types that the Tap may have missed.
2+//!
3+//! Fetches records directly from each user's PDS via `com.atproto.repo.listRecords`.
4+//! Used for state-tracking records (pull.status, issue.state) that arrive as separate
5+//! records and may have been emitted before the appview was connected to the Tap.
6+
7+use std::sync::Arc;
8+
9+use crate::state::AppState;
10+
11+use super::consumer;
12+
13+/// Collections to backfill from each user's PDS.
14+const BACKFILL_COLLECTIONS: &[&str] = &[
15+    "sh.tangled.repo.pull.status",
16+    "sh.tangled.repo.issue.state",
17+];
18+
19+/// Run the PDS backfill. Fetches state records for all known DIDs.
20+pub async fn run(state: Arc<AppState>) {
21+    tracing::info!("starting PDS backfill for state records");
22+
23+    // Get all unique DIDs that have pulls or issues
24+    let dids = match sqlx::query_scalar::<_, String>(
25+        "SELECT DISTINCT did FROM pulls \
26+         UNION \
27+         SELECT DISTINCT did FROM issues",
28+    )
29+    .fetch_all(&state.db)
30+    .await
31+    {
32+        Ok(dids) => dids,
33+        Err(e) => {
34+            tracing::error!(error = %e, "failed to query DIDs for backfill");
35+            return;
36+        }
37+    };
38+
39+    tracing::info!(did_count = dids.len(), "backfilling state records from PDS");
40+
41+    let mut total_processed = 0u64;
42+    let mut total_errors = 0u64;
43+
44+    for did in &dids {
45+        // Resolve DID to PDS URL
46+        let pds_url = match state.did_resolver.resolve_pds(did).await {
47+            Some(url) => url,
48+            None => {
49+                tracing::debug!(did, "could not resolve PDS, skipping");
50+                total_errors += 1;
51+                continue;
52+            }
53+        };
54+
55+        for collection in BACKFILL_COLLECTIONS {
56+            match fetch_and_process(&state, &pds_url, did, collection).await {
57+                Ok(count) => total_processed += count,
58+                Err(e) => {
59+                    tracing::debug!(
60+                        did, collection, error = %e,
61+                        "PDS backfill error for collection"
62+                    );
63+                    total_errors += 1;
64+                }
65+            }
66+        }
67+    }
68+
69+    tracing::info!(
70+        total_processed,
71+        total_errors,
72+        "PDS backfill complete"
73+    );
74+}
75+
76+/// Fetch all records of a collection from a user's PDS and process them.
77+async fn fetch_and_process(
78+    state: &Arc<AppState>,
79+    pds_url: &str,
80+    did: &str,
81+    collection: &str,
82+) -> anyhow::Result<u64> {
83+    let mut cursor: Option<String> = None;
84+    let mut count = 0u64;
85+
86+    loop {
87+        let mut url = format!(
88+            "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection={}&limit=100",
89+            pds_url.trim_end_matches('/'),
90+            did,
91+            collection,
92+        );
93+        if let Some(ref c) = cursor {
94+            url.push_str(&format!("&cursor={}", c));
95+        }
96+
97+        let resp = state
98+            .http_client
99+            .get(&url)
100+            .send()
101+            .await?;
102+
103+        if !resp.status().is_success() {
104+            return Ok(count);
105+        }
106+
107+        let body: serde_json::Value = resp.json().await?;
108+
109+        let records = body
110+            .get("records")
111+            .and_then(|v| v.as_array())
112+            .cloned()
113+            .unwrap_or_default();
114+
115+        if records.is_empty() {
116+            break;
117+        }
118+
119+        for record in &records {
120+            let uri = record.get("uri").and_then(|v| v.as_str()).unwrap_or("");
121+            let value = record.get("value");
122+
123+            // Parse AT-URI: at://did/collection/rkey
124+            let parts: Vec<&str> = uri
125+                .trim_start_matches("at://")
126+                .splitn(3, '/')
127+                .collect();
128+            if parts.len() < 3 {
129+                continue;
130+            }
131+            let rkey = parts[2];
132+
133+            if let Some(val) = value {
134+                let compat_event = serde_json::json!({
135+                    "did": did,
136+                    "commit": {
137+                        "collection": collection,
138+                        "operation": "create",
139+                        "rkey": rkey,
140+                        "record": val,
141+                    }
142+                });
143+
144+                if let Err(e) = consumer::process_event(state, &compat_event).await {
145+                    tracing::debug!(
146+                        error = %e, did, collection, rkey,
147+                        "backfill record processing error"
148+                    );
149+                }
150+                count += 1;
151+            }
152+        }
153+
154+        // Check for next page
155+        cursor = body
156+            .get("cursor")
157+            .and_then(|v| v.as_str())
158+            .map(|s| s.to_string());
159+
160+        if cursor.is_none() {
161+            break;
162+        }
163+    }
164+
165+    Ok(count)
166+}
cospan · schematic version control on atproto built on AT Protocol