fix: remove redundant rec.get() — panproto transforms handle it - Pipeline algebraicChecks: DB projection path_extract already computes these via panproto expressions, remove manual extraction - RefUpdate breakingChanges: DB projection compute_array_len already counts them, remove manual array length calculation - Repo node_did: DB projection at_uri_extract_did already extracts it, use row.node_did from the transformed result instead of raw rec - Consolidate 3 identical AT-URI parse functions into single parse_at_uri - Remove dead extract_did_from_at_uri (no callers)

Author: Aaron Steven White
Commit e88c3d93a271ff11ff1045f766c151df76c15432
Parent: 5ec6183c44
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
1 file changed +28 -72
@@ -81,41 +81,34 @@ async fn dispatch_special_upsert(
8181     match collection {
8282         // ─── Repo (node URL lookup) ─────────────────────────────────
8383         "dev.cospan.repo" | "sh.tangled.repo" => {
84-            let node_uri = rec.get("node").and_then(|v| v.as_str()).unwrap_or("");
85-            let node_did = extract_did_from_at_uri(node_uri);
86-
87-            let node_url = {
88-                let nodes = db::node::list(&state.db, 1000, None).await?;
89-                nodes
90-                    .iter()
91-                    .find(|n| n.did == node_did)
92-                    .and_then(|n| n.public_endpoint.clone())
93-                    .unwrap_or_default()
94-            };
95-
84+            // DB projection extracts nodeDid from the AT-URI via panproto expression
9685             let mut row: db::repo::RepoRow =
9786                 serde_json::from_value(transform_record(state, collection, rec))?;
9887             row.did = did.to_string();
9988             row.rkey = rkey.to_string();
10089             row.indexed_at = Utc::now();
101-            row.node_did = node_did;
102-            row.node_url = node_url;
90+
91+            // Look up node URL from nodes table (business logic, not schema-derivable)
92+            if !row.node_did.is_empty() {
93+                let nodes = db::node::list(&state.db, 1000, None).await?;
94+                if let Some(url) = nodes
95+                    .iter()
96+                    .find(|n| n.did == row.node_did)
97+                    .and_then(|n| n.public_endpoint.clone())
98+                {
99+                    row.node_url = url;
100+                }
101+            }
103102             db::repo::upsert(&state.db, &row).await?;
104103         }
105104 
106-        // ─── Ref Update (breaking change count + SSE) ───────────────
105+        // ─── Ref Update (breaking change count computed by DB projection + SSE) ──
107106         "dev.cospan.vcs.refUpdate" | "sh.tangled.git.refUpdate" => {
108-            let breaking_changes = rec
109-                .get("breakingChanges")
110-                .and_then(|v| v.as_array())
111-                .map(|a| a.len() as i32)
112-                .unwrap_or(0);
113-
107+            // DB projection compute_array_len handles breakingChanges → breakingChangeCount
114108             let mut row: db::ref_update::RefUpdateRow =
115109                 serde_json::from_value(transform_record(state, collection, rec))?;
116110             row.rkey = rkey.to_string();
117111             row.indexed_at = Utc::now();
118-            row.breaking_change_count = breaking_changes;
119112             db::ref_update::upsert(&state.db, &row).await?;
120113 
121114             let _ = state.event_tx.send(IndexEvent::RefUpdate {
@@ -164,7 +157,7 @@ async fn dispatch_special_upsert(
164157             db::issue_comment::upsert(&state.db, &row).await?;
165158 
166159             if existing.is_none() {
167-                let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&issue_uri);
160+                let (issue_did, issue_rkey) = parse_at_uri(&issue_uri);
168161                 db::issue::increment_comment_count(&state.db, &issue_did, &issue_rkey).await?;
169162             }
170163         }
@@ -189,7 +182,7 @@ async fn dispatch_special_upsert(
189182             row.indexed_at = Utc::now();
190183             db::issue_state::upsert(&state.db, &row).await?;
191184 
192-            let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&issue_uri);
185+            let (issue_did, issue_rkey) = parse_at_uri(&issue_uri);
193186             if let Some(issue) = db::issue::get_by_pk(&state.db, &issue_did, &issue_rkey).await? {
194187                 let old_state = &issue.state;
195188                 if old_state != &new_state {
@@ -261,7 +254,7 @@ async fn dispatch_special_upsert(
261254             db::pull_comment::upsert(&state.db, &row).await?;
262255 
263256             if existing.is_none() {
264-                let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&pull_uri);
257+                let (pull_did, pull_rkey) = parse_at_uri(&pull_uri);
265258                 db::pull::increment_comment_count(&state.db, &pull_did, &pull_rkey).await?;
266259             }
267260         }
@@ -282,7 +275,7 @@ async fn dispatch_special_upsert(
282275             let new_state = row.state.clone();
283276             db::pull_state::upsert(&state.db, &row).await?;
284277 
285-            let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&pull_uri);
278+            let (pull_did, pull_rkey) = parse_at_uri(&pull_uri);
286279             if let Some(pull) = db::pull::get_by_pk(&state.db, &pull_did, &pull_rkey).await? {
287280                 let old_state = &pull.state;
288281                 if old_state != &new_state {
@@ -322,7 +315,7 @@ async fn dispatch_special_upsert(
322315             db::star::upsert(&state.db, &row).await?;
323316 
324317             if existing.is_none() {
325-                let (repo_did, repo_name) = parse_repo_at_uri(&row.subject);
318+                let (repo_did, repo_name) = parse_at_uri(&row.subject);
326319                 db::star::increment_repo_star_count(&state.db, &repo_did, &repo_name).await?;
327320 
328321                 // SSE only for cospan-native events
@@ -337,29 +330,12 @@ async fn dispatch_special_upsert(
337330 
338331         // ─── Pipeline (algebraicChecks extraction) ──────────────────
339332         "dev.cospan.pipeline" | "sh.tangled.pipeline" => {
340-            let checks = rec.get("algebraicChecks");
341-
333+            // DB projection path_extract transforms handle algebraicChecks flattening
342334             let mut row: db::pipeline::PipelineRow =
343335                 serde_json::from_value(transform_record(state, collection, rec))?;
344336             row.did = did.to_string();
345337             row.rkey = rkey.to_string();
346338             row.indexed_at = Utc::now();
347-            row.gat_type_check = checks
348-                .and_then(|c| c.get("gatTypeCheck"))
349-                .and_then(|v| v.as_str())
350-                .map(String::from);
351-            row.equation_verification = checks
352-                .and_then(|c| c.get("equationVerification"))
353-                .and_then(|v| v.as_str())
354-                .map(String::from);
355-            row.lens_law_check = checks
356-                .and_then(|c| c.get("lensLawCheck"))
357-                .and_then(|v| v.as_str())
358-                .map(String::from);
359-            row.breaking_change_check = checks
360-                .and_then(|c| c.get("breakingChangeCheck"))
361-                .and_then(|v| v.as_str())
362-                .map(String::from);
363339             db::pipeline::upsert(&state.db, &row).await?;
364340         }
365341 
@@ -408,7 +384,7 @@ async fn dispatch_special_upsert(
408384         // ─── Tangled Pipeline Status (SQL update) ───────────────────
409385         "sh.tangled.pipeline.status" => {
410386             let pipeline_uri = rec.get("pipeline").and_then(|v| v.as_str()).unwrap_or("");
411-            let (pipeline_did, pipeline_rkey) = parse_at_uri_did_rkey(pipeline_uri);
387+            let (pipeline_did, pipeline_rkey) = parse_at_uri(pipeline_uri);
412388 
413389             let raw_status = rec
414390                 .get("status")
@@ -500,7 +476,7 @@ async fn dispatch_special_delete(
500476         // ─── Issue Comment (counter decrement) ──────────────────────
501477         "dev.cospan.repo.issue.comment" | "sh.tangled.repo.issue.comment" => {
502478             if let Some(comment) = db::issue_comment::get(&state.db, did, rkey).await? {
503-                let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&comment.issue_uri);
479+                let (issue_did, issue_rkey) = parse_at_uri(&comment.issue_uri);
504480                 db::issue::decrement_comment_count(&state.db, &issue_did, &issue_rkey).await?;
505481             }
506482             db::issue_comment::delete(&state.db, did, rkey).await?;
@@ -519,7 +495,7 @@ async fn dispatch_special_delete(
519495         // ─── Pull Comment (counter decrement) ───────────────────────
520496         "dev.cospan.repo.pull.comment" | "sh.tangled.repo.pull.comment" => {
521497             if let Some(comment) = db::pull_comment::get(&state.db, did, rkey).await? {
522-                let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&comment.pull_uri);
498+                let (pull_did, pull_rkey) = parse_at_uri(&comment.pull_uri);
523499                 db::pull::decrement_comment_count(&state.db, &pull_did, &pull_rkey).await?;
524500             }
525501             db::pull_comment::delete(&state.db, did, rkey).await?;
@@ -528,7 +504,7 @@ async fn dispatch_special_delete(
528504         // ─── Star (counter decrement + SSE) ─────────────────────────
529505         "dev.cospan.feed.star" | "sh.tangled.feed.star" => {
530506             if let Some(star) = db::star::get(&state.db, did, rkey).await? {
531-                let (repo_did, repo_name) = parse_repo_at_uri(&star.subject);
507+                let (repo_did, repo_name) = parse_at_uri(&star.subject);
532508                 db::star::decrement_repo_star_count(&state.db, &repo_did, &repo_name).await?;
533509 
534510                 if collection.starts_with("dev.cospan.") {
@@ -578,29 +554,9 @@ async fn dispatch_special_delete(
578554 
579555 // ─── Helper functions ───────────────────────────────────────────────────────
580556 
581-fn extract_did_from_at_uri(uri: &str) -> String {
582-    // at://did:plc:abc123/collection/rkey -> did:plc:abc123
583-    uri.strip_prefix("at://")
584-        .and_then(|s| s.split('/').next())
585-        .unwrap_or("")
586-        .to_string()
587-}
588-
589-fn parse_repo_at_uri(uri: &str) -> (String, String) {
590-    // at://did:plc:abc123/dev.cospan.repo/repo-name -> (did:plc:abc123, repo-name)
591-    let parts: Vec<&str> = uri
592-        .strip_prefix("at://")
593-        .unwrap_or("")
594-        .splitn(3, '/')
595-        .collect();
596-    let did = parts.first().unwrap_or(&"").to_string();
597-    let name = parts.get(2).unwrap_or(&"").to_string();
598-    (did, name)
599-}
600-
601-/// Parse an AT-URI into (did, rkey) — used for issue/pull URIs.
602-/// at://did:plc:abc123/dev.cospan.repo.issue/tid123 -> (did:plc:abc123, tid123)
603-fn parse_at_uri_did_rkey(uri: &str) -> (String, String) {
557+/// Parse an AT-URI into (did, rkey/name).
558+/// `at://did:plc:abc/dev.cospan.repo/name` → `("did:plc:abc", "name")`
559+fn parse_at_uri(uri: &str) -> (String, String) {
604560     let parts: Vec<&str> = uri
605561         .strip_prefix("at://")
606562         .unwrap_or("")
cospan · schematic version control on atproto built on AT Protocol