fix: use msgpack for compiled morphisms, run rustfmt on generated code CompiledMigration has HashMap<Edge, Edge> and HashMap<(Name, Name), Edge> which JSON can't serialize (non-string keys). Switch to msgpack for the compiled morphism/projection serialization. Also run rustfmt on generated .rs files after writing to ensure cargo fmt --check passes in CI.

Author: Aaron Steven White
Commit 10256d57545d4e630e1caf8864361a82f3a1dbd6
Parent: f00c2c3f6e
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
11 files changed +412 -250
@@ -462,6 +462,7 @@ dependencies = [
462462  "rand 0.8.5",
463463  "redis",
464464  "reqwest",
465+ "rmp-serde",
465466  "serde",
466467  "serde_json",
467468  "sha2",
@@ -491,6 +492,7 @@ dependencies = [
491492  "panproto-mig",
492493  "panproto-protocols",
493494  "panproto-schema",
495+ "rmp-serde",
494496  "serde",
495497  "serde_json",
496498 ]
@@ -31,6 +31,7 @@ sqlx.workspace = true
3131 
3232 serde.workspace = true
3333 serde_json.workspace = true
34+rmp-serde.workspace = true
3435 reqwest.workspace = true
3536 tokio-tungstenite.workspace = true
3637 futures-util.workspace = true
@@ -1,2 +1,2 @@
1-pub use super::generated::types::ActorProfileRow;
21 pub use super::generated::crud::actor_profiles::{delete, get, list, upsert};
2+pub use super::generated::types::ActorProfileRow;
@@ -1,2 +1,2 @@
1-pub use super::generated::types::NodeRow;
21 pub use super::generated::crud::nodes::{delete, get, list, upsert};
2+pub use super::generated::types::NodeRow;
@@ -46,7 +46,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
4646         // ─── Node ───────────────────────────────────────────────────
4747         ("dev.cospan.node", "create" | "update") => {
4848             if let Some(rec) = record {
49-                let row = db::node::NodeRow::from_json(did, rkey, &transform_record(state, collection, rec));
49+                let row = db::node::NodeRow::from_json(
50+                    did,
51+                    rkey,
52+                    &transform_record(state, collection, rec),
53+                );
5054                 db::node::upsert(&state.db, &row).await?;
5155             }
5256         }
@@ -57,7 +61,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
5761         // ─── Actor Profile ──────────────────────────────────────────
5862         ("dev.cospan.actor.profile", "create" | "update") => {
5963             if let Some(rec) = record {
60-                let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, &transform_record(state, collection, rec));
64+                let row = db::actor_profile::ActorProfileRow::from_json(
65+                    did,
66+                    rkey,
67+                    &transform_record(state, collection, rec),
68+                );
6169                 db::actor_profile::upsert(&state.db, &row).await?;
6270             }
6371         }
@@ -82,7 +90,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
8290                         .unwrap_or_default()
8391                 };
8492 
85-                let mut row = db::repo::RepoRow::from_json(did, rkey, &transform_record(state, collection, rec));
93+                let mut row = db::repo::RepoRow::from_json(
94+                    did,
95+                    rkey,
96+                    &transform_record(state, collection, rec),
97+                );
8698                 row.node_did = node_did;
8799                 row.node_url = node_url;
88100                 db::repo::upsert(&state.db, &row).await?;
@@ -106,7 +118,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
106118                     .map(|a| a.len() as i32)
107119                     .unwrap_or(0);
108120 
109-                let mut row = db::ref_update::RefUpdateRow::from_json(did, rkey, &transform_record(state, collection, rec));
121+                let mut row = db::ref_update::RefUpdateRow::from_json(
122+                    did,
123+                    rkey,
124+                    &transform_record(state, collection, rec),
125+                );
110126                 row.breaking_change_count = breaking_changes;
111127                 db::ref_update::upsert(&state.db, &row).await?;
112128 
@@ -128,7 +144,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
128144         // ─── Issue ──────────────────────────────────────────────────
129145         ("dev.cospan.repo.issue", "create" | "update") => {
130146             if let Some(rec) = record {
131-                let row = db::issue::IssueRow::from_json(did, rkey, &transform_record(state, collection, rec));
147+                let row = db::issue::IssueRow::from_json(
148+                    did,
149+                    rkey,
150+                    &transform_record(state, collection, rec),
151+                );
132152                 db::issue::upsert(&state.db, &row).await?;
133153 
134154                 // Publish SSE event
@@ -161,7 +181,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
161181                     .unwrap_or("")
162182                     .to_string();
163183 
164-                let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, &transform_record(state, collection, rec));
184+                let row = db::issue_comment::IssueCommentRow::from_json(
185+                    did,
186+                    rkey,
187+                    &transform_record(state, collection, rec),
188+                );
165189 
166190                 // Check if this is a new comment (not an update) for counter purposes
167191                 let existing = db::issue_comment::get(&state.db, did, rkey).await?;
@@ -197,7 +221,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
197221                     .unwrap_or("open")
198222                     .to_string();
199223 
200-                let row = db::issue_state::IssueStateRow::from_json(did, rkey, &transform_record(state, collection, rec));
224+                let row = db::issue_state::IssueStateRow::from_json(
225+                    did,
226+                    rkey,
227+                    &transform_record(state, collection, rec),
228+                );
201229                 db::issue_state::upsert(&state.db, &row).await?;
202230 
203231                 // Update the issue's state and repo counters
@@ -246,7 +274,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
246274         // ─── Pull Request ───────────────────────────────────────────
247275         ("dev.cospan.repo.pull", "create" | "update") => {
248276             if let Some(rec) = record {
249-                let row = db::pull::PullRow::from_json(did, rkey, &transform_record(state, collection, rec));
277+                let row = db::pull::PullRow::from_json(
278+                    did,
279+                    rkey,
280+                    &transform_record(state, collection, rec),
281+                );
250282                 db::pull::upsert(&state.db, &row).await?;
251283 
252284                 // Publish SSE event
@@ -278,7 +310,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
278310                     .unwrap_or("")
279311                     .to_string();
280312 
281-                let row = db::pull_comment::PullCommentRow::from_json(did, rkey, &transform_record(state, collection, rec));
313+                let row = db::pull_comment::PullCommentRow::from_json(
314+                    did,
315+                    rkey,
316+                    &transform_record(state, collection, rec),
317+                );
282318 
283319                 // Check if this is a new comment (not an update) for counter purposes
284320                 let existing = db::pull_comment::get(&state.db, did, rkey).await?;
@@ -312,7 +348,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
312348                     .unwrap_or("open")
313349                     .to_string();
314350 
315-                let row = db::pull_state::PullStateRow::from_json(did, rkey, &transform_record(state, collection, rec));
351+                let row = db::pull_state::PullStateRow::from_json(
352+                    did,
353+                    rkey,
354+                    &transform_record(state, collection, rec),
355+                );
316356                 db::pull_state::upsert(&state.db, &row).await?;
317357 
318358                 // Update the pull's state and repo counters
@@ -365,7 +405,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
365405                     .unwrap_or("")
366406                     .to_string();
367407 
368-                let row = db::star::StarRow::from_json(did, rkey, &transform_record(state, collection, rec));
408+                let row = db::star::StarRow::from_json(
409+                    did,
410+                    rkey,
411+                    &transform_record(state, collection, rec),
412+                );
369413 
370414                 let existing = db::star::get(&state.db, did, rkey).await?;
371415                 db::star::upsert(&state.db, &row).await?;
@@ -399,7 +443,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
399443         // ─── Follow ─────────────────────────────────────────────────
400444         ("dev.cospan.graph.follow", "create" | "update") => {
401445             if let Some(rec) = record {
402-                let row = db::follow::FollowRow::from_json(did, rkey, &transform_record(state, collection, rec));
446+                let row = db::follow::FollowRow::from_json(
447+                    did,
448+                    rkey,
449+                    &transform_record(state, collection, rec),
450+                );
403451                 db::follow::upsert(&state.db, &row).await?;
404452             }
405453         }
@@ -410,7 +458,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
410458         // ─── Reaction ───────────────────────────────────────────────
411459         ("dev.cospan.feed.reaction", "create" | "update") => {
412460             if let Some(rec) = record {
413-                let row = db::reaction::ReactionRow::from_json(did, rkey, &transform_record(state, collection, rec));
461+                let row = db::reaction::ReactionRow::from_json(
462+                    did,
463+                    rkey,
464+                    &transform_record(state, collection, rec),
465+                );
414466                 db::reaction::upsert(&state.db, &row).await?;
415467             }
416468         }
@@ -421,7 +473,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
421473         // ─── Label Definition ───────────────────────────────────────
422474         ("dev.cospan.label.definition", "create" | "update") => {
423475             if let Some(rec) = record {
424-                let row = db::label::LabelRow::from_json(did, rkey, &transform_record(state, collection, rec));
476+                let row = db::label::LabelRow::from_json(
477+                    did,
478+                    rkey,
479+                    &transform_record(state, collection, rec),
480+                );
425481                 db::label::upsert(&state.db, &row).await?;
426482             }
427483         }
@@ -432,7 +488,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
432488         // ─── Org ────────────────────────────────────────────────────
433489         ("dev.cospan.org", "create" | "update") => {
434490             if let Some(rec) = record {
435-                let row = db::org::OrgRow::from_json(did, rkey, &transform_record(state, collection, rec));
491+                let row = db::org::OrgRow::from_json(
492+                    did,
493+                    rkey,
494+                    &transform_record(state, collection, rec),
495+                );
436496                 db::org::upsert(&state.db, &row).await?;
437497             }
438498         }
@@ -443,7 +503,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
443503         // ─── Org Member ─────────────────────────────────────────────
444504         ("dev.cospan.org.member", "create" | "update") => {
445505             if let Some(rec) = record {
446-                let row = db::org_member::OrgMemberRow::from_json(did, rkey, &transform_record(state, collection, rec));
506+                let row = db::org_member::OrgMemberRow::from_json(
507+                    did,
508+                    rkey,
509+                    &transform_record(state, collection, rec),
510+                );
447511                 db::org_member::upsert(&state.db, &row).await?;
448512             }
449513         }
@@ -454,7 +518,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
454518         // ─── Collaborator ───────────────────────────────────────────
455519         ("dev.cospan.repo.collaborator", "create" | "update") => {
456520             if let Some(rec) = record {
457-                let row = db::collaborator::CollaboratorRow::from_json(did, rkey, &transform_record(state, collection, rec));
521+                let row = db::collaborator::CollaboratorRow::from_json(
522+                    did,
523+                    rkey,
524+                    &transform_record(state, collection, rec),
525+                );
458526                 db::collaborator::upsert(&state.db, &row).await?;
459527             }
460528         }
@@ -467,7 +535,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
467535             if let Some(rec) = record {
468536                 let checks = rec.get("algebraicChecks");
469537 
470-                let mut row = db::pipeline::PipelineRow::from_json(did, rkey, &transform_record(state, collection, rec));
538+                let mut row = db::pipeline::PipelineRow::from_json(
539+                    did,
540+                    rkey,
541+                    &transform_record(state, collection, rec),
542+                );
471543                 row.gat_type_check = checks
472544                     .and_then(|c| c.get("gatTypeCheck"))
473545                     .and_then(|v| v.as_str())
@@ -494,7 +566,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
494566         // ─── Dependency ─────────────────────────────────────────────
495567         ("dev.cospan.repo.dependency", "create" | "update") => {
496568             if let Some(rec) = record {
497-                let row = db::dependency::DependencyRow::from_json(did, rkey, &transform_record(state, collection, rec));
569+                let row = db::dependency::DependencyRow::from_json(
570+                    did,
571+                    rkey,
572+                    &transform_record(state, collection, rec),
573+                );
498574                 db::dependency::upsert(&state.db, &row).await?;
499575             }
500576         }
@@ -509,7 +585,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
509585         // ─── Tangled Star ──────────────────────────────────────────
510586         ("sh.tangled.feed.star", "create" | "update") => {
511587             if let Some(rec) = record {
512-                let row = db::star::StarRow::from_json(did, rkey, &transform_record(state, collection, rec));
588+                let row = db::star::StarRow::from_json(
589+                    did,
590+                    rkey,
591+                    &transform_record(state, collection, rec),
592+                );
513593                 let existing = db::star::get(&state.db, did, rkey).await?;
514594                 db::star::upsert(&state.db, &row).await?;
515595 
@@ -532,7 +612,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
532612         // ─── Tangled Follow ────────────────────────────────────────
533613         ("sh.tangled.graph.follow", "create" | "update") => {
534614             if let Some(rec) = record {
535-                let row = db::follow::FollowRow::from_json(did, rkey, &transform_record(state, collection, rec));
615+                let row = db::follow::FollowRow::from_json(
616+                    did,
617+                    rkey,
618+                    &transform_record(state, collection, rec),
619+                );
536620                 db::follow::upsert(&state.db, &row).await?;
537621                 tracing::debug!(did, rkey, "indexed tangled follow");
538622             }
@@ -544,7 +628,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
544628         // ─── Tangled Reaction ──────────────────────────────────────
545629         ("sh.tangled.feed.reaction", "create" | "update") => {
546630             if let Some(rec) = record {
547-                let row = db::reaction::ReactionRow::from_json(did, rkey, &transform_record(state, collection, rec));
631+                let row = db::reaction::ReactionRow::from_json(
632+                    did,
633+                    rkey,
634+                    &transform_record(state, collection, rec),
635+                );
548636                 db::reaction::upsert(&state.db, &row).await?;
549637                 tracing::debug!(did, rkey, "indexed tangled reaction");
550638             }
@@ -556,7 +644,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
556644         // ─── Tangled Issue ─────────────────────────────────────────
557645         ("sh.tangled.repo.issue", "create" | "update") => {
558646             if let Some(rec) = record {
559-                let row = db::issue::IssueRow::from_json(did, rkey, &transform_record(state, collection, rec));
647+                let row = db::issue::IssueRow::from_json(
648+                    did,
649+                    rkey,
650+                    &transform_record(state, collection, rec),
651+                );
560652                 db::issue::upsert(&state.db, &row).await?;
561653                 tracing::debug!(did, rkey, "indexed tangled issue");
562654             }
@@ -585,7 +677,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
585677                     .unwrap_or("open")
586678                     .to_string();
587679 
588-                let row = db::issue_state::IssueStateRow::from_json(did, rkey, &transform_record(state, collection, rec));
680+                let row = db::issue_state::IssueStateRow::from_json(
681+                    did,
682+                    rkey,
683+                    &transform_record(state, collection, rec),
684+                );
589685                 db::issue_state::upsert(&state.db, &row).await?;
590686 
591687                 // Update the issue's state and repo counters
@@ -631,7 +727,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
631727                     .unwrap_or("")
632728                     .to_string();
633729 
634-                let row = db::issue_comment::IssueCommentRow::from_json(did, rkey, &transform_record(state, collection, rec));
730+                let row = db::issue_comment::IssueCommentRow::from_json(
731+                    did,
732+                    rkey,
733+                    &transform_record(state, collection, rec),
734+                );
635735 
636736                 let existing = db::issue_comment::get(&state.db, did, rkey).await?;
637737                 db::issue_comment::upsert(&state.db, &row).await?;
@@ -654,7 +754,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
654754         // ─── Tangled Pull Request ──────────────────────────────────
655755         ("sh.tangled.repo.pull", "create" | "update") => {
656756             if let Some(rec) = record {
657-                let row = db::pull::PullRow::from_json(did, rkey, &transform_record(state, collection, rec));
757+                let row = db::pull::PullRow::from_json(
758+                    did,
759+                    rkey,
760+                    &transform_record(state, collection, rec),
761+                );
658762                 db::pull::upsert(&state.db, &row).await?;
659763                 tracing::debug!(did, rkey, "indexed tangled pull");
660764             }
@@ -676,7 +780,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
676780                     .and_then(|v| v.as_str())
677781                     .unwrap_or("")
678782                     .to_string();
679-                let row = db::pull_state::PullStateRow::from_json(did, rkey, &transform_record(state, collection, rec));
783+                let row = db::pull_state::PullStateRow::from_json(
784+                    did,
785+                    rkey,
786+                    &transform_record(state, collection, rec),
787+                );
680788                 let new_state = row.state.clone();
681789                 db::pull_state::upsert(&state.db, &row).await?;
682790 
@@ -721,7 +829,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
721829                     .unwrap_or("")
722830                     .to_string();
723831 
724-                let row = db::pull_comment::PullCommentRow::from_json(did, rkey, &transform_record(state, collection, rec));
832+                let row = db::pull_comment::PullCommentRow::from_json(
833+                    did,
834+                    rkey,
835+                    &transform_record(state, collection, rec),
836+                );
725837 
726838                 let existing = db::pull_comment::get(&state.db, did, rkey).await?;
727839                 db::pull_comment::upsert(&state.db, &row).await?;
@@ -744,7 +856,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
744856         // ─── Tangled Collaborator ──────────────────────────────────
745857         ("sh.tangled.repo.collaborator", "create" | "update") => {
746858             if let Some(rec) = record {
747-                let row = db::collaborator::CollaboratorRow::from_json(did, rkey, &transform_record(state, collection, rec));
859+                let row = db::collaborator::CollaboratorRow::from_json(
860+                    did,
861+                    rkey,
862+                    &transform_record(state, collection, rec),
863+                );
748864                 db::collaborator::upsert(&state.db, &row).await?;
749865                 tracing::debug!(did, rkey, "indexed tangled collaborator");
750866             }
@@ -756,7 +872,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
756872         // ─── Tangled Knot → Node ──────────────────────────────────
757873         ("sh.tangled.knot", "create" | "update") => {
758874             if let Some(rec) = record {
759-                let row = db::node::NodeRow::from_json(did, rkey, &transform_record(state, collection, rec));
875+                let row = db::node::NodeRow::from_json(
876+                    did,
877+                    rkey,
878+                    &transform_record(state, collection, rec),
879+                );
760880                 db::node::upsert(&state.db, &row).await?;
761881                 tracing::debug!(did, rkey, "indexed tangled knot as node");
762882             }
@@ -798,7 +918,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
798918         // ─── Tangled Actor Profile ─────────────────────────────────
799919         ("sh.tangled.actor.profile", "create" | "update") => {
800920             if let Some(rec) = record {
801-                let row = db::actor_profile::ActorProfileRow::from_json(did, rkey, &transform_record(state, collection, rec));
921+                let row = db::actor_profile::ActorProfileRow::from_json(
922+                    did,
923+                    rkey,
924+                    &transform_record(state, collection, rec),
925+                );
802926                 db::actor_profile::upsert(&state.db, &row).await?;
803927                 tracing::debug!(did, rkey, "indexed tangled actor profile");
804928             }
@@ -810,7 +934,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
810934         // ─── Tangled Repo ──────────────────────────────────────────
811935         ("sh.tangled.repo", "create" | "update") => {
812936             if let Some(rec) = record {
813-                let row = db::repo::RepoRow::from_json(did, rkey, &transform_record(state, collection, rec));
937+                let row = db::repo::RepoRow::from_json(
938+                    did,
939+                    rkey,
940+                    &transform_record(state, collection, rec),
941+                );
814942                 db::repo::upsert(&state.db, &row).await?;
815943                 tracing::debug!(did, rkey, "indexed tangled repo");
816944             }
@@ -822,7 +950,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
822950         // ─── Tangled Knot Member → Org Member ─────────────────────
823951         ("sh.tangled.knot.member", "create" | "update") => {
824952             if let Some(rec) = record {
825-                let row = db::org_member::OrgMemberRow::from_json(did, rkey, &transform_record(state, collection, rec));
953+                let row = db::org_member::OrgMemberRow::from_json(
954+                    did,
955+                    rkey,
956+                    &transform_record(state, collection, rec),
957+                );
826958                 db::org_member::upsert(&state.db, &row).await?;
827959                 tracing::debug!(did, rkey, "indexed tangled knot member as org member");
828960             }
@@ -898,7 +1030,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
8981030         // ─── Tangled Pipeline ──────────────────────────────────────
8991031         ("sh.tangled.pipeline", "create" | "update") => {
9001032             if let Some(rec) = record {
901-                let row = db::pipeline::PipelineRow::from_json(did, rkey, &transform_record(state, collection, rec));
1033+                let row = db::pipeline::PipelineRow::from_json(
1034+                    did,
1035+                    rkey,
1036+                    &transform_record(state, collection, rec),
1037+                );
9021038                 db::pipeline::upsert(&state.db, &row).await?;
9031039                 tracing::debug!(did, rkey, "indexed tangled pipeline");
9041040             }
@@ -955,7 +1091,11 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
9551091         // ─── Tangled Git RefUpdate ─────────────────────────────────
9561092         ("sh.tangled.git.refUpdate", "create" | "update") => {
9571093             if let Some(rec) = record {
958-                let row = db::ref_update::RefUpdateRow::from_json(did, rkey, &transform_record(state, collection, rec));
1094+                let row = db::ref_update::RefUpdateRow::from_json(
1095+                    did,
1096+                    rkey,
1097+                    &transform_record(state, collection, rec),
1098+                );
9591099                 db::ref_update::upsert(&state.db, &row).await?;
9601100                 tracing::debug!(did, rkey, "indexed tangled git refUpdate");
9611101             }
cospan · schematic version control on atproto built on AT Protocol