feat: full backfill via Tap — all historical Tangled + Cospan records Integrate Tap (github.com/bluesky-social/indigo/cmd/tap) as a sidecar service for complete historical backfill. Tap discovers all repos with sh.tangled.* or dev.cospan.* records via the relay's listReposByCollection, downloads full history from each PDS, and streams events ordered (backfill first, then live). New: - docker-compose: tap service with TAP_SIGNAL_COLLECTION and TAP_COLLECTION_FILTERS configured for Tangled + Cospan - indexer/tap.rs: WebSocket consumer that processes Tap events through the same consumer pipeline as Jetstream - Runs both Tap and Jetstream consumers concurrently - init-tap-db.sh: creates the tap database in PostgreSQL

Author: Aaron Steven White
Commit 2a6e5aaee5a79c3b56358e6ea5465a2e1b6de63b
Parent: aedb601e59
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
7 files changed +221 -6
@@ -1,5 +1,16 @@
11 # Changelog
22 
3+## v0.6.0
4+
5+### Full network backfill via Tap
6+
7+- Integrated [Tap](https://github.com/bluesky-social/indigo/cmd/tap) for complete historical backfill of all Tangled and Cospan records
8+- Tap discovers all repos with `sh.tangled.*` or `dev.cospan.*` records via `com.atproto.sync.listReposByCollection`
9+- Downloads full repo history from each PDS, delivers historical events before switching to live firehose
10+- Runs as a sidecar in docker-compose alongside the appview
11+- Appview consumes from both Tap (backfill + live) and Jetstream (live with cursor) simultaneously
12+- All Row struct fields have `#[serde(default)]` for resilient deserialization
13+
314 ## v0.5.3
415 
516 - All Row struct fields now have `#[serde(default)]` so panproto transforms that don't produce every field don't crash deserialization
@@ -436,7 +436,7 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
436436 
437437 [[package]]
438438 name = "cospan-appview"
439-version = "0.5.3"
439+version = "0.6.0"
440440 dependencies = [
441441  "anyhow",
442442  "async-trait",
@@ -482,7 +482,7 @@ dependencies = [
482482 
483483 [[package]]
484484 name = "cospan-codegen"
485-version = "0.5.3"
485+version = "0.6.0"
486486 dependencies = [
487487  "anyhow",
488488  "panproto-check",
@@ -499,7 +499,7 @@ dependencies = [
499499 
500500 [[package]]
501501 name = "cospan-node"
502-version = "0.5.3"
502+version = "0.6.0"
503503 dependencies = [
504504  "anyhow",
505505  "async-trait",
@@ -7,7 +7,7 @@ members = [
77 resolver = "2"
88 
99 [workspace.package]
10-version = "0.5.3"
10+version = "0.6.0"
1111 edition = "2024"
1212 license = "AGPL-3.0-or-later"
1313 repository = "https://github.com/cospan-dev/cospan"
@@ -1,13 +1,32 @@
1-mod consumer;
1+pub(crate) mod consumer;
22 mod dispatch;
33 mod jetstream;
4+mod tap;
45 
56 use std::sync::Arc;
67 
78 use crate::state::AppState;
89 
9-/// Run the firehose indexer. Connects to Jetstream and processes events.
10+/// Run the indexer. Spawns both Jetstream (live) and Tap (backfill + live) consumers.
1011 pub async fn run(state: Arc<AppState>) -> anyhow::Result<()> {
12+    // Spawn Tap consumer if TAP_URL is configured
13+    let tap_state = state.clone();
14+    tokio::spawn(async move {
15+        loop {
16+            match tap::subscribe(&tap_state).await {
17+                Ok(()) => {
18+                    tracing::info!("tap connection closed, reconnecting in 5s");
19+                }
20+                Err(e) => {
21+                    tracing::error!(error = %e, "tap connection error, reconnecting in 10s");
22+                    tokio::time::sleep(std::time::Duration::from_secs(10)).await;
23+                }
24+            }
25+            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
26+        }
27+    });
28+
29+    // Run Jetstream consumer (primary live stream with cursor persistence)
1130     loop {
1231         tracing::info!("connecting to jetstream");
1332         match jetstream::subscribe(&state).await {
@@ -0,0 +1,141 @@
1+//! Tap WebSocket consumer for full historical backfill + live events.
2+//!
3+//! Connects to a Tap instance (github.com/bluesky-social/indigo/cmd/tap)
4+//! which handles repo discovery, backfill from PDS, and live firehose events.
5+//! Events arrive as JSON with a `live` boolean indicating backfill vs live.
6+
7+use std::sync::Arc;
8+
9+use futures_util::StreamExt;
10+use tokio_tungstenite::connect_async;
11+use tokio_tungstenite::tungstenite::Message;
12+
13+use super::consumer;
14+use crate::state::AppState;
15+
16+/// Subscribe to a Tap instance and process events.
17+pub async fn subscribe(state: &Arc<AppState>) -> anyhow::Result<()> {
18+    let tap_url = match std::env::var("TAP_URL") {
19+        Ok(url) => url,
20+        Err(_) => {
21+            tracing::info!("TAP_URL not set, skipping Tap consumer");
22+            return Ok(());
23+        }
24+    };
25+
26+    tracing::info!(url = %tap_url, "connecting to Tap");
27+
28+    let (ws, _) = connect_async(&tap_url).await?;
29+    let (_, mut read) = ws.split();
30+
31+    let mut event_count: u64 = 0;
32+    let mut backfill_count: u64 = 0;
33+    let mut live_count: u64 = 0;
34+
35+    while let Some(msg) = read.next().await {
36+        let msg = msg?;
37+        let data = match msg {
38+            Message::Text(text) => text.as_bytes().to_vec(),
39+            Message::Binary(bin) => bin.to_vec(),
40+            Message::Ping(_) | Message::Pong(_) => continue,
41+            Message::Close(_) => break,
42+            _ => continue,
43+        };
44+
45+        let event: serde_json::Value = match serde_json::from_slice(&data) {
46+            Ok(v) => v,
47+            Err(e) => {
48+                tracing::warn!(error = %e, "failed to parse Tap event");
49+                continue;
50+            }
51+        };
52+
53+        // Tap event format: { "id": N, "type": "record"|"identity", "record": { ... } }
54+        let event_type = event.get("type").and_then(|v| v.as_str()).unwrap_or("");
55+        if event_type != "record" {
56+            continue; // Skip identity events for now
57+        }
58+
59+        let record_event = match event.get("record") {
60+            Some(r) => r,
61+            None => continue,
62+        };
63+
64+        let is_live = record_event
65+            .get("live")
66+            .and_then(|v| v.as_bool())
67+            .unwrap_or(true);
68+        let action = record_event
69+            .get("action")
70+            .and_then(|v| v.as_str())
71+            .unwrap_or("");
72+        let collection = record_event
73+            .get("collection")
74+            .and_then(|v| v.as_str())
75+            .unwrap_or("");
76+        let did = record_event
77+            .get("did")
78+            .and_then(|v| v.as_str())
79+            .unwrap_or("");
80+        let rkey = record_event
81+            .get("rkey")
82+            .and_then(|v| v.as_str())
83+            .unwrap_or("");
84+        let record_data = record_event.get("record");
85+
86+        // Map Tap action to Jetstream-compatible operation
87+        let operation = match action {
88+            "create" | "update" => action,
89+            "delete" => "delete",
90+            _ => continue,
91+        };
92+
93+        // Build a Jetstream-compatible event structure so the consumer can handle it
94+        let compat_event = serde_json::json!({
95+            "did": did,
96+            "commit": {
97+                "collection": collection,
98+                "operation": operation,
99+                "rkey": rkey,
100+                "record": record_data,
101+            }
102+        });
103+
104+        // Process through the same consumer pipeline
105+        if let Err(e) = consumer::process_event(state, &compat_event).await {
106+            tracing::warn!(
107+                error = %e,
108+                collection,
109+                did,
110+                rkey,
111+                live = is_live,
112+                "tap event processing error"
113+            );
114+        }
115+
116+        event_count += 1;
117+        if is_live {
118+            live_count += 1;
119+        } else {
120+            backfill_count += 1;
121+        }
122+
123+        if event_count.is_multiple_of(1000) {
124+            tracing::info!(
125+                total = event_count,
126+                backfill = backfill_count,
127+                live = live_count,
128+                "tap progress"
129+            );
130+        }
131+    }
132+
133+    tracing::info!(
134+        total = event_count,
135+        backfill = backfill_count,
136+        live = live_count,
137+        "tap connection closed"
138+    );
139+
140+    Ok(())
141+}
cospan · schematic version control on atproto built on AT Protocol