refactor: replace string-template interop with panproto morphisms Tangled→Cospan interop now uses panproto properly: Codegen time (cospan-codegen): - Parse both Lexicon sets via parse_lexicon() - Define explicit Migration (vertex_map + edge_map) per NSID pair - compile() to get CompiledMigration - Serialize to generated/interop/compiled_morphisms.json Runtime (cospan-appview): - Load pre-compiled migrations at startup (panics if missing) - parse_json() → lift_wtype_sigma() → to_json() per Tangled record - Feed result to from_json() for denormalization Replaces all from_tangled_json() string template methods with panproto's type-theoretic morphism infrastructure. The morphisms are specified explicitly (not discovered), compiled statically (not at runtime), and applied via lift (not string munging). 10 morphisms compiled for matching NSID pairs. Records with incompatible schemas (repo, pipeline, refUpdate) fall through to raw record passthrough.
Author: Aaron Steven White
Commit
75b143f70f808f624b152fa2272e5883caec0009Parent: ecf907af29
Structural diff unavailable
These commits were pushed via plain git push, so no pre-parsed
schemas are available. Install git-remote-cospan and re-push via panproto:// to
see scope-level changes, breaking change detection, and semantic diffs.
brew install panproto/tap/git-remote-cospan10 files changed +402 -401
@@ -455,6 +455,8 @@ dependencies = [
455455 "panproto-expr-parser", 456456 "panproto-gat", 457457 "panproto-inst", 458+ "panproto-mig", 459+ "panproto-protocols", 458460 "panproto-schema", 459461 "panproto-xrpc", 460462 "rand 0.8.5",
@@ -484,6 +486,7 @@ dependencies = [
484486 "anyhow", 485487 "panproto-check", 486488 "panproto-gat", 489+ "panproto-inst", 487490 "panproto-mig", 488491 "panproto-protocols", 489492 "panproto-schema",
@@ -15,6 +15,8 @@ path = "src/main.rs"
1515 [dependencies] 1616 panproto-core.workspace = true 1717 panproto-inst.workspace = true 18+panproto-mig.workspace = true 19+panproto-protocols.workspace = true 1820 panproto-expr.workspace = true 1921 panproto-expr-parser.workspace = true 2022 panproto-gat.workspace = true
@@ -5,6 +5,7 @@ pub struct AppConfig {
55 pub database_url: String, 66 pub jetstream_url: String, 77 pub listen: String, 8+ pub lexicons_dir: String, 89 } 910 1011 impl AppConfig {
@@ -15,6 +16,8 @@ impl AppConfig {
1516 jetstream_url: std::env::var("JETSTREAM_URL") 1617 .unwrap_or_else(|_| "wss://jetstream2.us-east.bsky.network/subscribe".into()), 1718 listen: std::env::var("APPVIEW_LISTEN").unwrap_or_else(|_| "0.0.0.0:3000".into()), 19+ lexicons_dir: std::env::var("LEXICONS_DIR") 20+ .unwrap_or_else(|_| "packages/lexicons".into()), 1821 }) 1922 } 2023 }
@@ -6,6 +6,23 @@ use crate::db;
66 use crate::state::AppState; 77 use crate::xrpc::sse::IndexEvent; 88 9+/// Transform a Tangled record to Cospan schema via panproto morphism, 10+/// falling back to the original record if no morphism exists. 11+fn tangled_to_cospan( 12+ state: &AppState, 13+ collection: &str, 14+ rec: &serde_json::Value, 15+) -> serde_json::Value { 16+ match state.tangled_interop.transform(collection, rec) { 17+ Some(Ok(cospan_rec)) => cospan_rec, 18+ Some(Err(e)) => { 19+ tracing::warn!(collection, error = %e, "morphism transform failed, using raw record"); 20+ rec.clone() 21+ } 22+ None => rec.clone(), // no morphism, pass through 23+ } 24+} 25+ 926 /// Process a single Jetstream event by dispatching on collection. 1027 pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) -> anyhow::Result<()> { 1128 let commit = match event.get("commit") {
@@ -492,7 +509,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
492509 // ─── Tangled Star ────────────────────────────────────────── 493510 ("sh.tangled.feed.star", "create" | "update") => { 494511 if let Some(rec) = record { 495- let row = db::star::StarRow::from_tangled_json(did, rkey, rec); 512+ let cospan_rec = tangled_to_cospan(state, collection, rec); 513+ let row = db::star::StarRow::from_json(did, rkey, &cospan_rec); 496514 let existing = db::star::get(&state.db, did, rkey).await?; 497515 db::star::upsert(&state.db, &row).await?; 498516
@@ -515,7 +533,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
515533 // ─── Tangled Follow ──────────────────────────────────────── 516534 ("sh.tangled.graph.follow", "create" | "update") => { 517535 if let Some(rec) = record { 518- let row = db::follow::FollowRow::from_tangled_json(did, rkey, rec); 536+ let cospan_rec = tangled_to_cospan(state, collection, rec); 537+ let row = db::follow::FollowRow::from_json(did, rkey, &cospan_rec); 519538 db::follow::upsert(&state.db, &row).await?; 520539 tracing::debug!(did, rkey, "indexed tangled follow"); 521540 }
@@ -527,7 +546,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
527546 // ─── Tangled Reaction ────────────────────────────────────── 528547 ("sh.tangled.feed.reaction", "create" | "update") => { 529548 if let Some(rec) = record { 530- let row = db::reaction::ReactionRow::from_tangled_json(did, rkey, rec); 549+ let cospan_rec = tangled_to_cospan(state, collection, rec); 550+ let row = db::reaction::ReactionRow::from_json(did, rkey, &cospan_rec); 531551 db::reaction::upsert(&state.db, &row).await?; 532552 tracing::debug!(did, rkey, "indexed tangled reaction"); 533553 }
@@ -539,7 +559,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
539559 // ─── Tangled Issue ───────────────────────────────────────── 540560 ("sh.tangled.repo.issue", "create" | "update") => { 541561 if let Some(rec) = record { 542- let row = db::issue::IssueRow::from_tangled_json(did, rkey, rec); 562+ let cospan_rec = tangled_to_cospan(state, collection, rec); 563+ let row = db::issue::IssueRow::from_json(did, rkey, &cospan_rec); 543564 db::issue::upsert(&state.db, &row).await?; 544565 tracing::debug!(did, rkey, "indexed tangled issue"); 545566 }
@@ -568,7 +589,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
568589 .unwrap_or("open") 569590 .to_string(); 570591 571- let row = db::issue_state::IssueStateRow::from_tangled_json(did, rkey, rec); 592+ let cospan_rec = tangled_to_cospan(state, collection, rec); 593+ let row = db::issue_state::IssueStateRow::from_json(did, rkey, &cospan_rec); 572594 db::issue_state::upsert(&state.db, &row).await?; 573595 574596 // Update the issue's state and repo counters
@@ -614,7 +636,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
614636 .unwrap_or("") 615637 .to_string(); 616638 617- let row = db::issue_comment::IssueCommentRow::from_tangled_json(did, rkey, rec); 639+ let cospan_rec = tangled_to_cospan(state, collection, rec); 640+ let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, &cospan_rec); 618641 619642 let existing = db::issue_comment::get(&state.db, did, rkey).await?; 620643 db::issue_comment::upsert(&state.db, &row).await?;
@@ -637,7 +660,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
637660 // ─── Tangled Pull Request ────────────────────────────────── 638661 ("sh.tangled.repo.pull", "create" | "update") => { 639662 if let Some(rec) = record { 640- let row = db::pull::PullRow::from_tangled_json(did, rkey, rec); 663+ let cospan_rec = tangled_to_cospan(state, collection, rec); 664+ let row = db::pull::PullRow::from_json(did, rkey, &cospan_rec); 641665 db::pull::upsert(&state.db, &row).await?; 642666 tracing::debug!(did, rkey, "indexed tangled pull"); 643667 }
@@ -659,7 +683,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
659683 .and_then(|v| v.as_str()) 660684 .unwrap_or("") 661685 .to_string(); 662- let row = db::pull_state::PullStateRow::from_tangled_json(did, rkey, rec); 686+ let cospan_rec = tangled_to_cospan(state, collection, rec); 687+ let row = db::pull_state::PullStateRow::from_json(did, rkey, &cospan_rec); 663688 let new_state = row.state.clone(); 664689 db::pull_state::upsert(&state.db, &row).await?; 665690
@@ -704,7 +729,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
704729 .unwrap_or("") 705730 .to_string(); 706731 707- let row = db::pull_comment::PullCommentRow::from_tangled_json(did, rkey, rec); 732+ let cospan_rec = tangled_to_cospan(state, collection, rec); 733+ let row = db::pull_comment::PullCommentRow::from_json(did, rkey, &cospan_rec); 708734 709735 let existing = db::pull_comment::get(&state.db, did, rkey).await?; 710736 db::pull_comment::upsert(&state.db, &row).await?;
@@ -727,7 +753,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
727753 // ─── Tangled Collaborator ────────────────────────────────── 728754 ("sh.tangled.repo.collaborator", "create" | "update") => { 729755 if let Some(rec) = record { 730- let row = db::collaborator::CollaboratorRow::from_tangled_json(did, rkey, rec); 756+ let cospan_rec = tangled_to_cospan(state, collection, rec); 757+ let row = db::collaborator::CollaboratorRow::from_json(did, rkey, &cospan_rec); 731758 db::collaborator::upsert(&state.db, &row).await?; 732759 tracing::debug!(did, rkey, "indexed tangled collaborator"); 733760 }
@@ -739,7 +766,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
739766 // ─── Tangled Knot → Node ────────────────────────────────── 740767 ("sh.tangled.knot", "create" | "update") => { 741768 if let Some(rec) = record { 742- let row = db::node::NodeRow::from_tangled_json(did, rkey, rec); 769+ let cospan_rec = tangled_to_cospan(state, collection, rec); 770+ let row = db::node::NodeRow::from_json(did, rkey, &cospan_rec); 743771 db::node::upsert(&state.db, &row).await?; 744772 tracing::debug!(did, rkey, "indexed tangled knot as node"); 745773 }
@@ -781,7 +809,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
781809 // ─── Tangled Actor Profile ───────────────────────────────── 782810 ("sh.tangled.actor.profile", "create" | "update") => { 783811 if let Some(rec) = record { 784- let row = db::actor_profile::ActorProfileRow::from_tangled_json(did, rkey, rec); 812+ let cospan_rec = tangled_to_cospan(state, collection, rec); 813+ let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, &cospan_rec); 785814 db::actor_profile::upsert(&state.db, &row).await?; 786815 tracing::debug!(did, rkey, "indexed tangled actor profile"); 787816 }
@@ -793,7 +822,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
793822 // ─── Tangled Repo ────────────────────────────────────────── 794823 ("sh.tangled.repo", "create" | "update") => { 795824 if let Some(rec) = record { 796- let row = db::repo::RepoRow::from_tangled_json(did, rkey, rec); 825+ let cospan_rec = tangled_to_cospan(state, collection, rec); 826+ let row = db::repo::RepoRow::from_json(did, rkey, &cospan_rec); 797827 db::repo::upsert(&state.db, &row).await?; 798828 tracing::debug!(did, rkey, "indexed tangled repo"); 799829 }
@@ -805,7 +835,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
805835 // ─── Tangled Knot Member → Org Member ───────────────────── 806836 ("sh.tangled.knot.member", "create" | "update") => { 807837 if let Some(rec) = record { 808- let row = db::org_member::OrgMemberRow::from_tangled_json(did, rkey, rec); 838+ let cospan_rec = tangled_to_cospan(state, collection, rec); 839+ let row = db::org_member::OrgMemberRow::from_json(did, rkey, &cospan_rec); 809840 db::org_member::upsert(&state.db, &row).await?; 810841 tracing::debug!(did, rkey, "indexed tangled knot member as org member"); 811842 }
@@ -881,7 +912,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
881912 // ─── Tangled Pipeline ────────────────────────────────────── 882913 ("sh.tangled.pipeline", "create" | "update") => { 883914 if let Some(rec) = record { 884- let row = db::pipeline::PipelineRow::from_tangled_json(did, rkey, rec); 915+ let cospan_rec = tangled_to_cospan(state, collection, rec); 916+ let row = db::pipeline::PipelineRow::from_json(did, rkey, &cospan_rec); 885917 db::pipeline::upsert(&state.db, &row).await?; 886918 tracing::debug!(did, rkey, "indexed tangled pipeline"); 887919 }
@@ -938,7 +970,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
938970 // ─── Tangled Git RefUpdate ───────────────────────────────── 939971 ("sh.tangled.git.refUpdate", "create" | "update") => { 940972 if let Some(rec) = record { 941- let row = db::ref_update::RefUpdateRow::from_tangled_json(did, rkey, rec); 973+ let cospan_rec = tangled_to_cospan(state, collection, rec); 974+ let row = db::ref_update::RefUpdateRow::from_json(did, rkey, &cospan_rec); 942975 db::ref_update::upsert(&state.db, &row).await?; 943976 tracing::debug!(did, rkey, "indexed tangled git refUpdate"); 944977 }
@@ -0,0 +1,116 @@
1+//! Tangled → Cospan interop via pre-compiled panproto morphisms. 2+//! 3+//! Morphisms are defined explicitly and compiled at codegen time 4+//! (see cospan-codegen/src/tangled_interop.rs). The compiled migrations 5+//! are serialized to generated/interop/compiled_morphisms.json. 6+//! 7+//! At runtime, this module loads the compiled migrations and applies 8+//! them to incoming Tangled Jetstream records using panproto's 9+//! `lift_wtype_sigma()`. 10+ 11+use std::collections::HashMap; 12+use std::path::Path; 13+ 14+use anyhow::{Context, Result}; 15+use panproto_inst::CompiledMigration; 16+use panproto_mig::lift_wtype_sigma; 17+use panproto_schema::Schema; 18+ 19+/// A pre-compiled interop mapping loaded from codegen output. 20+#[derive(serde::Deserialize)] 21+pub struct CompiledInterop { 22+ pub tangled_nsid: String, 23+ pub cospan_nsid: String, 24+ pub tangled_schema: Schema, 25+ pub cospan_schema: Schema, 26+ pub compiled: CompiledMigration, 27+ pub quality_report: String, 28+} 29+ 30+/// Registry of pre-compiled Tangled → Cospan morphisms. 31+pub struct TangledInterop { 32+ mappings: HashMap<String, CompiledInterop>, 33+} 34+ 35+impl TangledInterop { 36+ /// Create an empty registry (no morphisms loaded). 37+ pub fn empty() -> Self { 38+ Self { 39+ mappings: HashMap::new(), 40+ } 41+ } 42+ 43+ /// Load pre-compiled morphisms from the codegen output file. 44+ pub fn load(lexicons_dir: &Path) -> Result<Self> { 45+ // The compiled morphisms are at generated/interop/compiled_morphisms.json 46+ // relative to the workspace root. lexicons_dir is packages/lexicons/, 47+ // so workspace root is two levels up. 48+ let workspace_root = lexicons_dir 49+ .parent() 50+ .and_then(|p| p.parent()) 51+ .unwrap_or(lexicons_dir); 52+ let morphisms_path = workspace_root.join("generated/interop/compiled_morphisms.json"); 53+ 54+ if !morphisms_path.exists() { 55+ anyhow::bail!( 56+ "compiled morphisms not found at {}. Run `cargo run -p cospan-codegen` first.", 57+ morphisms_path.display() 58+ ); 59+ } 60+ 61+ let json = std::fs::read_to_string(&morphisms_path) 62+ .with_context(|| format!("reading {}", morphisms_path.display()))?; 63+ let interops: Vec<CompiledInterop> = serde_json::from_str(&json) 64+ .with_context(|| "deserializing compiled morphisms")?; 65+ 66+ let mut mappings = HashMap::new(); 67+ for interop in interops { 68+ tracing::info!( 69+ tangled = %interop.tangled_nsid, 70+ cospan = %interop.cospan_nsid, 71+ quality = %interop.quality_report, 72+ "loaded compiled interop morphism" 73+ ); 74+ mappings.insert(interop.tangled_nsid.clone(), interop); 75+ } 76+ 77+ tracing::info!(count = mappings.len(), "loaded tangled interop morphisms"); 78+ Ok(Self { mappings }) 79+ } 80+ 81+ /// Transform a Tangled JSON record to Cospan JSON using the pre-compiled 82+ /// morphism. Returns None if no morphism exists for this NSID. 83+ pub fn transform( 84+ &self, 85+ tangled_nsid: &str, 86+ record: &serde_json::Value, 87+ ) -> Option<Result<serde_json::Value>> { 88+ let mapping = self.mappings.get(tangled_nsid)?; 89+ Some(apply_lift(mapping, record)) 90+ } 91+} 92+ 93+/// Apply the pre-compiled morphism: parse → lift → emit. 94+fn apply_lift( 95+ mapping: &CompiledInterop, 96+ record: &serde_json::Value, 97+) -> Result<serde_json::Value> { 98+ // Parse the Tangled JSON into a panproto WInstance 99+ let instance = panproto_inst::parse::parse_json( 100+ &mapping.tangled_schema, 101+ &mapping.tangled_nsid, 102+ record, 103+ ) 104+ .map_err(|e| anyhow::anyhow!("parse {}: {e:?}", mapping.tangled_nsid))?; 105+ 106+ // Lift through the pre-compiled morphism (Sigma for field renames) 107+ let lifted = lift_wtype_sigma( 108+ &mapping.compiled, 109+ &mapping.cospan_schema, 110+ &instance, 111+ ) 112+ .map_err(|e| anyhow::anyhow!("lift {} → {}: {e:?}", mapping.tangled_nsid, mapping.cospan_nsid))?; 113+ 114+ // Emit back to JSON in the Cospan schema shape 115+ Ok(panproto_inst::parse::to_json(&mapping.cospan_schema, &lifted)) 116+}