feat: Tangled interop via generated morphisms Add 74 Tangled lexicon files under packages/lexicons/sh/tangled/, fetched from tangled.org/tangled.org/core. Add fetch script at scripts/fetch-tangled-lexicons.sh for easy updates. New tangled_interop module in cospan-codegen defines 19 morphisms mapping sh.tangled.* records to Cospan database Row types. Each morphism generates a from_tangled_json() method handling field name differences (knot→node, subject→member_did, repoDid→repo_did, boolean bluesky→string, etc.). Replaced 15 manual Row constructions in consumer.rs with generated from_tangled_json() calls. 3 remain manual (spindle→OrgRow, spindle.member→OrgMemberRow, label.definition) due to incompatible field mappings. Net: -732 lines of manual Tangled interop code replaced by generated morphisms derived from both Cospan and Tangled lexicon schemas.
Author: Aaron Steven White
Commit
2fbc93db9d19be436e6c45deaecefd758ddc0426Parent: 4d7aaedcd8
Structural diff unavailable
These commits were pushed via plain git push, so no pre-parsed
schemas are available. Install git-remote-cospan and re-push via panproto:// to
see scope-level changes, breaking change detection, and semantic diffs.
brew install panproto/tap/git-remote-cospan81 files changed +4778 -732
@@ -14,5 +14,6 @@ planning/
1414 PROGRESS.md 1515 /generated/ 1616 crates/cospan-appview/src/db/generated/ 17+vendor/ 1718 .claude/ 1819 pnpm-lock.yaml 1819 \ No newline at end of file
@@ -29,16 +29,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
2929 // ─── Node ─────────────────────────────────────────────────── 3030 ("dev.cospan.node", "create" | "update") => { 3131 if let Some(rec) = record { 32- let row = db::node::NodeRow { 33- did: did.to_string(), 34- rkey: rkey.to_string(), 35- public_endpoint: rec 36- .get("publicEndpoint") 37- .and_then(|v| v.as_str()) 38- .map(String::from), 39- created_at: parse_datetime(rec, "createdAt"), 40- indexed_at: Utc::now(), 41- }; 32+ let row = db::node::NodeRow::from_json(did, rkey, rec); 4233 db::node::upsert(&state.db, &row).await?; 4334 } 4435 }
@@ -74,48 +65,9 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
7465 .unwrap_or_default() 7566 }; 7667 77- let row = db::repo::RepoRow { 78- did: did.to_string(), 79- rkey: rkey.to_string(), 80- name: rec 81- .get("name") 82- .and_then(|v| v.as_str()) 83- .unwrap_or("") 84- .to_string(), 85- description: rec 86- .get("description") 87- .and_then(|v| v.as_str()) 88- .map(String::from), 89- protocol: rec 90- .get("protocol") 91- .and_then(|v| v.as_str()) 92- .unwrap_or("") 93- .to_string(), 94- node_did, 95- node_url, 96- default_branch: Some(rec 97- .get("defaultBranch") 98- .and_then(|v| v.as_str()) 99- .unwrap_or("main") 100- .to_string()), 101- visibility: Some(rec 102- .get("visibility") 103- .and_then(|v| v.as_str()) 104- .unwrap_or("public") 105- .to_string()), 106- source_repo: rec 107- .get("sourceRepo") 108- .and_then(|v| v.as_str()) 109- .map(String::from), 110- star_count: 0, 111- fork_count: 0, 112- open_issue_count: 0, 113- open_mr_count: 0, 114- source: "cospan".to_string(), 115- source_uri: None, 116- created_at: parse_datetime(rec, "createdAt"), 117- indexed_at: Utc::now(), 118- }; 68+ let mut row = db::repo::RepoRow::from_json(did, rkey, rec); 69+ row.node_did = node_did; 70+ row.node_url = node_url; 11971 db::repo::upsert(&state.db, &row).await?; 12072 } 12173 }
@@ -140,50 +92,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
14092 .map(|a| a.len() as i32) 14193 .unwrap_or(0); 14294 143- let row = db::ref_update::RefUpdateRow { 144- id: 0, // auto-generated 145- repo_did, 146- repo_name, 147- rkey: rkey.to_string(), 148- committer_did: rec 149- .get("committerDid") 150- .and_then(|v| v.as_str()) 151- .unwrap_or(did) 152- .to_string(), 153- ref_name: rec 154- .get("ref") 155- .and_then(|v| v.as_str()) 156- .unwrap_or("") 157- .to_string(), 158- old_target: rec 159- .get("oldTarget") 160- .and_then(|v| v.as_str()) 161- .map(String::from), 162- new_target: rec 163- .get("newTarget") 164- .and_then(|v| v.as_str()) 165- .unwrap_or("") 166- .to_string(), 167- protocol: rec 168- .get("protocol") 169- .and_then(|v| v.as_str()) 170- .unwrap_or("") 171- .to_string(), 172- migration_id: rec 173- .get("migrationId") 174- .and_then(|v| v.as_str()) 175- .map(String::from), 176- breaking_change_count: breaking_changes, 177- lens_id: rec.get("lensId").and_then(|v| v.as_str()).map(String::from), 178- lens_quality: rec 179- .get("lensQuality") 180- .and_then(|v| v.as_f64()) 181- .map(|f| f as f32), 182- commit_count: rec.get("commitCount").and_then(|v| v.as_i64()).unwrap_or(0) 183- as i32, 184- created_at: parse_datetime(rec, "createdAt"), 185- indexed_at: Utc::now(), 186- }; 95+ let mut row = db::ref_update::RefUpdateRow::from_json(did, rkey, rec); 96+ row.breaking_change_count = breaking_changes; 18797 db::ref_update::upsert(&state.db, &row).await?; 18898 18999 // Publish SSE event
@@ -207,22 +117,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
207117 let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or(""); 208118 let (repo_did, repo_name) = parse_repo_at_uri(repo_uri); 209119 210- let row = db::issue::IssueRow { 211- did: did.to_string(), 212- rkey: rkey.to_string(), 213- repo_did, 214- repo_name, 215- title: rec 216- .get("title") 217- .and_then(|v| v.as_str()) 218- .unwrap_or("") 219- .to_string(), 220- body: rec.get("body").and_then(|v| v.as_str()).map(String::from), 221- state: "open".to_string(), 222- comment_count: 0, 223- created_at: parse_datetime(rec, "createdAt"), 224- indexed_at: Utc::now(), 225- }; 120+ let row = db::issue::IssueRow::from_json(did, rkey, rec); 226121 db::issue::upsert(&state.db, &row).await?; 227122 228123 // Publish SSE event
@@ -255,18 +150,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
255150 .unwrap_or("") 256151 .to_string(); 257152 258- let row = db::issue_comment::IssueCommentRow { 259- did: did.to_string(), 260- rkey: rkey.to_string(), 261- issue_uri: issue_uri.clone(), 262- body: rec 263- .get("body") 264- .and_then(|v| v.as_str()) 265- .unwrap_or("") 266- .to_string(), 267- created_at: parse_datetime(rec, "createdAt"), 268- indexed_at: Utc::now(), 269- }; 153+ let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, rec); 270154 271155 // Check if this is a new comment (not an update) for counter purposes 272156 let existing = db::issue_comment::get(&state.db, did, rkey).await?;
@@ -302,15 +186,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
302186 .unwrap_or("open") 303187 .to_string(); 304188 305- let row = db::issue_state::IssueStateRow { 306- did: did.to_string(), 307- rkey: rkey.to_string(), 308- issue_uri: issue_uri.clone(), 309- state: new_state.clone(), 310- reason: rec.get("reason").and_then(|v| v.as_str()).map(String::from), 311- created_at: parse_datetime(rec, "createdAt"), 312- indexed_at: Utc::now(), 313- }; 189+ let row = db::issue_state::IssueStateRow::from_json(did, rkey, rec); 314190 db::issue_state::upsert(&state.db, &row).await?; 315191 316192 // Update the issue's state and repo counters
@@ -362,36 +238,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
362238 let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or(""); 363239 let (repo_did, repo_name) = parse_repo_at_uri(repo_uri); 364240 365- let row = db::pull::PullRow { 366- did: did.to_string(), 367- rkey: rkey.to_string(), 368- repo_did, 369- repo_name, 370- title: rec 371- .get("title") 372- .and_then(|v| v.as_str()) 373- .unwrap_or("") 374- .to_string(), 375- body: rec.get("body").and_then(|v| v.as_str()).map(String::from), 376- target_ref: rec 377- .get("targetRef") 378- .and_then(|v| v.as_str()) 379- .unwrap_or("") 380- .to_string(), 381- source_ref: rec 382- .get("sourceRef") 383- .and_then(|v| v.as_str()) 384- .unwrap_or("") 385- .to_string(), 386- source_repo: rec 387- .get("sourceRepo") 388- .and_then(|v| v.as_str()) 389- .map(String::from), 390- state: "open".to_string(), 391- comment_count: 0, 392- created_at: parse_datetime(rec, "createdAt"), 393- indexed_at: Utc::now(), 394- }; 241+ let row = db::pull::PullRow::from_json(did, rkey, rec); 395242 db::pull::upsert(&state.db, &row).await?; 396243 397244 // Publish SSE event
@@ -423,22 +270,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
423270 .unwrap_or("") 424271 .to_string(); 425272 426- let row = db::pull_comment::PullCommentRow { 427- did: did.to_string(), 428- rkey: rkey.to_string(), 429- pull_uri: pull_uri.clone(), 430- body: rec 431- .get("body") 432- .and_then(|v| v.as_str()) 433- .unwrap_or("") 434- .to_string(), 435- review_decision: rec 436- .get("reviewDecision") 437- .and_then(|v| v.as_str()) 438- .map(String::from), 439- created_at: parse_datetime(rec, "createdAt"), 440- indexed_at: Utc::now(), 441- }; 273+ let row = db::pull_comment::PullCommentRow::from_json(did, rkey, rec); 442274 443275 // Check if this is a new comment (not an update) for counter purposes 444276 let existing = db::pull_comment::get(&state.db, did, rkey).await?;
@@ -472,18 +304,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
472304 .unwrap_or("open") 473305 .to_string(); 474306 475- let row = db::pull_state::PullStateRow { 476- did: did.to_string(), 477- rkey: rkey.to_string(), 478- pull_uri: pull_uri.clone(), 479- state: new_state.clone(), 480- merge_commit_id: rec 481- .get("mergeCommitId") 482- .and_then(|v| v.as_str()) 483- .map(String::from), 484- created_at: parse_datetime(rec, "createdAt"), 485- indexed_at: Utc::now(), 486- }; 307+ let row = db::pull_state::PullStateRow::from_json(did, rkey, rec); 487308 db::pull_state::upsert(&state.db, &row).await?; 488309 489310 // Update the pull's state and repo counters
@@ -536,13 +357,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
536357 .unwrap_or("") 537358 .to_string(); 538359 539- let row = db::star::StarRow { 540- did: did.to_string(), 541- rkey: rkey.to_string(), 542- subject: subject.clone(), 543- created_at: parse_datetime(rec, "createdAt"), 544- indexed_at: Utc::now(), 545- }; 360+ let row = db::star::StarRow::from_json(did, rkey, rec); 546361 547362 let existing = db::star::get(&state.db, did, rkey).await?; 548363 db::star::upsert(&state.db, &row).await?;
@@ -576,17 +391,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
576391 // ─── Follow ───────────────────────────────────────────────── 577392 ("dev.cospan.graph.follow", "create" | "update") => { 578393 if let Some(rec) = record { 579- let row = db::follow::FollowRow { 580- did: did.to_string(), 581- rkey: rkey.to_string(), 582- subject: rec 583- .get("subject") 584- .and_then(|v| v.as_str()) 585- .unwrap_or("") 586- .to_string(), 587- created_at: parse_datetime(rec, "createdAt"), 588- indexed_at: Utc::now(), 589- }; 394+ let row = db::follow::FollowRow::from_json(did, rkey, rec); 590395 db::follow::upsert(&state.db, &row).await?; 591396 } 592397 }
@@ -597,22 +402,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
597402 // ─── Reaction ─────────────────────────────────────────────── 598403 ("dev.cospan.feed.reaction", "create" | "update") => { 599404 if let Some(rec) = record { 600- let row = db::reaction::ReactionRow { 601- did: did.to_string(), 602- rkey: rkey.to_string(), 603- subject: rec 604- .get("subject") 605- .and_then(|v| v.as_str()) 606- .unwrap_or("") 607- .to_string(), 608- emoji: rec 609- .get("emoji") 610- .and_then(|v| v.as_str()) 611- .unwrap_or("") 612- .to_string(), 613- created_at: parse_datetime(rec, "createdAt"), 614- indexed_at: Utc::now(), 615- }; 405+ let row = db::reaction::ReactionRow::from_json(did, rkey, rec); 616406 db::reaction::upsert(&state.db, &row).await?; 617407 } 618408 }
@@ -626,28 +416,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
626416 let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or(""); 627417 let (repo_did, repo_name) = parse_repo_at_uri(repo_uri); 628418 629- let row = db::label::LabelRow { 630- did: did.to_string(), 631- rkey: rkey.to_string(), 632- repo_did, 633- repo_name, 634- name: rec 635- .get("name") 636- .and_then(|v| v.as_str()) 637- .unwrap_or("") 638- .to_string(), 639- color: rec 640- .get("color") 641- .and_then(|v| v.as_str()) 642- .unwrap_or("") 643- .to_string(), 644- description: rec 645- .get("description") 646- .and_then(|v| v.as_str()) 647- .map(String::from), 648- created_at: parse_datetime(rec, "createdAt"), 649- indexed_at: Utc::now(), 650- }; 419+ let row = db::label::LabelRow::from_json(did, rkey, rec); 651420 db::label::upsert(&state.db, &row).await?; 652421 } 653422 }
@@ -658,27 +427,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
658427 // ─── Org ──────────────────────────────────────────────────── 659428 ("dev.cospan.org", "create" | "update") => { 660429 if let Some(rec) = record { 661- let row = db::org::OrgRow { 662- did: did.to_string(), 663- rkey: rkey.to_string(), 664- name: rec 665- .get("name") 666- .and_then(|v| v.as_str()) 667- .unwrap_or("") 668- .to_string(), 669- description: rec 670- .get("description") 671- .and_then(|v| v.as_str()) 672- .map(String::from), 673- avatar_cid: rec 674- .get("avatar") 675- .and_then(|v| v.get("ref")) 676- .and_then(|v| v.get("$link")) 677- .and_then(|v| v.as_str()) 678- .map(String::from), 679- created_at: parse_datetime(rec, "createdAt"), 680- indexed_at: Utc::now(), 681- }; 430+ let row = db::org::OrgRow::from_json(did, rkey, rec); 682431 db::org::upsert(&state.db, &row).await?; 683432 } 684433 }
@@ -689,27 +438,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
689438 // ─── Org Member ───────────────────────────────────────────── 690439 ("dev.cospan.org.member", "create" | "update") => { 691440 if let Some(rec) = record { 692- let row = db::org_member::OrgMemberRow { 693- did: did.to_string(), 694- rkey: rkey.to_string(), 695- org_uri: rec 696- .get("org") 697- .and_then(|v| v.as_str()) 698- .unwrap_or("") 699- .to_string(), 700- member_did: rec 701- .get("member") 702- .and_then(|v| v.as_str()) 703- .unwrap_or("") 704- .to_string(), 705- role: rec 706- .get("role") 707- .and_then(|v| v.as_str()) 708- .unwrap_or("member") 709- .to_string(), 710- created_at: parse_datetime(rec, "createdAt"), 711- indexed_at: Utc::now(), 712- }; 441+ let row = db::org_member::OrgMemberRow::from_json(did, rkey, rec); 713442 db::org_member::upsert(&state.db, &row).await?; 714443 } 715444 }
@@ -723,24 +452,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
723452 let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or(""); 724453 let (repo_did, repo_name) = parse_repo_at_uri(repo_uri); 725454 726- let row = db::collaborator::CollaboratorRow { 727- did: did.to_string(), 728- rkey: rkey.to_string(), 729- repo_did, 730- repo_name, 731- member_did: rec 732- .get("did") 733- .and_then(|v| v.as_str()) 734- .unwrap_or("") 735- .to_string(), 736- role: rec 737- .get("role") 738- .and_then(|v| v.as_str()) 739- .unwrap_or("reader") 740- .to_string(), 741- created_at: parse_datetime(rec, "createdAt"), 742- indexed_at: Utc::now(), 743- }; 455+ let row = db::collaborator::CollaboratorRow::from_json(did, rkey, rec); 744456 db::collaborator::upsert(&state.db, &row).await?; 745457 } 746458 }
@@ -756,46 +468,23 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
756468 757469 let checks = rec.get("algebraicChecks"); 758470 759- let row = db::pipeline::PipelineRow { 760- did: did.to_string(), 761- rkey: rkey.to_string(), 762- repo_did, 763- repo_name, 764- commit_id: rec 765- .get("commitId") 766- .and_then(|v| v.as_str()) 767- .unwrap_or("") 768- .to_string(), 769- ref_name: rec.get("ref").and_then(|v| v.as_str()).map(String::from), 770- status: rec 771- .get("status") 772- .and_then(|v| v.as_str()) 773- .unwrap_or("pending") 774- .to_string(), 775- gat_type_check: checks 776- .and_then(|c| c.get("gatTypeCheck")) 777- .and_then(|v| v.as_str()) 778- .map(String::from), 779- equation_verification: checks 780- .and_then(|c| c.get("equationVerification")) 781- .and_then(|v| v.as_str()) 782- .map(String::from), 783- lens_law_check: checks 784- .and_then(|c| c.get("lensLawCheck")) 785- .and_then(|v| v.as_str()) 786- .map(String::from), 787- breaking_change_check: checks 788- .and_then(|c| c.get("breakingChangeCheck")) 789- .and_then(|v| v.as_str()) 790- .map(String::from), 791- created_at: parse_datetime(rec, "createdAt"), 792- completed_at: rec 793- .get("completedAt") 794- .and_then(|v| v.as_str()) 795- .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 796- .map(|dt| dt.with_timezone(&Utc)), 797- indexed_at: Utc::now(), 798- }; 471+ let mut row = db::pipeline::PipelineRow::from_json(did, rkey, rec); 472+ row.gat_type_check = checks 473+ .and_then(|c| c.get("gatTypeCheck")) 474+ .and_then(|v| v.as_str()) 475+ .map(String::from); 476+ row.equation_verification = checks 477+ .and_then(|c| c.get("equationVerification")) 478+ .and_then(|v| v.as_str()) 479+ .map(String::from); 480+ row.lens_law_check = checks 481+ .and_then(|c| c.get("lensLawCheck")) 482+ .and_then(|v| v.as_str()) 483+ .map(String::from); 484+ row.breaking_change_check = checks 485+ .and_then(|c| c.get("breakingChangeCheck")) 486+ .and_then(|v| v.as_str()) 487+ .map(String::from); 799488 db::pipeline::upsert(&state.db, &row).await?; 800489 } 801490 }
@@ -806,38 +495,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
806495 // ─── Dependency ───────────────────────────────────────────── 807496 ("dev.cospan.repo.dependency", "create" | "update") => { 808497 if let Some(rec) = record { 809- let source_uri = rec.get("sourceRepo").and_then(|v| v.as_str()).unwrap_or(""); 810- let target_uri = rec.get("targetRepo").and_then(|v| v.as_str()).unwrap_or(""); 811- let (source_repo_did, source_repo_name) = parse_repo_at_uri(source_uri); 812- let (target_repo_did, target_repo_name) = parse_repo_at_uri(target_uri); 813- 814- let row = db::dependency::DependencyRow { 815- did: did.to_string(), 816- rkey: rkey.to_string(), 817- source_repo_did, 818- source_repo_name, 819- target_repo_did, 820- target_repo_name, 821- morphism_id: rec 822- .get("morphismId") 823- .and_then(|v| v.as_str()) 824- .unwrap_or("") 825- .to_string(), 826- source_protocol: rec 827- .get("sourceProtocol") 828- .and_then(|v| v.as_str()) 829- .map(String::from), 830- target_protocol: rec 831- .get("targetProtocol") 832- .and_then(|v| v.as_str()) 833- .map(String::from), 834- description: rec 835- .get("description") 836- .and_then(|v| v.as_str()) 837- .map(String::from), 838- created_at: parse_datetime(rec, "createdAt"), 839- indexed_at: Utc::now(), 840- }; 498+ let row = db::dependency::DependencyRow::from_json(did, rkey, rec); 841499 db::dependency::upsert(&state.db, &row).await?; 842500 } 843501 }
@@ -852,25 +510,12 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
852510 // ─── Tangled Star ────────────────────────────────────────── 853511 ("sh.tangled.feed.star", "create" | "update") => { 854512 if let Some(rec) = record { 855- let subject = rec 856- .get("subject") 857- .and_then(|v| v.as_str()) 858- .unwrap_or("") 859- .to_string(); 860- 861- let row = db::star::StarRow { 862- did: did.to_string(), 863- rkey: rkey.to_string(), 864- subject: subject.clone(), 865- created_at: parse_datetime(rec, "createdAt"), 866- indexed_at: Utc::now(), 867- }; 868- 513+ let row = db::star::StarRow::from_tangled_json(did, rkey, rec); 869514 let existing = db::star::get(&state.db, did, rkey).await?; 870515 db::star::upsert(&state.db, &row).await?; 871516 872517 if existing.is_none() { 873- let (repo_did, repo_name) = parse_repo_at_uri(&subject); 518+ let (repo_did, repo_name) = parse_repo_at_uri(&row.subject); 874519 db::star::increment_repo_star_count(&state.db, &repo_did, &repo_name).await?; 875520 } 876521
@@ -888,17 +533,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
888533 // ─── Tangled Follow ──────────────────────────────────────── 889534 ("sh.tangled.graph.follow", "create" | "update") => { 890535 if let Some(rec) = record { 891- let row = db::follow::FollowRow { 892- did: did.to_string(), 893- rkey: rkey.to_string(), 894- subject: rec 895- .get("subject") 896- .and_then(|v| v.as_str()) 897- .unwrap_or("") 898- .to_string(), 899- created_at: parse_datetime(rec, "createdAt"), 900- indexed_at: Utc::now(), 901- }; 536+ let row = db::follow::FollowRow::from_tangled_json(did, rkey, rec); 902537 db::follow::upsert(&state.db, &row).await?; 903538 tracing::debug!(did, rkey, "indexed tangled follow"); 904539 }
@@ -910,24 +545,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
910545 // ─── Tangled Reaction ────────────────────────────────────── 911546 ("sh.tangled.feed.reaction", "create" | "update") => { 912547 if let Some(rec) = record { 913- let row = db::reaction::ReactionRow { 914- did: did.to_string(), 915- rkey: rkey.to_string(), 916- subject: rec 917- .get("subject") 918- .and_then(|v| v.as_str()) 919- .unwrap_or("") 920- .to_string(), 921- // Tangled uses "reaction" or "emoji" for the emoji field 922- emoji: rec 923- .get("emoji") 924- .or_else(|| rec.get("reaction")) 925- .and_then(|v| v.as_str()) 926- .unwrap_or("") 927- .to_string(), 928- created_at: parse_datetime(rec, "createdAt"), 929- indexed_at: Utc::now(), 930- }; 548+ let row = db::reaction::ReactionRow::from_tangled_json(did, rkey, rec); 931549 db::reaction::upsert(&state.db, &row).await?; 932550 tracing::debug!(did, rkey, "indexed tangled reaction"); 933551 }
@@ -939,25 +557,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
939557 // ─── Tangled Issue ───────────────────────────────────────── 940558 ("sh.tangled.repo.issue", "create" | "update") => { 941559 if let Some(rec) = record { 942- let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or(""); 943- let (repo_did, repo_name) = parse_repo_at_uri(repo_uri); 944- 945- let row = db::issue::IssueRow { 946- did: did.to_string(), 947- rkey: rkey.to_string(), 948- repo_did, 949- repo_name, 950- title: rec 951- .get("title") 952- .and_then(|v| v.as_str()) 953- .unwrap_or("") 954- .to_string(), 955- body: rec.get("body").and_then(|v| v.as_str()).map(String::from), 956- state: "open".to_string(), 957- comment_count: 0, 958- created_at: parse_datetime(rec, "createdAt"), 959- indexed_at: Utc::now(), 960- }; 560+ let row = db::issue::IssueRow::from_tangled_json(did, rkey, rec); 961561 db::issue::upsert(&state.db, &row).await?; 962562 tracing::debug!(did, rkey, "indexed tangled issue"); 963563 }
@@ -986,15 +586,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
986586 .unwrap_or("open") 987587 .to_string(); 988588 989- let row = db::issue_state::IssueStateRow { 990- did: did.to_string(), 991- rkey: rkey.to_string(), 992- issue_uri: issue_uri.clone(), 993- state: new_state.clone(), 994- reason: None, 995- created_at: parse_datetime(rec, "createdAt"), 996- indexed_at: Utc::now(), 997- }; 589+ let row = db::issue_state::IssueStateRow::from_tangled_json(did, rkey, rec); 998590 db::issue_state::upsert(&state.db, &row).await?; 999591 1000592 // Update the issue's state and repo counters
@@ -1040,18 +632,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1040632 .unwrap_or("") 1041633 .to_string(); 1042634 1043- let row = db::issue_comment::IssueCommentRow { 1044- did: did.to_string(), 1045- rkey: rkey.to_string(), 1046- issue_uri: issue_uri.clone(), 1047- body: rec 1048- .get("body") 1049- .and_then(|v| v.as_str()) 1050- .unwrap_or("") 1051- .to_string(), 1052- created_at: parse_datetime(rec, "createdAt"), 1053- indexed_at: Utc::now(), 1054- }; 635+ let row = db::issue_comment::IssueCommentRow::from_tangled_json(did, rkey, rec); 1055636 1056637 let existing = db::issue_comment::get(&state.db, did, rkey).await?; 1057638 db::issue_comment::upsert(&state.db, &row).await?;
@@ -1074,52 +655,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1074655 // ─── Tangled Pull Request ────────────────────────────────── 1075656 ("sh.tangled.repo.pull", "create" | "update") => { 1076657 if let Some(rec) = record { 1077- // Tangled PR structure: 1078- // target: { repo (at-uri), branch } 1079- // source: { branch, sha, repo (at-uri) } 1080- let target = rec.get("target"); 1081- let source = rec.get("source"); 1082- 1083- let repo_uri = target 1084- .and_then(|t| t.get("repo")) 1085- .and_then(|v| v.as_str()) 1086- .unwrap_or(""); 1087- let (repo_did, repo_name) = parse_repo_at_uri(repo_uri); 1088- 1089- let target_ref = target 1090- .and_then(|t| t.get("branch")) 1091- .and_then(|v| v.as_str()) 1092- .unwrap_or("") 1093- .to_string(); 1094- let source_ref = source 1095- .and_then(|s| s.get("branch")) 1096- .and_then(|v| v.as_str()) 1097- .unwrap_or("") 1098- .to_string(); 1099- let source_repo = source 1100- .and_then(|s| s.get("repo")) 1101- .and_then(|v| v.as_str()) 1102- .map(String::from); 1103- 1104- let row = db::pull::PullRow { 1105- did: did.to_string(), 1106- rkey: rkey.to_string(), 1107- repo_did, 1108- repo_name, 1109- title: rec 1110- .get("title") 1111- .and_then(|v| v.as_str()) 1112- .unwrap_or("") 1113- .to_string(), 1114- body: rec.get("body").and_then(|v| v.as_str()).map(String::from), 1115- target_ref, 1116- source_ref, 1117- source_repo, 1118- state: "open".to_string(), 1119- comment_count: 0, 1120- created_at: parse_datetime(rec, "createdAt"), 1121- indexed_at: Utc::now(), 1122- }; 658+ let row = db::pull::PullRow::from_tangled_json(did, rkey, rec); 1123659 db::pull::upsert(&state.db, &row).await?; 1124660 tracing::debug!(did, rkey, "indexed tangled pull"); 1125661 }
@@ -1141,22 +677,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1141677 .and_then(|v| v.as_str()) 1142678 .unwrap_or("") 1143679 .to_string(); 1144- // Tangled uses "status", map to Cospan "state" 1145- let new_state = rec 1146- .get("status") 1147- .and_then(|v| v.as_str()) 1148- .unwrap_or("open") 1149- .to_string(); 1150- 1151- let row = db::pull_state::PullStateRow { 1152- did: did.to_string(), 1153- rkey: rkey.to_string(), 1154- pull_uri: pull_uri.clone(), 1155- state: new_state.clone(), 1156- merge_commit_id: None, 1157- created_at: parse_datetime(rec, "createdAt"), 1158- indexed_at: Utc::now(), 1159- }; 680+ let row = db::pull_state::PullStateRow::from_tangled_json(did, rkey, rec); 681+ let new_state = row.state.clone(); 1160682 db::pull_state::upsert(&state.db, &row).await?; 1161683 1162684 // Update the pull's state and repo counters
@@ -1200,19 +722,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1200722 .unwrap_or("") 1201723 .to_string(); 1202724 1203- let row = db::pull_comment::PullCommentRow { 1204- did: did.to_string(), 1205- rkey: rkey.to_string(), 1206- pull_uri: pull_uri.clone(), 1207- body: rec 1208- .get("body") 1209- .and_then(|v| v.as_str()) 1210- .unwrap_or("") 1211- .to_string(), 1212- review_decision: None, // Tangled doesn't have review decisions 1213- created_at: parse_datetime(rec, "createdAt"), 1214- indexed_at: Utc::now(), 1215- }; 725+ let row = db::pull_comment::PullCommentRow::from_tangled_json(did, rkey, rec); 1216726 1217727 let existing = db::pull_comment::get(&state.db, did, rkey).await?; 1218728 db::pull_comment::upsert(&state.db, &row).await?;
@@ -1235,31 +745,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1235745 // ─── Tangled Collaborator ────────────────────────────────── 1236746 ("sh.tangled.repo.collaborator", "create" | "update") => { 1237747 if let Some(rec) = record { 1238- let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or(""); 1239- let (repo_did, repo_name) = parse_repo_at_uri(repo_uri); 1240- 1241- // Tangled uses "subject" or "did" for the collaborator DID 1242- let member_did = rec 1243- .get("subject") 1244- .or_else(|| rec.get("did")) 1245- .and_then(|v| v.as_str()) 1246- .unwrap_or("") 1247- .to_string(); 1248- 1249- let row = db::collaborator::CollaboratorRow { 1250- did: did.to_string(), 1251- rkey: rkey.to_string(), 1252- repo_did, 1253- repo_name, 1254- member_did, 1255- role: rec 1256- .get("role") 1257- .and_then(|v| v.as_str()) 1258- .unwrap_or("reader") 1259- .to_string(), 1260- created_at: parse_datetime(rec, "createdAt"), 1261- indexed_at: Utc::now(), 1262- }; 748+ let row = db::collaborator::CollaboratorRow::from_tangled_json(did, rkey, rec); 1263749 db::collaborator::upsert(&state.db, &row).await?; 1264750 tracing::debug!(did, rkey, "indexed tangled collaborator"); 1265751 }
@@ -1271,14 +757,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1271757 // ─── Tangled Knot → Node ────────────────────────────────── 1272758 ("sh.tangled.knot", "create" | "update") => { 1273759 if let Some(rec) = record { 1274- // Tangled knots don't have a publicEndpoint field 1275- let row = db::node::NodeRow { 1276- did: did.to_string(), 1277- rkey: rkey.to_string(), 1278- public_endpoint: None, 1279- created_at: parse_datetime(rec, "createdAt"), 1280- indexed_at: Utc::now(), 1281- }; 760+ let row = db::node::NodeRow::from_tangled_json(did, rkey, rec); 1282761 db::node::upsert(&state.db, &row).await?; 1283762 tracing::debug!(did, rkey, "indexed tangled knot as node"); 1284763 }
@@ -1320,35 +799,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1320799 // ─── Tangled Actor Profile ───────────────────────────────── 1321800 ("sh.tangled.actor.profile", "create" | "update") => { 1322801 if let Some(rec) = record { 1323- // Tangled stores bluesky as a bool; Cospan stores a handle string. 1324- // If true, we store the DID as a placeholder (best-effort without 1325- // async handle resolution in the hot path). 1326- let bluesky = match rec.get("bluesky") { 1327- Some(serde_json::Value::Bool(true)) => did.to_string(), 1328- Some(serde_json::Value::String(s)) => s.clone(), 1329- _ => String::new(), 1330- }; 1331- 1332- let row = db::actor_profile::ActorProfileRow { 1333- did: did.to_string(), 1334- rkey: rkey.to_string(), 1335- bluesky, 1336- display_name: rec 1337- .get("displayName") 1338- .and_then(|v| v.as_str()) 1339- .map(String::from), 1340- description: rec 1341- .get("description") 1342- .and_then(|v| v.as_str()) 1343- .map(String::from), 1344- avatar_cid: rec 1345- .get("avatar") 1346- .and_then(|v| v.get("ref")) 1347- .and_then(|v| v.get("$link")) 1348- .and_then(|v| v.as_str()) 1349- .map(String::from), 1350- indexed_at: Utc::now(), 1351- }; 802+ let row = db::actor_profile::ActorProfileRow::from_tangled_json(did, rkey, rec); 1352803 db::actor_profile::upsert(&state.db, &row).await?; 1353804 tracing::debug!(did, rkey, "indexed tangled actor profile"); 1354805 }
@@ -1360,48 +811,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1360811 // ─── Tangled Repo ────────────────────────────────────────── 1361812 ("sh.tangled.repo", "create" | "update") => { 1362813 if let Some(rec) = record { 1363- // Tangled stores knot as a hostname string; construct did:web and URL 1364- let knot = rec.get("knot").and_then(|v| v.as_str()).unwrap_or(""); 1365- let node_did = if knot.is_empty() { 1366- String::new() 1367- } else { 1368- format!("did:web:{knot}") 1369- }; 1370- let node_url = if knot.is_empty() { 1371- String::new() 1372- } else { 1373- format!("https://{knot}") 1374- }; 1375- 1376- let source_uri = format!("at://{did}/sh.tangled.repo/{rkey}"); 1377- 1378- let row = db::repo::RepoRow { 1379- did: did.to_string(), 1380- rkey: rkey.to_string(), 1381- name: rec 1382- .get("name") 1383- .and_then(|v| v.as_str()) 1384- .unwrap_or("") 1385- .to_string(), 1386- description: rec 1387- .get("description") 1388- .and_then(|v| v.as_str()) 1389- .map(String::from), 1390- protocol: "git".to_string(), // Tangled repos are always git 1391- node_did, 1392- node_url, 1393- default_branch: Some("main".to_string()), 1394- visibility: Some("public".to_string()), 1395- source_repo: None, 1396- star_count: 0, 1397- fork_count: 0, 1398- open_issue_count: 0, 1399- open_mr_count: 0, 1400- source: "tangled".to_string(), 1401- source_uri: Some(source_uri), 1402- created_at: parse_datetime(rec, "createdAt"), 1403- indexed_at: Utc::now(), 1404- }; 814+ let row = db::repo::RepoRow::from_tangled_json(did, rkey, rec); 1405815 db::repo::upsert(&state.db, &row).await?; 1406816 tracing::debug!(did, rkey, "indexed tangled repo"); 1407817 }
@@ -1413,23 +823,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1413823 // ─── Tangled Knot Member → Org Member ───────────────────── 1414824 ("sh.tangled.knot.member", "create" | "update") => { 1415825 if let Some(rec) = record { 1416- // Construct an org URI from the DID (knot owner) since knot.member 1417- // is associated with the knot owner's DID 1418- let org_uri = format!("at://{did}/sh.tangled.knot/self"); 1419- 1420- let row = db::org_member::OrgMemberRow { 1421- did: did.to_string(), 1422- rkey: rkey.to_string(), 1423- org_uri, 1424- member_did: rec 1425- .get("subject") 1426- .and_then(|v| v.as_str()) 1427- .unwrap_or("") 1428- .to_string(), 1429- role: "member".to_string(), // Tangled doesn't specify roles 1430- created_at: parse_datetime(rec, "createdAt"), 1431- indexed_at: Utc::now(), 1432- }; 826+ let row = db::org_member::OrgMemberRow::from_tangled_json(did, rkey, rec); 1433827 db::org_member::upsert(&state.db, &row).await?; 1434828 tracing::debug!(did, rkey, "indexed tangled knot member as org member"); 1435829 }
@@ -1505,35 +899,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1505899 // ─── Tangled Pipeline ────────────────────────────────────── 1506900 ("sh.tangled.pipeline", "create" | "update") => { 1507901 if let Some(rec) = record { 1508- let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or(""); 1509- let (repo_did, repo_name) = parse_repo_at_uri(repo_uri); 1510- 1511- // Extract commit ID from triggerMetadata if present 1512- let commit_id = rec 1513- .get("triggerMetadata") 1514- .and_then(|m| m.get("commitId")) 1515- .and_then(|v| v.as_str()) 1516- .or_else(|| rec.get("commitId").and_then(|v| v.as_str())) 1517- .unwrap_or("") 1518- .to_string(); 1519- 1520- let row = db::pipeline::PipelineRow { 1521- did: did.to_string(), 1522- rkey: rkey.to_string(), 1523- repo_did, 1524- repo_name, 1525- commit_id, 1526- ref_name: rec.get("ref").and_then(|v| v.as_str()).map(String::from), 1527- status: "pending".to_string(), 1528- // Tangled doesn't have algebraic checks — mark all as skipped 1529- gat_type_check: Some("skipped".to_string()), 1530- equation_verification: Some("skipped".to_string()), 1531- lens_law_check: Some("skipped".to_string()), 1532- breaking_change_check: Some("skipped".to_string()), 1533- created_at: parse_datetime(rec, "createdAt"), 1534- completed_at: None, 1535- indexed_at: Utc::now(), 1536- }; 902+ let row = db::pipeline::PipelineRow::from_tangled_json(did, rkey, rec); 1537903 db::pipeline::upsert(&state.db, &row).await?; 1538904 tracing::debug!(did, rkey, "indexed tangled pipeline"); 1539905 }
@@ -1590,49 +956,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1590956 // ─── Tangled Git RefUpdate ───────────────────────────────── 1591957 ("sh.tangled.git.refUpdate", "create" | "update") => { 1592958 if let Some(rec) = record { 1593- let repo_did = rec 1594- .get("repoDid") 1595- .and_then(|v| v.as_str()) 1596- .unwrap_or(did) 1597- .to_string(); 1598- let repo_name = rec 1599- .get("repoName") 1600- .and_then(|v| v.as_str()) 1601- .unwrap_or("") 1602- .to_string(); 1603- 1604- let row = db::ref_update::RefUpdateRow { 1605- id: 0, // auto-generated 1606- repo_did, 1607- repo_name, 1608- rkey: rkey.to_string(), 1609- committer_did: rec 1610- .get("committerDid") 1611- .and_then(|v| v.as_str()) 1612- .unwrap_or(did) 1613- .to_string(), 1614- ref_name: rec 1615- .get("ref") 1616- .and_then(|v| v.as_str()) 1617- .unwrap_or("") 1618- .to_string(), 1619- // Tangled uses oldSha/newSha — store git SHAs as-is 1620- old_target: rec.get("oldSha").and_then(|v| v.as_str()).map(String::from), 1621- new_target: rec 1622- .get("newSha") 1623- .and_then(|v| v.as_str()) 1624- .unwrap_or("") 1625- .to_string(), 1626- protocol: "git".to_string(), 1627- migration_id: None, 1628- breaking_change_count: 0, 1629- lens_id: None, 1630- lens_quality: None, 1631- commit_count: rec.get("commitCount").and_then(|v| v.as_i64()).unwrap_or(0) 1632- as i32, 1633- created_at: parse_datetime(rec, "createdAt"), 1634- indexed_at: Utc::now(), 1635- }; 959+ let row = db::ref_update::RefUpdateRow::from_tangled_json(did, rkey, rec); 1636960 db::ref_update::upsert(&state.db, &row).await?; 1637961 tracing::debug!(did, rkey, "indexed tangled git refUpdate"); 1638962 }
@@ -1,3 +1,4 @@
11 pub mod emit_rows; 22 pub mod morphism; 33 pub mod record_config; 4+pub mod tangled_interop;
@@ -12,6 +12,7 @@
1212 mod emit_rows; 1313 mod morphism; 1414 mod record_config; 15+mod tangled_interop; 1516 1617 use std::fs; 1718 use std::path::{Path, PathBuf};
@@ -180,11 +181,29 @@ fn main() -> Result<()> {
180181 } 181182 } 182183 184+ // --- Generate Tangled interop from_tangled_json() methods --- 185+ let tangled_morphisms = tangled_interop::all_tangled_morphisms(); 186+ let mut all_tangled_json = String::new(); 187+ all_tangled_json.push_str("// Generated by cospan-codegen — Tangled → Cospan interop\n"); 188+ all_tangled_json.push_str("// Do not edit manually.\n\n"); 189+ all_tangled_json.push_str("use super::types::*;\n\n"); 190+ 191+ match tangled_interop::emit_tangled_from_json(&tangled_morphisms) { 192+ Ok(code) => all_tangled_json.push_str(&code), 193+ Err(e) => eprintln!(" warn: emit_tangled_from_json: {e}"), 194+ } 195+ 196+ println!( 197+ " Generated {} Tangled interop morphisms", 198+ tangled_morphisms.len() 199+ ); 200+ 183201 // Write generated files (reference copies in generated/) 184202 fs::write(generated_dir.join("rust/types.rs"), &all_rust)?; 185203 fs::write(generated_dir.join("rust/row_types.rs"), &all_row_types)?; 186204 fs::write(generated_dir.join("rust/crud.rs"), &all_crud)?; 187205 fs::write(generated_dir.join("rust/from_json.rs"), &all_from_json)?; 206+ fs::write(generated_dir.join("rust/tangled_interop.rs"), &all_tangled_json)?; 188207 fs::write(generated_dir.join("sql/schema.sql"), &all_sql)?; 189208 fs::write(generated_dir.join("typescript/types.ts"), &all_ts)?; 190209
@@ -194,6 +213,7 @@ fn main() -> Result<()> {
194213 fs::write(appview_gen_dir.join("types.rs"), &all_row_types)?; 195214 fs::write(appview_gen_dir.join("crud.rs"), &all_crud)?; 196215 fs::write(appview_gen_dir.join("from_json.rs"), &all_from_json)?; 216+ fs::write(appview_gen_dir.join("tangled_interop.rs"), &all_tangled_json)?; 197217 fs::write(appview_gen_dir.join("mod.rs"), "\ 198218 // Generated by cospan-codegen — do not edit manually. 199219
@@ -203,6 +223,8 @@ pub mod types;
203223 pub mod crud; 204224 #[allow(dead_code, unused_imports)] 205225 pub mod from_json; 226+#[allow(dead_code, unused_imports)] 227+pub mod tangled_interop; 206228 207229 pub use types::*; 208230 ")?;
@@ -0,0 +1,445 @@
1+//! Tangled → Cospan interop morphisms. 2+//! 3+//! Each Tangled record type that maps to a Cospan database table gets a 4+//! `from_tangled_json()` method generated on the target Row type. 5+//! The morphism specifies how Tangled field names map to Cospan columns. 6+ 7+use anyhow::Result; 8+use panproto_protocols::emit::IndentWriter; 9+ 10+/// A field mapping from a Tangled Lexicon field to a Cospan Row column. 11+#[derive(Clone)] 12+pub enum FieldMapping { 13+ /// Direct copy: `rec.get("tangledField").as_str()` → column 14+ Direct { 15+ tangled_field: &'static str, 16+ required: bool, 17+ }, 18+ /// Constant value 19+ Constant(&'static str), 20+ /// Custom extraction expression (raw Rust code) 21+ Custom(&'static str), 22+} 23+ 24+/// Morphism from a Tangled record to a Cospan Row type. 25+pub struct TangledMorphism { 26+ pub tangled_nsid: &'static str, 27+ pub target_row: &'static str, 28+ pub target_table: &'static str, 29+ /// Ordered list of (column_name, mapping) pairs. 30+ /// Must match the Row struct field order. 31+ pub fields: Vec<(&'static str, FieldMapping)>, 32+ /// Source tracking 33+ pub source_label: &'static str, 34+} 35+ 36+pub fn all_tangled_morphisms() -> Vec<TangledMorphism> { 37+ vec![ 38+ TangledMorphism { 39+ tangled_nsid: "sh.tangled.feed.star", 40+ target_row: "StarRow", 41+ target_table: "stars", 42+ source_label: "tangled", 43+ fields: vec![ 44+ ("did", FieldMapping::Custom("did.to_string()")), 45+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 46+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 47+ ("subject", FieldMapping::Direct { tangled_field: "subject", required: true }), 48+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 49+ ], 50+ }, 51+ TangledMorphism { 52+ tangled_nsid: "sh.tangled.graph.follow", 53+ target_row: "FollowRow", 54+ target_table: "follows", 55+ source_label: "tangled", 56+ fields: vec![ 57+ ("did", FieldMapping::Custom("did.to_string()")), 58+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 59+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 60+ ("subject", FieldMapping::Direct { tangled_field: "subject", required: true }), 61+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 62+ ], 63+ }, 64+ TangledMorphism { 65+ tangled_nsid: "sh.tangled.feed.reaction", 66+ target_row: "ReactionRow", 67+ target_table: "reactions", 68+ source_label: "tangled", 69+ fields: vec![ 70+ ("did", FieldMapping::Custom("did.to_string()")), 71+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 72+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 73+ ("emoji", FieldMapping::Direct { tangled_field: "emoji", required: true }), 74+ ("subject", FieldMapping::Direct { tangled_field: "subject", required: true }), 75+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 76+ ], 77+ }, 78+ TangledMorphism { 79+ tangled_nsid: "sh.tangled.repo.issue", 80+ target_row: "IssueRow", 81+ target_table: "issues", 82+ source_label: "tangled", 83+ fields: vec![ 84+ ("did", FieldMapping::Custom("did.to_string()")), 85+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 86+ ("repo_did", FieldMapping::Custom("parse_at_uri_did(rec, \"repo\")")), 87+ ("repo_name", FieldMapping::Custom("parse_at_uri_name(rec, \"repo\")")), 88+ ("body", FieldMapping::Direct { tangled_field: "body", required: false }), 89+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 90+ ("title", FieldMapping::Direct { tangled_field: "title", required: true }), 91+ ("state", FieldMapping::Constant("\"open\".to_string()")), 92+ ("comment_count", FieldMapping::Constant("0")), 93+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 94+ ], 95+ }, 96+ TangledMorphism { 97+ tangled_nsid: "sh.tangled.repo.issue.state", 98+ target_row: "IssueStateRow", 99+ target_table: "issue_states", 100+ source_label: "tangled", 101+ fields: vec![ 102+ ("did", FieldMapping::Custom("did.to_string()")), 103+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 104+ ("issue_uri", FieldMapping::Direct { tangled_field: "issue", required: true }), 105+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 106+ ("reason", FieldMapping::Direct { tangled_field: "reason", required: false }), 107+ ("state", FieldMapping::Direct { tangled_field: "state", required: true }), 108+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 109+ ], 110+ }, 111+ TangledMorphism { 112+ tangled_nsid: "sh.tangled.repo.issue.comment", 113+ target_row: "IssueCommentRow", 114+ target_table: "issue_comments", 115+ source_label: "tangled", 116+ fields: vec![ 117+ ("did", FieldMapping::Custom("did.to_string()")), 118+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 119+ ("issue_uri", FieldMapping::Direct { tangled_field: "issue", required: true }), 120+ ("body", FieldMapping::Direct { tangled_field: "body", required: true }), 121+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 122+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 123+ ], 124+ }, 125+ TangledMorphism { 126+ tangled_nsid: "sh.tangled.repo.pull", 127+ target_row: "PullRow", 128+ target_table: "pulls", 129+ source_label: "tangled", 130+ fields: vec![ 131+ ("did", FieldMapping::Custom("did.to_string()")), 132+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 133+ ("repo_did", FieldMapping::Custom("parse_at_uri_did(rec, \"repo\")")), 134+ ("repo_name", FieldMapping::Custom("parse_at_uri_name(rec, \"repo\")")), 135+ ("body", FieldMapping::Direct { tangled_field: "body", required: false }), 136+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 137+ ("source_ref", FieldMapping::Direct { tangled_field: "sourceBranch", required: true }), 138+ ("source_repo", FieldMapping::Direct { tangled_field: "sourceRepo", required: false }), 139+ ("target_ref", FieldMapping::Direct { tangled_field: "targetBranch", required: true }), 140+ ("title", FieldMapping::Direct { tangled_field: "title", required: true }), 141+ ("state", FieldMapping::Constant("\"open\".to_string()")), 142+ ("comment_count", FieldMapping::Constant("0")), 143+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 144+ ], 145+ }, 146+ TangledMorphism { 147+ tangled_nsid: "sh.tangled.repo.pull.status", 148+ target_row: "PullStateRow", 149+ target_table: "pull_states", 150+ source_label: "tangled", 151+ fields: vec![ 152+ ("did", FieldMapping::Custom("did.to_string()")), 153+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 154+ ("pull_uri", FieldMapping::Direct { tangled_field: "pull", required: true }), 155+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 156+ ("merge_commit_id", FieldMapping::Direct { tangled_field: "mergeCommitId", required: false }), 157+ ("state", FieldMapping::Custom("{\ 158+ let s = rec.get(\"state\").and_then(|v| v.as_str()).unwrap_or(\"open\");\ 159+ match s { \"merged\" => \"merged\", \"closed\" => \"closed\", _ => \"open\" }.to_string()\ 160+ }")), 161+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 162+ ], 163+ }, 164+ TangledMorphism { 165+ tangled_nsid: "sh.tangled.repo.pull.comment", 166+ target_row: "PullCommentRow", 167+ target_table: "pull_comments", 168+ source_label: "tangled", 169+ fields: vec![ 170+ ("did", FieldMapping::Custom("did.to_string()")), 171+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 172+ ("pull_uri", FieldMapping::Direct { tangled_field: "pull", required: true }), 173+ ("body", FieldMapping::Direct { tangled_field: "body", required: true }), 174+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 175+ ("review_decision", FieldMapping::Direct { tangled_field: "reviewDecision", required: false }), 176+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 177+ ], 178+ }, 179+ TangledMorphism { 180+ tangled_nsid: "sh.tangled.repo.collaborator", 181+ target_row: "CollaboratorRow", 182+ target_table: "collaborators", 183+ source_label: "tangled", 184+ fields: vec![ 185+ ("did", FieldMapping::Custom("did.to_string()")), 186+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 187+ ("repo_did", FieldMapping::Custom("parse_at_uri_did(rec, \"repo\")")), 188+ ("repo_name", FieldMapping::Custom("parse_at_uri_name(rec, \"repo\")")), 189+ // Tangled uses "subject" for the collaborator DID, Cospan uses "did" → "member_did" 190+ ("member_did", FieldMapping::Direct { tangled_field: "subject", required: true }), 191+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 192+ ("role", FieldMapping::Constant("\"contributor\".to_string()")), 193+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 194+ ], 195+ }, 196+ TangledMorphism { 197+ tangled_nsid: "sh.tangled.knot", 198+ target_row: "NodeRow", 199+ target_table: "nodes", 200+ source_label: "tangled", 201+ fields: vec![ 202+ ("did", FieldMapping::Custom("did.to_string()")), 203+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 204+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 205+ ("public_endpoint", FieldMapping::Custom("{\ 206+ let hostname = rec.get(\"hostname\").and_then(|v| v.as_str()).unwrap_or(\"\");\ 207+ if hostname.is_empty() { None } else { Some(format!(\"https://{hostname}\")) }\ 208+ }")), 209+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 210+ ], 211+ }, 212+ // sh.tangled.spindle also maps to NodeRow but uses from_tangled_spindle_json 213+ // to avoid duplicate impl with sh.tangled.knot 214+ TangledMorphism { 215+ tangled_nsid: "sh.tangled.spindle", 216+ target_row: "NodeRow", 217+ target_table: "nodes", 218+ source_label: "tangled", 219+ fields: vec![], // handled specially below 220+ }, 221+ TangledMorphism { 222+ tangled_nsid: "sh.tangled.actor.profile", 223+ target_row: "ActorProfileRow", 224+ target_table: "actor_profiles", 225+ source_label: "tangled", 226+ fields: vec![ 227+ ("did", FieldMapping::Custom("did.to_string()")), 228+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 229+ // Tangled bluesky is a boolean; we store the handle or empty string 230+ ("bluesky", FieldMapping::Custom("{\ 231+ match rec.get(\"bluesky\") {\ 232+ Some(serde_json::Value::Bool(true)) => did.to_string(),\ 233+ Some(serde_json::Value::String(s)) => s.clone(),\ 234+ _ => String::new(),\ 235+ }\ 236+ }")), 237+ ("description", FieldMapping::Direct { tangled_field: "description", required: false }), 238+ ("display_name", FieldMapping::Custom("None")), // Tangled profiles don't have displayName 239+ ("avatar_cid", FieldMapping::Custom("\ 240+ rec.get(\"avatar\")\ 241+ .and_then(|v| v.get(\"ref\"))\ 242+ .and_then(|v| v.get(\"$link\"))\ 243+ .and_then(|v| v.as_str())\ 244+ .map(String::from)")), 245+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 246+ ], 247+ }, 248+ TangledMorphism { 249+ tangled_nsid: "sh.tangled.repo", 250+ target_row: "RepoRow", 251+ target_table: "repos", 252+ source_label: "tangled", 253+ fields: vec![ 254+ ("did", FieldMapping::Custom("did.to_string()")), 255+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 256+ // Tangled uses "knot" (hostname), not AT-URI node 257+ ("node_did", FieldMapping::Custom("{\ 258+ let knot = rec.get(\"knot\").and_then(|v| v.as_str()).unwrap_or(\"\");\ 259+ if knot.is_empty() { String::new() } else { format!(\"did:web:{knot}\") }\ 260+ }")), 261+ ("node_url", FieldMapping::Custom("{\ 262+ let knot = rec.get(\"knot\").and_then(|v| v.as_str()).unwrap_or(\"\");\ 263+ if knot.is_empty() { String::new() } else { format!(\"https://{knot}\") }\ 264+ }")), 265+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 266+ ("default_branch", FieldMapping::Constant("Some(\"main\".to_string())")), 267+ ("description", FieldMapping::Direct { tangled_field: "description", required: false }), 268+ ("name", FieldMapping::Direct { tangled_field: "name", required: true }), 269+ ("protocol", FieldMapping::Constant("\"git\".to_string()")), 270+ ("source_repo", FieldMapping::Custom("None")), 271+ ("visibility", FieldMapping::Constant("Some(\"public\".to_string())")), 272+ ("star_count", FieldMapping::Constant("0")), 273+ ("fork_count", FieldMapping::Constant("0")), 274+ ("open_issue_count", FieldMapping::Constant("0")), 275+ ("open_mr_count", FieldMapping::Constant("0")), 276+ ("source", FieldMapping::Constant("\"tangled\".to_string()")), 277+ ("source_uri", FieldMapping::Custom("Some(format!(\"at://{did}/sh.tangled.repo/{rkey}\"))")), 278+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 279+ ], 280+ }, 281+ TangledMorphism { 282+ tangled_nsid: "sh.tangled.knot.member", 283+ target_row: "OrgMemberRow", 284+ target_table: "org_members", 285+ source_label: "tangled", 286+ fields: vec![ 287+ ("did", FieldMapping::Custom("did.to_string()")), 288+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 289+ ("org_uri", FieldMapping::Direct { tangled_field: "knot", required: true }), 290+ ("member_did", FieldMapping::Direct { tangled_field: "member", required: true }), 291+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 292+ ("role", FieldMapping::Direct { tangled_field: "role", required: true }), 293+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 294+ ], 295+ }, 296+ TangledMorphism { 297+ tangled_nsid: "sh.tangled.spindle.member", 298+ target_row: "OrgMemberRow", 299+ target_table: "org_members", 300+ source_label: "tangled", 301+ fields: vec![ 302+ ("did", FieldMapping::Custom("did.to_string()")), 303+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 304+ ("org_uri", FieldMapping::Direct { tangled_field: "spindle", required: true }), 305+ ("member_did", FieldMapping::Direct { tangled_field: "member", required: true }), 306+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 307+ ("role", FieldMapping::Direct { tangled_field: "role", required: true }), 308+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 309+ ], 310+ }, 311+ TangledMorphism { 312+ tangled_nsid: "sh.tangled.label.definition", 313+ target_row: "LabelRow", 314+ target_table: "labels", 315+ source_label: "tangled", 316+ fields: vec![ 317+ ("did", FieldMapping::Custom("did.to_string()")), 318+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 319+ // Tangled labels don't have a repo AT-URI directly, infer from context 320+ ("repo_did", FieldMapping::Custom("did.to_string()")), 321+ ("repo_name", FieldMapping::Constant("String::new()")), 322+ ("color", FieldMapping::Custom("rec.get(\"color\").and_then(|v| v.as_str()).unwrap_or(\"#6b7280\").to_string()")), 323+ ("created_at", FieldMapping::Custom("parse_datetime(rec, \"createdAt\")")), 324+ ("description", FieldMapping::Custom("rec.get(\"name\").and_then(|v| v.as_str()).map(String::from)")), 325+ ("name", FieldMapping::Direct { tangled_field: "name", required: true }), 326+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 327+ ], 328+ }, 329+ TangledMorphism { 330+ tangled_nsid: "sh.tangled.pipeline", 331+ target_row: "PipelineRow", 332+ target_table: "pipelines", 333+ source_label: "tangled", 334+ fields: vec![ 335+ ("did", FieldMapping::Custom("did.to_string()")), 336+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 337+ // Extract repo info from triggerMetadata.repo 338+ ("repo_did", FieldMapping::Custom("rec.get(\"triggerMetadata\").and_then(|t| t.get(\"repo\")).and_then(|r| r.get(\"did\")).and_then(|v| v.as_str()).unwrap_or(\"\").to_string()")), 339+ ("repo_name", FieldMapping::Custom("rec.get(\"triggerMetadata\").and_then(|t| t.get(\"repo\")).and_then(|r| r.get(\"repo\")).and_then(|v| v.as_str()).unwrap_or(\"\").to_string()")), 340+ // Extract commit ID from push trigger data 341+ ("commit_id", FieldMapping::Custom("rec.get(\"triggerMetadata\").and_then(|t| t.get(\"push\")).and_then(|p| p.get(\"newSha\")).and_then(|v| v.as_str()).unwrap_or(\"\").to_string()")), 342+ ("completed_at", FieldMapping::Custom("None")), 343+ ("created_at", FieldMapping::Custom("chrono::Utc::now()")), 344+ ("ref_name", FieldMapping::Custom("rec.get(\"triggerMetadata\").and_then(|t| t.get(\"push\")).and_then(|p| p.get(\"ref\")).and_then(|v| v.as_str()).map(String::from)")), 345+ ("status", FieldMapping::Constant("\"pending\".to_string()")), 346+ ("gat_type_check", FieldMapping::Constant("Some(\"skipped\".to_string())")), 347+ ("equation_verification", FieldMapping::Constant("Some(\"skipped\".to_string())")), 348+ ("lens_law_check", FieldMapping::Constant("Some(\"skipped\".to_string())")), 349+ ("breaking_change_check", FieldMapping::Constant("Some(\"skipped\".to_string())")), 350+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 351+ ], 352+ }, 353+ TangledMorphism { 354+ tangled_nsid: "sh.tangled.git.refUpdate", 355+ target_row: "RefUpdateRow", 356+ target_table: "ref_updates", 357+ source_label: "tangled", 358+ fields: vec![ 359+ ("id", FieldMapping::Constant("0")), 360+ // No did column for ref_updates (include_did: false) 361+ ("rkey", FieldMapping::Custom("rkey.to_string()")), 362+ // Tangled uses repoDid/repoName directly instead of AT-URI 363+ ("repo_did", FieldMapping::Direct { tangled_field: "repoDid", required: true }), 364+ ("repo_name", FieldMapping::Direct { tangled_field: "repoName", required: true }), 365+ ("commit_count", FieldMapping::Constant("1")), 366+ ("committer_did", FieldMapping::Direct { tangled_field: "committerDid", required: true }), 367+ ("created_at", FieldMapping::Custom("chrono::Utc::now()")), 368+ ("lens_id", FieldMapping::Custom("None")), 369+ ("lens_quality", FieldMapping::Custom("None")), 370+ ("migration_id", FieldMapping::Custom("None")), 371+ ("new_target", FieldMapping::Direct { tangled_field: "newSha", required: true }), 372+ ("old_target", FieldMapping::Direct { tangled_field: "oldSha", required: false }), 373+ ("protocol", FieldMapping::Constant("\"git\".to_string()")), 374+ ("ref_name", FieldMapping::Direct { tangled_field: "ref", required: true }), 375+ ("breaking_change_count", FieldMapping::Constant("0")), 376+ ("indexed_at", FieldMapping::Custom("chrono::Utc::now()")), 377+ ], 378+ }, 379+ ] 380+} 381+ 382+/// Emit `from_tangled_json()` methods for all morphisms. 383+pub fn emit_tangled_from_json(morphisms: &[TangledMorphism]) -> Result<String> { 384+ let mut w = IndentWriter::new(" "); 385+ 386+ // Track which row types already have from_tangled_json to avoid duplicate impls 387+ let mut seen_rows = std::collections::HashSet::new(); 388+ 389+ for m in morphisms { 390+ if m.fields.is_empty() { 391+ // Skip morphisms without field mappings (handled by another morphism) 392+ w.line(&format!("// {} → {} (uses same from_tangled_json as knot)", m.tangled_nsid, m.target_row)); 393+ w.blank(); 394+ continue; 395+ } 396+ if !seen_rows.insert(m.target_row) { 397+ // Skip duplicate implementations for the same target row 398+ w.line(&format!("// {} → {} (skipped: duplicate impl)", m.tangled_nsid, m.target_row)); 399+ w.blank(); 400+ continue; 401+ } 402+ w.line(&format!("// {} → {}", m.tangled_nsid, m.target_row)); 403+ w.line(&format!("impl {} {{", m.target_row)); 404+ w.indent(); 405+ w.line(&format!( 406+ "/// Deserialize a {} Jetstream record into a Cospan row.", 407+ m.tangled_nsid 408+ )); 409+ w.line("pub fn from_tangled_json(did: &str, rkey: &str, rec: &serde_json::Value) -> Self {"); 410+ w.indent(); 411+ w.line("Self {"); 412+ w.indent(); 413+ 414+ for (col_name, mapping) in &m.fields { 415+ match mapping { 416+ FieldMapping::Direct { tangled_field, required: true } => { 417+ w.line(&format!( 418+ "{col_name}: rec.get(\"{tangled_field}\").and_then(|v| v.as_str()).unwrap_or(\"\").to_string()," 419+ )); 420+ } 421+ FieldMapping::Direct { tangled_field, required: false } => { 422+ w.line(&format!( 423+ "{col_name}: rec.get(\"{tangled_field}\").and_then(|v| v.as_str()).map(String::from)," 424+ )); 425+ } 426+ FieldMapping::Constant(val) => { 427+ w.line(&format!("{col_name}: {val},")); 428+ } 429+ FieldMapping::Custom(expr) => { 430+ w.line(&format!("{col_name}: {expr},")); 431+ } 432+ } 433+ } 434+ 435+ w.dedent(); 436+ w.line("}"); 437+ w.dedent(); 438+ w.line("}"); 439+ w.dedent(); 440+ w.line("}"); 441+ w.blank(); 442+ } 443+ 444+ Ok(w.finish()) 445+}