refactor: DB projections via panproto expression language Replace hand-coded from_json() string templates with panproto FieldTransforms compiled at codegen time: - AT-URI decomposition: ComputeField with split/replace/head/index expressions (e.g., repo → repo_did, repo_name) - Field renames: RenameField (e.g., issue → issue_uri, member → member_did) - Nested extraction: ComputeField with Field path traversal (e.g., avatar.ref.$link → avatar_cid) - Default values: AddField with Value::Str/Value::Int (e.g., state="open", comment_count=0) - Field drops: DropField for JSON arrays not stored in DB All transforms are compiled into CompiledMigration.field_transforms at codegen time and applied at runtime via lift_wtype_sigma(). Unified RecordTransformer replaces separate TangledInterop + SchemaRegistry. Single transform() call handles both Cospan (DB projection) and Tangled (interop morphism + DB projection) records through the same panproto pipeline. from_json() methods still exist for final Value→Row struct mapping but receive schema-validated, field-transformed JSON — not raw Jetstream payloads.

Author: Aaron Steven White
Commit f00c2c3f6e3b2731bb1de18a34ac54a088c36620
Parent: 0c13611376
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
11 files changed +579 -216
@@ -485,6 +485,7 @@ version = "0.3.2"
485485 dependencies = [
486486  "anyhow",
487487  "panproto-check",
488+ "panproto-expr",
488489  "panproto-gat",
489490  "panproto-inst",
490491  "panproto-mig",
@@ -6,31 +6,17 @@ use crate::db;
66 use crate::state::AppState;
77 use crate::xrpc::sse::IndexEvent;
88 
9-/// Parse a record through panproto's schema-driven parser.
10-/// For Cospan records, validates and normalizes via the Lexicon schema.
11-/// For Tangled records, applies the compiled morphism first.
12-fn parse_record(
9+/// Transform a record through the pre-compiled panproto morphism.
10+/// Handles both Cospan (DB projection) and Tangled (interop + DB projection).
11+fn transform_record(
1312     state: &AppState,
1413     collection: &str,
1514     rec: &serde_json::Value,
1615 ) -> serde_json::Value {
17-    // For Tangled records, try morphism transform first
18-    if collection.starts_with("sh.tangled.") {
19-        if let Some(result) = state.tangled_interop.transform(collection, rec) {
20-            match result {
21-                Ok(cospan_rec) => return cospan_rec,
22-                Err(e) => {
23-                    tracing::warn!(collection, error = %e, "morphism transform failed");
24-                }
25-            }
26-        }
27-    }
28-
29-    // Parse via Lexicon schema (Cospan records, or Tangled fallthrough)
30-    match state.schemas.parse_record(collection, rec) {
31-        Some(Ok(parsed)) => parsed,
16+    match state.transformer.transform(collection, rec) {
17+        Some(Ok(transformed)) => transformed,
3218         Some(Err(e)) => {
33-            tracing::debug!(collection, error = %e, "schema parse failed, using raw");
19+            tracing::warn!(collection, error = %e, "panproto transform failed, using raw");
3420             rec.clone()
3521         }
3622         None => rec.clone(),
@@ -60,7 +46,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
6046         // ─── Node ───────────────────────────────────────────────────
6147         ("dev.cospan.node", "create" | "update") => {
6248             if let Some(rec) = record {
63-                let row = db::node::NodeRow::from_json(did, rkey, &parse_record(state, collection, rec));
49+                let row = db::node::NodeRow::from_json(did, rkey, &transform_record(state, collection, rec));
6450                 db::node::upsert(&state.db, &row).await?;
6551             }
6652         }
@@ -71,7 +57,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
7157         // ─── Actor Profile ──────────────────────────────────────────
7258         ("dev.cospan.actor.profile", "create" | "update") => {
7359             if let Some(rec) = record {
74-                let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, &parse_record(state, collection, rec));
60+                let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, &transform_record(state, collection, rec));
7561                 db::actor_profile::upsert(&state.db, &row).await?;
7662             }
7763         }
@@ -96,7 +82,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
9682                         .unwrap_or_default()
9783                 };
9884 
99-                let mut row = db::repo::RepoRow::from_json(did, rkey, &parse_record(state, collection, rec));
85+                let mut row = db::repo::RepoRow::from_json(did, rkey, &transform_record(state, collection, rec));
10086                 row.node_did = node_did;
10187                 row.node_url = node_url;
10288                 db::repo::upsert(&state.db, &row).await?;
@@ -120,7 +106,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
120106                     .map(|a| a.len() as i32)
121107                     .unwrap_or(0);
122108 
123-                let mut row = db::ref_update::RefUpdateRow::from_json(did, rkey, &parse_record(state, collection, rec));
109+                let mut row = db::ref_update::RefUpdateRow::from_json(did, rkey, &transform_record(state, collection, rec));
124110                 row.breaking_change_count = breaking_changes;
125111                 db::ref_update::upsert(&state.db, &row).await?;
126112 
@@ -142,7 +128,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
142128         // ─── Issue ──────────────────────────────────────────────────
143129         ("dev.cospan.repo.issue", "create" | "update") => {
144130             if let Some(rec) = record {
145-                let row = db::issue::IssueRow::from_json(did, rkey, &parse_record(state, collection, rec));
131+                let row = db::issue::IssueRow::from_json(did, rkey, &transform_record(state, collection, rec));
146132                 db::issue::upsert(&state.db, &row).await?;
147133 
148134                 // Publish SSE event
@@ -175,7 +161,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
175161                     .unwrap_or("")
176162                     .to_string();
177163 
178-                let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, &parse_record(state, collection, rec));
164+                let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, &transform_record(state, collection, rec));
179165 
180166                 // Check if this is a new comment (not an update) for counter purposes
181167                 let existing = db::issue_comment::get(&state.db, did, rkey).await?;
@@ -211,7 +197,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
211197                     .unwrap_or("open")
212198                     .to_string();
213199 
214-                let row = db::issue_state::IssueStateRow::from_json(did, rkey, &parse_record(state, collection, rec));
200+                let row = db::issue_state::IssueStateRow::from_json(did, rkey, &transform_record(state, collection, rec));
215201                 db::issue_state::upsert(&state.db, &row).await?;
216202 
217203                 // Update the issue's state and repo counters
@@ -260,7 +246,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
260246         // ─── Pull Request ───────────────────────────────────────────
261247         ("dev.cospan.repo.pull", "create" | "update") => {
262248             if let Some(rec) = record {
263-                let row = db::pull::PullRow::from_json(did, rkey, &parse_record(state, collection, rec));
249+                let row = db::pull::PullRow::from_json(did, rkey, &transform_record(state, collection, rec));
264250                 db::pull::upsert(&state.db, &row).await?;
265251 
266252                 // Publish SSE event
@@ -292,7 +278,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
292278                     .unwrap_or("")
293279                     .to_string();
294280 
295-                let row = db::pull_comment::PullCommentRow::from_json(did, rkey, &parse_record(state, collection, rec));
281+                let row = db::pull_comment::PullCommentRow::from_json(did, rkey, &transform_record(state, collection, rec));
296282 
297283                 // Check if this is a new comment (not an update) for counter purposes
298284                 let existing = db::pull_comment::get(&state.db, did, rkey).await?;
@@ -326,7 +312,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
326312                     .unwrap_or("open")
327313                     .to_string();
328314 
329-                let row = db::pull_state::PullStateRow::from_json(did, rkey, &parse_record(state, collection, rec));
315+                let row = db::pull_state::PullStateRow::from_json(did, rkey, &transform_record(state, collection, rec));
330316                 db::pull_state::upsert(&state.db, &row).await?;
331317 
332318                 // Update the pull's state and repo counters
@@ -379,7 +365,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
379365                     .unwrap_or("")
380366                     .to_string();
381367 
382-                let row = db::star::StarRow::from_json(did, rkey, &parse_record(state, collection, rec));
368+                let row = db::star::StarRow::from_json(did, rkey, &transform_record(state, collection, rec));
383369 
384370                 let existing = db::star::get(&state.db, did, rkey).await?;
385371                 db::star::upsert(&state.db, &row).await?;
@@ -413,7 +399,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
413399         // ─── Follow ─────────────────────────────────────────────────
414400         ("dev.cospan.graph.follow", "create" | "update") => {
415401             if let Some(rec) = record {
416-                let row = db::follow::FollowRow::from_json(did, rkey, &parse_record(state, collection, rec));
402+                let row = db::follow::FollowRow::from_json(did, rkey, &transform_record(state, collection, rec));
417403                 db::follow::upsert(&state.db, &row).await?;
418404             }
419405         }
@@ -424,7 +410,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
424410         // ─── Reaction ───────────────────────────────────────────────
425411         ("dev.cospan.feed.reaction", "create" | "update") => {
426412             if let Some(rec) = record {
427-                let row = db::reaction::ReactionRow::from_json(did, rkey, &parse_record(state, collection, rec));
413+                let row = db::reaction::ReactionRow::from_json(did, rkey, &transform_record(state, collection, rec));
428414                 db::reaction::upsert(&state.db, &row).await?;
429415             }
430416         }
@@ -435,7 +421,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
435421         // ─── Label Definition ───────────────────────────────────────
436422         ("dev.cospan.label.definition", "create" | "update") => {
437423             if let Some(rec) = record {
438-                let row = db::label::LabelRow::from_json(did, rkey, &parse_record(state, collection, rec));
424+                let row = db::label::LabelRow::from_json(did, rkey, &transform_record(state, collection, rec));
439425                 db::label::upsert(&state.db, &row).await?;
440426             }
441427         }
@@ -446,7 +432,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
446432         // ─── Org ────────────────────────────────────────────────────
447433         ("dev.cospan.org", "create" | "update") => {
448434             if let Some(rec) = record {
449-                let row = db::org::OrgRow::from_json(did, rkey, &parse_record(state, collection, rec));
435+                let row = db::org::OrgRow::from_json(did, rkey, &transform_record(state, collection, rec));
450436                 db::org::upsert(&state.db, &row).await?;
451437             }
452438         }
@@ -457,7 +443,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
457443         // ─── Org Member ─────────────────────────────────────────────
458444         ("dev.cospan.org.member", "create" | "update") => {
459445             if let Some(rec) = record {
460-                let row = db::org_member::OrgMemberRow::from_json(did, rkey, &parse_record(state, collection, rec));
446+                let row = db::org_member::OrgMemberRow::from_json(did, rkey, &transform_record(state, collection, rec));
461447                 db::org_member::upsert(&state.db, &row).await?;
462448             }
463449         }
@@ -468,7 +454,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
468454         // ─── Collaborator ───────────────────────────────────────────
469455         ("dev.cospan.repo.collaborator", "create" | "update") => {
470456             if let Some(rec) = record {
471-                let row = db::collaborator::CollaboratorRow::from_json(did, rkey, &parse_record(state, collection, rec));
457+                let row = db::collaborator::CollaboratorRow::from_json(did, rkey, &transform_record(state, collection, rec));
472458                 db::collaborator::upsert(&state.db, &row).await?;
473459             }
474460         }
@@ -481,7 +467,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
481467             if let Some(rec) = record {
482468                 let checks = rec.get("algebraicChecks");
483469 
484-                let mut row = db::pipeline::PipelineRow::from_json(did, rkey, &parse_record(state, collection, rec));
470+                let mut row = db::pipeline::PipelineRow::from_json(did, rkey, &transform_record(state, collection, rec));
485471                 row.gat_type_check = checks
486472                     .and_then(|c| c.get("gatTypeCheck"))
487473                     .and_then(|v| v.as_str())
@@ -508,7 +494,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
508494         // ─── Dependency ─────────────────────────────────────────────
509495         ("dev.cospan.repo.dependency", "create" | "update") => {
510496             if let Some(rec) = record {
511-                let row = db::dependency::DependencyRow::from_json(did, rkey, &parse_record(state, collection, rec));
497+                let row = db::dependency::DependencyRow::from_json(did, rkey, &transform_record(state, collection, rec));
512498                 db::dependency::upsert(&state.db, &row).await?;
513499             }
514500         }
@@ -523,7 +509,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
523509         // ─── Tangled Star ──────────────────────────────────────────
524510         ("sh.tangled.feed.star", "create" | "update") => {
525511             if let Some(rec) = record {
526-                let row = db::star::StarRow::from_json(did, rkey, &parse_record(state, collection, rec));
512+                let row = db::star::StarRow::from_json(did, rkey, &transform_record(state, collection, rec));
527513                 let existing = db::star::get(&state.db, did, rkey).await?;
528514                 db::star::upsert(&state.db, &row).await?;
529515 
@@ -546,7 +532,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
546532         // ─── Tangled Follow ────────────────────────────────────────
547533         ("sh.tangled.graph.follow", "create" | "update") => {
548534             if let Some(rec) = record {
549-                let row = db::follow::FollowRow::from_json(did, rkey, &parse_record(state, collection, rec));
535+                let row = db::follow::FollowRow::from_json(did, rkey, &transform_record(state, collection, rec));
550536                 db::follow::upsert(&state.db, &row).await?;
551537                 tracing::debug!(did, rkey, "indexed tangled follow");
552538             }
@@ -558,7 +544,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
558544         // ─── Tangled Reaction ──────────────────────────────────────
559545         ("sh.tangled.feed.reaction", "create" | "update") => {
560546             if let Some(rec) = record {
561-                let row = db::reaction::ReactionRow::from_json(did, rkey, &parse_record(state, collection, rec));
547+                let row = db::reaction::ReactionRow::from_json(did, rkey, &transform_record(state, collection, rec));
562548                 db::reaction::upsert(&state.db, &row).await?;
563549                 tracing::debug!(did, rkey, "indexed tangled reaction");
564550             }
@@ -570,7 +556,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
570556         // ─── Tangled Issue ─────────────────────────────────────────
571557         ("sh.tangled.repo.issue", "create" | "update") => {
572558             if let Some(rec) = record {
573-                let row = db::issue::IssueRow::from_json(did, rkey, &parse_record(state, collection, rec));
559+                let row = db::issue::IssueRow::from_json(did, rkey, &transform_record(state, collection, rec));
574560                 db::issue::upsert(&state.db, &row).await?;
575561                 tracing::debug!(did, rkey, "indexed tangled issue");
576562             }
@@ -599,7 +585,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
599585                     .unwrap_or("open")
600586                     .to_string();
601587 
602-                let row = db::issue_state::IssueStateRow::from_json(did, rkey, &parse_record(state, collection, rec));
588+                let row = db::issue_state::IssueStateRow::from_json(did, rkey, &transform_record(state, collection, rec));
603589                 db::issue_state::upsert(&state.db, &row).await?;
604590 
605591                 // Update the issue's state and repo counters
@@ -645,7 +631,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
645631                     .unwrap_or("")
646632                     .to_string();
647633 
648-                let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, &parse_record(state, collection, rec));
634+                let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, &transform_record(state, collection, rec));
649635 
650636                 let existing = db::issue_comment::get(&state.db, did, rkey).await?;
651637                 db::issue_comment::upsert(&state.db, &row).await?;
@@ -668,7 +654,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
668654         // ─── Tangled Pull Request ──────────────────────────────────
669655         ("sh.tangled.repo.pull", "create" | "update") => {
670656             if let Some(rec) = record {
671-                let row = db::pull::PullRow::from_json(did, rkey, &parse_record(state, collection, rec));
657+                let row = db::pull::PullRow::from_json(did, rkey, &transform_record(state, collection, rec));
672658                 db::pull::upsert(&state.db, &row).await?;
673659                 tracing::debug!(did, rkey, "indexed tangled pull");
674660             }
@@ -690,7 +676,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
690676                     .and_then(|v| v.as_str())
691677                     .unwrap_or("")
692678                     .to_string();
693-                let row = db::pull_state::PullStateRow::from_json(did, rkey, &parse_record(state, collection, rec));
679+                let row = db::pull_state::PullStateRow::from_json(did, rkey, &transform_record(state, collection, rec));
694680                 let new_state = row.state.clone();
695681                 db::pull_state::upsert(&state.db, &row).await?;
696682 
@@ -735,7 +721,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
735721                     .unwrap_or("")
736722                     .to_string();
737723 
738-                let row = db::pull_comment::PullCommentRow::from_json(did, rkey, &parse_record(state, collection, rec));
724+                let row = db::pull_comment::PullCommentRow::from_json(did, rkey, &transform_record(state, collection, rec));
739725 
740726                 let existing = db::pull_comment::get(&state.db, did, rkey).await?;
741727                 db::pull_comment::upsert(&state.db, &row).await?;
@@ -758,7 +744,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
758744         // ─── Tangled Collaborator ──────────────────────────────────
759745         ("sh.tangled.repo.collaborator", "create" | "update") => {
760746             if let Some(rec) = record {
761-                let row = db::collaborator::CollaboratorRow::from_json(did, rkey, &parse_record(state, collection, rec));
747+                let row = db::collaborator::CollaboratorRow::from_json(did, rkey, &transform_record(state, collection, rec));
762748                 db::collaborator::upsert(&state.db, &row).await?;
763749                 tracing::debug!(did, rkey, "indexed tangled collaborator");
764750             }
@@ -770,7 +756,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
770756         // ─── Tangled Knot → Node ──────────────────────────────────
771757         ("sh.tangled.knot", "create" | "update") => {
772758             if let Some(rec) = record {
773-                let row = db::node::NodeRow::from_json(did, rkey, &parse_record(state, collection, rec));
759+                let row = db::node::NodeRow::from_json(did, rkey, &transform_record(state, collection, rec));
774760                 db::node::upsert(&state.db, &row).await?;
775761                 tracing::debug!(did, rkey, "indexed tangled knot as node");
776762             }
@@ -812,7 +798,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
812798         // ─── Tangled Actor Profile ─────────────────────────────────
813799         ("sh.tangled.actor.profile", "create" | "update") => {
814800             if let Some(rec) = record {
815-                let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, &parse_record(state, collection, rec));
801+                let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, &transform_record(state, collection, rec));
816802                 db::actor_profile::upsert(&state.db, &row).await?;
817803                 tracing::debug!(did, rkey, "indexed tangled actor profile");
818804             }
@@ -824,7 +810,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
824810         // ─── Tangled Repo ──────────────────────────────────────────
825811         ("sh.tangled.repo", "create" | "update") => {
826812             if let Some(rec) = record {
827-                let row = db::repo::RepoRow::from_json(did, rkey, &parse_record(state, collection, rec));
813+                let row = db::repo::RepoRow::from_json(did, rkey, &transform_record(state, collection, rec));
828814                 db::repo::upsert(&state.db, &row).await?;
829815                 tracing::debug!(did, rkey, "indexed tangled repo");
830816             }
@@ -836,7 +822,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
836822         // ─── Tangled Knot Member → Org Member ─────────────────────
837823         ("sh.tangled.knot.member", "create" | "update") => {
838824             if let Some(rec) = record {
839-                let row = db::org_member::OrgMemberRow::from_json(did, rkey, &parse_record(state, collection, rec));
825+                let row = db::org_member::OrgMemberRow::from_json(did, rkey, &transform_record(state, collection, rec));
840826                 db::org_member::upsert(&state.db, &row).await?;
841827                 tracing::debug!(did, rkey, "indexed tangled knot member as org member");
842828             }
@@ -912,7 +898,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
912898         // ─── Tangled Pipeline ──────────────────────────────────────
913899         ("sh.tangled.pipeline", "create" | "update") => {
914900             if let Some(rec) = record {
915-                let row = db::pipeline::PipelineRow::from_json(did, rkey, &parse_record(state, collection, rec));
901+                let row = db::pipeline::PipelineRow::from_json(did, rkey, &transform_record(state, collection, rec));
916902                 db::pipeline::upsert(&state.db, &row).await?;
917903                 tracing::debug!(did, rkey, "indexed tangled pipeline");
918904             }
@@ -969,7 +955,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
969955         // ─── Tangled Git RefUpdate ─────────────────────────────────
970956         ("sh.tangled.git.refUpdate", "create" | "update") => {
971957             if let Some(rec) = record {
972-                let row = db::ref_update::RefUpdateRow::from_json(did, rkey, &parse_record(state, collection, rec));
958+                let row = db::ref_update::RefUpdateRow::from_json(did, rkey, &transform_record(state, collection, rec));
973959                 db::ref_update::upsert(&state.db, &row).await?;
974960                 tracing::debug!(did, rkey, "indexed tangled git refUpdate");
975961             }
@@ -1,12 +1,12 @@
1-//! Tangled → Cospan interop via pre-compiled panproto morphisms.
1+//! Schema-driven record transformation via pre-compiled panproto morphisms.
22 //!
3-//! Morphisms are defined explicitly and compiled at codegen time
4-//! (see cospan-codegen/src/tangled_interop.rs). The compiled migrations
5-//! are serialized to generated/interop/compiled_morphisms.json.
3+//! All record transformations flow through panproto:
64 //!
7-//! At runtime, this module loads the compiled migrations and applies
8-//! them to incoming Tangled Jetstream records using panproto's
9-//! `lift_wtype_sigma()`.
5+//! **Cospan records**: parse_json → lift (DB projection transforms) → to_json
6+//! **Tangled records**: parse_json → lift (Tangled→Cospan + DB transforms) → to_json
7+//!
8+//! Morphisms and field transforms are compiled at codegen time and serialized.
9+//! This module loads them at startup and applies them at runtime.
1010 
1111 use std::collections::HashMap;
1212 use std::path::Path;
@@ -16,7 +16,7 @@ use panproto_inst::CompiledMigration;
1616 use panproto_mig::lift_wtype_sigma;
1717 use panproto_schema::Schema;
1818 
19-/// A pre-compiled interop mapping loaded from codegen output.
19+/// A pre-compiled Tangled → Cospan morphism (includes DB projection).
2020 #[derive(serde::Deserialize)]
2121 pub struct CompiledInterop {
2222     pub tangled_nsid: String,
@@ -27,75 +27,151 @@ pub struct CompiledInterop {
2727     pub quality_report: String,
2828 }
2929 
30-/// Registry of pre-compiled Tangled → Cospan morphisms.
31-pub struct TangledInterop {
32-    mappings: HashMap<String, CompiledInterop>,
30+/// A pre-compiled Cospan → Database projection.
31+#[derive(serde::Deserialize)]
32+pub struct CompiledDbProjection {
33+    pub nsid: String,
34+    pub schema: Schema,
35+    pub compiled: CompiledMigration,
3336 }
3437 
35-impl TangledInterop {
36-    /// Create an empty registry (no morphisms loaded).
38+/// Registry of all pre-compiled panproto morphisms.
39+pub struct RecordTransformer {
40+    /// Tangled NSID → compiled Tangled→Cospan+DB morphism
41+    tangled_morphisms: HashMap<String, CompiledInterop>,
42+    /// Cospan NSID → compiled Cospan→DB projection
43+    db_projections: HashMap<String, CompiledDbProjection>,
44+}
45+
46+impl RecordTransformer {
47+    /// Create an empty transformer (no morphisms loaded).
3748     pub fn empty() -> Self {
3849         Self {
39-            mappings: HashMap::new(),
50+            tangled_morphisms: HashMap::new(),
51+            db_projections: HashMap::new(),
4052         }
4153     }
4254 
43-    /// Load pre-compiled morphisms from the codegen output file.
55+    /// Load pre-compiled morphisms from codegen output.
4456     pub fn load(lexicons_dir: &Path) -> Result<Self> {
45-        // The compiled morphisms are at generated/interop/compiled_morphisms.json
46-        // relative to the workspace root. lexicons_dir is packages/lexicons/,
47-        // so workspace root is two levels up.
4857         let workspace_root = lexicons_dir
4958             .parent()
5059             .and_then(|p| p.parent())
5160             .unwrap_or(lexicons_dir);
52-        let morphisms_path = workspace_root.join("generated/interop/compiled_morphisms.json");
61+        let interop_dir = workspace_root.join("generated/interop");
62+
63+        // Load Tangled morphisms
64+        let morphisms_path = interop_dir.join("compiled_morphisms.json");
65+        let tangled_morphisms = if morphisms_path.exists() {
66+            let json = std::fs::read_to_string(&morphisms_path)
67+                .with_context(|| format!("reading {}", morphisms_path.display()))?;
68+            let interops: Vec<CompiledInterop> = serde_json::from_str(&json)
69+                .with_context(|| "deserializing compiled morphisms")?;
70+            let mut map = HashMap::new();
71+            for interop in interops {
72+                tracing::info!(
73+                    tangled = %interop.tangled_nsid,
74+                    cospan = %interop.cospan_nsid,
75+                    quality = %interop.quality_report,
76+                    "loaded tangled morphism"
77+                );
78+                map.insert(interop.tangled_nsid.clone(), interop);
79+            }
80+            map
81+        } else {
82+            tracing::warn!("no compiled morphisms found, tangled interop disabled");
83+            HashMap::new()
84+        };
5385 
54-        if !morphisms_path.exists() {
86+        // Load DB projections
87+        let projections_path = interop_dir.join("db_projections.json");
88+        let db_projections = if projections_path.exists() {
89+            let json = std::fs::read_to_string(&projections_path)
90+                .with_context(|| format!("reading {}", projections_path.display()))?;
91+            let projections: Vec<CompiledDbProjection> = serde_json::from_str(&json)
92+                .with_context(|| "deserializing DB projections")?;
93+            let mut map = HashMap::new();
94+            for proj in projections {
95+                tracing::info!(nsid = %proj.nsid, "loaded DB projection");
96+                map.insert(proj.nsid.clone(), proj);
97+            }
98+            map
99+        } else {
55100             anyhow::bail!(
56-                "compiled morphisms not found at {}. Run `cargo run -p cospan-codegen` first.",
57-                morphisms_path.display()
101+                "no DB projections found at {}. Run `cargo run -p cospan-codegen` first.",
102+                projections_path.display()
58103             );
59-        }
104+        };
60105 
61-        let json = std::fs::read_to_string(&morphisms_path)
62-            .with_context(|| format!("reading {}", morphisms_path.display()))?;
63-        let interops: Vec<CompiledInterop> = serde_json::from_str(&json)
64-            .with_context(|| "deserializing compiled morphisms")?;
65-
66-        let mut mappings = HashMap::new();
67-        for interop in interops {
68-            tracing::info!(
69-                tangled = %interop.tangled_nsid,
70-                cospan = %interop.cospan_nsid,
71-                quality = %interop.quality_report,
72-                "loaded compiled interop morphism"
73-            );
74-            mappings.insert(interop.tangled_nsid.clone(), interop);
75-        }
106+        tracing::info!(
107+            tangled = tangled_morphisms.len(),
108+            db = db_projections.len(),
109+            "loaded panproto morphisms"
110+        );
76111 
77-        tracing::info!(count = mappings.len(), "loaded tangled interop morphisms");
78-        Ok(Self { mappings })
112+        Ok(Self {
113+            tangled_morphisms,
114+            db_projections,
115+        })
79116     }
80117 
81-    /// Transform a Tangled JSON record to Cospan JSON using the pre-compiled
82-    /// morphism. Returns None if no morphism exists for this NSID.
118+    /// Transform a record through the appropriate panproto morphism.
119+    ///
120+    /// For Cospan records: applies DB projection (AT-URI decomposition, renames).
121+    /// For Tangled records: applies Tangled→Cospan morphism + DB projection.
122+    /// Returns None if no morphism exists for this NSID.
83123     pub fn transform(
84124         &self,
125+        collection: &str,
126+        record: &serde_json::Value,
127+    ) -> Option<Result<serde_json::Value>> {
128+        if collection.starts_with("sh.tangled.") {
129+            self.transform_tangled(collection, record)
130+        } else {
131+            self.transform_cospan(collection, record)
132+        }
133+    }
134+
135+    fn transform_cospan(
136+        &self,
137+        nsid: &str,
138+        record: &serde_json::Value,
139+    ) -> Option<Result<serde_json::Value>> {
140+        let proj = self.db_projections.get(nsid)?;
141+        Some(apply_projection(&proj.schema, &proj.compiled, nsid, record))
142+    }
143+
144+    fn transform_tangled(
145+        &self,
85146         tangled_nsid: &str,
86147         record: &serde_json::Value,
87148     ) -> Option<Result<serde_json::Value>> {
88-        let mapping = self.mappings.get(tangled_nsid)?;
89-        Some(apply_lift(mapping, record))
149+        let morphism = self.tangled_morphisms.get(tangled_nsid)?;
150+        Some(apply_morphism(morphism, record))
90151     }
91152 }
92153 
93-/// Apply the pre-compiled morphism: parse → lift → emit.
94-fn apply_lift(
154+/// Apply a DB projection: parse → lift (with field transforms) → emit.
155+fn apply_projection(
156+    schema: &Schema,
157+    compiled: &CompiledMigration,
158+    nsid: &str,
159+    record: &serde_json::Value,
160+) -> Result<serde_json::Value> {
161+    let instance = panproto_inst::parse::parse_json(schema, nsid, record)
162+        .map_err(|e| anyhow::anyhow!("parse {nsid}: {e:?}"))?;
163+
164+    let lifted = lift_wtype_sigma(compiled, schema, &instance)
165+        .map_err(|e| anyhow::anyhow!("project {nsid}: {e:?}"))?;
166+
167+    Ok(panproto_inst::parse::to_json(schema, &lifted))
168+}
169+
170+/// Apply a Tangled→Cospan morphism: parse → lift (rename + DB transforms) → emit.
171+fn apply_morphism(
95172     mapping: &CompiledInterop,
96173     record: &serde_json::Value,
97174 ) -> Result<serde_json::Value> {
98-    // Parse the Tangled JSON into a panproto WInstance
99175     let instance = panproto_inst::parse::parse_json(
100176         &mapping.tangled_schema,
101177         &mapping.tangled_nsid,
@@ -103,14 +179,21 @@ fn apply_lift(
103179     )
104180     .map_err(|e| anyhow::anyhow!("parse {}: {e:?}", mapping.tangled_nsid))?;
105181 
106-    // Lift through the pre-compiled morphism (Sigma for field renames)
107182     let lifted = lift_wtype_sigma(
108183         &mapping.compiled,
109184         &mapping.cospan_schema,
110185         &instance,
111186     )
112-    .map_err(|e| anyhow::anyhow!("lift {} → {}: {e:?}", mapping.tangled_nsid, mapping.cospan_nsid))?;
187+    .map_err(|e| {
188+        anyhow::anyhow!(
189+            "lift {} → {}: {e:?}",
190+            mapping.tangled_nsid,
191+            mapping.cospan_nsid
192+        )
193+    })?;
113194 
114-    // Emit back to JSON in the Cospan schema shape
115-    Ok(panproto_inst::parse::to_json(&mapping.cospan_schema, &lifted))
195+    Ok(panproto_inst::parse::to_json(
196+        &mapping.cospan_schema,
197+        &lifted,
198+    ))
116199 }
@@ -6,7 +6,6 @@ pub mod error;
66 pub mod indexer;
77 pub mod interop;
88 pub mod middleware;
9-pub mod record_parser;
109 pub mod node_proxy;
1110 pub mod state;
1211 pub mod xrpc;
@@ -1,86 +0,0 @@
1-//! Schema-driven record parsing via panproto.
2-//!
3-//! Parses Jetstream JSON records into panproto WInstances using the
4-//! Lexicon schema, then converts to JSON in the canonical Cospan shape.
5-//! This replaces all hand-coded `rec.get("field")` chains with
6-//! panproto's typed parsing infrastructure.
7-
8-use std::collections::HashMap;
9-use std::path::Path;
10-
11-use anyhow::{Context, Result};
12-use panproto_inst::parse::{parse_json, to_json};
13-use panproto_protocols::web_document::atproto;
14-use panproto_schema::Schema;
15-
16-/// Pre-loaded Lexicon schemas for all Cospan record types.
17-pub struct SchemaRegistry {
18-    /// NSID → parsed panproto Schema
19-    schemas: HashMap<String, Schema>,
20-}
21-
22-impl SchemaRegistry {
23-    /// Load all Lexicon schemas from the lexicons directory.
24-    pub fn load(lexicons_dir: &Path) -> Result<Self> {
25-        let mut schemas = HashMap::new();
26-
27-        fn walk(dir: &Path, schemas: &mut HashMap<String, Schema>) -> Result<()> {
28-            let entries = std::fs::read_dir(dir)
29-                .with_context(|| format!("reading {}", dir.display()))?;
30-            for entry in entries.flatten() {
31-                let path = entry.path();
32-                if path.is_dir() {
33-                    walk(&path, schemas)?;
34-                } else if path.extension().is_some_and(|e| e == "json") {
35-                    let content = std::fs::read_to_string(&path)?;
36-                    let json: serde_json::Value = serde_json::from_str(&content)?;
37-                    if let Some(nsid) = json.get("id").and_then(|v| v.as_str()) {
38-                        // Only parse record types (not procedures)
39-                        let is_record = json
40-                            .pointer("/defs/main/type")
41-                            .and_then(|v| v.as_str())
42-                            == Some("record");
43-                        if is_record {
44-                            match atproto::parse_lexicon(&json) {
45-                                Ok(schema) => {
46-                                    schemas.insert(nsid.to_string(), schema);
47-                                }
48-                                Err(e) => {
49-                                    tracing::debug!(nsid, error = %e, "skipping unparseable lexicon");
50-                                }
51-                            }
52-                        }
53-                    }
54-                }
55-            }
56-            Ok(())
57-        }
58-
59-        walk(lexicons_dir, &mut schemas)?;
60-        tracing::info!(count = schemas.len(), "loaded lexicon schemas");
61-        Ok(Self { schemas })
62-    }
63-
64-    /// Parse a JSON record into a panproto WInstance, then emit as canonical JSON.
65-    ///
66-    /// This ensures the record is validated against the Lexicon schema and
67-    /// all field names are normalized. Returns None if the NSID has no schema.
68-    pub fn parse_record(
69-        &self,
70-        nsid: &str,
71-        record: &serde_json::Value,
72-    ) -> Option<Result<serde_json::Value>> {
73-        let schema = self.schemas.get(nsid)?;
74-
75-        let result = parse_json(schema, nsid, record)
76-            .map(|instance| to_json(schema, &instance))
77-            .map_err(|e| anyhow::anyhow!("parse {nsid}: {e:?}"));
78-
79-        Some(result)
80-    }
81-
82-    /// Get the raw schema for an NSID (for use with lift operations).
83-    pub fn schema(&self, nsid: &str) -> Option<&Schema> {
84-        self.schemas.get(nsid)
85-    }
86-}
cospan · schematic version control on atproto built on AT Protocol