feat: second Tap instance for knot DIDs, multi-Tap support - tap-knots: discovers knot DIDs via sh.tangled.knot signal collection, syncs refUpdates (commits), pipelines, and knot-authored records - TAP_URL now supports comma-separated URLs for multiple Tap instances - appview spawns one subscriber per Tap URL - init-tap-db.sh creates both tap and tap_knots databases This enables complete historical backfill of ALL Tangled data: user-authored records (issues, pulls, stars) via tap, knot-authored records (commits, pipelines) via tap-knots.
Author: Aaron Steven White
Commit
72fbdee6ce49b4d43a0b696272d9f3c3020a41f2Parent: 2b778e45dc
Structural diff unavailable
These commits were pushed via plain git push, so no pre-parsed
schemas are available. Install git-remote-cospan and re-push via panproto:// to
see scope-level changes, breaking change detection, and semantic diffs.
brew install panproto/tap/git-remote-cospan4 files changed +56 -27
@@ -9,22 +9,27 @@ use crate::state::AppState;
99 1010 /// Run the indexer. Spawns both Jetstream (live) and Tap (backfill + live) consumers. 1111 pub async fn run(state: Arc<AppState>) -> anyhow::Result<()> { 12- // Spawn Tap consumer if TAP_URL is configured 13- let tap_state = state.clone(); 14- tokio::spawn(async move { 15- loop { 16- match tap::subscribe(&tap_state).await { 17- Ok(()) => { 18- tracing::info!("tap connection closed, reconnecting in 5s"); 12+ // Spawn Tap consumers for each URL in TAP_URL (comma-separated) 13+ if let Ok(tap_urls) = std::env::var("TAP_URL") { 14+ for url in tap_urls.split(',').map(|s| s.trim().to_string()) { 15+ if url.is_empty() { continue; } 16+ let tap_state = state.clone(); 17+ tokio::spawn(async move { 18+ loop { 19+ match tap::subscribe_to(&tap_state, &url).await { 20+ Ok(()) => { 21+ tracing::info!(url = %url, "tap connection closed, reconnecting in 5s"); 22+ } 23+ Err(e) => { 24+ tracing::error!(url = %url, error = %e, "tap connection error, reconnecting in 10s"); 25+ tokio::time::sleep(std::time::Duration::from_secs(10)).await; 26+ } 27+ } 28+ tokio::time::sleep(std::time::Duration::from_secs(5)).await; 1929 } 20- Err(e) => { 21- tracing::error!(error = %e, "tap connection error, reconnecting in 10s"); 22- tokio::time::sleep(std::time::Duration::from_secs(10)).await; 23- } 24- } 25- tokio::time::sleep(std::time::Duration::from_secs(5)).await; 30+ }); 2631 } 27- }); 32+ } 2833 2934 // Run Jetstream consumer (primary live stream with cursor persistence) 3035 loop {
@@ -13,19 +13,11 @@ use tokio_tungstenite::tungstenite::Message;
1313 use super::consumer; 1414 use crate::state::AppState; 1515 16-/// Subscribe to a Tap instance and process events. 17-pub async fn subscribe(state: &Arc<AppState>) -> anyhow::Result<()> { 18- let tap_url = match std::env::var("TAP_URL") { 19- Ok(url) => url, 20- Err(_) => { 21- tracing::info!("TAP_URL not set, skipping Tap consumer"); 22- return Ok(()); 23- } 24- }; 25- 16+/// Subscribe to a Tap instance at the given URL. 17+pub async fn subscribe_to(state: &Arc<AppState>, tap_url: &str) -> anyhow::Result<()> { 2618 tracing::info!(url = %tap_url, "connecting to Tap"); 2719 28- let (ws, _) = connect_async(&tap_url).await?; 20+ let (ws, _) = connect_async(tap_url).await?; 2921 let (_, mut read) = ws.split(); 3022 3123 let mut event_count: u64 = 0;
@@ -122,6 +122,34 @@ services:
122122 retries: 10 123123 start_period: 300s 124124 125+ # Tap for knot DIDs: discovers git servers via sh.tangled.knot 126+ # Syncs refUpdates (commits), pipelines, and other knot-authored records 127+ tap-knots: 128+ image: ghcr.io/bluesky-social/indigo/tap:latest 129+ container_name: cospan-tap-knots 130+ restart: unless-stopped 131+ environment: 132+ TAP_DATABASE_URL: postgres://cospan:${POSTGRES_PASSWORD}@db:5432/tap_knots 133+ TAP_BIND: ":2480" 134+ TAP_SIGNAL_COLLECTION: sh.tangled.knot 135+ TAP_COLLECTION_FILTERS: "sh.tangled.*,dev.cospan.*" 136+ TAP_RELAY_URL: https://relay1.us-east.bsky.network 137+ TAP_DISABLE_ACKS: "true" 138+ TAP_LOG_LEVEL: info 139+ TAP_RESYNC_PARALLELISM: "3" 140+ TAP_FIREHOSE_PARALLELISM: "10" 141+ depends_on: 142+ db: 143+ condition: service_healthy 144+ networks: 145+ - cospan 146+ healthcheck: 147+ test: ["CMD-SHELL", "wget -q -O- http://localhost:2480/health || exit 1"] 148+ interval: 30s 149+ timeout: 10s 150+ retries: 10 151+ start_period: 300s 152+ 125153 # ATProto AppView 126154 appview: 127155 image: ghcr.io/cospan-dev/cospan-appview:${COSPAN_VERSION:-latest}
@@ -131,7 +159,7 @@ services:
131159 DATABASE_URL: postgres://cospan:${POSTGRES_PASSWORD}@db:5432/cospan 132160 REDIS_URL: redis://redis:6379 133161 JETSTREAM_URL: wss://jetstream2.us-east.bsky.network/subscribe 134- TAP_URL: ws://tap:2480/channel 162+ TAP_URL: ws://tap:2480/channel,ws://tap-knots:2480/channel 135163 APPVIEW_LISTEN: "0.0.0.0:3000" 136164 PUBLIC_URL: https://${DOMAIN:?DOMAIN is required} 137165 depends_on:
@@ -141,6 +169,8 @@ services:
141169 condition: service_healthy 142170 tap: 143171 condition: service_started 172+ tap-knots: 173+ condition: service_started 144174 networks: 145175 - cospan 146176 deploy:
@@ -1,7 +1,9 @@
11 #!/bin/bash 2-# Create the tap database for the Tap sync service 2+# Create databases for Tap sync services 33 set -e 44 psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL 55 CREATE DATABASE tap; 66 GRANT ALL PRIVILEGES ON DATABASE tap TO cospan; 7+ CREATE DATABASE tap_knots; 8+ GRANT ALL PRIVILEGES ON DATABASE tap_knots TO cospan; 79 EOSQL