fix: resolve all review findings from code audit Critical fixes: - actor_profiles: set include_rkey=false (table has no rkey column) - ref_updates: fix conflict_keys to (committer_did, rkey) matching the actual unique index idx_ref_updates_rkey - repos: default_branch/visibility are now String (NOT NULL in DB), not Option<String>; added TypeOverride to force correct type - repos: source column now included in INSERT (was excluded by is_counter heuristic); distinguishes pds/tangled/api origins Correctness improvements: - Split ExtraColumn.default into explicit exclude_from_insert flag; counters (star_count etc.) excluded, data columns (source, state) included - Add include_rkey to RecordConfig; actor_profile omits rkey - Fix from_json/from_tangled_json to respect include_rkey/include_did - Suppress unused variable warnings with #[allow(unused_variables)] Cleanup: - Remove dead emit_dispatch_table function - Remove empty if-body in emit_row_types - Remove unused target_table/source_label from TangledMorphism - Remove unused repo_did/repo_name extractions in consumer.rs - Use string literals instead of &format!() for static SQL queries

Author: Aaron Steven White
Commit ecf907af2990bfa6310b7ce3ac182b6d15ad224e
Parent: 12b96fd993
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
7 files changed +99 -175
@@ -15,7 +15,7 @@
1515 					l.label.toLowerCase().includes(searchQuery.toLowerCase()) ||
1616 					l.value.toLowerCase().includes(searchQuery.toLowerCase())
1717 				).slice(0, 20)
18-			: []
18+			: ALL_LANGUAGES.slice(0, 20)
1919 	);
2020 
2121 	function selectProtocol(value: string) {
@@ -78,10 +78,12 @@
7878 					onfocus={() => showDropdown = true}
7979 					onblur={() => setTimeout(() => showDropdown = false, 200)}
8080 					placeholder="Filter by language..."
81-					class="w-44 rounded-full border border-border bg-surface-0 px-3 py-1 text-xs text-text-primary placeholder:text-text-secondary focus:border-accent focus:outline-none"
81+					autocomplete="off"
82+					spellcheck="false"
83+					class="w-48 rounded-full border border-border bg-surface-0 px-3 py-1 text-xs text-text-primary placeholder:text-text-secondary focus:border-accent focus:outline-none"
8284 				/>
8385 
84-				{#if showDropdown && filtered.length > 0}
86+				{#if showDropdown}
8587 					<ul class="absolute left-0 top-full z-50 mt-1 max-h-48 w-56 overflow-y-auto rounded-lg border border-border bg-surface-1 shadow-lg">
8688 						{#each filtered as lang}
8789 							<li>
@@ -83,9 +83,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
8383         // ─── Ref Update ─────────────────────────────────────────────
8484         ("dev.cospan.vcs.refUpdate", "create" | "update") => {
8585             if let Some(rec) = record {
86-                let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
87-                let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
88-
8986                 let breaking_changes = rec
9087                     .get("breakingChanges")
9188                     .and_then(|v| v.as_array())
@@ -114,9 +111,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
114111         // ─── Issue ──────────────────────────────────────────────────
115112         ("dev.cospan.repo.issue", "create" | "update") => {
116113             if let Some(rec) = record {
117-                let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
118-                let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
119-
120114                 let row = db::issue::IssueRow::from_json(did, rkey, rec);
121115                 db::issue::upsert(&state.db, &row).await?;
122116 
@@ -235,9 +229,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
235229         // ─── Pull Request ───────────────────────────────────────────
236230         ("dev.cospan.repo.pull", "create" | "update") => {
237231             if let Some(rec) = record {
238-                let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
239-                let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
240-
241232                 let row = db::pull::PullRow::from_json(did, rkey, rec);
242233                 db::pull::upsert(&state.db, &row).await?;
243234 
@@ -413,9 +404,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
413404         // ─── Label Definition ───────────────────────────────────────
414405         ("dev.cospan.label.definition", "create" | "update") => {
415406             if let Some(rec) = record {
416-                let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
417-                let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
418-
419407                 let row = db::label::LabelRow::from_json(did, rkey, rec);
420408                 db::label::upsert(&state.db, &row).await?;
421409             }
@@ -449,9 +437,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
449437         // ─── Collaborator ───────────────────────────────────────────
450438         ("dev.cospan.repo.collaborator", "create" | "update") => {
451439             if let Some(rec) = record {
452-                let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
453-                let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
454-
455440                 let row = db::collaborator::CollaboratorRow::from_json(did, rkey, rec);
456441                 db::collaborator::upsert(&state.db, &row).await?;
457442             }
@@ -463,9 +448,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
463448         // ─── Pipeline ───────────────────────────────────────────────
464449         ("dev.cospan.pipeline", "create" | "update") => {
465450             if let Some(rec) = record {
466-                let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
467-                let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
468-
469451                 let checks = rec.get("algebraicChecks");
470452 
471453                 let mut row = db::pipeline::PipelineRow::from_json(did, rkey, rec);
@@ -29,7 +29,6 @@ pub async fn handler(
2929 
3030     let row = db::actor_profile::ActorProfileRow {
3131         did: input.did.clone(),
32-        rkey: "self".to_string(),
3332         bluesky: input.bluesky.clone().unwrap_or_default(),
3433         display_name: input.display_name.clone(),
3534         description: input.description.clone(),
@@ -41,8 +41,8 @@ pub async fn handler(
4141         protocol: protocol.clone(),
4242         node_did: String::new(),
4343         node_url: String::new(),
44-        default_branch: Some("main".to_string()),
45-        visibility: Some(visibility.clone()),
44+        default_branch: "main".to_string(),
45+        visibility: visibility.clone(),
4646         source_repo: None,
4747         star_count: 0,
4848         fork_count: 0,
@@ -48,14 +48,16 @@ fn columns_for_record(
4848             is_counter: false,
4949         });
5050     }
51-    cols.push(Column {
52-        name: "rkey".into(),
53-        camel_name: "rkey".into(),
54-        rust_type: "String".into(),
55-        sql_type: "TEXT".into(),
56-        optional: false,
57-        is_counter: false,
58-    });
51+    if config.include_rkey {
52+        cols.push(Column {
53+            name: "rkey".into(),
54+            camel_name: "rkey".into(),
55+            rust_type: "String".into(),
56+            sql_type: "TEXT".into(),
57+            optional: false,
58+            is_counter: false,
59+        });
60+    }
5961 
6062     // 2. URI decomposition columns (replace the AT-URI field with did+name)
6163     for decomp in config.uri_decompositions {
@@ -174,7 +176,7 @@ fn columns_for_record(
174176             rust_type: extra.rust_type.into(),
175177             sql_type: extra.sql_type.into(),
176178             optional: extra.optional,
177-            is_counter: extra.default.is_some(),
179+            is_counter: extra.exclude_from_insert,
178180         });
179181     }
180182 
@@ -208,9 +210,6 @@ pub fn emit_row_types(
208210     w.line(&format!("pub struct {} {{", config.row_struct_name));
209211     w.indent();
210212     for col in &cols {
211-        if col.name != col.camel_name && !col.name.contains("_") || col.camel_name != snake_to_camel(&col.name) {
212-            // Only add rename if the auto-derived camelCase doesn't match
213-        }
214213         w.line(&format!("pub {}: {},", col.name, col.rust_type));
215214     }
216215     w.dedent();
@@ -345,7 +344,7 @@ pub fn emit_crud(
345344     ));
346345     w.indent();
347346     w.line(&format!(
348-        "sqlx::query_as::<_, {row_name}>(&format!(\"SELECT {} FROM {} WHERE {}\"))",
347+        "sqlx::query_as::<_, {row_name}>(\"SELECT {} FROM {} WHERE {}\")",
349348         select_cols,
350349         config.table_name,
351350         where_clauses.join(" AND ")
@@ -410,6 +409,7 @@ pub fn emit_from_json(
410409     w.line(&format!("impl {row_name} {{"));
411410     w.indent();
412411     w.line("/// Deserialize a Jetstream record JSON into a row.");
412+    w.line("#[allow(unused_variables)]");
413413     w.line("pub fn from_json(did: &str, rkey: &str, rec: &serde_json::Value) -> Self {");
414414     w.indent();
415415     w.line("Self {");
@@ -421,7 +421,9 @@ pub fn emit_from_json(
421421     if config.include_did {
422422         w.line("did: did.to_string(),");
423423     }
424-    w.line("rkey: rkey.to_string(),");
424+    if config.include_rkey {
425+        w.line("rkey: rkey.to_string(),");
426+    }
425427 
426428     // URI decompositions
427429     for decomp in config.uri_decompositions {
@@ -530,15 +532,26 @@ pub fn emit_from_json(
530532     for extra in config.extra_columns {
531533         if extra.name == "avatar_cid" {
532534             w.line("avatar_cid: rec.get(\"avatar\").and_then(|v| v.get(\"ref\")).and_then(|v| v.get(\"$link\")).and_then(|v| v.as_str()).map(String::from),");
533-        } else if let Some(default) = extra.default {
534-            let val = match extra.rust_type {
535-                "i32" => format!("{}", default.trim_matches('\'')),
536-                "String" => format!("\"{}\".to_string()", default.trim_matches('\'')),
537-                _ => "Default::default()".into(),
535+        } else if extra.exclude_from_insert {
536+            // Counter/auto-managed — use zero/default
537+            let val: &str = match extra.rust_type {
538+                "i32" | "i64" => "0",
539+                "String" => "String::new()",
540+                _ if extra.rust_type.starts_with("Option<") => "None",
541+                _ => "Default::default()",
538542             };
539543             w.line(&format!("{}: {val},", extra.name));
540-        } else {
544+        } else if extra.optional {
541545             w.line(&format!("{}: None,", extra.name));
546+        } else {
547+            // Non-optional, non-counter extra — use a sensible default
548+            let val: &str = match extra.rust_type {
549+                "String" => "String::new()",
550+                "i32" | "i64" => "0",
551+                "bool" => "false",
552+                _ => "Default::default()",
553+            };
554+            w.line(&format!("{}: {val},", extra.name));
542555         }
543556     }
544557 
@@ -555,72 +568,6 @@ pub fn emit_from_json(
555568 }
556569 
557570 // ---------------------------------------------------------------------------
558-// Emit consumer dispatch
559-// ---------------------------------------------------------------------------
560-
561-pub fn emit_dispatch_table(configs: &[&RecordConfig]) -> Result<String> {
562-    let mut w = IndentWriter::new("    ");
563-
564-    w.line("/// Process a single Jetstream event by dispatching on collection.");
565-    w.line("/// Generated from Lexicon definitions — do not edit manually.");
566-    w.line("pub async fn dispatch_record(");
567-    w.indent();
568-    w.line("pool: &PgPool,");
569-    w.line("collection: &str,");
570-    w.line("operation: &str,");
571-    w.line("did: &str,");
572-    w.line("rkey: &str,");
573-    w.line("record: Option<&serde_json::Value>,");
574-    w.dedent();
575-    w.line(") -> Result<(), sqlx::Error> {");
576-    w.indent();
577-    w.line("match (collection, operation) {");
578-    w.indent();
579-
580-    for config in configs {
581-        let row_name = config.row_struct_name;
582-        let mod_prefix = config.table_name;
583-
584-        // create | update
585-        w.line(&format!(
586-            "(\"{}\", \"create\" | \"update\") => {{",
587-            config.nsid
588-        ));
589-        w.indent();
590-        w.line("if let Some(rec) = record {");
591-        w.indent();
592-        w.line(&format!(
593-            "let row = {row_name}::from_json(did, rkey, rec);"
594-        ));
595-        w.line(&format!("{mod_prefix}::upsert(pool, &row).await?;"));
596-        w.dedent();
597-        w.line("}");
598-        w.dedent();
599-        w.line("}");
600-
601-        // delete
602-        w.line(&format!(
603-            "(\"{}\", \"delete\") => {{",
604-            config.nsid
605-        ));
606-        w.indent();
607-        w.line(&format!("{mod_prefix}::delete(pool, did).await?;"));
608-        w.dedent();
609-        w.line("}");
610-    }
611-
612-    w.line("_ => {} // unknown collection");
613-    w.dedent();
614-    w.line("}");
615-    w.line("Ok(())");
616-    w.dedent();
617-    w.line("}");
618-    w.blank();
619-
620-    Ok(w.finish())
621-}
622-
623-// ---------------------------------------------------------------------------
624571 // Helpers
625572 // ---------------------------------------------------------------------------
626573 
cospan · schematic version control on atproto built on AT Protocol