refactor: eliminate from_json string munging, use serde after panproto Delete the entire from_json.rs generated file (577 lines of rec.get() chains). Records are now deserialized via serde_json::from_value() after the panproto parse/lift/emit pipeline transforms them. The pipeline: parse_json() → lift_wtype_sigma() (with DB projection FieldTransforms) → to_json() → serde_json::from_value::<XxxRow>() DB projection transforms output camelCase keys to match the Row structs' #[serde(rename_all = "camelCase")] attribute. ATProto envelope fields (did, rkey, indexed_at) are set after deserialization since they come from the Jetstream event, not the record. Deleted: emit_from_json(), json_extractor(), emit_helper_functions(), parse_at_uri_did/name/parse_datetime from generated code. Net: -501 lines of string-munging codegen.

Author: Aaron Steven White
Commit a6e7a01e446f7c6335f383b5c3cfffcfd93ffecf
Parent: 10256d5754
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
5 files changed +256 -501
@@ -46,11 +46,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
4646         // ─── Node ───────────────────────────────────────────────────
4747         ("dev.cospan.node", "create" | "update") => {
4848             if let Some(rec) = record {
49-                let row = db::node::NodeRow::from_json(
50-                    did,
51-                    rkey,
52-                    &transform_record(state, collection, rec),
53-                );
49+                let mut row: db::node::NodeRow =
50+                    serde_json::from_value(transform_record(state, collection, rec))?;
51+                row.did = did.to_string();
52+                row.rkey = rkey.to_string();
53+                row.indexed_at = Utc::now();
5454                 db::node::upsert(&state.db, &row).await?;
5555             }
5656         }
@@ -61,11 +61,10 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
6161         // ─── Actor Profile ──────────────────────────────────────────
6262         ("dev.cospan.actor.profile", "create" | "update") => {
6363             if let Some(rec) = record {
64-                let row = db::actor_profile::ActorProfileRow::from_json(
65-                    did,
66-                    rkey,
67-                    &transform_record(state, collection, rec),
68-                );
64+                let mut row: db::actor_profile::ActorProfileRow =
65+                    serde_json::from_value(transform_record(state, collection, rec))?;
66+                row.did = did.to_string();
67+                row.indexed_at = Utc::now();
6968                 db::actor_profile::upsert(&state.db, &row).await?;
7069             }
7170         }
@@ -90,11 +89,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
9089                         .unwrap_or_default()
9190                 };
9291 
93-                let mut row = db::repo::RepoRow::from_json(
94-                    did,
95-                    rkey,
96-                    &transform_record(state, collection, rec),
97-                );
92+                let mut row: db::repo::RepoRow =
93+                    serde_json::from_value(transform_record(state, collection, rec))?;
94+                row.did = did.to_string();
95+                row.rkey = rkey.to_string();
96+                row.indexed_at = Utc::now();
9897                 row.node_did = node_did;
9998                 row.node_url = node_url;
10099                 db::repo::upsert(&state.db, &row).await?;
@@ -118,11 +117,10 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
118117                     .map(|a| a.len() as i32)
119118                     .unwrap_or(0);
120119 
121-                let mut row = db::ref_update::RefUpdateRow::from_json(
122-                    did,
123-                    rkey,
124-                    &transform_record(state, collection, rec),
125-                );
120+                let mut row: db::ref_update::RefUpdateRow =
121+                    serde_json::from_value(transform_record(state, collection, rec))?;
122+                row.rkey = rkey.to_string();
123+                row.indexed_at = Utc::now();
126124                 row.breaking_change_count = breaking_changes;
127125                 db::ref_update::upsert(&state.db, &row).await?;
128126 
@@ -144,11 +142,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
144142         // ─── Issue ──────────────────────────────────────────────────
145143         ("dev.cospan.repo.issue", "create" | "update") => {
146144             if let Some(rec) = record {
147-                let row = db::issue::IssueRow::from_json(
148-                    did,
149-                    rkey,
150-                    &transform_record(state, collection, rec),
151-                );
145+                let mut row: db::issue::IssueRow =
146+                    serde_json::from_value(transform_record(state, collection, rec))?;
147+                row.did = did.to_string();
148+                row.rkey = rkey.to_string();
149+                row.indexed_at = Utc::now();
152150                 db::issue::upsert(&state.db, &row).await?;
153151 
154152                 // Publish SSE event
@@ -181,11 +179,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
181179                     .unwrap_or("")
182180                     .to_string();
183181 
184-                let row = db::issue_comment::IssueCommentRow::from_json(
185-                    did,
186-                    rkey,
187-                    &transform_record(state, collection, rec),
188-                );
182+                let mut row: db::issue_comment::IssueCommentRow =
183+                    serde_json::from_value(transform_record(state, collection, rec))?;
184+                row.did = did.to_string();
185+                row.rkey = rkey.to_string();
186+                row.indexed_at = Utc::now();
189187 
190188                 // Check if this is a new comment (not an update) for counter purposes
191189                 let existing = db::issue_comment::get(&state.db, did, rkey).await?;
@@ -221,11 +219,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
221219                     .unwrap_or("open")
222220                     .to_string();
223221 
224-                let row = db::issue_state::IssueStateRow::from_json(
225-                    did,
226-                    rkey,
227-                    &transform_record(state, collection, rec),
228-                );
222+                let mut row: db::issue_state::IssueStateRow =
223+                    serde_json::from_value(transform_record(state, collection, rec))?;
224+                row.did = did.to_string();
225+                row.rkey = rkey.to_string();
226+                row.indexed_at = Utc::now();
229227                 db::issue_state::upsert(&state.db, &row).await?;
230228 
231229                 // Update the issue's state and repo counters
@@ -274,11 +272,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
274272         // ─── Pull Request ───────────────────────────────────────────
275273         ("dev.cospan.repo.pull", "create" | "update") => {
276274             if let Some(rec) = record {
277-                let row = db::pull::PullRow::from_json(
278-                    did,
279-                    rkey,
280-                    &transform_record(state, collection, rec),
281-                );
275+                let mut row: db::pull::PullRow =
276+                    serde_json::from_value(transform_record(state, collection, rec))?;
277+                row.did = did.to_string();
278+                row.rkey = rkey.to_string();
279+                row.indexed_at = Utc::now();
282280                 db::pull::upsert(&state.db, &row).await?;
283281 
284282                 // Publish SSE event
@@ -310,11 +308,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
310308                     .unwrap_or("")
311309                     .to_string();
312310 
313-                let row = db::pull_comment::PullCommentRow::from_json(
314-                    did,
315-                    rkey,
316-                    &transform_record(state, collection, rec),
317-                );
311+                let mut row: db::pull_comment::PullCommentRow =
312+                    serde_json::from_value(transform_record(state, collection, rec))?;
313+                row.did = did.to_string();
314+                row.rkey = rkey.to_string();
315+                row.indexed_at = Utc::now();
318316 
319317                 // Check if this is a new comment (not an update) for counter purposes
320318                 let existing = db::pull_comment::get(&state.db, did, rkey).await?;
@@ -348,11 +346,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
348346                     .unwrap_or("open")
349347                     .to_string();
350348 
351-                let row = db::pull_state::PullStateRow::from_json(
352-                    did,
353-                    rkey,
354-                    &transform_record(state, collection, rec),
355-                );
349+                let mut row: db::pull_state::PullStateRow =
350+                    serde_json::from_value(transform_record(state, collection, rec))?;
351+                row.did = did.to_string();
352+                row.rkey = rkey.to_string();
353+                row.indexed_at = Utc::now();
356354                 db::pull_state::upsert(&state.db, &row).await?;
357355 
358356                 // Update the pull's state and repo counters
@@ -405,11 +403,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
405403                     .unwrap_or("")
406404                     .to_string();
407405 
408-                let row = db::star::StarRow::from_json(
409-                    did,
410-                    rkey,
411-                    &transform_record(state, collection, rec),
412-                );
406+                let mut row: db::star::StarRow =
407+                    serde_json::from_value(transform_record(state, collection, rec))?;
408+                row.did = did.to_string();
409+                row.rkey = rkey.to_string();
410+                row.indexed_at = Utc::now();
413411 
414412                 let existing = db::star::get(&state.db, did, rkey).await?;
415413                 db::star::upsert(&state.db, &row).await?;
@@ -443,11 +441,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
443441         // ─── Follow ─────────────────────────────────────────────────
444442         ("dev.cospan.graph.follow", "create" | "update") => {
445443             if let Some(rec) = record {
446-                let row = db::follow::FollowRow::from_json(
447-                    did,
448-                    rkey,
449-                    &transform_record(state, collection, rec),
450-                );
444+                let mut row: db::follow::FollowRow =
445+                    serde_json::from_value(transform_record(state, collection, rec))?;
446+                row.did = did.to_string();
447+                row.rkey = rkey.to_string();
448+                row.indexed_at = Utc::now();
451449                 db::follow::upsert(&state.db, &row).await?;
452450             }
453451         }
@@ -458,11 +456,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
458456         // ─── Reaction ───────────────────────────────────────────────
459457         ("dev.cospan.feed.reaction", "create" | "update") => {
460458             if let Some(rec) = record {
461-                let row = db::reaction::ReactionRow::from_json(
462-                    did,
463-                    rkey,
464-                    &transform_record(state, collection, rec),
465-                );
459+                let mut row: db::reaction::ReactionRow =
460+                    serde_json::from_value(transform_record(state, collection, rec))?;
461+                row.did = did.to_string();
462+                row.rkey = rkey.to_string();
463+                row.indexed_at = Utc::now();
466464                 db::reaction::upsert(&state.db, &row).await?;
467465             }
468466         }
@@ -473,11 +471,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
473471         // ─── Label Definition ───────────────────────────────────────
474472         ("dev.cospan.label.definition", "create" | "update") => {
475473             if let Some(rec) = record {
476-                let row = db::label::LabelRow::from_json(
477-                    did,
478-                    rkey,
479-                    &transform_record(state, collection, rec),
480-                );
474+                let mut row: db::label::LabelRow =
475+                    serde_json::from_value(transform_record(state, collection, rec))?;
476+                row.did = did.to_string();
477+                row.rkey = rkey.to_string();
478+                row.indexed_at = Utc::now();
481479                 db::label::upsert(&state.db, &row).await?;
482480             }
483481         }
@@ -488,11 +486,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
488486         // ─── Org ────────────────────────────────────────────────────
489487         ("dev.cospan.org", "create" | "update") => {
490488             if let Some(rec) = record {
491-                let row = db::org::OrgRow::from_json(
492-                    did,
493-                    rkey,
494-                    &transform_record(state, collection, rec),
495-                );
489+                let mut row: db::org::OrgRow =
490+                    serde_json::from_value(transform_record(state, collection, rec))?;
491+                row.did = did.to_string();
492+                row.rkey = rkey.to_string();
493+                row.indexed_at = Utc::now();
496494                 db::org::upsert(&state.db, &row).await?;
497495             }
498496         }
@@ -503,11 +501,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
503501         // ─── Org Member ─────────────────────────────────────────────
504502         ("dev.cospan.org.member", "create" | "update") => {
505503             if let Some(rec) = record {
506-                let row = db::org_member::OrgMemberRow::from_json(
507-                    did,
508-                    rkey,
509-                    &transform_record(state, collection, rec),
510-                );
504+                let mut row: db::org_member::OrgMemberRow =
505+                    serde_json::from_value(transform_record(state, collection, rec))?;
506+                row.did = did.to_string();
507+                row.rkey = rkey.to_string();
508+                row.indexed_at = Utc::now();
511509                 db::org_member::upsert(&state.db, &row).await?;
512510             }
513511         }
@@ -518,11 +516,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
518516         // ─── Collaborator ───────────────────────────────────────────
519517         ("dev.cospan.repo.collaborator", "create" | "update") => {
520518             if let Some(rec) = record {
521-                let row = db::collaborator::CollaboratorRow::from_json(
522-                    did,
523-                    rkey,
524-                    &transform_record(state, collection, rec),
525-                );
519+                let mut row: db::collaborator::CollaboratorRow =
520+                    serde_json::from_value(transform_record(state, collection, rec))?;
521+                row.did = did.to_string();
522+                row.rkey = rkey.to_string();
523+                row.indexed_at = Utc::now();
526524                 db::collaborator::upsert(&state.db, &row).await?;
527525             }
528526         }
@@ -535,11 +533,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
535533             if let Some(rec) = record {
536534                 let checks = rec.get("algebraicChecks");
537535 
538-                let mut row = db::pipeline::PipelineRow::from_json(
539-                    did,
540-                    rkey,
541-                    &transform_record(state, collection, rec),
542-                );
536+                let mut row: db::pipeline::PipelineRow =
537+                    serde_json::from_value(transform_record(state, collection, rec))?;
538+                row.did = did.to_string();
539+                row.rkey = rkey.to_string();
540+                row.indexed_at = Utc::now();
543541                 row.gat_type_check = checks
544542                     .and_then(|c| c.get("gatTypeCheck"))
545543                     .and_then(|v| v.as_str())
@@ -566,11 +564,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
566564         // ─── Dependency ─────────────────────────────────────────────
567565         ("dev.cospan.repo.dependency", "create" | "update") => {
568566             if let Some(rec) = record {
569-                let row = db::dependency::DependencyRow::from_json(
570-                    did,
571-                    rkey,
572-                    &transform_record(state, collection, rec),
573-                );
567+                let mut row: db::dependency::DependencyRow =
568+                    serde_json::from_value(transform_record(state, collection, rec))?;
569+                row.did = did.to_string();
570+                row.rkey = rkey.to_string();
571+                row.indexed_at = Utc::now();
574572                 db::dependency::upsert(&state.db, &row).await?;
575573             }
576574         }
@@ -585,11 +583,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
585583         // ─── Tangled Star ──────────────────────────────────────────
586584         ("sh.tangled.feed.star", "create" | "update") => {
587585             if let Some(rec) = record {
588-                let row = db::star::StarRow::from_json(
589-                    did,
590-                    rkey,
591-                    &transform_record(state, collection, rec),
592-                );
586+                let mut row: db::star::StarRow =
587+                    serde_json::from_value(transform_record(state, collection, rec))?;
588+                row.did = did.to_string();
589+                row.rkey = rkey.to_string();
590+                row.indexed_at = Utc::now();
593591                 let existing = db::star::get(&state.db, did, rkey).await?;
594592                 db::star::upsert(&state.db, &row).await?;
595593 
@@ -612,11 +610,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
612610         // ─── Tangled Follow ────────────────────────────────────────
613611         ("sh.tangled.graph.follow", "create" | "update") => {
614612             if let Some(rec) = record {
615-                let row = db::follow::FollowRow::from_json(
616-                    did,
617-                    rkey,
618-                    &transform_record(state, collection, rec),
619-                );
613+                let mut row: db::follow::FollowRow =
614+                    serde_json::from_value(transform_record(state, collection, rec))?;
615+                row.did = did.to_string();
616+                row.rkey = rkey.to_string();
617+                row.indexed_at = Utc::now();
620618                 db::follow::upsert(&state.db, &row).await?;
621619                 tracing::debug!(did, rkey, "indexed tangled follow");
622620             }
@@ -628,11 +626,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
628626         // ─── Tangled Reaction ──────────────────────────────────────
629627         ("sh.tangled.feed.reaction", "create" | "update") => {
630628             if let Some(rec) = record {
631-                let row = db::reaction::ReactionRow::from_json(
632-                    did,
633-                    rkey,
634-                    &transform_record(state, collection, rec),
635-                );
629+                let mut row: db::reaction::ReactionRow =
630+                    serde_json::from_value(transform_record(state, collection, rec))?;
631+                row.did = did.to_string();
632+                row.rkey = rkey.to_string();
633+                row.indexed_at = Utc::now();
636634                 db::reaction::upsert(&state.db, &row).await?;
637635                 tracing::debug!(did, rkey, "indexed tangled reaction");
638636             }
@@ -644,11 +642,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
644642         // ─── Tangled Issue ─────────────────────────────────────────
645643         ("sh.tangled.repo.issue", "create" | "update") => {
646644             if let Some(rec) = record {
647-                let row = db::issue::IssueRow::from_json(
648-                    did,
649-                    rkey,
650-                    &transform_record(state, collection, rec),
651-                );
645+                let mut row: db::issue::IssueRow =
646+                    serde_json::from_value(transform_record(state, collection, rec))?;
647+                row.did = did.to_string();
648+                row.rkey = rkey.to_string();
649+                row.indexed_at = Utc::now();
652650                 db::issue::upsert(&state.db, &row).await?;
653651                 tracing::debug!(did, rkey, "indexed tangled issue");
654652             }
@@ -677,11 +675,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
677675                     .unwrap_or("open")
678676                     .to_string();
679677 
680-                let row = db::issue_state::IssueStateRow::from_json(
681-                    did,
682-                    rkey,
683-                    &transform_record(state, collection, rec),
684-                );
678+                let mut row: db::issue_state::IssueStateRow =
679+                    serde_json::from_value(transform_record(state, collection, rec))?;
680+                row.did = did.to_string();
681+                row.rkey = rkey.to_string();
682+                row.indexed_at = Utc::now();
685683                 db::issue_state::upsert(&state.db, &row).await?;
686684 
687685                 // Update the issue's state and repo counters
@@ -727,11 +725,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
727725                     .unwrap_or("")
728726                     .to_string();
729727 
730-                let row = db::issue_comment::IssueCommentRow::from_json(
731-                    did,
732-                    rkey,
733-                    &transform_record(state, collection, rec),
734-                );
728+                let mut row: db::issue_comment::IssueCommentRow =
729+                    serde_json::from_value(transform_record(state, collection, rec))?;
730+                row.did = did.to_string();
731+                row.rkey = rkey.to_string();
732+                row.indexed_at = Utc::now();
735733 
736734                 let existing = db::issue_comment::get(&state.db, did, rkey).await?;
737735                 db::issue_comment::upsert(&state.db, &row).await?;
@@ -754,11 +752,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
754752         // ─── Tangled Pull Request ──────────────────────────────────
755753         ("sh.tangled.repo.pull", "create" | "update") => {
756754             if let Some(rec) = record {
757-                let row = db::pull::PullRow::from_json(
758-                    did,
759-                    rkey,
760-                    &transform_record(state, collection, rec),
761-                );
755+                let mut row: db::pull::PullRow =
756+                    serde_json::from_value(transform_record(state, collection, rec))?;
757+                row.did = did.to_string();
758+                row.rkey = rkey.to_string();
759+                row.indexed_at = Utc::now();
762760                 db::pull::upsert(&state.db, &row).await?;
763761                 tracing::debug!(did, rkey, "indexed tangled pull");
764762             }
@@ -780,11 +778,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
780778                     .and_then(|v| v.as_str())
781779                     .unwrap_or("")
782780                     .to_string();
783-                let row = db::pull_state::PullStateRow::from_json(
784-                    did,
785-                    rkey,
786-                    &transform_record(state, collection, rec),
787-                );
781+                let mut row: db::pull_state::PullStateRow =
782+                    serde_json::from_value(transform_record(state, collection, rec))?;
783+                row.did = did.to_string();
784+                row.rkey = rkey.to_string();
785+                row.indexed_at = Utc::now();
788786                 let new_state = row.state.clone();
789787                 db::pull_state::upsert(&state.db, &row).await?;
790788 
@@ -829,11 +827,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
829827                     .unwrap_or("")
830828                     .to_string();
831829 
832-                let row = db::pull_comment::PullCommentRow::from_json(
833-                    did,
834-                    rkey,
835-                    &transform_record(state, collection, rec),
836-                );
830+                let mut row: db::pull_comment::PullCommentRow =
831+                    serde_json::from_value(transform_record(state, collection, rec))?;
832+                row.did = did.to_string();
833+                row.rkey = rkey.to_string();
834+                row.indexed_at = Utc::now();
837835 
838836                 let existing = db::pull_comment::get(&state.db, did, rkey).await?;
839837                 db::pull_comment::upsert(&state.db, &row).await?;
@@ -856,11 +854,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
856854         // ─── Tangled Collaborator ──────────────────────────────────
857855         ("sh.tangled.repo.collaborator", "create" | "update") => {
858856             if let Some(rec) = record {
859-                let row = db::collaborator::CollaboratorRow::from_json(
860-                    did,
861-                    rkey,
862-                    &transform_record(state, collection, rec),
863-                );
857+                let mut row: db::collaborator::CollaboratorRow =
858+                    serde_json::from_value(transform_record(state, collection, rec))?;
859+                row.did = did.to_string();
860+                row.rkey = rkey.to_string();
861+                row.indexed_at = Utc::now();
864862                 db::collaborator::upsert(&state.db, &row).await?;
865863                 tracing::debug!(did, rkey, "indexed tangled collaborator");
866864             }
@@ -872,11 +870,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
872870         // ─── Tangled Knot → Node ──────────────────────────────────
873871         ("sh.tangled.knot", "create" | "update") => {
874872             if let Some(rec) = record {
875-                let row = db::node::NodeRow::from_json(
876-                    did,
877-                    rkey,
878-                    &transform_record(state, collection, rec),
879-                );
873+                let mut row: db::node::NodeRow =
874+                    serde_json::from_value(transform_record(state, collection, rec))?;
875+                row.did = did.to_string();
876+                row.rkey = rkey.to_string();
877+                row.indexed_at = Utc::now();
880878                 db::node::upsert(&state.db, &row).await?;
881879                 tracing::debug!(did, rkey, "indexed tangled knot as node");
882880             }
@@ -918,11 +916,10 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
918916         // ─── Tangled Actor Profile ─────────────────────────────────
919917         ("sh.tangled.actor.profile", "create" | "update") => {
920918             if let Some(rec) = record {
921-                let row = db::actor_profile::ActorProfileRow::from_json(
922-                    did,
923-                    rkey,
924-                    &transform_record(state, collection, rec),
925-                );
919+                let mut row: db::actor_profile::ActorProfileRow =
920+                    serde_json::from_value(transform_record(state, collection, rec))?;
921+                row.did = did.to_string();
922+                row.indexed_at = Utc::now();
926923                 db::actor_profile::upsert(&state.db, &row).await?;
927924                 tracing::debug!(did, rkey, "indexed tangled actor profile");
928925             }
@@ -934,11 +931,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
934931         // ─── Tangled Repo ──────────────────────────────────────────
935932         ("sh.tangled.repo", "create" | "update") => {
936933             if let Some(rec) = record {
937-                let row = db::repo::RepoRow::from_json(
938-                    did,
939-                    rkey,
940-                    &transform_record(state, collection, rec),
941-                );
934+                let mut row: db::repo::RepoRow =
935+                    serde_json::from_value(transform_record(state, collection, rec))?;
936+                row.did = did.to_string();
937+                row.rkey = rkey.to_string();
938+                row.indexed_at = Utc::now();
942939                 db::repo::upsert(&state.db, &row).await?;
943940                 tracing::debug!(did, rkey, "indexed tangled repo");
944941             }
@@ -950,11 +947,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
950947         // ─── Tangled Knot Member → Org Member ─────────────────────
951948         ("sh.tangled.knot.member", "create" | "update") => {
952949             if let Some(rec) = record {
953-                let row = db::org_member::OrgMemberRow::from_json(
954-                    did,
955-                    rkey,
956-                    &transform_record(state, collection, rec),
957-                );
950+                let mut row: db::org_member::OrgMemberRow =
951+                    serde_json::from_value(transform_record(state, collection, rec))?;
952+                row.did = did.to_string();
953+                row.rkey = rkey.to_string();
954+                row.indexed_at = Utc::now();
958955                 db::org_member::upsert(&state.db, &row).await?;
959956                 tracing::debug!(did, rkey, "indexed tangled knot member as org member");
960957             }
@@ -1030,11 +1027,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
10301027         // ─── Tangled Pipeline ──────────────────────────────────────
10311028         ("sh.tangled.pipeline", "create" | "update") => {
10321029             if let Some(rec) = record {
1033-                let row = db::pipeline::PipelineRow::from_json(
1034-                    did,
1035-                    rkey,
1036-                    &transform_record(state, collection, rec),
1037-                );
1030+                let mut row: db::pipeline::PipelineRow =
1031+                    serde_json::from_value(transform_record(state, collection, rec))?;
1032+                row.did = did.to_string();
1033+                row.rkey = rkey.to_string();
1034+                row.indexed_at = Utc::now();
10381035                 db::pipeline::upsert(&state.db, &row).await?;
10391036                 tracing::debug!(did, rkey, "indexed tangled pipeline");
10401037             }
@@ -1091,11 +1088,10 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
10911088         // ─── Tangled Git RefUpdate ─────────────────────────────────
10921089         ("sh.tangled.git.refUpdate", "create" | "update") => {
10931090             if let Some(rec) = record {
1094-                let row = db::ref_update::RefUpdateRow::from_json(
1095-                    did,
1096-                    rkey,
1097-                    &transform_record(state, collection, rec),
1098-                );
1091+                let mut row: db::ref_update::RefUpdateRow =
1092+                    serde_json::from_value(transform_record(state, collection, rec))?;
1093+                row.rkey = rkey.to_string();
1094+                row.indexed_at = Utc::now();
10991095                 db::ref_update::upsert(&state.db, &row).await?;
11001096                 tracing::debug!(did, rkey, "indexed tangled git refUpdate");
11011097             }
@@ -25,22 +25,19 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
2525     let body_vertex = record_body_vertex(nsid);
2626 
2727     match nsid {
28+        // All target keys are camelCase to match #[serde(rename_all = "camelCase")]
29+        // on the Row structs, so serde can deserialize the panproto output directly.
2830         "dev.cospan.repo" => {
2931             transforms.insert(
3032                 Name::from(body_vertex),
3133                 vec![
32-                    // node (AT-URI) → node_did (extract DID)
33-                    at_uri_extract_did("node", "node_did"),
34-                    // node_url is looked up at ingestion time, add empty default
35-                    add_field_str("node_url", ""),
36-                    // Counter defaults (excluded from insert, managed separately)
37-                    add_field_int("star_count", 0),
38-                    add_field_int("fork_count", 0),
39-                    add_field_int("open_issue_count", 0),
40-                    add_field_int("open_mr_count", 0),
41-                    // Source tracking
34+                    at_uri_extract_did("node", "nodeDid"),
35+                    add_field_str("nodeUrl", ""),
36+                    add_field_int("starCount", 0),
37+                    add_field_int("forkCount", 0),
38+                    add_field_int("openIssueCount", 0),
39+                    add_field_int("openMrCount", 0),
4240                     add_field_str("source", "pds"),
43-                    // Drop complex fields not stored in DB columns
4441                     drop_field("node"),
4542                 ],
4643             );
@@ -49,10 +46,9 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
4946             transforms.insert(
5047                 Name::from(body_vertex),
5148                 vec![
52-                    at_uri_extract_did("repo", "repo_did"),
53-                    at_uri_extract_name("repo", "repo_name"),
54-                    // Count breaking changes from array length
55-                    compute_array_len("breakingChanges", "breaking_change_count"),
49+                    at_uri_extract_did("repo", "repoDid"),
50+                    at_uri_extract_name("repo", "repoName"),
51+                    compute_array_len("breakingChanges", "breakingChangeCount"),
5652                     drop_field("repo"),
5753                     drop_field("breakingChanges"),
5854                 ],
@@ -62,10 +58,10 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
6258             transforms.insert(
6359                 Name::from(body_vertex),
6460                 vec![
65-                    at_uri_extract_did("repo", "repo_did"),
66-                    at_uri_extract_name("repo", "repo_name"),
61+                    at_uri_extract_did("repo", "repoDid"),
62+                    at_uri_extract_name("repo", "repoName"),
6763                     add_field_str("state", "open"),
68-                    add_field_int("comment_count", 0),
64+                    add_field_int("commentCount", 0),
6965                     drop_field("repo"),
7066                     drop_field("schemaRefs"),
7167                     drop_field("labels"),
@@ -78,7 +74,7 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
7874             transforms.insert(
7975                 Name::from(body_vertex),
8076                 vec![
81-                    rename_field("issue", "issue_uri"),
77+                    rename_field("issue", "issueUri"),
8278                     drop_field("schemaRefs"),
8379                     drop_field("mentions"),
8480                 ],
@@ -87,17 +83,17 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
8783         "dev.cospan.repo.issue.state" => {
8884             transforms.insert(
8985                 Name::from(body_vertex),
90-                vec![rename_field("issue", "issue_uri")],
86+                vec![rename_field("issue", "issueUri")],
9187             );
9288         }
9389         "dev.cospan.repo.pull" => {
9490             transforms.insert(
9591                 Name::from(body_vertex),
9692                 vec![
97-                    at_uri_extract_did("repo", "repo_did"),
98-                    at_uri_extract_name("repo", "repo_name"),
93+                    at_uri_extract_did("repo", "repoDid"),
94+                    at_uri_extract_name("repo", "repoName"),
9995                     add_field_str("state", "open"),
100-                    add_field_int("comment_count", 0),
96+                    add_field_int("commentCount", 0),
10197                     drop_field("repo"),
10298                     drop_field("mergePreview"),
10399                     drop_field("mentions"),
@@ -109,7 +105,7 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
109105             transforms.insert(
110106                 Name::from(body_vertex),
111107                 vec![
112-                    rename_field("pull", "pull_uri"),
108+                    rename_field("pull", "pullUri"),
113109                     drop_field("schemaRefs"),
114110                     drop_field("mentions"),
115111                 ],
@@ -118,15 +114,14 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
118114         "dev.cospan.repo.pull.state" => {
119115             transforms.insert(
120116                 Name::from(body_vertex),
121-                vec![rename_field("pull", "pull_uri")],
117+                vec![rename_field("pull", "pullUri")],
122118             );
123119         }
124120         "dev.cospan.actor.profile" => {
125121             transforms.insert(
126122                 Name::from(body_vertex),
127123                 vec![
128-                    // Extract avatar CID from blob ref
129-                    path_extract("avatar", vec!["ref", "$link"], "avatar_cid"),
124+                    path_extract("avatar", vec!["ref", "$link"], "avatarCid"),
130125                     drop_field("avatar"),
131126                     drop_field("links"),
132127                 ],
@@ -136,8 +131,8 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
136131             transforms.insert(
137132                 Name::from(body_vertex),
138133                 vec![
139-                    at_uri_extract_did("repo", "repo_did"),
140-                    at_uri_extract_name("repo", "repo_name"),
134+                    at_uri_extract_did("repo", "repoDid"),
135+                    at_uri_extract_name("repo", "repoName"),
141136                     drop_field("repo"),
142137                 ],
143138             );
@@ -146,7 +141,7 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
146141             transforms.insert(
147142                 Name::from(body_vertex),
148143                 vec![
149-                    path_extract("avatar", vec!["ref", "$link"], "avatar_cid"),
144+                    path_extract("avatar", vec!["ref", "$link"], "avatarCid"),
150145                     drop_field("avatar"),
151146                 ],
152147             );
@@ -155,8 +150,8 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
155150             transforms.insert(
156151                 Name::from(body_vertex),
157152                 vec![
158-                    rename_field("org", "org_uri"),
159-                    rename_field("member", "member_did"),
153+                    rename_field("org", "orgUri"),
154+                    rename_field("member", "memberDid"),
160155                 ],
161156             );
162157         }
@@ -164,10 +159,9 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
164159             transforms.insert(
165160                 Name::from(body_vertex),
166161                 vec![
167-                    at_uri_extract_did("repo", "repo_did"),
168-                    at_uri_extract_name("repo", "repo_name"),
169-                    // Lexicon `did` field (collaborator DID) → `member_did`
170-                    rename_field("did", "member_did"),
162+                    at_uri_extract_did("repo", "repoDid"),
163+                    at_uri_extract_name("repo", "repoName"),
164+                    rename_field("did", "memberDid"),
171165                     drop_field("repo"),
172166                 ],
173167             );
@@ -176,10 +170,10 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
176170             transforms.insert(
177171                 Name::from(body_vertex),
178172                 vec![
179-                    at_uri_extract_did("sourceRepo", "source_repo_did"),
180-                    at_uri_extract_name("sourceRepo", "source_repo_name"),
181-                    at_uri_extract_did("targetRepo", "target_repo_did"),
182-                    at_uri_extract_name("targetRepo", "target_repo_name"),
173+                    at_uri_extract_did("sourceRepo", "sourceRepoDid"),
174+                    at_uri_extract_name("sourceRepo", "sourceRepoName"),
175+                    at_uri_extract_did("targetRepo", "targetRepoDid"),
176+                    at_uri_extract_name("targetRepo", "targetRepoName"),
183177                     drop_field("sourceRepo"),
184178                     drop_field("targetRepo"),
185179                 ],
@@ -189,20 +183,19 @@ pub fn db_transforms(nsid: &str) -> HashMap<Name, Vec<FieldTransform>> {
189183             transforms.insert(
190184                 Name::from(body_vertex),
191185                 vec![
192-                    at_uri_extract_did("repo", "repo_did"),
193-                    at_uri_extract_name("repo", "repo_name"),
194-                    // Flatten algebraicChecks sub-object
195-                    path_extract("algebraicChecks", vec!["gatTypeCheck"], "gat_type_check"),
186+                    at_uri_extract_did("repo", "repoDid"),
187+                    at_uri_extract_name("repo", "repoName"),
188+                    path_extract("algebraicChecks", vec!["gatTypeCheck"], "gatTypeCheck"),
196189                     path_extract(
197190                         "algebraicChecks",
198191                         vec!["equationVerification"],
199-                        "equation_verification",
192+                        "equationVerification",
200193                     ),
201-                    path_extract("algebraicChecks", vec!["lensLawCheck"], "lens_law_check"),
194+                    path_extract("algebraicChecks", vec!["lensLawCheck"], "lensLawCheck"),
202195                     path_extract(
203196                         "algebraicChecks",
204197                         vec!["breakingChangeCheck"],
205-                        "breaking_change_check",
198+                        "breakingChangeCheck",
206199                     ),
207200                     drop_field("repo"),
208201                     drop_field("algebraicChecks"),
@@ -331,6 +324,38 @@ fn compute_array_len(source_field: &str, target_field: &str) -> FieldTransform {
331324     }
332325 }
333326 
327+/// Auto-generate camelCase → snake_case renames for all Lexicon fields.
328+/// This ensures `to_json()` output has snake_case keys matching Row struct field names.
329+pub fn auto_camel_to_snake_renames(
330+    schema: &panproto_schema::Schema,
331+    nsid: &str,
332+) -> Vec<FieldTransform> {
333+    let body_vertex = record_body_vertex(nsid);
334+    let props = panproto_protocols::emit::children_by_edge(schema, &body_vertex, "prop");
335+    let mut renames = Vec::new();
336+    for (edge, _) in &props {
337+        if let Some(name) = edge.name.as_ref() {
338+            let camel = name.as_str();
339+            let snake = camel_to_snake(camel);
340+            if camel != snake {
341+                renames.push(rename_field(camel, &snake));
342+            }
343+        }
344+    }
345+    renames
346+}
347+
348+fn camel_to_snake(s: &str) -> String {
349+    let mut r = String::with_capacity(s.len() + 4);
350+    for (i, c) in s.chars().enumerate() {
351+        if c.is_uppercase() && i > 0 {
352+            r.push('_');
353+        }
354+        r.push(c.to_lowercase().next().unwrap_or(c));
355+    }
356+    r
357+}
358+
334359 /// Get the record body vertex ID for a given NSID.
335360 /// ATProto Lexicon schemas have the body under `{nsid}.record`.
336361 fn record_body_vertex(nsid: &str) -> String {
@@ -206,6 +206,15 @@ pub fn emit_row_types(
206206     w.line(&format!("pub struct {} {{", config.row_struct_name));
207207     w.indent();
208208     for col in &cols {
209+        // Add #[serde(default)] for fields not in the Lexicon record
210+        // (did, rkey, indexed_at, counters) so serde_json::from_value works
211+        if col.name == "indexed_at" {
212+            w.line("#[serde(default = \"default_now\")]");
213+        } else if col.rust_type.starts_with("DateTime") {
214+            // DateTime fields from Lexicon are already in the transformed JSON
215+        } else if col.is_counter || col.name == "did" || col.name == "rkey" || col.name == "id" {
216+            w.line("#[serde(default)]");
217+        }
209218         w.line(&format!("pub {}: {},", col.name, col.rust_type));
210219     }
211220     w.dedent();
@@ -385,172 +394,6 @@ pub fn emit_crud(
385394 }
386395 
387396 // ---------------------------------------------------------------------------
388-// Emit from_json deserializer
389-// ---------------------------------------------------------------------------
390-
391-pub fn emit_from_json(
392-    schema: &panproto_schema::Schema,
393-    nsid: &str,
394-    config: &RecordConfig,
395-) -> Result<String> {
396-    let mut w = IndentWriter::new("    ");
397-    let row_name = config.row_struct_name;
398-
399-    w.line(&format!("impl {row_name} {{"));
400-    w.indent();
401-    w.line("/// Deserialize a Jetstream record JSON into a row.");
402-    w.line("#[allow(unused_variables)]");
403-    w.line("pub fn from_json(did: &str, rkey: &str, rec: &serde_json::Value) -> Self {");
404-    w.indent();
405-    w.line("Self {");
406-    w.indent();
407-
408-    if config.has_serial_id {
409-        w.line("id: 0, // assigned by database");
410-    }
411-    if config.include_did {
412-        w.line("did: did.to_string(),");
413-    }
414-    if config.include_rkey {
415-        w.line("rkey: rkey.to_string(),");
416-    }
417-
418-    // URI decompositions
419-    for decomp in config.uri_decompositions {
420-        w.line(&format!(
421-            "// Decompose {} AT-URI → ({}, {})",
422-            decomp.source_field, decomp.did_column, decomp.name_column
423-        ));
424-        // Special case: "node" AT-URI is at-uri pointing to the node record,
425-        // and we look up the node_url from the nodes table at ingestion time.
426-        // For most URIs we just split the AT-URI.
427-        if decomp.did_column == "node_did" {
428-            w.line(&format!(
429-                "{}: parse_at_uri_did(rec, \"{}\"),",
430-                decomp.did_column, decomp.source_field
431-            ));
432-            w.line(&format!(
433-                "{}: String::new(), // looked up from nodes table at ingestion",
434-                decomp.name_column
435-            ));
436-        } else {
437-            w.line(&format!(
438-                "{}: parse_at_uri_did(rec, \"{}\"),",
439-                decomp.did_column, decomp.source_field
440-            ));
441-            w.line(&format!(
442-                "{}: parse_at_uri_name(rec, \"{}\"),",
443-                decomp.name_column, decomp.source_field
444-            ));
445-        }
446-    }
447-
448-    // URI storages (store full AT-URI as renamed column)
449-    for storage in config.uri_storages {
450-        w.line(&format!(
451-            "{}: rec.get(\"{}\").and_then(|v| v.as_str()).unwrap_or(\"\").to_string(),",
452-            storage.column_name, storage.source_field
453-        ));
454-    }
455-
456-    // Field renames
457-    for rename in config.field_renames {
458-        w.line(&format!(
459-            "{}: rec.get(\"{}\").and_then(|v| v.as_str()).unwrap_or(\"\").to_string(),",
460-            rename.column_name, rename.source_field
461-        ));
462-    }
463-
464-    // Lexicon fields
465-    let body_id = find_record_body(schema, nsid);
466-    let props = children_by_edge(schema, &body_id, "prop");
467-    for (edge, prop_vertex) in &props {
468-        let field_name = edge.name.as_ref().map(|n| n.as_str()).unwrap_or("unknown");
469-
470-        if config.skip_fields.contains(&field_name) || field_name == "did" || field_name == "rkey" {
471-            continue;
472-        }
473-
474-        // Check for type overrides that change the extraction
475-        let type_override = config
476-            .type_overrides
477-            .iter()
478-            .find(|o| o.source_field == field_name);
479-
480-        let mut snake = camel_to_snake(field_name);
481-        if snake == "ref" {
482-            snake = "ref_name".to_string();
483-        } else if snake == "type" {
484-            snake = "type_name".to_string();
485-        }
486-
487-        if let Some(ovr) = type_override {
488-            // Generate type-specific extractor based on override
489-            let extractor = match ovr.rust_type {
490-                "Option<f32>" => {
491-                    format!("rec.get(\"{field_name}\").and_then(|v| v.as_f64()).map(|v| v as f32)")
492-                }
493-                "f32" => format!(
494-                    "rec.get(\"{field_name}\").and_then(|v| v.as_f64()).unwrap_or(0.0) as f32"
495-                ),
496-                "i32" => format!(
497-                    "rec.get(\"{field_name}\").and_then(|v| v.as_i64()).unwrap_or(0) as i32"
498-                ),
499-                "Option<i32>" => {
500-                    format!("rec.get(\"{field_name}\").and_then(|v| v.as_i64()).map(|v| v as i32)")
501-                }
502-                _ => format!(
503-                    "rec.get(\"{field_name}\").and_then(|v| v.as_str()).unwrap_or(\"\").to_string()"
504-                ),
505-            };
506-            w.line(&format!("{snake}: {extractor},"));
507-        } else {
508-            let is_required = is_field_required(schema, &body_id, field_name);
509-            let extractor = json_extractor(&prop_vertex.kind, field_name, is_required);
510-            w.line(&format!("{snake}: {extractor},"));
511-        }
512-    }
513-
514-    // Extra denormalized columns
515-    for extra in config.extra_columns {
516-        if extra.name == "avatar_cid" {
517-            w.line("avatar_cid: rec.get(\"avatar\").and_then(|v| v.get(\"ref\")).and_then(|v| v.get(\"$link\")).and_then(|v| v.as_str()).map(String::from),");
518-        } else if extra.exclude_from_insert {
519-            // Counter/auto-managed — use zero/default
520-            let val: &str = match extra.rust_type {
521-                "i32" | "i64" => "0",
522-                "String" => "String::new()",
523-                _ if extra.rust_type.starts_with("Option<") => "None",
524-                _ => "Default::default()",
525-            };
526-            w.line(&format!("{}: {val},", extra.name));
527-        } else if extra.optional {
528-            w.line(&format!("{}: None,", extra.name));
529-        } else {
530-            // Non-optional, non-counter extra — use a sensible default
531-            let val: &str = match extra.rust_type {
532-                "String" => "String::new()",
533-                "i32" | "i64" => "0",
534-                "bool" => "false",
535-                _ => "Default::default()",
536-            };
537-            w.line(&format!("{}: {val},", extra.name));
538-        }
539-    }
540-
541-    w.line("indexed_at: chrono::Utc::now(),");
542-    w.dedent();
543-    w.line("}");
544-    w.dedent();
545-    w.line("}");
546-    w.dedent();
547-    w.line("}");
548-    w.blank();
549-
550-    Ok(w.finish())
551-}
552-
553-// ---------------------------------------------------------------------------
554397 // Helpers
555398 // ---------------------------------------------------------------------------
556399 
@@ -599,59 +442,6 @@ fn lexicon_kind_to_db_types(kind: &panproto_gat::Name, field_name: &str) -> (Str
599442     }
600443 }
601444 
602-fn json_extractor(kind: &panproto_gat::Name, field_name: &str, required: bool) -> String {
603-    // DateTime fields
604-    if field_name.ends_with("At") && kind.as_str() == "string" {
605-        if required {
606-            return format!("parse_datetime(rec, \"{field_name}\")");
607-        } else {
608-            return format!(
609-                "rec.get(\"{field_name}\").and_then(|v| v.as_str()).and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()).map(|dt| dt.with_timezone(&chrono::Utc))"
610-            );
611-        }
612-    }
613-
614-    match kind.as_str() {
615-        "string" | "ref" | "token" | "cid-link" => {
616-            if required {
617-                format!(
618-                    "rec.get(\"{field_name}\").and_then(|v| v.as_str()).unwrap_or(\"\").to_string()"
619-                )
620-            } else {
621-                format!("rec.get(\"{field_name}\").and_then(|v| v.as_str()).map(String::from)")
622-            }
623-        }
624-        "integer" => {
625-            if required {
626-                format!("rec.get(\"{field_name}\").and_then(|v| v.as_i64()).unwrap_or(0)")
627-            } else {
628-                format!("rec.get(\"{field_name}\").and_then(|v| v.as_i64())")
629-            }
630-        }
631-        "number" => {
632-            if required {
633-                format!("rec.get(\"{field_name}\").and_then(|v| v.as_f64()).unwrap_or(0.0)")
634-            } else {
635-                format!("rec.get(\"{field_name}\").and_then(|v| v.as_f64())")
636-            }
637-        }
638-        "boolean" => {
639-            if required {
640-                format!("rec.get(\"{field_name}\").and_then(|v| v.as_bool()).unwrap_or(false)")
641-            } else {
642-                format!("rec.get(\"{field_name}\").and_then(|v| v.as_bool())")
643-            }
644-        }
645-        _ => {
646-            if required {
647-                format!("rec.get(\"{field_name}\").cloned().unwrap_or(serde_json::Value::Null)")
648-            } else {
649-                format!("rec.get(\"{field_name}\").cloned()")
650-            }
651-        }
652-    }
653-}
654-
655445 fn camel_to_snake(s: &str) -> String {
656446     let mut r = String::with_capacity(s.len() + 4);
657447     for (i, c) in s.chars().enumerate() {
@@ -680,38 +470,3 @@ fn snake_to_camel(s: &str) -> String {
680470     }
681471     result
682472 }
683-
684-// Helper functions emitted into the generated output
685-pub fn emit_helper_functions() -> String {
686-    r#"
687-pub fn parse_datetime(rec: &serde_json::Value, field: &str) -> DateTime<Utc> {
688-    rec.get(field)
689-        .and_then(|v| v.as_str())
690-        .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
691-        .map(|dt| dt.with_timezone(&Utc))
692-        .unwrap_or_else(Utc::now)
693-}
694-
695-pub fn parse_at_uri_did(rec: &serde_json::Value, field: &str) -> String {
696-    rec.get(field)
697-        .and_then(|v| v.as_str())
698-        .and_then(|uri| uri.strip_prefix("at://"))
699-        .and_then(|rest| rest.split('/').next())
700-        .unwrap_or("")
701-        .to_string()
702-}
703-
704-pub fn parse_at_uri_name(rec: &serde_json::Value, field: &str) -> String {
705-    rec.get(field)
706-        .and_then(|v| v.as_str())
707-        .and_then(|uri| uri.strip_prefix("at://"))
708-        .map(|rest| {
709-            let parts: Vec<&str> = rest.splitn(3, '/').collect();
710-            parts.get(2).copied().unwrap_or("")
711-        })
712-        .unwrap_or("")
713-        .to_string()
714-}
715-"#
716-    .to_string()
717-}
@@ -53,7 +53,6 @@ fn main() -> Result<()> {
5353     let mut all_ts = String::new();
5454     let mut all_row_types = String::new();
5555     let mut all_crud = String::new();
56-    let mut all_from_json = String::new();
5756 
5857     // Headers
5958     all_rust.push_str("// Generated by cospan-codegen from packages/lexicons/\n");
@@ -69,19 +68,14 @@ fn main() -> Result<()> {
6968     all_row_types.push_str("// Generated by cospan-codegen from packages/lexicons/\n");
7069     all_row_types.push_str("// Do not edit manually.\n\n");
7170     all_row_types.push_str(
72-        "use chrono::{DateTime, Utc};\nuse sqlx::PgPool;\nuse serde::{Serialize, Deserialize};\n\n",
71+        "use chrono::{DateTime, Utc};\nuse sqlx::PgPool;\nuse serde::{Serialize, Deserialize};\n\n\
72+         fn default_now() -> DateTime<Utc> { Utc::now() }\n\n",
7373     );
74-    all_row_types.push_str(&emit_rows::emit_helper_functions());
75-    all_row_types.push('\n');
7674 
7775     all_crud.push_str("// Generated by cospan-codegen from packages/lexicons/\n");
7876     all_crud.push_str("// Do not edit manually.\n\n");
7977     all_crud.push_str("use chrono::{DateTime, Utc};\nuse sqlx::PgPool;\nuse super::types::*;\n\n");
8078 
81-    all_from_json.push_str("// Generated by cospan-codegen from packages/lexicons/\n");
82-    all_from_json.push_str("// Do not edit manually.\n\n");
83-    all_from_json.push_str("use super::types::*;\n\n");
84-
8579     for lexicon_path in &lexicon_files {
8680         let relative = lexicon_path.strip_prefix(&lexicons_dir).unwrap();
8781         println!("  Processing: {}", relative.display());
@@ -175,15 +169,6 @@ fn main() -> Result<()> {
175169                 }
176170                 Err(e) => eprintln!("  warn: emit_crud for {nsid}: {e}"),
177171             }
178-
179-            // from_json deserializer
180-            match emit_rows::emit_from_json(&atproto_schema, nsid, config) {
181-                Ok(code) => {
182-                    all_from_json.push_str(&format!("// {nsid}\n"));
183-                    all_from_json.push_str(&code);
184-                }
185-                Err(e) => eprintln!("  warn: emit_from_json for {nsid}: {e}"),
186-            }
187172         }
188173     }
189174 
@@ -216,17 +201,15 @@ fn main() -> Result<()> {
216201     fs::write(generated_dir.join("rust/types.rs"), &all_rust)?;
217202     fs::write(generated_dir.join("rust/row_types.rs"), &all_row_types)?;
218203     fs::write(generated_dir.join("rust/crud.rs"), &all_crud)?;
219-    fs::write(generated_dir.join("rust/from_json.rs"), &all_from_json)?;
220204     fs::write(generated_dir.join("sql/schema.sql"), &all_sql)?;
221205     fs::write(generated_dir.join("typescript/types.ts"), &all_ts)?;
222206 
223207     // Write appview-integrated copies and format them
224208     let appview_gen_dir = workspace_root.join("crates/cospan-appview/src/db/generated");
225209     fs::create_dir_all(&appview_gen_dir)?;
226-    let gen_rs_files = ["types.rs", "crud.rs", "from_json.rs"];
210+    let gen_rs_files = ["types.rs", "crud.rs"];
227211     fs::write(appview_gen_dir.join("types.rs"), &all_row_types)?;
228212     fs::write(appview_gen_dir.join("crud.rs"), &all_crud)?;
229-    fs::write(appview_gen_dir.join("from_json.rs"), &all_from_json)?;
230213     fs::write(
231214         appview_gen_dir.join("mod.rs"),
232215         "\
@@ -236,9 +219,6 @@ fn main() -> Result<()> {
236219 pub mod types;
237220 #[allow(dead_code, unused_imports)]
238221 pub mod crud;
239-#[allow(dead_code, unused_imports)]
240-pub mod from_json;
241-
242222 pub use types::*;
243223 ",
244224     )?;
@@ -264,7 +244,6 @@ pub use types::*;
264244     println!("  generated/rust/types.rs                        (Lexicon record types)");
265245     println!("  generated/rust/row_types.rs                    (database Row types)");
266246     println!("  generated/rust/crud.rs                         (CRUD functions)");
267-    println!("  generated/rust/from_json.rs                    (Jetstream deserializers)");
268247     println!("  generated/sql/schema.sql                       (PostgreSQL DDL)");
269248     println!("  generated/sql/baseline.json                    (schema baseline for --check)");
270249     println!("  generated/typescript/types.ts                  (TypeScript interfaces)");
@@ -289,7 +289,7 @@ pub fn compile_db_projections(lexicons_dir: &Path) -> Result<Vec<CompiledDbProje
289289         let mut compiled = panproto_mig::compile(&schema, &schema, &migration)
290290             .map_err(|e| anyhow::anyhow!("compile db projection for {nsid}: {e:?}"))?;
291291 
292-        // Inject DB projection field transforms
292+        // Inject DB projection field transforms (AT-URI decomp, renames, defaults)
293293         let db_transforms = crate::db_projection::db_transforms(nsid);
294294         for (vertex, transforms) in db_transforms {
295295             compiled
cospan · schematic version control on atproto built on AT Protocol