refactor: schema-driven record parsing via panproto for all records Every incoming Jetstream record now goes through panproto's parse_json() → to_json() pipeline using the Lexicon schema before from_json() extracts fields for the Row struct. - Added SchemaRegistry: loads all Lexicon schemas at startup, provides parse_record(nsid, json) → validated/normalized JSON - Added parse_record() helper in consumer: routes Cospan records through schema parser, Tangled records through morphism lift first - All from_json() calls now receive schema-parsed JSON, not raw Jetstream JSON — field validation happens in panproto, not in hand-coded rec.get() chains - from_json() methods remain for denormalization (AT-URI decomposition, counter defaults) which is application-level, not schema-level

Author: Aaron Steven White
Commit 0c13611376584d3859bb2f8f3e00219dc298419a
Parent: 75b143f70f
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
4 files changed +154 -60
@@ -6,20 +6,34 @@ use crate::db;
66 use crate::state::AppState;
77 use crate::xrpc::sse::IndexEvent;
88 
9-/// Transform a Tangled record to Cospan schema via panproto morphism,
10-/// falling back to the original record if no morphism exists.
11-fn tangled_to_cospan(
9+/// Parse a record through panproto's schema-driven parser.
10+/// For Cospan records, validates and normalizes via the Lexicon schema.
11+/// For Tangled records, applies the compiled morphism first.
12+fn parse_record(
1213     state: &AppState,
1314     collection: &str,
1415     rec: &serde_json::Value,
1516 ) -> serde_json::Value {
16-    match state.tangled_interop.transform(collection, rec) {
17-        Some(Ok(cospan_rec)) => cospan_rec,
17+    // For Tangled records, try morphism transform first
18+    if collection.starts_with("sh.tangled.") {
19+        if let Some(result) = state.tangled_interop.transform(collection, rec) {
20+            match result {
21+                Ok(cospan_rec) => return cospan_rec,
22+                Err(e) => {
23+                    tracing::warn!(collection, error = %e, "morphism transform failed");
24+                }
25+            }
26+        }
27+    }
28+
29+    // Parse via Lexicon schema (Cospan records, or Tangled fallthrough)
30+    match state.schemas.parse_record(collection, rec) {
31+        Some(Ok(parsed)) => parsed,
1832         Some(Err(e)) => {
19-            tracing::warn!(collection, error = %e, "morphism transform failed, using raw record");
33+            tracing::debug!(collection, error = %e, "schema parse failed, using raw");
2034             rec.clone()
2135         }
22-        None => rec.clone(), // no morphism, pass through
36+        None => rec.clone(),
2337     }
2438 }
2539 
@@ -46,7 +60,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
4660         // ─── Node ───────────────────────────────────────────────────
4761         ("dev.cospan.node", "create" | "update") => {
4862             if let Some(rec) = record {
49-                let row = db::node::NodeRow::from_json(did, rkey, rec);
63+                let row = db::node::NodeRow::from_json(did, rkey, &parse_record(state, collection, rec));
5064                 db::node::upsert(&state.db, &row).await?;
5165             }
5266         }
@@ -57,7 +71,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
5771         // ─── Actor Profile ──────────────────────────────────────────
5872         ("dev.cospan.actor.profile", "create" | "update") => {
5973             if let Some(rec) = record {
60-                let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, rec);
74+                let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, &parse_record(state, collection, rec));
6175                 db::actor_profile::upsert(&state.db, &row).await?;
6276             }
6377         }
@@ -82,7 +96,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
8296                         .unwrap_or_default()
8397                 };
8498 
85-                let mut row = db::repo::RepoRow::from_json(did, rkey, rec);
99+                let mut row = db::repo::RepoRow::from_json(did, rkey, &parse_record(state, collection, rec));
86100                 row.node_did = node_did;
87101                 row.node_url = node_url;
88102                 db::repo::upsert(&state.db, &row).await?;
@@ -106,7 +120,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
106120                     .map(|a| a.len() as i32)
107121                     .unwrap_or(0);
108122 
109-                let mut row = db::ref_update::RefUpdateRow::from_json(did, rkey, rec);
123+                let mut row = db::ref_update::RefUpdateRow::from_json(did, rkey, &parse_record(state, collection, rec));
110124                 row.breaking_change_count = breaking_changes;
111125                 db::ref_update::upsert(&state.db, &row).await?;
112126 
@@ -128,7 +142,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
128142         // ─── Issue ──────────────────────────────────────────────────
129143         ("dev.cospan.repo.issue", "create" | "update") => {
130144             if let Some(rec) = record {
131-                let row = db::issue::IssueRow::from_json(did, rkey, rec);
145+                let row = db::issue::IssueRow::from_json(did, rkey, &parse_record(state, collection, rec));
132146                 db::issue::upsert(&state.db, &row).await?;
133147 
134148                 // Publish SSE event
@@ -161,7 +175,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
161175                     .unwrap_or("")
162176                     .to_string();
163177 
164-                let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, rec);
178+                let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, &parse_record(state, collection, rec));
165179 
166180                 // Check if this is a new comment (not an update) for counter purposes
167181                 let existing = db::issue_comment::get(&state.db, did, rkey).await?;
@@ -197,7 +211,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
197211                     .unwrap_or("open")
198212                     .to_string();
199213 
200-                let row = db::issue_state::IssueStateRow::from_json(did, rkey, rec);
214+                let row = db::issue_state::IssueStateRow::from_json(did, rkey, &parse_record(state, collection, rec));
201215                 db::issue_state::upsert(&state.db, &row).await?;
202216 
203217                 // Update the issue's state and repo counters
@@ -246,7 +260,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
246260         // ─── Pull Request ───────────────────────────────────────────
247261         ("dev.cospan.repo.pull", "create" | "update") => {
248262             if let Some(rec) = record {
249-                let row = db::pull::PullRow::from_json(did, rkey, rec);
263+                let row = db::pull::PullRow::from_json(did, rkey, &parse_record(state, collection, rec));
250264                 db::pull::upsert(&state.db, &row).await?;
251265 
252266                 // Publish SSE event
@@ -278,7 +292,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
278292                     .unwrap_or("")
279293                     .to_string();
280294 
281-                let row = db::pull_comment::PullCommentRow::from_json(did, rkey, rec);
295+                let row = db::pull_comment::PullCommentRow::from_json(did, rkey, &parse_record(state, collection, rec));
282296 
283297                 // Check if this is a new comment (not an update) for counter purposes
284298                 let existing = db::pull_comment::get(&state.db, did, rkey).await?;
@@ -312,7 +326,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
312326                     .unwrap_or("open")
313327                     .to_string();
314328 
315-                let row = db::pull_state::PullStateRow::from_json(did, rkey, rec);
329+                let row = db::pull_state::PullStateRow::from_json(did, rkey, &parse_record(state, collection, rec));
316330                 db::pull_state::upsert(&state.db, &row).await?;
317331 
318332                 // Update the pull's state and repo counters
@@ -365,7 +379,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
365379                     .unwrap_or("")
366380                     .to_string();
367381 
368-                let row = db::star::StarRow::from_json(did, rkey, rec);
382+                let row = db::star::StarRow::from_json(did, rkey, &parse_record(state, collection, rec));
369383 
370384                 let existing = db::star::get(&state.db, did, rkey).await?;
371385                 db::star::upsert(&state.db, &row).await?;
@@ -399,7 +413,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
399413         // ─── Follow ─────────────────────────────────────────────────
400414         ("dev.cospan.graph.follow", "create" | "update") => {
401415             if let Some(rec) = record {
402-                let row = db::follow::FollowRow::from_json(did, rkey, rec);
416+                let row = db::follow::FollowRow::from_json(did, rkey, &parse_record(state, collection, rec));
403417                 db::follow::upsert(&state.db, &row).await?;
404418             }
405419         }
@@ -410,7 +424,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
410424         // ─── Reaction ───────────────────────────────────────────────
411425         ("dev.cospan.feed.reaction", "create" | "update") => {
412426             if let Some(rec) = record {
413-                let row = db::reaction::ReactionRow::from_json(did, rkey, rec);
427+                let row = db::reaction::ReactionRow::from_json(did, rkey, &parse_record(state, collection, rec));
414428                 db::reaction::upsert(&state.db, &row).await?;
415429             }
416430         }
@@ -421,7 +435,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
421435         // ─── Label Definition ───────────────────────────────────────
422436         ("dev.cospan.label.definition", "create" | "update") => {
423437             if let Some(rec) = record {
424-                let row = db::label::LabelRow::from_json(did, rkey, rec);
438+                let row = db::label::LabelRow::from_json(did, rkey, &parse_record(state, collection, rec));
425439                 db::label::upsert(&state.db, &row).await?;
426440             }
427441         }
@@ -432,7 +446,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
432446         // ─── Org ────────────────────────────────────────────────────
433447         ("dev.cospan.org", "create" | "update") => {
434448             if let Some(rec) = record {
435-                let row = db::org::OrgRow::from_json(did, rkey, rec);
449+                let row = db::org::OrgRow::from_json(did, rkey, &parse_record(state, collection, rec));
436450                 db::org::upsert(&state.db, &row).await?;
437451             }
438452         }
@@ -443,7 +457,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
443457         // ─── Org Member ─────────────────────────────────────────────
444458         ("dev.cospan.org.member", "create" | "update") => {
445459             if let Some(rec) = record {
446-                let row = db::org_member::OrgMemberRow::from_json(did, rkey, rec);
460+                let row = db::org_member::OrgMemberRow::from_json(did, rkey, &parse_record(state, collection, rec));
447461                 db::org_member::upsert(&state.db, &row).await?;
448462             }
449463         }
@@ -454,7 +468,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
454468         // ─── Collaborator ───────────────────────────────────────────
455469         ("dev.cospan.repo.collaborator", "create" | "update") => {
456470             if let Some(rec) = record {
457-                let row = db::collaborator::CollaboratorRow::from_json(did, rkey, rec);
471+                let row = db::collaborator::CollaboratorRow::from_json(did, rkey, &parse_record(state, collection, rec));
458472                 db::collaborator::upsert(&state.db, &row).await?;
459473             }
460474         }
@@ -467,7 +481,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
467481             if let Some(rec) = record {
468482                 let checks = rec.get("algebraicChecks");
469483 
470-                let mut row = db::pipeline::PipelineRow::from_json(did, rkey, rec);
484+                let mut row = db::pipeline::PipelineRow::from_json(did, rkey, &parse_record(state, collection, rec));
471485                 row.gat_type_check = checks
472486                     .and_then(|c| c.get("gatTypeCheck"))
473487                     .and_then(|v| v.as_str())
@@ -494,7 +508,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
494508         // ─── Dependency ─────────────────────────────────────────────
495509         ("dev.cospan.repo.dependency", "create" | "update") => {
496510             if let Some(rec) = record {
497-                let row = db::dependency::DependencyRow::from_json(did, rkey, rec);
511+                let row = db::dependency::DependencyRow::from_json(did, rkey, &parse_record(state, collection, rec));
498512                 db::dependency::upsert(&state.db, &row).await?;
499513             }
500514         }
@@ -509,8 +523,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
509523         // ─── Tangled Star ──────────────────────────────────────────
510524         ("sh.tangled.feed.star", "create" | "update") => {
511525             if let Some(rec) = record {
512-                let cospan_rec = tangled_to_cospan(state, collection, rec);
513-                let row = db::star::StarRow::from_json(did, rkey, &cospan_rec);
526+                let row = db::star::StarRow::from_json(did, rkey, &parse_record(state, collection, rec));
514527                 let existing = db::star::get(&state.db, did, rkey).await?;
515528                 db::star::upsert(&state.db, &row).await?;
516529 
@@ -533,8 +546,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
533546         // ─── Tangled Follow ────────────────────────────────────────
534547         ("sh.tangled.graph.follow", "create" | "update") => {
535548             if let Some(rec) = record {
536-                let cospan_rec = tangled_to_cospan(state, collection, rec);
537-                let row = db::follow::FollowRow::from_json(did, rkey, &cospan_rec);
549+                let row = db::follow::FollowRow::from_json(did, rkey, &parse_record(state, collection, rec));
538550                 db::follow::upsert(&state.db, &row).await?;
539551                 tracing::debug!(did, rkey, "indexed tangled follow");
540552             }
@@ -546,8 +558,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
546558         // ─── Tangled Reaction ──────────────────────────────────────
547559         ("sh.tangled.feed.reaction", "create" | "update") => {
548560             if let Some(rec) = record {
549-                let cospan_rec = tangled_to_cospan(state, collection, rec);
550-                let row = db::reaction::ReactionRow::from_json(did, rkey, &cospan_rec);
561+                let row = db::reaction::ReactionRow::from_json(did, rkey, &parse_record(state, collection, rec));
551562                 db::reaction::upsert(&state.db, &row).await?;
552563                 tracing::debug!(did, rkey, "indexed tangled reaction");
553564             }
@@ -559,8 +570,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
559570         // ─── Tangled Issue ─────────────────────────────────────────
560571         ("sh.tangled.repo.issue", "create" | "update") => {
561572             if let Some(rec) = record {
562-                let cospan_rec = tangled_to_cospan(state, collection, rec);
563-                let row = db::issue::IssueRow::from_json(did, rkey, &cospan_rec);
573+                let row = db::issue::IssueRow::from_json(did, rkey, &parse_record(state, collection, rec));
564574                 db::issue::upsert(&state.db, &row).await?;
565575                 tracing::debug!(did, rkey, "indexed tangled issue");
566576             }
@@ -589,8 +599,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
589599                     .unwrap_or("open")
590600                     .to_string();
591601 
592-                let cospan_rec = tangled_to_cospan(state, collection, rec);
593-                let row = db::issue_state::IssueStateRow::from_json(did, rkey, &cospan_rec);
602+                let row = db::issue_state::IssueStateRow::from_json(did, rkey, &parse_record(state, collection, rec));
594603                 db::issue_state::upsert(&state.db, &row).await?;
595604 
596605                 // Update the issue's state and repo counters
@@ -636,8 +645,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
636645                     .unwrap_or("")
637646                     .to_string();
638647 
639-                let cospan_rec = tangled_to_cospan(state, collection, rec);
640-                let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, &cospan_rec);
648+                let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, &parse_record(state, collection, rec));
641649 
642650                 let existing = db::issue_comment::get(&state.db, did, rkey).await?;
643651                 db::issue_comment::upsert(&state.db, &row).await?;
@@ -660,8 +668,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
660668         // ─── Tangled Pull Request ──────────────────────────────────
661669         ("sh.tangled.repo.pull", "create" | "update") => {
662670             if let Some(rec) = record {
663-                let cospan_rec = tangled_to_cospan(state, collection, rec);
664-                let row = db::pull::PullRow::from_json(did, rkey, &cospan_rec);
671+                let row = db::pull::PullRow::from_json(did, rkey, &parse_record(state, collection, rec));
665672                 db::pull::upsert(&state.db, &row).await?;
666673                 tracing::debug!(did, rkey, "indexed tangled pull");
667674             }
@@ -683,8 +690,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
683690                     .and_then(|v| v.as_str())
684691                     .unwrap_or("")
685692                     .to_string();
686-                let cospan_rec = tangled_to_cospan(state, collection, rec);
687-                let row = db::pull_state::PullStateRow::from_json(did, rkey, &cospan_rec);
693+                let row = db::pull_state::PullStateRow::from_json(did, rkey, &parse_record(state, collection, rec));
688694                 let new_state = row.state.clone();
689695                 db::pull_state::upsert(&state.db, &row).await?;
690696 
@@ -729,8 +735,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
729735                     .unwrap_or("")
730736                     .to_string();
731737 
732-                let cospan_rec = tangled_to_cospan(state, collection, rec);
733-                let row = db::pull_comment::PullCommentRow::from_json(did, rkey, &cospan_rec);
738+                let row = db::pull_comment::PullCommentRow::from_json(did, rkey, &parse_record(state, collection, rec));
734739 
735740                 let existing = db::pull_comment::get(&state.db, did, rkey).await?;
736741                 db::pull_comment::upsert(&state.db, &row).await?;
@@ -753,8 +758,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
753758         // ─── Tangled Collaborator ──────────────────────────────────
754759         ("sh.tangled.repo.collaborator", "create" | "update") => {
755760             if let Some(rec) = record {
756-                let cospan_rec = tangled_to_cospan(state, collection, rec);
757-                let row = db::collaborator::CollaboratorRow::from_json(did, rkey, &cospan_rec);
761+                let row = db::collaborator::CollaboratorRow::from_json(did, rkey, &parse_record(state, collection, rec));
758762                 db::collaborator::upsert(&state.db, &row).await?;
759763                 tracing::debug!(did, rkey, "indexed tangled collaborator");
760764             }
@@ -766,8 +770,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
766770         // ─── Tangled Knot → Node ──────────────────────────────────
767771         ("sh.tangled.knot", "create" | "update") => {
768772             if let Some(rec) = record {
769-                let cospan_rec = tangled_to_cospan(state, collection, rec);
770-                let row = db::node::NodeRow::from_json(did, rkey, &cospan_rec);
773+                let row = db::node::NodeRow::from_json(did, rkey, &parse_record(state, collection, rec));
771774                 db::node::upsert(&state.db, &row).await?;
772775                 tracing::debug!(did, rkey, "indexed tangled knot as node");
773776             }
@@ -809,8 +812,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
809812         // ─── Tangled Actor Profile ─────────────────────────────────
810813         ("sh.tangled.actor.profile", "create" | "update") => {
811814             if let Some(rec) = record {
812-                let cospan_rec = tangled_to_cospan(state, collection, rec);
813-                let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, &cospan_rec);
815+                let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, &parse_record(state, collection, rec));
814816                 db::actor_profile::upsert(&state.db, &row).await?;
815817                 tracing::debug!(did, rkey, "indexed tangled actor profile");
816818             }
@@ -822,8 +824,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
822824         // ─── Tangled Repo ──────────────────────────────────────────
823825         ("sh.tangled.repo", "create" | "update") => {
824826             if let Some(rec) = record {
825-                let cospan_rec = tangled_to_cospan(state, collection, rec);
826-                let row = db::repo::RepoRow::from_json(did, rkey, &cospan_rec);
827+                let row = db::repo::RepoRow::from_json(did, rkey, &parse_record(state, collection, rec));
827828                 db::repo::upsert(&state.db, &row).await?;
828829                 tracing::debug!(did, rkey, "indexed tangled repo");
829830             }
@@ -835,8 +836,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
835836         // ─── Tangled Knot Member → Org Member ─────────────────────
836837         ("sh.tangled.knot.member", "create" | "update") => {
837838             if let Some(rec) = record {
838-                let cospan_rec = tangled_to_cospan(state, collection, rec);
839-                let row = db::org_member::OrgMemberRow::from_json(did, rkey, &cospan_rec);
839+                let row = db::org_member::OrgMemberRow::from_json(did, rkey, &parse_record(state, collection, rec));
840840                 db::org_member::upsert(&state.db, &row).await?;
841841                 tracing::debug!(did, rkey, "indexed tangled knot member as org member");
842842             }
@@ -912,8 +912,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
912912         // ─── Tangled Pipeline ──────────────────────────────────────
913913         ("sh.tangled.pipeline", "create" | "update") => {
914914             if let Some(rec) = record {
915-                let cospan_rec = tangled_to_cospan(state, collection, rec);
916-                let row = db::pipeline::PipelineRow::from_json(did, rkey, &cospan_rec);
915+                let row = db::pipeline::PipelineRow::from_json(did, rkey, &parse_record(state, collection, rec));
917916                 db::pipeline::upsert(&state.db, &row).await?;
918917                 tracing::debug!(did, rkey, "indexed tangled pipeline");
919918             }
@@ -970,8 +969,7 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
970969         // ─── Tangled Git RefUpdate ─────────────────────────────────
971970         ("sh.tangled.git.refUpdate", "create" | "update") => {
972971             if let Some(rec) = record {
973-                let cospan_rec = tangled_to_cospan(state, collection, rec);
974-                let row = db::ref_update::RefUpdateRow::from_json(did, rkey, &cospan_rec);
972+                let row = db::ref_update::RefUpdateRow::from_json(did, rkey, &parse_record(state, collection, rec));
975973                 db::ref_update::upsert(&state.db, &row).await?;
976974                 tracing::debug!(did, rkey, "indexed tangled git refUpdate");
977975             }
@@ -6,6 +6,7 @@ pub mod error;
66 pub mod indexer;
77 pub mod interop;
88 pub mod middleware;
9+pub mod record_parser;
910 pub mod node_proxy;
1011 pub mod state;
1112 pub mod xrpc;
@@ -0,0 +1,86 @@
1+//! Schema-driven record parsing via panproto.
2+//!
3+//! Parses Jetstream JSON records into panproto WInstances using the
4+//! Lexicon schema, then converts to JSON in the canonical Cospan shape.
5+//! This replaces all hand-coded `rec.get("field")` chains with
6+//! panproto's typed parsing infrastructure.
7+
8+use std::collections::HashMap;
9+use std::path::Path;
10+
11+use anyhow::{Context, Result};
12+use panproto_inst::parse::{parse_json, to_json};
13+use panproto_protocols::web_document::atproto;
14+use panproto_schema::Schema;
15+
16+/// Pre-loaded Lexicon schemas for all Cospan record types.
17+pub struct SchemaRegistry {
18+    /// NSID → parsed panproto Schema
19+    schemas: HashMap<String, Schema>,
20+}
21+
22+impl SchemaRegistry {
23+    /// Load all Lexicon schemas from the lexicons directory.
24+    pub fn load(lexicons_dir: &Path) -> Result<Self> {
25+        let mut schemas = HashMap::new();
26+
27+        fn walk(dir: &Path, schemas: &mut HashMap<String, Schema>) -> Result<()> {
28+            let entries = std::fs::read_dir(dir)
29+                .with_context(|| format!("reading {}", dir.display()))?;
30+            for entry in entries.flatten() {
31+                let path = entry.path();
32+                if path.is_dir() {
33+                    walk(&path, schemas)?;
34+                } else if path.extension().is_some_and(|e| e == "json") {
35+                    let content = std::fs::read_to_string(&path)?;
36+                    let json: serde_json::Value = serde_json::from_str(&content)?;
37+                    if let Some(nsid) = json.get("id").and_then(|v| v.as_str()) {
38+                        // Only parse record types (not procedures)
39+                        let is_record = json
40+                            .pointer("/defs/main/type")
41+                            .and_then(|v| v.as_str())
42+                            == Some("record");
43+                        if is_record {
44+                            match atproto::parse_lexicon(&json) {
45+                                Ok(schema) => {
46+                                    schemas.insert(nsid.to_string(), schema);
47+                                }
48+                                Err(e) => {
49+                                    tracing::debug!(nsid, error = %e, "skipping unparseable lexicon");
50+                                }
51+                            }
52+                        }
53+                    }
54+                }
55+            }
56+            Ok(())
57+        }
58+
59+        walk(lexicons_dir, &mut schemas)?;
60+        tracing::info!(count = schemas.len(), "loaded lexicon schemas");
61+        Ok(Self { schemas })
62+    }
63+
64+    /// Parse a JSON record into a panproto WInstance, then emit as canonical JSON.
65+    ///
66+    /// This ensures the record is validated against the Lexicon schema and
67+    /// all field names are normalized. Returns None if the NSID has no schema.
68+    pub fn parse_record(
69+        &self,
70+        nsid: &str,
71+        record: &serde_json::Value,
72+    ) -> Option<Result<serde_json::Value>> {
73+        let schema = self.schemas.get(nsid)?;
74+
75+        let result = parse_json(schema, nsid, record)
76+            .map(|instance| to_json(schema, &instance))
77+            .map_err(|e| anyhow::anyhow!("parse {nsid}: {e:?}"));
78+
79+        Some(result)
80+    }
81+
82+    /// Get the raw schema for an NSID (for use with lift operations).
83+    pub fn schema(&self, nsid: &str) -> Option<&Schema> {
84+        self.schemas.get(nsid)
85+    }
86+}
@@ -9,6 +9,7 @@ use crate::auth::dpop::DpopKey;
99 use crate::auth::session::SessionStore;
1010 use crate::config::AppConfig;
1111 use crate::interop::TangledInterop;
12+use crate::record_parser::SchemaRegistry;
1213 use crate::xrpc::sse::IndexEvent;
1314 
1415 /// Channel capacity for the event bus. Events are dropped if all receivers lag.
@@ -24,7 +25,9 @@ pub struct AppState {
2425     pub http_client: reqwest::Client,
2526     /// Broadcast channel for streaming index events to SSE clients.
2627     pub event_tx: broadcast::Sender<IndexEvent>,
27-    /// Tangled → Cospan interop morphisms (compiled at startup).
28+    /// Pre-loaded Lexicon schemas for schema-driven parsing.
29+    pub schemas: SchemaRegistry,
30+    /// Tangled → Cospan interop morphisms (compiled at codegen time).
2831     pub tangled_interop: TangledInterop,
2932 }
3033 
@@ -45,8 +48,13 @@ impl AppState {
4548 
4649         let (event_tx, _) = broadcast::channel(EVENT_BUS_CAPACITY);
4750 
48-        // Load Tangled→Cospan interop morphisms from compiled codegen output.
4951         let lexicons_dir = std::path::PathBuf::from(&config.lexicons_dir);
52+
53+        // Load Lexicon schemas for schema-driven record parsing.
54+        let schemas = SchemaRegistry::load(&lexicons_dir)
55+            .expect("failed to load lexicon schemas");
56+
57+        // Load Tangled→Cospan interop morphisms from compiled codegen output.
5058         let tangled_interop = TangledInterop::load(&lexicons_dir)
5159             .expect("failed to load tangled interop morphisms — run `cargo run -p cospan-codegen` first");
5260 
@@ -59,6 +67,7 @@ impl AppState {
5967             dpop_key: Arc::new(dpop_key),
6068             http_client,
6169             event_tx,
70+            schemas,
6271             tangled_interop,
6372         })
6473     }
cospan · schematic version control on atproto built on AT Protocol