feat: knot event consumer for commits/refUpdates Tangled knots serve refUpdate events via WebSocket at /events, not via ATProto relay. This consumer: 1. Discovers knot URLs from the nodes table 2. Connects to each knot's event stream (max 10 concurrent) 3. Processes events through the same panproto consumer pipeline 4. Re-discovers knots every 5 minutes Enables ingestion of ALL historical commits and pipelines.
Author: Aaron Steven White
Commit
b30532d50e102ae7d3abf779b2a32cb2f9c2fde3Parent: 72fbdee6ce
Structural diff unavailable
These commits were pushed via plain git push, so no pre-parsed
schemas are available. Install git-remote-cospan and re-push via panproto:// to
see scope-level changes, breaking change detection, and semantic diffs.
brew install panproto/tap/git-remote-cospan2 files changed +162 -0
@@ -0,0 +1,155 @@
1+//! Knot event consumer — connects to Tangled knot servers' WebSocket event 2+//! streams to ingest refUpdates (commits) and other knot-authored records. 3+//! 4+//! Tangled knots serve events at wss://{host}/events (not via ATProto relay). 5+//! This consumer discovers knots from the nodes table and subscribes to each. 6+ 7+use std::sync::Arc; 8+ 9+use futures_util::StreamExt; 10+use tokio_tungstenite::connect_async; 11+use tokio_tungstenite::tungstenite::Message; 12+ 13+use super::consumer; 14+use crate::db; 15+use crate::state::AppState; 16+ 17+/// Discover all knot URLs and spawn a consumer for each. 18+pub async fn run(state: Arc<AppState>) { 19+ // Wait for initial data to be ingested before discovering knots 20+ tokio::time::sleep(std::time::Duration::from_secs(30)).await; 21+ 22+ loop { 23+ match discover_and_consume(&state).await { 24+ Ok(()) => { 25+ tracing::info!("knot consumer cycle complete, re-discovering in 5m"); 26+ } 27+ Err(e) => { 28+ tracing::error!(error = %e, "knot consumer error, retrying in 1m"); 29+ } 30+ } 31+ // Re-discover knots periodically 32+ tokio::time::sleep(std::time::Duration::from_secs(300)).await; 33+ } 34+} 35+ 36+async fn discover_and_consume(state: &Arc<AppState>) -> anyhow::Result<()> { 37+ let nodes = db::node::list(&state.db, 1000, None).await?; 38+ let knot_urls: Vec<String> = nodes 39+ .iter() 40+ .filter_map(|n| n.public_endpoint.as_ref()) 41+ .filter(|url| !url.is_empty() && !url.contains("localhost") && !url.contains("192.168.")) 42+ .map(|url| { 43+ // Convert https://knot.example.com to wss://knot.example.com/events 44+ let ws_url = url.replace("https://", "wss://").replace("http://", "ws://"); 45+ format!("{ws_url}/events") 46+ }) 47+ .collect(); 48+ 49+ if knot_urls.is_empty() { 50+ tracing::info!("no knots discovered, waiting for nodes to be ingested"); 51+ return Ok(()); 52+ } 53+ 54+ tracing::info!(count = knot_urls.len(), "discovered knots, connecting to event streams"); 55+ 56+ // Spawn a consumer for each knot (with concurrency limit) 57+ let semaphore = Arc::new(tokio::sync::Semaphore::new(10)); // max 10 concurrent connections 58+ let mut handles = Vec::new(); 59+ 60+ for url in knot_urls { 61+ let state = state.clone(); 62+ let sem = semaphore.clone(); 63+ handles.push(tokio::spawn(async move { 64+ let _permit = sem.acquire().await; 65+ if let Err(e) = consume_knot(&state, &url).await { 66+ tracing::debug!(url = %url, error = %e, "knot connection failed"); 67+ } 68+ })); 69+ } 70+ 71+ // Wait for all to complete (they'll timeout or disconnect) 72+ for handle in handles { 73+ let _ = handle.await; 74+ } 75+ 76+ Ok(()) 77+} 78+ 79+/// Connect to a single knot's event stream and process events. 80+async fn consume_knot(state: &Arc<AppState>, url: &str) -> anyhow::Result<()> { 81+ let (ws, _) = tokio::time::timeout( 82+ std::time::Duration::from_secs(10), 83+ connect_async(url), 84+ ) 85+ .await 86+ .map_err(|_| anyhow::anyhow!("connection timeout"))??; 87+ 88+ let (_, mut read) = ws.split(); 89+ let mut count = 0u64; 90+ 91+ // Read events with a timeout per message 92+ while let Ok(Some(msg)) = tokio::time::timeout( 93+ std::time::Duration::from_secs(30), 94+ read.next(), 95+ ) 96+ .await 97+ { 98+ let msg = msg?; 99+ let data = match msg { 100+ Message::Text(text) => text.as_bytes().to_vec(), 101+ Message::Binary(bin) => bin.to_vec(), 102+ Message::Ping(_) | Message::Pong(_) => continue, 103+ Message::Close(_) => break, 104+ _ => continue, 105+ }; 106+ 107+ let event: serde_json::Value = match serde_json::from_slice(&data) { 108+ Ok(v) => v, 109+ Err(_) => continue, 110+ }; 111+ 112+ // Knot event format: { "rkey": "...", "nsid": "...", "event": { ... } } 113+ let nsid = event.get("nsid").and_then(|v| v.as_str()).unwrap_or(""); 114+ let rkey = event.get("rkey").and_then(|v| v.as_str()).unwrap_or(""); 115+ let record = event.get("event"); 116+ 117+ if nsid.is_empty() || rkey.is_empty() || record.is_none() { 118+ continue; 119+ } 120+ 121+ // Extract the DID from the record (knot events don't have a top-level DID) 122+ let did = record 123+ .and_then(|r| r.get("repoDid").or(r.get("committerDid"))) 124+ .and_then(|v| v.as_str()) 125+ .unwrap_or(""); 126+ 127+ // Build a Jetstream-compatible event for the consumer pipeline 128+ let compat_event = serde_json::json!({ 129+ "did": did, 130+ "commit": { 131+ "collection": nsid, 132+ "operation": "create", 133+ "rkey": rkey, 134+ "record": record, 135+ } 136+ }); 137+ 138+ if let Err(e) = consumer::process_event(state, &compat_event).await { 139+ tracing::debug!( 140+ error = %e, 141+ nsid, 142+ rkey, 143+ "knot event processing error" 144+ ); 145+ } 146+ 147+ count += 1; 148+ } 149+ 150+ if count > 0 { 151+ tracing::info!(url = %url, events = count, "knot stream processed"); 152+ } 153+ 154+ Ok(()) 155+}
@@ -1,6 +1,7 @@
11 pub(crate) mod consumer; 22 mod dispatch; 33 mod jetstream; 4+mod knot_consumer; 45 mod tap; 56 67 use std::sync::Arc;
@@ -31,6 +32,12 @@ pub async fn run(state: Arc<AppState>) -> anyhow::Result<()> {
3132 } 3233 } 3334 35+ // Spawn knot event consumer (discovers knots, connects to each /events WebSocket) 36+ let knot_state = state.clone(); 37+ tokio::spawn(async move { 38+ knot_consumer::run(knot_state).await; 39+ }); 40+ 3441 // Run Jetstream consumer (primary live stream with cursor persistence) 3542 loop { 3643 tracing::info!("connecting to jetstream");