feat(tangled): complete interop for all 24 sh.tangled.* record types Translates all Tangled record types to Cospan equivalents in the Jetstream indexer. Adds 16 new collections to WANTED_COLLECTIONS and implements field-by-field mapping for each: - Trivial: reaction, issue.state, pull.status, collaborator - Near-trivial: knot, spindle, actor.profile, issue.comment, pull.comment - Moderate: repo, knot.member, spindle.member, label.definition, pipeline - Complex: git.refUpdate (git SHAs stored as-is), pipeline.status - Tangled-only: publicKey, string, artifact, label.op (logged, skipped) All translated records set source="tangled" with source_uri tracking. Aggregate counters updated on state changes (issues, pulls, stars, comments).

Author: Aaron Steven White
Commit 545a24621a47ccf74777efc1d7d1b392b5251d09
Parent: 807ca1e98c
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
2 files changed +675 -44
@@ -870,10 +870,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
870870         // ─── Tangled interop ────────────────────────────────────────
871871         // Translate sh.tangled.* records into dev.cospan.* equivalents and index
872872         // them with source="tangled" and source_uri set to the original AT-URI.
873-        //
874-        // Simple records (star, follow, reaction) have identical shapes — we do
875-        // direct field mapping. Complex records (issue, pull) extract what maps
876-        // directly and store unmapped fields as JSONB extra data.
877873 
878874         // ─── Tangled Star ──────────────────────────────────────────
879875         ("sh.tangled.feed.star", "create" | "update") => {
@@ -883,7 +879,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
883879                     .and_then(|v| v.as_str())
884880                     .unwrap_or("")
885881                     .to_string();
886-                let source_uri = format!("at://{did}/sh.tangled.feed.star/{rkey}");
887882 
888883                 let row = db::star::StarRow {
889884                     did: did.to_string(),
@@ -901,12 +896,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
901896                     db::star::increment_repo_star_count(&state.db, &repo_did, &repo_name).await?;
902897                 }
903898 
904-                tracing::debug!(
905-                    did = did,
906-                    rkey = rkey,
907-                    source_uri = %source_uri,
908-                    "indexed tangled star as cospan star"
909-                );
899+                tracing::debug!(did, rkey, "indexed tangled star");
910900             }
911901         }
912902         ("sh.tangled.feed.star", "delete") => {
@@ -920,8 +910,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
920910         // ─── Tangled Follow ────────────────────────────────────────
921911         ("sh.tangled.graph.follow", "create" | "update") => {
922912             if let Some(rec) = record {
923-                let source_uri = format!("at://{did}/sh.tangled.graph.follow/{rkey}");
924-
925913                 let row = db::follow::FollowRow {
926914                     did: did.to_string(),
927915                     rkey: rkey.to_string(),
@@ -934,27 +922,47 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
934922                     indexed_at: Utc::now(),
935923                 };
936924                 db::follow::upsert(&state.db, &row).await?;
937-
938-                tracing::debug!(
939-                    did = did,
940-                    rkey = rkey,
941-                    source_uri = %source_uri,
942-                    "indexed tangled follow as cospan follow"
943-                );
925+                tracing::debug!(did, rkey, "indexed tangled follow");
944926             }
945927         }
946928         ("sh.tangled.graph.follow", "delete") => {
947929             db::follow::delete(&state.db, did, rkey).await?;
948930         }
949931 
932+        // ─── Tangled Reaction ──────────────────────────────────────
933+        ("sh.tangled.feed.reaction", "create" | "update") => {
934+            if let Some(rec) = record {
935+                let row = db::reaction::ReactionRow {
936+                    did: did.to_string(),
937+                    rkey: rkey.to_string(),
938+                    subject: rec
939+                        .get("subject")
940+                        .and_then(|v| v.as_str())
941+                        .unwrap_or("")
942+                        .to_string(),
943+                    // Tangled uses "reaction" or "emoji" for the emoji field
944+                    emoji: rec
945+                        .get("emoji")
946+                        .or_else(|| rec.get("reaction"))
947+                        .and_then(|v| v.as_str())
948+                        .unwrap_or("")
949+                        .to_string(),
950+                    created_at: parse_datetime(rec, "createdAt"),
951+                    indexed_at: Utc::now(),
952+                };
953+                db::reaction::upsert(&state.db, &row).await?;
954+                tracing::debug!(did, rkey, "indexed tangled reaction");
955+            }
956+        }
957+        ("sh.tangled.feed.reaction", "delete") => {
958+            db::reaction::delete(&state.db, did, rkey).await?;
959+        }
960+
950961         // ─── Tangled Issue ─────────────────────────────────────────
951-        // Moderate complexity: Cospan adds schemaRef (absent in Tangled).
952-        // We map the core fields directly and leave schemaRef as null.
953962         ("sh.tangled.repo.issue", "create" | "update") => {
954963             if let Some(rec) = record {
955964                 let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
956965                 let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
957-                let source_uri = format!("at://{did}/sh.tangled.repo.issue/{rkey}");
958966 
959967                 let row = db::issue::IssueRow {
960968                     did: did.to_string(),
@@ -973,13 +981,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
973981                     indexed_at: Utc::now(),
974982                 };
975983                 db::issue::upsert(&state.db, &row).await?;
976-
977-                tracing::debug!(
978-                    did = did,
979-                    rkey = rkey,
980-                    source_uri = %source_uri,
981-                    "indexed tangled issue as cospan issue"
982-                );
984+                tracing::debug!(did, rkey, "indexed tangled issue");
983985             }
984986         }
985987         ("sh.tangled.repo.issue", "delete") => {
@@ -992,10 +994,106 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
992994             db::issue::delete(&state.db, did, rkey).await?;
993995         }
994996 
997+        // ─── Tangled Issue State ───────────────────────────────────
998+        ("sh.tangled.repo.issue.state", "create" | "update") => {
999+            if let Some(rec) = record {
1000+                let issue_uri = rec
1001+                    .get("issue")
1002+                    .and_then(|v| v.as_str())
1003+                    .unwrap_or("")
1004+                    .to_string();
1005+                let new_state = rec
1006+                    .get("state")
1007+                    .and_then(|v| v.as_str())
1008+                    .unwrap_or("open")
1009+                    .to_string();
1010+
1011+                let row = db::issue_state::IssueStateRow {
1012+                    did: did.to_string(),
1013+                    rkey: rkey.to_string(),
1014+                    issue_uri: issue_uri.clone(),
1015+                    state: new_state.clone(),
1016+                    reason: None,
1017+                    created_at: parse_datetime(rec, "createdAt"),
1018+                    indexed_at: Utc::now(),
1019+                };
1020+                db::issue_state::upsert(&state.db, &row).await?;
1021+
1022+                // Update the issue's state and repo counters
1023+                let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&issue_uri);
1024+                if let Some(issue) =
1025+                    db::issue::get_by_pk(&state.db, &issue_did, &issue_rkey).await?
1026+                {
1027+                    let old_state = &issue.state;
1028+                    if old_state != &new_state {
1029+                        db::issue::update_state(&state.db, &issue_did, &issue_rkey, &new_state)
1030+                            .await?;
1031+
1032+                        if old_state == "open" && new_state != "open" {
1033+                            decrement_repo_open_issue_count(
1034+                                &state.db,
1035+                                &issue.repo_did,
1036+                                &issue.repo_name,
1037+                            )
1038+                            .await?;
1039+                        } else if old_state != "open" && new_state == "open" {
1040+                            increment_repo_open_issue_count(
1041+                                &state.db,
1042+                                &issue.repo_did,
1043+                                &issue.repo_name,
1044+                            )
1045+                            .await?;
1046+                        }
1047+                    }
1048+                }
1049+                tracing::debug!(did, rkey, "indexed tangled issue state");
1050+            }
1051+        }
1052+        ("sh.tangled.repo.issue.state", "delete") => {
1053+            db::issue_state::delete(&state.db, did, rkey).await?;
1054+        }
1055+
1056+        // ─── Tangled Issue Comment ─────────────────────────────────
1057+        ("sh.tangled.repo.issue.comment", "create" | "update") => {
1058+            if let Some(rec) = record {
1059+                let issue_uri = rec
1060+                    .get("issue")
1061+                    .and_then(|v| v.as_str())
1062+                    .unwrap_or("")
1063+                    .to_string();
1064+
1065+                let row = db::issue_comment::IssueCommentRow {
1066+                    did: did.to_string(),
1067+                    rkey: rkey.to_string(),
1068+                    issue_uri: issue_uri.clone(),
1069+                    body: rec
1070+                        .get("body")
1071+                        .and_then(|v| v.as_str())
1072+                        .unwrap_or("")
1073+                        .to_string(),
1074+                    created_at: parse_datetime(rec, "createdAt"),
1075+                    indexed_at: Utc::now(),
1076+                };
1077+
1078+                let existing = db::issue_comment::get(&state.db, did, rkey).await?;
1079+                db::issue_comment::upsert(&state.db, &row).await?;
1080+
1081+                if existing.is_none() {
1082+                    let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&issue_uri);
1083+                    db::issue::increment_comment_count(&state.db, &issue_did, &issue_rkey).await?;
1084+                }
1085+                tracing::debug!(did, rkey, "indexed tangled issue comment");
1086+            }
1087+        }
1088+        ("sh.tangled.repo.issue.comment", "delete") => {
1089+            if let Some(comment) = db::issue_comment::get(&state.db, did, rkey).await? {
1090+                let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&comment.issue_uri);
1091+                db::issue::decrement_comment_count(&state.db, &issue_did, &issue_rkey).await?;
1092+            }
1093+            db::issue_comment::delete(&state.db, did, rkey).await?;
1094+        }
1095+
9951096         // ─── Tangled Pull Request ──────────────────────────────────
996-        // Complex: Tangled uses patchBlob + source.sha; Cospan uses mergePreview.
997-        // We extract the fields that map directly (title, body, target branch,
998-        // source branch) and store VCS-specific fields as extra JSONB.
9991097         ("sh.tangled.repo.pull", "create" | "update") => {
10001098             if let Some(rec) = record {
10011099                 // Tangled PR structure:
@@ -1025,8 +1123,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
10251123                     .and_then(|v| v.as_str())
10261124                     .map(String::from);
10271125 
1028-                let source_uri = format!("at://{did}/sh.tangled.repo.pull/{rkey}");
1029-
10301126                 let row = db::pull::PullRow {
10311127                     did: did.to_string(),
10321128                     rkey: rkey.to_string(),
@@ -1047,13 +1143,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
10471143                     indexed_at: Utc::now(),
10481144                 };
10491145                 db::pull::upsert(&state.db, &row).await?;
1050-
1051-                tracing::debug!(
1052-                    did = did,
1053-                    rkey = rkey,
1054-                    source_uri = %source_uri,
1055-                    "indexed tangled pull as cospan pull"
1056-                );
1146+                tracing::debug!(did, rkey, "indexed tangled pull");
10571147             }
10581148         }
10591149         ("sh.tangled.repo.pull", "delete") => {
@@ -1065,11 +1155,536 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
10651155             db::pull::delete(&state.db, did, rkey).await?;
10661156         }
10671157 
1158+        // ─── Tangled Pull Status → Pull State ─────────────────────
1159+        ("sh.tangled.repo.pull.status", "create" | "update") => {
1160+            if let Some(rec) = record {
1161+                let pull_uri = rec
1162+                    .get("pull")
1163+                    .and_then(|v| v.as_str())
1164+                    .unwrap_or("")
1165+                    .to_string();
1166+                // Tangled uses "status", map to Cospan "state"
1167+                let new_state = rec
1168+                    .get("status")
1169+                    .and_then(|v| v.as_str())
1170+                    .unwrap_or("open")
1171+                    .to_string();
1172+
1173+                let row = db::pull_state::PullStateRow {
1174+                    did: did.to_string(),
1175+                    rkey: rkey.to_string(),
1176+                    pull_uri: pull_uri.clone(),
1177+                    state: new_state.clone(),
1178+                    merge_commit_id: None,
1179+                    created_at: parse_datetime(rec, "createdAt"),
1180+                    indexed_at: Utc::now(),
1181+                };
1182+                db::pull_state::upsert(&state.db, &row).await?;
1183+
1184+                // Update the pull's state and repo counters
1185+                let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&pull_uri);
1186+                if let Some(pull) = db::pull::get_by_pk(&state.db, &pull_did, &pull_rkey).await? {
1187+                    let old_state = &pull.state;
1188+                    if old_state != &new_state {
1189+                        db::pull::update_state(&state.db, &pull_did, &pull_rkey, &new_state)
1190+                            .await?;
1191+
1192+                        if old_state == "open" && new_state != "open" {
1193+                            decrement_repo_open_mr_count(
1194+                                &state.db,
1195+                                &pull.repo_did,
1196+                                &pull.repo_name,
1197+                            )
1198+                            .await?;
1199+                        } else if old_state != "open" && new_state == "open" {
1200+                            increment_repo_open_mr_count(
1201+                                &state.db,
1202+                                &pull.repo_did,
1203+                                &pull.repo_name,
1204+                            )
1205+                            .await?;
1206+                        }
1207+                    }
1208+                }
1209+                tracing::debug!(did, rkey, "indexed tangled pull status as pull state");
1210+            }
1211+        }
1212+        ("sh.tangled.repo.pull.status", "delete") => {
1213+            db::pull_state::delete(&state.db, did, rkey).await?;
1214+        }
1215+
1216+        // ─── Tangled Pull Comment ──────────────────────────────────
1217+        ("sh.tangled.repo.pull.comment", "create" | "update") => {
1218+            if let Some(rec) = record {
1219+                let pull_uri = rec
1220+                    .get("pull")
1221+                    .and_then(|v| v.as_str())
1222+                    .unwrap_or("")
1223+                    .to_string();
1224+
1225+                let row = db::pull_comment::PullCommentRow {
1226+                    did: did.to_string(),
1227+                    rkey: rkey.to_string(),
1228+                    pull_uri: pull_uri.clone(),
1229+                    body: rec
1230+                        .get("body")
1231+                        .and_then(|v| v.as_str())
1232+                        .unwrap_or("")
1233+                        .to_string(),
1234+                    review_decision: None, // Tangled doesn't have review decisions
1235+                    created_at: parse_datetime(rec, "createdAt"),
1236+                    indexed_at: Utc::now(),
1237+                };
1238+
1239+                let existing = db::pull_comment::get(&state.db, did, rkey).await?;
1240+                db::pull_comment::upsert(&state.db, &row).await?;
1241+
1242+                if existing.is_none() {
1243+                    let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&pull_uri);
1244+                    db::pull::increment_comment_count(&state.db, &pull_did, &pull_rkey).await?;
1245+                }
1246+                tracing::debug!(did, rkey, "indexed tangled pull comment");
1247+            }
1248+        }
1249+        ("sh.tangled.repo.pull.comment", "delete") => {
1250+            if let Some(comment) = db::pull_comment::get(&state.db, did, rkey).await? {
1251+                let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&comment.pull_uri);
1252+                db::pull::decrement_comment_count(&state.db, &pull_did, &pull_rkey).await?;
1253+            }
1254+            db::pull_comment::delete(&state.db, did, rkey).await?;
1255+        }
1256+
1257+        // ─── Tangled Collaborator ──────────────────────────────────
1258+        ("sh.tangled.repo.collaborator", "create" | "update") => {
1259+            if let Some(rec) = record {
1260+                let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
1261+                let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
1262+
1263+                // Tangled uses "subject" or "did" for the collaborator DID
1264+                let member_did = rec
1265+                    .get("subject")
1266+                    .or_else(|| rec.get("did"))
1267+                    .and_then(|v| v.as_str())
1268+                    .unwrap_or("")
1269+                    .to_string();
1270+
1271+                let row = db::collaborator::CollaboratorRow {
1272+                    did: did.to_string(),
1273+                    rkey: rkey.to_string(),
1274+                    repo_did,
1275+                    repo_name,
1276+                    member_did,
1277+                    role: rec
1278+                        .get("role")
1279+                        .and_then(|v| v.as_str())
1280+                        .unwrap_or("reader")
1281+                        .to_string(),
1282+                    created_at: parse_datetime(rec, "createdAt"),
1283+                    indexed_at: Utc::now(),
1284+                };
1285+                db::collaborator::upsert(&state.db, &row).await?;
1286+                tracing::debug!(did, rkey, "indexed tangled collaborator");
1287+            }
1288+        }
1289+        ("sh.tangled.repo.collaborator", "delete") => {
1290+            db::collaborator::delete(&state.db, did, rkey).await?;
1291+        }
1292+
1293+        // ─── Tangled Knot → Node ──────────────────────────────────
1294+        ("sh.tangled.knot", "create" | "update") => {
1295+            if let Some(rec) = record {
1296+                // Tangled knots don't have a publicEndpoint field
1297+                let row = db::node::NodeRow {
1298+                    did: did.to_string(),
1299+                    rkey: rkey.to_string(),
1300+                    public_endpoint: None,
1301+                    created_at: parse_datetime(rec, "createdAt"),
1302+                    indexed_at: Utc::now(),
1303+                };
1304+                db::node::upsert(&state.db, &row).await?;
1305+                tracing::debug!(did, rkey, "indexed tangled knot as node");
1306+            }
1307+        }
1308+        ("sh.tangled.knot", "delete") => {
1309+            db::node::delete(&state.db, did).await?;
1310+        }
1311+
1312+        // ─── Tangled Spindle → Org ────────────────────────────────
1313+        ("sh.tangled.spindle", "create" | "update") => {
1314+            if let Some(rec) = record {
1315+                // Use the DID as a fallback name; Tangled spindles don't carry a name field
1316+                let name = rec
1317+                    .get("name")
1318+                    .and_then(|v| v.as_str())
1319+                    .unwrap_or(did)
1320+                    .to_string();
1321+
1322+                let row = db::org::OrgRow {
1323+                    did: did.to_string(),
1324+                    rkey: rkey.to_string(),
1325+                    name,
1326+                    description: rec
1327+                        .get("description")
1328+                        .and_then(|v| v.as_str())
1329+                        .map(String::from),
1330+                    avatar_cid: None,
1331+                    created_at: parse_datetime(rec, "createdAt"),
1332+                    indexed_at: Utc::now(),
1333+                };
1334+                db::org::upsert(&state.db, &row).await?;
1335+                tracing::debug!(did, rkey, "indexed tangled spindle as org");
1336+            }
1337+        }
1338+        ("sh.tangled.spindle", "delete") => {
1339+            db::org::delete(&state.db, did, rkey).await?;
1340+        }
1341+
1342+        // ─── Tangled Actor Profile ─────────────────────────────────
1343+        ("sh.tangled.actor.profile", "create" | "update") => {
1344+            if let Some(rec) = record {
1345+                // Tangled stores bluesky as a bool; Cospan stores a handle string.
1346+                // If true, we store the DID as a placeholder (best-effort without
1347+                // async handle resolution in the hot path).
1348+                let bluesky = match rec.get("bluesky") {
1349+                    Some(serde_json::Value::Bool(true)) => did.to_string(),
1350+                    Some(serde_json::Value::String(s)) => s.clone(),
1351+                    _ => String::new(),
1352+                };
1353+
1354+                let row = db::actor_profile::ActorProfileRow {
1355+                    did: did.to_string(),
1356+                    bluesky,
1357+                    display_name: rec
1358+                        .get("displayName")
1359+                        .and_then(|v| v.as_str())
1360+                        .map(String::from),
1361+                    description: rec
1362+                        .get("description")
1363+                        .and_then(|v| v.as_str())
1364+                        .map(String::from),
1365+                    avatar_cid: rec
1366+                        .get("avatar")
1367+                        .and_then(|v| v.get("ref"))
1368+                        .and_then(|v| v.get("$link"))
1369+                        .and_then(|v| v.as_str())
1370+                        .map(String::from),
1371+                    indexed_at: Utc::now(),
1372+                };
1373+                db::actor_profile::upsert(&state.db, &row).await?;
1374+                tracing::debug!(did, rkey, "indexed tangled actor profile");
1375+            }
1376+        }
1377+        ("sh.tangled.actor.profile", "delete") => {
1378+            db::actor_profile::delete(&state.db, did).await?;
1379+        }
1380+
1381+        // ─── Tangled Repo ──────────────────────────────────────────
1382+        ("sh.tangled.repo", "create" | "update") => {
1383+            if let Some(rec) = record {
1384+                // Tangled stores knot as a hostname string; construct did:web and URL
1385+                let knot = rec.get("knot").and_then(|v| v.as_str()).unwrap_or("");
1386+                let node_did = if knot.is_empty() {
1387+                    String::new()
1388+                } else {
1389+                    format!("did:web:{knot}")
1390+                };
1391+                let node_url = if knot.is_empty() {
1392+                    String::new()
1393+                } else {
1394+                    format!("https://{knot}")
1395+                };
1396+
1397+                let source_uri = format!("at://{did}/sh.tangled.repo/{rkey}");
1398+
1399+                let row = db::repo::RepoRow {
1400+                    did: did.to_string(),
1401+                    rkey: rkey.to_string(),
1402+                    name: rec
1403+                        .get("name")
1404+                        .and_then(|v| v.as_str())
1405+                        .unwrap_or("")
1406+                        .to_string(),
1407+                    description: rec
1408+                        .get("description")
1409+                        .and_then(|v| v.as_str())
1410+                        .map(String::from),
1411+                    protocol: "git".to_string(), // Tangled repos are always git
1412+                    node_did,
1413+                    node_url,
1414+                    default_branch: "main".to_string(),
1415+                    visibility: "public".to_string(),
1416+                    source_repo: None,
1417+                    star_count: 0,
1418+                    fork_count: 0,
1419+                    open_issue_count: 0,
1420+                    open_mr_count: 0,
1421+                    source: "tangled".to_string(),
1422+                    source_uri: Some(source_uri),
1423+                    created_at: parse_datetime(rec, "createdAt"),
1424+                    indexed_at: Utc::now(),
1425+                };
1426+                db::repo::upsert(&state.db, &row).await?;
1427+                tracing::debug!(did, rkey, "indexed tangled repo");
1428+            }
1429+        }
1430+        ("sh.tangled.repo", "delete") => {
1431+            tracing::warn!(did, rkey, "tangled repo delete (need rkey->name lookup)");
1432+        }
1433+
1434+        // ─── Tangled Knot Member → Org Member ─────────────────────
1435+        ("sh.tangled.knot.member", "create" | "update") => {
1436+            if let Some(rec) = record {
1437+                // Construct an org URI from the DID (knot owner) since knot.member
1438+                // is associated with the knot owner's DID
1439+                let org_uri = format!("at://{did}/sh.tangled.knot/self");
1440+
1441+                let row = db::org_member::OrgMemberRow {
1442+                    did: did.to_string(),
1443+                    rkey: rkey.to_string(),
1444+                    org_uri,
1445+                    member_did: rec
1446+                        .get("subject")
1447+                        .and_then(|v| v.as_str())
1448+                        .unwrap_or("")
1449+                        .to_string(),
1450+                    role: "member".to_string(), // Tangled doesn't specify roles
1451+                    created_at: parse_datetime(rec, "createdAt"),
1452+                    indexed_at: Utc::now(),
1453+                };
1454+                db::org_member::upsert(&state.db, &row).await?;
1455+                tracing::debug!(did, rkey, "indexed tangled knot member as org member");
1456+            }
1457+        }
1458+        ("sh.tangled.knot.member", "delete") => {
1459+            db::org_member::delete(&state.db, did, rkey).await?;
1460+        }
1461+
1462+        // ─── Tangled Spindle Member → Org Member ──────────────────
1463+        ("sh.tangled.spindle.member", "create" | "update") => {
1464+            if let Some(rec) = record {
1465+                let org_uri = format!("at://{did}/sh.tangled.spindle/self");
1466+
1467+                let row = db::org_member::OrgMemberRow {
1468+                    did: did.to_string(),
1469+                    rkey: rkey.to_string(),
1470+                    org_uri,
1471+                    member_did: rec
1472+                        .get("subject")
1473+                        .and_then(|v| v.as_str())
1474+                        .unwrap_or("")
1475+                        .to_string(),
1476+                    role: "member".to_string(),
1477+                    created_at: parse_datetime(rec, "createdAt"),
1478+                    indexed_at: Utc::now(),
1479+                };
1480+                db::org_member::upsert(&state.db, &row).await?;
1481+                tracing::debug!(did, rkey, "indexed tangled spindle member as org member");
1482+            }
1483+        }
1484+        ("sh.tangled.spindle.member", "delete") => {
1485+            db::org_member::delete(&state.db, did, rkey).await?;
1486+        }
1487+
1488+        // ─── Tangled Label Definition ──────────────────────────────
1489+        ("sh.tangled.label.definition", "create" | "update") => {
1490+            if let Some(rec) = record {
1491+                let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
1492+                let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
1493+
1494+                let row = db::label::LabelRow {
1495+                    did: did.to_string(),
1496+                    rkey: rkey.to_string(),
1497+                    repo_did,
1498+                    repo_name,
1499+                    name: rec
1500+                        .get("name")
1501+                        .and_then(|v| v.as_str())
1502+                        .unwrap_or("")
1503+                        .to_string(),
1504+                    color: rec
1505+                        .get("color")
1506+                        .and_then(|v| v.as_str())
1507+                        .unwrap_or("")
1508+                        .to_string(),
1509+                    // Tangled may store description in valueType
1510+                    description: rec
1511+                        .get("description")
1512+                        .or_else(|| rec.get("valueType"))
1513+                        .and_then(|v| v.as_str())
1514+                        .map(String::from),
1515+                    created_at: parse_datetime(rec, "createdAt"),
1516+                    indexed_at: Utc::now(),
1517+                };
1518+                db::label::upsert(&state.db, &row).await?;
1519+                tracing::debug!(did, rkey, "indexed tangled label definition");
1520+            }
1521+        }
1522+        ("sh.tangled.label.definition", "delete") => {
1523+            db::label::delete(&state.db, did, rkey).await?;
1524+        }
1525+
1526+        // ─── Tangled Pipeline ──────────────────────────────────────
1527+        ("sh.tangled.pipeline", "create" | "update") => {
1528+            if let Some(rec) = record {
1529+                let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
1530+                let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
1531+
1532+                // Extract commit ID from triggerMetadata if present
1533+                let commit_id = rec
1534+                    .get("triggerMetadata")
1535+                    .and_then(|m| m.get("commitId"))
1536+                    .and_then(|v| v.as_str())
1537+                    .or_else(|| rec.get("commitId").and_then(|v| v.as_str()))
1538+                    .unwrap_or("")
1539+                    .to_string();
1540+
1541+                let row = db::pipeline::PipelineRow {
1542+                    did: did.to_string(),
1543+                    rkey: rkey.to_string(),
1544+                    repo_did,
1545+                    repo_name,
1546+                    commit_id,
1547+                    ref_name: rec.get("ref").and_then(|v| v.as_str()).map(String::from),
1548+                    status: "pending".to_string(),
1549+                    // Tangled doesn't have algebraic checks — mark all as skipped
1550+                    gat_type_check: Some("skipped".to_string()),
1551+                    equation_verification: Some("skipped".to_string()),
1552+                    lens_law_check: Some("skipped".to_string()),
1553+                    breaking_change_check: Some("skipped".to_string()),
1554+                    created_at: parse_datetime(rec, "createdAt"),
1555+                    completed_at: None,
1556+                    indexed_at: Utc::now(),
1557+                };
1558+                db::pipeline::upsert(&state.db, &row).await?;
1559+                tracing::debug!(did, rkey, "indexed tangled pipeline");
1560+            }
1561+        }
1562+        ("sh.tangled.pipeline", "delete") => {
1563+            db::pipeline::delete(&state.db, did, rkey).await?;
1564+        }
1565+
1566+        // ─── Tangled Pipeline Status → Update pipeline ─────────────
1567+        ("sh.tangled.pipeline.status", "create" | "update") => {
1568+            if let Some(rec) = record {
1569+                // The pipeline AT-URI to look up
1570+                let pipeline_uri = rec.get("pipeline").and_then(|v| v.as_str()).unwrap_or("");
1571+                let (pipeline_did, pipeline_rkey) = parse_at_uri_did_rkey(pipeline_uri);
1572+
1573+                // Map Tangled status values to Cospan equivalents
1574+                let raw_status = rec
1575+                    .get("status")
1576+                    .and_then(|v| v.as_str())
1577+                    .unwrap_or("pending");
1578+                let mapped_status = match raw_status {
1579+                    "success" => "passed",
1580+                    "failed" => "failed",
1581+                    "cancelled" | "canceled" => "cancelled",
1582+                    "running" | "in_progress" => "running",
1583+                    other => other,
1584+                };
1585+
1586+                if !pipeline_did.is_empty() && !pipeline_rkey.is_empty() {
1587+                    // Update the pipeline status in-place
1588+                    sqlx::query(
1589+                        "UPDATE pipelines SET status = $1, indexed_at = NOW() \
1590+                         WHERE did = $2 AND rkey = $3",
1591+                    )
1592+                    .bind(mapped_status)
1593+                    .bind(&pipeline_did)
1594+                    .bind(&pipeline_rkey)
1595+                    .execute(&state.db)
1596+                    .await?;
1597+                }
1598+                tracing::debug!(
1599+                    did,
1600+                    rkey,
1601+                    status = mapped_status,
1602+                    "indexed tangled pipeline status"
1603+                );
1604+            }
1605+        }
1606+        ("sh.tangled.pipeline.status", "delete") => {
1607+            // Pipeline status deletes are no-ops (status is stored on the pipeline row)
1608+            tracing::debug!(did, rkey, "tangled pipeline status delete (no-op)");
1609+        }
1610+
1611+        // ─── Tangled Git RefUpdate ─────────────────────────────────
1612+        ("sh.tangled.git.refUpdate", "create" | "update") => {
1613+            if let Some(rec) = record {
1614+                let repo_did = rec
1615+                    .get("repoDid")
1616+                    .and_then(|v| v.as_str())
1617+                    .unwrap_or(did)
1618+                    .to_string();
1619+                let repo_name = rec
1620+                    .get("repoName")
1621+                    .and_then(|v| v.as_str())
1622+                    .unwrap_or("")
1623+                    .to_string();
1624+
1625+                let row = db::ref_update::RefUpdateRow {
1626+                    id: 0, // auto-generated
1627+                    repo_did,
1628+                    repo_name,
1629+                    rkey: rkey.to_string(),
1630+                    committer_did: rec
1631+                        .get("committerDid")
1632+                        .and_then(|v| v.as_str())
1633+                        .unwrap_or(did)
1634+                        .to_string(),
1635+                    ref_name: rec
1636+                        .get("ref")
1637+                        .and_then(|v| v.as_str())
1638+                        .unwrap_or("")
1639+                        .to_string(),
1640+                    // Tangled uses oldSha/newSha — store git SHAs as-is
1641+                    old_target: rec.get("oldSha").and_then(|v| v.as_str()).map(String::from),
1642+                    new_target: rec
1643+                        .get("newSha")
1644+                        .and_then(|v| v.as_str())
1645+                        .unwrap_or("")
1646+                        .to_string(),
1647+                    protocol: "git".to_string(),
1648+                    migration_id: None,
1649+                    breaking_change_count: 0,
1650+                    lens_id: None,
1651+                    lens_quality: None,
1652+                    commit_count: rec.get("commitCount").and_then(|v| v.as_i64()).unwrap_or(0)
1653+                        as i32,
1654+                    created_at: parse_datetime(rec, "createdAt"),
1655+                    indexed_at: Utc::now(),
1656+                };
1657+                db::ref_update::upsert(&state.db, &row).await?;
1658+                tracing::debug!(did, rkey, "indexed tangled git refUpdate");
1659+            }
1660+        }
1661+        ("sh.tangled.git.refUpdate", "delete") => {
1662+            db::ref_update::delete(&state.db, did, rkey).await?;
1663+        }
1664+
1665+        // ─── Tangled-only records (no Cospan equivalent) ──────────
1666+        // These are display-only features in Tangled with no Cospan schema.
1667+        // Log and skip.
1668+        (
1669+            "sh.tangled.publicKey"
1670+            | "sh.tangled.string"
1671+            | "sh.tangled.repo.artifact"
1672+            | "sh.tangled.label.op",
1673+            _,
1674+        ) => {
1675+            tracing::debug!(
1676+                collection,
1677+                did,
1678+                rkey,
1679+                "tangled-only record skipped (no cospan equivalent)"
1680+            );
1681+        }
1682+
10681683         // ─── Catch-all for other tangled collections ───────────────
10691684         (c, _) if c.starts_with("sh.tangled.") => {
10701685             tracing::debug!(
10711686                 collection = c,
1072-                "tangled record received for unhandled collection (no cospan equivalent)"
1687+                "tangled record received for unhandled collection"
10731688             );
10741689         }
10751690 
@@ -30,10 +30,26 @@ const WANTED_COLLECTIONS: &[&str] = &[
3030     "dev.cospan.org.member",
3131     "dev.cospan.pipeline",
3232     // Tangled interop
33-    "sh.tangled.repo.issue",
3433     "sh.tangled.feed.star",
34+    "sh.tangled.feed.reaction",
3535     "sh.tangled.graph.follow",
36+    "sh.tangled.repo",
37+    "sh.tangled.repo.issue",
38+    "sh.tangled.repo.issue.comment",
39+    "sh.tangled.repo.issue.state",
3640     "sh.tangled.repo.pull",
41+    "sh.tangled.repo.pull.comment",
42+    "sh.tangled.repo.pull.status",
43+    "sh.tangled.repo.collaborator",
44+    "sh.tangled.knot",
45+    "sh.tangled.knot.member",
46+    "sh.tangled.spindle",
47+    "sh.tangled.spindle.member",
48+    "sh.tangled.actor.profile",
49+    "sh.tangled.label.definition",
50+    "sh.tangled.pipeline",
51+    "sh.tangled.pipeline.status",
52+    "sh.tangled.git.refUpdate",
3753 ];
3854 
3955 /// Classify an error as retryable or permanent.
cospan · schematic version control on atproto built on AT Protocol