feat: generate XRPC Input/Params types from Lexicon definitions Add 36 Lexicon files for all query and procedure XRPC endpoints (previously only record types had Lexicons). New emit_xrpc.rs generates Rust Input/Params structs from the parsed Lexicon query/procedure schemas via panproto's ATProto parse_query_procedure_def(). 24 types generated. Output: generated/rust/xrpc_types.rs

Author: Aaron Steven White
Commit 4af490f94bd5fc3aeda3a02f258e5c5700439fa4
Parent: a6e7a01e44
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
46 files changed +2381 -1007
@@ -6,9 +6,11 @@ use crate::db;
66 use crate::state::AppState;
77 use crate::xrpc::sse::IndexEvent;
88 
9+use super::dispatch;
10+
911 /// Transform a record through the pre-compiled panproto morphism.
1012 /// Handles both Cospan (DB projection) and Tangled (interop + DB projection).
11-fn transform_record(
13+pub(super) fn transform_record(
1214     state: &AppState,
1315     collection: &str,
1416     rec: &serde_json::Value,
@@ -42,218 +44,178 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
4244     let rkey = commit.get("rkey").and_then(|v| v.as_str()).unwrap_or("");
4345     let record = commit.get("record");
4446 
45-    match (collection, operation) {
46-        // ─── Node ───────────────────────────────────────────────────
47-        ("dev.cospan.node", "create" | "update") => {
47+    match operation {
48+        "create" | "update" => {
4849             if let Some(rec) = record {
49-                let mut row: db::node::NodeRow =
50-                    serde_json::from_value(transform_record(state, collection, rec))?;
51-                row.did = did.to_string();
52-                row.rkey = rkey.to_string();
53-                row.indexed_at = Utc::now();
54-                db::node::upsert(&state.db, &row).await?;
50+                // Try generic dispatch first for simple records
51+                if dispatch::dispatch_simple_upsert(state, collection, did, rkey, rec).await? {
52+                    return Ok(());
53+                }
54+                // Fall through to special-case handling
55+                dispatch_special_upsert(state, collection, did, rkey, rec).await?;
5556             }
5657         }
57-        ("dev.cospan.node", "delete") => {
58-            db::node::delete(&state.db, did).await?;
59-        }
60-
61-        // ─── Actor Profile ──────────────────────────────────────────
62-        ("dev.cospan.actor.profile", "create" | "update") => {
63-            if let Some(rec) = record {
64-                let mut row: db::actor_profile::ActorProfileRow =
65-                    serde_json::from_value(transform_record(state, collection, rec))?;
66-                row.did = did.to_string();
67-                row.indexed_at = Utc::now();
68-                db::actor_profile::upsert(&state.db, &row).await?;
58+        "delete" => {
59+            // Try generic dispatch first for simple deletes
60+            if dispatch::dispatch_simple_delete(state, collection, did, rkey).await? {
61+                return Ok(());
6962             }
63+            // Fall through to special-case handling
64+            dispatch_special_delete(state, collection, did, rkey).await?;
7065         }
71-        ("dev.cospan.actor.profile", "delete") => {
72-            db::actor_profile::delete(&state.db, did).await?;
73-        }
66+        _ => {}
67+    }
7468 
75-        // ─── Repo ───────────────────────────────────────────────────
76-        ("dev.cospan.repo", "create" | "update") => {
77-            if let Some(rec) = record {
78-                // Extract node DID and URL from the node AT-URI
79-                let node_uri = rec.get("node").and_then(|v| v.as_str()).unwrap_or("");
80-                let node_did = extract_did_from_at_uri(node_uri);
81-
82-                // Look up node URL from nodes table
83-                let node_url = {
84-                    let nodes = db::node::list(&state.db, 1000, None).await?;
85-                    nodes
86-                        .iter()
87-                        .find(|n| n.did == node_did)
88-                        .and_then(|n| n.public_endpoint.clone())
89-                        .unwrap_or_default()
90-                };
91-
92-                let mut row: db::repo::RepoRow =
93-                    serde_json::from_value(transform_record(state, collection, rec))?;
94-                row.did = did.to_string();
95-                row.rkey = rkey.to_string();
96-                row.indexed_at = Utc::now();
97-                row.node_did = node_did;
98-                row.node_url = node_url;
99-                db::repo::upsert(&state.db, &row).await?;
100-            }
101-        }
102-        ("dev.cospan.repo", "delete") => {
103-            // Need to look up repo name from rkey - for now just log
104-            tracing::warn!(
105-                did,
106-                rkey,
107-                "repo delete not fully implemented (need rkey->name lookup)"
108-            );
109-        }
69+    Ok(())
70+}
11071 
111-        // ─── Ref Update ─────────────────────────────────────────────
112-        ("dev.cospan.vcs.refUpdate", "create" | "update") => {
113-            if let Some(rec) = record {
114-                let breaking_changes = rec
115-                    .get("breakingChanges")
116-                    .and_then(|v| v.as_array())
117-                    .map(|a| a.len() as i32)
118-                    .unwrap_or(0);
119-
120-                let mut row: db::ref_update::RefUpdateRow =
121-                    serde_json::from_value(transform_record(state, collection, rec))?;
122-                row.rkey = rkey.to_string();
123-                row.indexed_at = Utc::now();
124-                row.breaking_change_count = breaking_changes;
125-                db::ref_update::upsert(&state.db, &row).await?;
126-
127-                // Publish SSE event
128-                let _ = state.event_tx.send(IndexEvent::RefUpdate {
129-                    repo_did: row.repo_did.clone(),
130-                    repo_name: row.repo_name.clone(),
131-                    ref_name: row.ref_name.clone(),
132-                    new_target: row.new_target.clone(),
133-                    committer_did: row.committer_did.clone(),
134-                    breaking_change_count: row.breaking_change_count,
135-                });
136-            }
137-        }
138-        ("dev.cospan.vcs.refUpdate", "delete") => {
139-            db::ref_update::delete(&state.db, did, rkey).await?;
140-        }
72+// ─── Special-case upserts (records with side effects) ─────────────────────
14173 
142-        // ─── Issue ──────────────────────────────────────────────────
143-        ("dev.cospan.repo.issue", "create" | "update") => {
144-            if let Some(rec) = record {
145-                let mut row: db::issue::IssueRow =
146-                    serde_json::from_value(transform_record(state, collection, rec))?;
147-                row.did = did.to_string();
148-                row.rkey = rkey.to_string();
149-                row.indexed_at = Utc::now();
150-                db::issue::upsert(&state.db, &row).await?;
151-
152-                // Publish SSE event
153-                let _ = state.event_tx.send(IndexEvent::IssueCreated {
154-                    repo_did: row.repo_did.clone(),
155-                    repo_name: row.repo_name.clone(),
156-                    issue_rkey: row.rkey.clone(),
157-                    title: row.title.clone(),
158-                    author_did: row.did.clone(),
159-                });
160-            }
161-        }
162-        ("dev.cospan.repo.issue", "delete") => {
163-            // Look up the issue to decrement repo counter
164-            if let Some(issue) = db::issue::get_by_pk(&state.db, did, rkey).await?
165-                && issue.state == "open"
74+async fn dispatch_special_upsert(
75+    state: &Arc<AppState>,
76+    collection: &str,
77+    did: &str,
78+    rkey: &str,
79+    rec: &serde_json::Value,
80+) -> anyhow::Result<()> {
81+    match collection {
82+        // ─── Repo (node URL lookup) ─────────────────────────────────
83+        "dev.cospan.repo" => {
84+            let node_uri = rec.get("node").and_then(|v| v.as_str()).unwrap_or("");
85+            let node_did = extract_did_from_at_uri(node_uri);
86+
87+            let node_url = {
88+                let nodes = db::node::list(&state.db, 1000, None).await?;
89+                nodes
90+                    .iter()
91+                    .find(|n| n.did == node_did)
92+                    .and_then(|n| n.public_endpoint.clone())
93+                    .unwrap_or_default()
94+            };
95+
96+            let mut row: db::repo::RepoRow =
97+                serde_json::from_value(transform_record(state, collection, rec))?;
98+            row.did = did.to_string();
99+            row.rkey = rkey.to_string();
100+            row.indexed_at = Utc::now();
101+            row.node_did = node_did;
102+            row.node_url = node_url;
103+            db::repo::upsert(&state.db, &row).await?;
104+        }
105+
106+        // ─── Ref Update (breaking change count + SSE) ───────────────
107+        "dev.cospan.vcs.refUpdate" => {
108+            let breaking_changes = rec
109+                .get("breakingChanges")
110+                .and_then(|v| v.as_array())
111+                .map(|a| a.len() as i32)
112+                .unwrap_or(0);
113+
114+            let mut row: db::ref_update::RefUpdateRow =
115+                serde_json::from_value(transform_record(state, collection, rec))?;
116+            row.rkey = rkey.to_string();
117+            row.indexed_at = Utc::now();
118+            row.breaking_change_count = breaking_changes;
119+            db::ref_update::upsert(&state.db, &row).await?;
120+
121+            let _ = state.event_tx.send(IndexEvent::RefUpdate {
122+                repo_did: row.repo_did.clone(),
123+                repo_name: row.repo_name.clone(),
124+                ref_name: row.ref_name.clone(),
125+                new_target: row.new_target.clone(),
126+                committer_did: row.committer_did.clone(),
127+                breaking_change_count: row.breaking_change_count,
128+            });
129+        }
130+
131+        // ─── Issue (SSE event on create) ────────────────────────────
132+        "dev.cospan.repo.issue" => {
133+            let mut row: db::issue::IssueRow =
134+                serde_json::from_value(transform_record(state, collection, rec))?;
135+            row.did = did.to_string();
136+            row.rkey = rkey.to_string();
137+            row.indexed_at = Utc::now();
138+            db::issue::upsert(&state.db, &row).await?;
139+
140+            let _ = state.event_tx.send(IndexEvent::IssueCreated {
141+                repo_did: row.repo_did.clone(),
142+                repo_name: row.repo_name.clone(),
143+                issue_rkey: row.rkey.clone(),
144+                title: row.title.clone(),
145+                author_did: row.did.clone(),
146+            });
147+        }
148+
149+        // ─── Issue Comment (comment count increment) ────────────────
150+        "dev.cospan.repo.issue.comment" | "sh.tangled.repo.issue.comment" => {
151+            let issue_uri = rec
152+                .get("issue")
153+                .and_then(|v| v.as_str())
154+                .unwrap_or("")
155+                .to_string();
156+
157+            let mut row: db::issue_comment::IssueCommentRow =
158+                serde_json::from_value(transform_record(state, collection, rec))?;
159+            row.did = did.to_string();
160+            row.rkey = rkey.to_string();
161+            row.indexed_at = Utc::now();
162+
163+            let existing = db::issue_comment::get(&state.db, did, rkey).await?;
164+            db::issue_comment::upsert(&state.db, &row).await?;
165+
166+            if existing.is_none() {
167+                let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&issue_uri);
168+                db::issue::increment_comment_count(&state.db, &issue_did, &issue_rkey).await?;
169+            }
170+        }
171+
172+        // ─── Issue State (state transitions + counter adjustments + SSE) ─
173+        "dev.cospan.repo.issue.state" | "sh.tangled.repo.issue.state" => {
174+            let issue_uri = rec
175+                .get("issue")
176+                .and_then(|v| v.as_str())
177+                .unwrap_or("")
178+                .to_string();
179+            let new_state = rec
180+                .get("state")
181+                .and_then(|v| v.as_str())
182+                .unwrap_or("open")
183+                .to_string();
184+
185+            let mut row: db::issue_state::IssueStateRow =
186+                serde_json::from_value(transform_record(state, collection, rec))?;
187+            row.did = did.to_string();
188+            row.rkey = rkey.to_string();
189+            row.indexed_at = Utc::now();
190+            db::issue_state::upsert(&state.db, &row).await?;
191+
192+            let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&issue_uri);
193+            if let Some(issue) =
194+                db::issue::get_by_pk(&state.db, &issue_did, &issue_rkey).await?
166195             {
167-                decrement_repo_open_issue_count(&state.db, &issue.repo_did, &issue.repo_name)
168-                    .await?;
169-            }
170-            db::issue::delete(&state.db, did, rkey).await?;
171-        }
172-
173-        // ─── Issue Comment ──────────────────────────────────────────
174-        ("dev.cospan.repo.issue.comment", "create" | "update") => {
175-            if let Some(rec) = record {
176-                let issue_uri = rec
177-                    .get("issue")
178-                    .and_then(|v| v.as_str())
179-                    .unwrap_or("")
180-                    .to_string();
181-
182-                let mut row: db::issue_comment::IssueCommentRow =
183-                    serde_json::from_value(transform_record(state, collection, rec))?;
184-                row.did = did.to_string();
185-                row.rkey = rkey.to_string();
186-                row.indexed_at = Utc::now();
187-
188-                // Check if this is a new comment (not an update) for counter purposes
189-                let existing = db::issue_comment::get(&state.db, did, rkey).await?;
190-                db::issue_comment::upsert(&state.db, &row).await?;
191-
192-                if existing.is_none() {
193-                    // Increment comment count on the issue
194-                    let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&issue_uri);
195-                    db::issue::increment_comment_count(&state.db, &issue_did, &issue_rkey).await?;
196-                }
197-            }
198-        }
199-        ("dev.cospan.repo.issue.comment", "delete") => {
200-            // Look up the comment to decrement the issue counter
201-            if let Some(comment) = db::issue_comment::get(&state.db, did, rkey).await? {
202-                let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&comment.issue_uri);
203-                db::issue::decrement_comment_count(&state.db, &issue_did, &issue_rkey).await?;
204-            }
205-            db::issue_comment::delete(&state.db, did, rkey).await?;
206-        }
196+                let old_state = &issue.state;
197+                if old_state != &new_state {
198+                    db::issue::update_state(&state.db, &issue_did, &issue_rkey, &new_state)
199+                        .await?;
200+
201+                    if old_state == "open" && new_state != "open" {
202+                        decrement_repo_open_issue_count(
203+                            &state.db,
204+                            &issue.repo_did,
205+                            &issue.repo_name,
206+                        )
207+                        .await?;
208+                    } else if old_state != "open" && new_state == "open" {
209+                        increment_repo_open_issue_count(
210+                            &state.db,
211+                            &issue.repo_did,
212+                            &issue.repo_name,
213+                        )
214+                        .await?;
215+                    }
207216 
208-        // ─── Issue State ────────────────────────────────────────────
209-        ("dev.cospan.repo.issue.state", "create" | "update") => {
210-            if let Some(rec) = record {
211-                let issue_uri = rec
212-                    .get("issue")
213-                    .and_then(|v| v.as_str())
214-                    .unwrap_or("")
215-                    .to_string();
216-                let new_state = rec
217-                    .get("state")
218-                    .and_then(|v| v.as_str())
219-                    .unwrap_or("open")
220-                    .to_string();
221-
222-                let mut row: db::issue_state::IssueStateRow =
223-                    serde_json::from_value(transform_record(state, collection, rec))?;
224-                row.did = did.to_string();
225-                row.rkey = rkey.to_string();
226-                row.indexed_at = Utc::now();
227-                db::issue_state::upsert(&state.db, &row).await?;
228-
229-                // Update the issue's state and repo counters
230-                let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&issue_uri);
231-                if let Some(issue) =
232-                    db::issue::get_by_pk(&state.db, &issue_did, &issue_rkey).await?
233-                {
234-                    let old_state = &issue.state;
235-                    if old_state != &new_state {
236-                        db::issue::update_state(&state.db, &issue_did, &issue_rkey, &new_state)
237-                            .await?;
238-
239-                        // Adjust repo open_issue_count
240-                        if old_state == "open" && new_state != "open" {
241-                            decrement_repo_open_issue_count(
242-                                &state.db,
243-                                &issue.repo_did,
244-                                &issue.repo_name,
245-                            )
246-                            .await?;
247-                        } else if old_state != "open" && new_state == "open" {
248-                            increment_repo_open_issue_count(
249-                                &state.db,
250-                                &issue.repo_did,
251-                                &issue.repo_name,
252-                            )
253-                            .await?;
254-                        }
255-
256-                        // Publish SSE event
217+                    // SSE only for cospan-native events
218+                    if collection.starts_with("dev.cospan.") {
257219                         let _ = state.event_tx.send(IndexEvent::IssueStateChanged {
258220                             repo_did: issue.repo_did.clone(),
259221                             repo_name: issue.repo_name.clone(),
@@ -265,120 +227,89 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
265227                 }
266228             }
267229         }
268-        ("dev.cospan.repo.issue.state", "delete") => {
269-            db::issue_state::delete(&state.db, did, rkey).await?;
270-        }
271230 
272-        // ─── Pull Request ───────────────────────────────────────────
273-        ("dev.cospan.repo.pull", "create" | "update") => {
274-            if let Some(rec) = record {
275-                let mut row: db::pull::PullRow =
276-                    serde_json::from_value(transform_record(state, collection, rec))?;
277-                row.did = did.to_string();
278-                row.rkey = rkey.to_string();
279-                row.indexed_at = Utc::now();
280-                db::pull::upsert(&state.db, &row).await?;
281-
282-                // Publish SSE event
283-                let _ = state.event_tx.send(IndexEvent::PullCreated {
284-                    repo_did: row.repo_did.clone(),
285-                    repo_name: row.repo_name.clone(),
286-                    pull_rkey: row.rkey.clone(),
287-                    title: row.title.clone(),
288-                    author_did: row.did.clone(),
289-                });
290-            }
291-        }
292-        ("dev.cospan.repo.pull", "delete") => {
293-            // Look up the pull to decrement repo counter
294-            if let Some(pull) = db::pull::get_by_pk(&state.db, did, rkey).await?
295-                && pull.state == "open"
296-            {
297-                decrement_repo_open_mr_count(&state.db, &pull.repo_did, &pull.repo_name).await?;
298-            }
299-            db::pull::delete(&state.db, did, rkey).await?;
300-        }
231+        // ─── Pull Request (SSE event on create) ─────────────────────
232+        "dev.cospan.repo.pull" => {
233+            let mut row: db::pull::PullRow =
234+                serde_json::from_value(transform_record(state, collection, rec))?;
235+            row.did = did.to_string();
236+            row.rkey = rkey.to_string();
237+            row.indexed_at = Utc::now();
238+            db::pull::upsert(&state.db, &row).await?;
301239 
302-        // ─── Pull Comment ───────────────────────────────────────────
303-        ("dev.cospan.repo.pull.comment", "create" | "update") => {
304-            if let Some(rec) = record {
305-                let pull_uri = rec
306-                    .get("pull")
307-                    .and_then(|v| v.as_str())
308-                    .unwrap_or("")
309-                    .to_string();
310-
311-                let mut row: db::pull_comment::PullCommentRow =
312-                    serde_json::from_value(transform_record(state, collection, rec))?;
313-                row.did = did.to_string();
314-                row.rkey = rkey.to_string();
315-                row.indexed_at = Utc::now();
316-
317-                // Check if this is a new comment (not an update) for counter purposes
318-                let existing = db::pull_comment::get(&state.db, did, rkey).await?;
319-                db::pull_comment::upsert(&state.db, &row).await?;
320-
321-                if existing.is_none() {
322-                    let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&pull_uri);
323-                    db::pull::increment_comment_count(&state.db, &pull_did, &pull_rkey).await?;
324-                }
325-            }
326-        }
327-        ("dev.cospan.repo.pull.comment", "delete") => {
328-            if let Some(comment) = db::pull_comment::get(&state.db, did, rkey).await? {
329-                let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&comment.pull_uri);
330-                db::pull::decrement_comment_count(&state.db, &pull_did, &pull_rkey).await?;
331-            }
332-            db::pull_comment::delete(&state.db, did, rkey).await?;
240+            let _ = state.event_tx.send(IndexEvent::PullCreated {
241+                repo_did: row.repo_did.clone(),
242+                repo_name: row.repo_name.clone(),
243+                pull_rkey: row.rkey.clone(),
244+                title: row.title.clone(),
245+                author_did: row.did.clone(),
246+            });
333247         }
334248 
335-        // ─── Pull State ─────────────────────────────────────────────
336-        ("dev.cospan.repo.pull.state", "create" | "update") => {
337-            if let Some(rec) = record {
338-                let pull_uri = rec
339-                    .get("pull")
340-                    .and_then(|v| v.as_str())
341-                    .unwrap_or("")
342-                    .to_string();
343-                let new_state = rec
344-                    .get("state")
345-                    .and_then(|v| v.as_str())
346-                    .unwrap_or("open")
347-                    .to_string();
348-
349-                let mut row: db::pull_state::PullStateRow =
350-                    serde_json::from_value(transform_record(state, collection, rec))?;
351-                row.did = did.to_string();
352-                row.rkey = rkey.to_string();
353-                row.indexed_at = Utc::now();
354-                db::pull_state::upsert(&state.db, &row).await?;
355-
356-                // Update the pull's state and repo counters
249+        // ─── Pull Comment (comment count increment) ─────────────────
250+        "dev.cospan.repo.pull.comment" | "sh.tangled.repo.pull.comment" => {
251+            let pull_uri = rec
252+                .get("pull")
253+                .and_then(|v| v.as_str())
254+                .unwrap_or("")
255+                .to_string();
256+
257+            let mut row: db::pull_comment::PullCommentRow =
258+                serde_json::from_value(transform_record(state, collection, rec))?;
259+            row.did = did.to_string();
260+            row.rkey = rkey.to_string();
261+            row.indexed_at = Utc::now();
262+
263+            let existing = db::pull_comment::get(&state.db, did, rkey).await?;
264+            db::pull_comment::upsert(&state.db, &row).await?;
265+
266+            if existing.is_none() {
357267                 let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&pull_uri);
358-                if let Some(pull) = db::pull::get_by_pk(&state.db, &pull_did, &pull_rkey).await? {
359-                    let old_state = &pull.state;
360-                    if old_state != &new_state {
361-                        db::pull::update_state(&state.db, &pull_did, &pull_rkey, &new_state)
362-                            .await?;
363-
364-                        // Adjust repo open_mr_count
365-                        if old_state == "open" && new_state != "open" {
366-                            decrement_repo_open_mr_count(
367-                                &state.db,
368-                                &pull.repo_did,
369-                                &pull.repo_name,
370-                            )
371-                            .await?;
372-                        } else if old_state != "open" && new_state == "open" {
373-                            increment_repo_open_mr_count(
374-                                &state.db,
375-                                &pull.repo_did,
376-                                &pull.repo_name,
377-                            )
378-                            .await?;
379-                        }
380-
381-                        // Publish SSE event
268+                db::pull::increment_comment_count(&state.db, &pull_did, &pull_rkey).await?;
269+            }
270+        }
271+
272+        // ─── Pull State (state transitions + counter adjustments + SSE) ─
273+        "dev.cospan.repo.pull.state" | "sh.tangled.repo.pull.status" => {
274+            let pull_uri = rec
275+                .get("pull")
276+                .and_then(|v| v.as_str())
277+                .unwrap_or("")
278+                .to_string();
279+
280+            let mut row: db::pull_state::PullStateRow =
281+                serde_json::from_value(transform_record(state, collection, rec))?;
282+            row.did = did.to_string();
283+            row.rkey = rkey.to_string();
284+            row.indexed_at = Utc::now();
285+            let new_state = row.state.clone();
286+            db::pull_state::upsert(&state.db, &row).await?;
287+
288+            let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&pull_uri);
289+            if let Some(pull) = db::pull::get_by_pk(&state.db, &pull_did, &pull_rkey).await? {
290+                let old_state = &pull.state;
291+                if old_state != &new_state {
292+                    db::pull::update_state(&state.db, &pull_did, &pull_rkey, &new_state)
293+                        .await?;
294+
295+                    if old_state == "open" && new_state != "open" {
296+                        decrement_repo_open_mr_count(
297+                            &state.db,
298+                            &pull.repo_did,
299+                            &pull.repo_name,
300+                        )
301+                        .await?;
302+                    } else if old_state != "open" && new_state == "open" {
303+                        increment_repo_open_mr_count(
304+                            &state.db,
305+                            &pull.repo_did,
306+                            &pull.repo_name,
307+                        )
308+                        .await?;
309+                    }
310+
311+                    // SSE only for cospan-native events
312+                    if collection.starts_with("dev.cospan.") {
382313                         let _ = state.event_tx.send(IndexEvent::PullStateChanged {
383314                             repo_did: pull.repo_did.clone(),
384315                             repo_name: pull.repo_name.clone(),
@@ -390,268 +321,183 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
390321                 }
391322             }
392323         }
393-        ("dev.cospan.repo.pull.state", "delete") => {
394-            db::pull_state::delete(&state.db, did, rkey).await?;
395-        }
396324 
397-        // ─── Star ───────────────────────────────────────────────────
398-        ("dev.cospan.feed.star", "create" | "update") => {
399-            if let Some(rec) = record {
400-                let subject = rec
401-                    .get("subject")
402-                    .and_then(|v| v.as_str())
403-                    .unwrap_or("")
404-                    .to_string();
405-
406-                let mut row: db::star::StarRow =
407-                    serde_json::from_value(transform_record(state, collection, rec))?;
408-                row.did = did.to_string();
409-                row.rkey = rkey.to_string();
410-                row.indexed_at = Utc::now();
411-
412-                let existing = db::star::get(&state.db, did, rkey).await?;
413-                db::star::upsert(&state.db, &row).await?;
414-
415-                if existing.is_none() {
416-                    let (repo_did, repo_name) = parse_repo_at_uri(&subject);
417-                    db::star::increment_repo_star_count(&state.db, &repo_did, &repo_name).await?;
418-
419-                    // Publish SSE event
325+        // ─── Star (counter increment + SSE) ─────────────────────────
326+        "dev.cospan.feed.star" | "sh.tangled.feed.star" => {
327+            let mut row: db::star::StarRow =
328+                serde_json::from_value(transform_record(state, collection, rec))?;
329+            row.did = did.to_string();
330+            row.rkey = rkey.to_string();
331+            row.indexed_at = Utc::now();
332+
333+            let existing = db::star::get(&state.db, did, rkey).await?;
334+            db::star::upsert(&state.db, &row).await?;
335+
336+            if existing.is_none() {
337+                let (repo_did, repo_name) = parse_repo_at_uri(&row.subject);
338+                db::star::increment_repo_star_count(&state.db, &repo_did, &repo_name).await?;
339+
340+                // SSE only for cospan-native events
341+                if collection.starts_with("dev.cospan.") {
420342                     let _ = state.event_tx.send(IndexEvent::StarCreated {
421343                         did: did.to_string(),
422-                        subject: subject.clone(),
344+                        subject: row.subject.clone(),
423345                     });
424346                 }
425347             }
426348         }
427-        ("dev.cospan.feed.star", "delete") => {
428-            if let Some(star) = db::star::get(&state.db, did, rkey).await? {
429-                let (repo_did, repo_name) = parse_repo_at_uri(&star.subject);
430-                db::star::decrement_repo_star_count(&state.db, &repo_did, &repo_name).await?;
431-
432-                // Publish SSE event
433-                let _ = state.event_tx.send(IndexEvent::StarDeleted {
434-                    did: did.to_string(),
435-                    subject: star.subject.clone(),
436-                });
437-            }
438-            db::star::delete(&state.db, did, rkey).await?;
439-        }
440-
441-        // ─── Follow ─────────────────────────────────────────────────
442-        ("dev.cospan.graph.follow", "create" | "update") => {
443-            if let Some(rec) = record {
444-                let mut row: db::follow::FollowRow =
445-                    serde_json::from_value(transform_record(state, collection, rec))?;
446-                row.did = did.to_string();
447-                row.rkey = rkey.to_string();
448-                row.indexed_at = Utc::now();
449-                db::follow::upsert(&state.db, &row).await?;
450-            }
451-        }
452-        ("dev.cospan.graph.follow", "delete") => {
453-            db::follow::delete(&state.db, did, rkey).await?;
454-        }
455-
456-        // ─── Reaction ───────────────────────────────────────────────
457-        ("dev.cospan.feed.reaction", "create" | "update") => {
458-            if let Some(rec) = record {
459-                let mut row: db::reaction::ReactionRow =
460-                    serde_json::from_value(transform_record(state, collection, rec))?;
461-                row.did = did.to_string();
462-                row.rkey = rkey.to_string();
463-                row.indexed_at = Utc::now();
464-                db::reaction::upsert(&state.db, &row).await?;
465-            }
466-        }
467-        ("dev.cospan.feed.reaction", "delete") => {
468-            db::reaction::delete(&state.db, did, rkey).await?;
469-        }
470-
471-        // ─── Label Definition ───────────────────────────────────────
472-        ("dev.cospan.label.definition", "create" | "update") => {
473-            if let Some(rec) = record {
474-                let mut row: db::label::LabelRow =
475-                    serde_json::from_value(transform_record(state, collection, rec))?;
476-                row.did = did.to_string();
477-                row.rkey = rkey.to_string();
478-                row.indexed_at = Utc::now();
479-                db::label::upsert(&state.db, &row).await?;
480-            }
481-        }
482-        ("dev.cospan.label.definition", "delete") => {
483-            db::label::delete(&state.db, did, rkey).await?;
484-        }
485-
486-        // ─── Org ────────────────────────────────────────────────────
487-        ("dev.cospan.org", "create" | "update") => {
488-            if let Some(rec) = record {
489-                let mut row: db::org::OrgRow =
490-                    serde_json::from_value(transform_record(state, collection, rec))?;
491-                row.did = did.to_string();
492-                row.rkey = rkey.to_string();
493-                row.indexed_at = Utc::now();
494-                db::org::upsert(&state.db, &row).await?;
495-            }
496-        }
497-        ("dev.cospan.org", "delete") => {
498-            db::org::delete(&state.db, did, rkey).await?;
499-        }
500-
501-        // ─── Org Member ─────────────────────────────────────────────
502-        ("dev.cospan.org.member", "create" | "update") => {
503-            if let Some(rec) = record {
504-                let mut row: db::org_member::OrgMemberRow =
505-                    serde_json::from_value(transform_record(state, collection, rec))?;
506-                row.did = did.to_string();
507-                row.rkey = rkey.to_string();
508-                row.indexed_at = Utc::now();
509-                db::org_member::upsert(&state.db, &row).await?;
510-            }
511-        }
512-        ("dev.cospan.org.member", "delete") => {
513-            db::org_member::delete(&state.db, did, rkey).await?;
514-        }
515-
516-        // ─── Collaborator ───────────────────────────────────────────
517-        ("dev.cospan.repo.collaborator", "create" | "update") => {
518-            if let Some(rec) = record {
519-                let mut row: db::collaborator::CollaboratorRow =
520-                    serde_json::from_value(transform_record(state, collection, rec))?;
521-                row.did = did.to_string();
522-                row.rkey = rkey.to_string();
523-                row.indexed_at = Utc::now();
524-                db::collaborator::upsert(&state.db, &row).await?;
525-            }
526-        }
527-        ("dev.cospan.repo.collaborator", "delete") => {
528-            db::collaborator::delete(&state.db, did, rkey).await?;
529-        }
530349 
531-        // ─── Pipeline ───────────────────────────────────────────────
532-        ("dev.cospan.pipeline", "create" | "update") => {
533-            if let Some(rec) = record {
534-                let checks = rec.get("algebraicChecks");
535-
536-                let mut row: db::pipeline::PipelineRow =
537-                    serde_json::from_value(transform_record(state, collection, rec))?;
538-                row.did = did.to_string();
539-                row.rkey = rkey.to_string();
540-                row.indexed_at = Utc::now();
541-                row.gat_type_check = checks
542-                    .and_then(|c| c.get("gatTypeCheck"))
543-                    .and_then(|v| v.as_str())
544-                    .map(String::from);
545-                row.equation_verification = checks
546-                    .and_then(|c| c.get("equationVerification"))
547-                    .and_then(|v| v.as_str())
548-                    .map(String::from);
549-                row.lens_law_check = checks
550-                    .and_then(|c| c.get("lensLawCheck"))
551-                    .and_then(|v| v.as_str())
552-                    .map(String::from);
553-                row.breaking_change_check = checks
554-                    .and_then(|c| c.get("breakingChangeCheck"))
555-                    .and_then(|v| v.as_str())
556-                    .map(String::from);
557-                db::pipeline::upsert(&state.db, &row).await?;
558-            }
559-        }
560-        ("dev.cospan.pipeline", "delete") => {
561-            db::pipeline::delete(&state.db, did, rkey).await?;
350+        // ─── Pipeline (algebraicChecks extraction) ──────────────────
351+        "dev.cospan.pipeline" => {
352+            let checks = rec.get("algebraicChecks");
353+
354+            let mut row: db::pipeline::PipelineRow =
355+                serde_json::from_value(transform_record(state, collection, rec))?;
356+            row.did = did.to_string();
357+            row.rkey = rkey.to_string();
358+            row.indexed_at = Utc::now();
359+            row.gat_type_check = checks
360+                .and_then(|c| c.get("gatTypeCheck"))
361+                .and_then(|v| v.as_str())
362+                .map(String::from);
363+            row.equation_verification = checks
364+                .and_then(|c| c.get("equationVerification"))
365+                .and_then(|v| v.as_str())
366+                .map(String::from);
367+            row.lens_law_check = checks
368+                .and_then(|c| c.get("lensLawCheck"))
369+                .and_then(|v| v.as_str())
370+                .map(String::from);
371+            row.breaking_change_check = checks
372+                .and_then(|c| c.get("breakingChangeCheck"))
373+                .and_then(|v| v.as_str())
374+                .map(String::from);
375+            db::pipeline::upsert(&state.db, &row).await?;
376+        }
377+
378+        // ─── Tangled Spindle → Org (via panproto morphism) ───────────
379+        "sh.tangled.spindle" => {
380+            let mut row: db::org::OrgRow =
381+                serde_json::from_value(transform_record(state, collection, rec))?;
382+            row.did = did.to_string();
383+            row.rkey = rkey.to_string();
384+            // Tangled spindles don't carry a name field; use DID as fallback
385+            if row.name.is_empty() {
386+                row.name = did.to_string();
387+            }
388+            row.indexed_at = Utc::now();
389+            db::org::upsert(&state.db, &row).await?;
390+        }
391+
392+        // ─── Tangled Spindle Member → Org Member (via panproto morphism)
393+        "sh.tangled.spindle.member" => {
394+            let mut row: db::org_member::OrgMemberRow =
395+                serde_json::from_value(transform_record(state, collection, rec))?;
396+            row.did = did.to_string();
397+            row.rkey = rkey.to_string();
398+            // Synthesize the org AT-URI from the DID
399+            if row.org_uri.is_empty() {
400+                row.org_uri = format!("at://{did}/sh.tangled.spindle/self");
401+            }
402+            // Tangled has no role field; default to "member"
403+            if row.role.is_empty() {
404+                row.role = "member".to_string();
405+            }
406+            row.indexed_at = Utc::now();
407+            db::org_member::upsert(&state.db, &row).await?;
408+        }
409+
410+        // ─── Tangled Label Definition (via panproto morphism) ───────
411+        "sh.tangled.label.definition" => {
412+            let mut row: db::label::LabelRow =
413+                serde_json::from_value(transform_record(state, collection, rec))?;
414+            row.did = did.to_string();
415+            row.rkey = rkey.to_string();
416+            row.indexed_at = Utc::now();
417+            db::label::upsert(&state.db, &row).await?;
418+        }
419+
420+        // ─── Tangled Pipeline Status (SQL update) ───────────────────
421+        "sh.tangled.pipeline.status" => {
422+            let pipeline_uri = rec.get("pipeline").and_then(|v| v.as_str()).unwrap_or("");
423+            let (pipeline_did, pipeline_rkey) = parse_at_uri_did_rkey(pipeline_uri);
424+
425+            let raw_status = rec
426+                .get("status")
427+                .and_then(|v| v.as_str())
428+                .unwrap_or("pending");
429+            let mapped_status = match raw_status {
430+                "success" => "passed",
431+                "failed" => "failed",
432+                "cancelled" | "canceled" => "cancelled",
433+                "running" | "in_progress" => "running",
434+                other => other,
435+            };
436+
437+            if !pipeline_did.is_empty() && !pipeline_rkey.is_empty() {
438+                sqlx::query(
439+                    "UPDATE pipelines SET status = $1, indexed_at = NOW() \
440+                     WHERE did = $2 AND rkey = $3",
441+                )
442+                .bind(mapped_status)
443+                .bind(&pipeline_did)
444+                .bind(&pipeline_rkey)
445+                .execute(&state.db)
446+                .await?;
447+            }
448+        }
449+
450+        // ─── Tangled-only records (no Cospan equivalent) ────────────
451+        "sh.tangled.publicKey" | "sh.tangled.string" | "sh.tangled.repo.artifact"
452+        | "sh.tangled.label.op" => {
453+            tracing::debug!(
454+                collection,
455+                did,
456+                rkey,
457+                "tangled-only record skipped (no cospan equivalent)"
458+            );
562459         }
563460 
564-        // ─── Dependency ─────────────────────────────────────────────
565-        ("dev.cospan.repo.dependency", "create" | "update") => {
566-            if let Some(rec) = record {
567-                let mut row: db::dependency::DependencyRow =
568-                    serde_json::from_value(transform_record(state, collection, rec))?;
569-                row.did = did.to_string();
570-                row.rkey = rkey.to_string();
571-                row.indexed_at = Utc::now();
572-                db::dependency::upsert(&state.db, &row).await?;
573-            }
461+        // ─── Catch-alls ─────────────────────────────────────────────
462+        c if c.starts_with("sh.tangled.") => {
463+            tracing::debug!(
464+                collection = c,
465+                "tangled record received for unhandled collection"
466+            );
574467         }
575-        ("dev.cospan.repo.dependency", "delete") => {
576-            db::dependency::delete(&state.db, did, rkey).await?;
468+        c if c.starts_with("dev.cospan.") => {
469+            tracing::debug!(collection = c, "unhandled dev.cospan collection");
577470         }
578471 
579-        // ─── Tangled interop ────────────────────────────────────────
580-        // Translate sh.tangled.* records into dev.cospan.* equivalents and index
581-        // them with source="tangled" and source_uri set to the original AT-URI.
582-
583-        // ─── Tangled Star ──────────────────────────────────────────
584-        ("sh.tangled.feed.star", "create" | "update") => {
585-            if let Some(rec) = record {
586-                let mut row: db::star::StarRow =
587-                    serde_json::from_value(transform_record(state, collection, rec))?;
588-                row.did = did.to_string();
589-                row.rkey = rkey.to_string();
590-                row.indexed_at = Utc::now();
591-                let existing = db::star::get(&state.db, did, rkey).await?;
592-                db::star::upsert(&state.db, &row).await?;
593-
594-                if existing.is_none() {
595-                    let (repo_did, repo_name) = parse_repo_at_uri(&row.subject);
596-                    db::star::increment_repo_star_count(&state.db, &repo_did, &repo_name).await?;
597-                }
472+        _ => {}
473+    }
598474 
599-                tracing::debug!(did, rkey, "indexed tangled star");
600-            }
601-        }
602-        ("sh.tangled.feed.star", "delete") => {
603-            if let Some(star) = db::star::get(&state.db, did, rkey).await? {
604-                let (repo_did, repo_name) = parse_repo_at_uri(&star.subject);
605-                db::star::decrement_repo_star_count(&state.db, &repo_did, &repo_name).await?;
606-            }
607-            db::star::delete(&state.db, did, rkey).await?;
608-        }
475+    Ok(())
476+}
609477 
610-        // ─── Tangled Follow ────────────────────────────────────────
611-        ("sh.tangled.graph.follow", "create" | "update") => {
612-            if let Some(rec) = record {
613-                let mut row: db::follow::FollowRow =
614-                    serde_json::from_value(transform_record(state, collection, rec))?;
615-                row.did = did.to_string();
616-                row.rkey = rkey.to_string();
617-                row.indexed_at = Utc::now();
618-                db::follow::upsert(&state.db, &row).await?;
619-                tracing::debug!(did, rkey, "indexed tangled follow");
620-            }
621-        }
622-        ("sh.tangled.graph.follow", "delete") => {
623-            db::follow::delete(&state.db, did, rkey).await?;
624-        }
478+// ─── Special-case deletes (records with side effects) ──────────────────────
625479 
626-        // ─── Tangled Reaction ──────────────────────────────────────
627-        ("sh.tangled.feed.reaction", "create" | "update") => {
628-            if let Some(rec) = record {
629-                let mut row: db::reaction::ReactionRow =
630-                    serde_json::from_value(transform_record(state, collection, rec))?;
631-                row.did = did.to_string();
632-                row.rkey = rkey.to_string();
633-                row.indexed_at = Utc::now();
634-                db::reaction::upsert(&state.db, &row).await?;
635-                tracing::debug!(did, rkey, "indexed tangled reaction");
636-            }
480+async fn dispatch_special_delete(
481+    state: &Arc<AppState>,
482+    collection: &str,
483+    did: &str,
484+    rkey: &str,
485+) -> anyhow::Result<()> {
486+    match collection {
487+        // ─── Repo (incomplete implementation) ───────────────────────
488+        "dev.cospan.repo" => {
489+            tracing::warn!(
490+                did,
491+                rkey,
492+                "repo delete not fully implemented (need rkey->name lookup)"
493+            );
637494         }
638-        ("sh.tangled.feed.reaction", "delete") => {
639-            db::reaction::delete(&state.db, did, rkey).await?;
495+        "sh.tangled.repo" => {
496+            tracing::warn!(did, rkey, "tangled repo delete (need rkey->name lookup)");
640497         }
641498 
642-        // ─── Tangled Issue ─────────────────────────────────────────
643-        ("sh.tangled.repo.issue", "create" | "update") => {
644-            if let Some(rec) = record {
645-                let mut row: db::issue::IssueRow =
646-                    serde_json::from_value(transform_record(state, collection, rec))?;
647-                row.did = did.to_string();
648-                row.rkey = rkey.to_string();
649-                row.indexed_at = Utc::now();
650-                db::issue::upsert(&state.db, &row).await?;
651-                tracing::debug!(did, rkey, "indexed tangled issue");
652-            }
653-        }
654-        ("sh.tangled.repo.issue", "delete") => {
499+        // ─── Issue (counter decrement on delete) ────────────────────
500+        "dev.cospan.repo.issue" | "sh.tangled.repo.issue" => {
655501             if let Some(issue) = db::issue::get_by_pk(&state.db, did, rkey).await?
656502                 && issue.state == "open"
657503             {
@@ -661,87 +507,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
661507             db::issue::delete(&state.db, did, rkey).await?;
662508         }
663509 
664-        // ─── Tangled Issue State ───────────────────────────────────
665-        ("sh.tangled.repo.issue.state", "create" | "update") => {
666-            if let Some(rec) = record {
667-                let issue_uri = rec
668-                    .get("issue")
669-                    .and_then(|v| v.as_str())
670-                    .unwrap_or("")
671-                    .to_string();
672-                let new_state = rec
673-                    .get("state")
674-                    .and_then(|v| v.as_str())
675-                    .unwrap_or("open")
676-                    .to_string();
677-
678-                let mut row: db::issue_state::IssueStateRow =
679-                    serde_json::from_value(transform_record(state, collection, rec))?;
680-                row.did = did.to_string();
681-                row.rkey = rkey.to_string();
682-                row.indexed_at = Utc::now();
683-                db::issue_state::upsert(&state.db, &row).await?;
684-
685-                // Update the issue's state and repo counters
686-                let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&issue_uri);
687-                if let Some(issue) =
688-                    db::issue::get_by_pk(&state.db, &issue_did, &issue_rkey).await?
689-                {
690-                    let old_state = &issue.state;
691-                    if old_state != &new_state {
692-                        db::issue::update_state(&state.db, &issue_did, &issue_rkey, &new_state)
693-                            .await?;
694-
695-                        if old_state == "open" && new_state != "open" {
696-                            decrement_repo_open_issue_count(
697-                                &state.db,
698-                                &issue.repo_did,
699-                                &issue.repo_name,
700-                            )
701-                            .await?;
702-                        } else if old_state != "open" && new_state == "open" {
703-                            increment_repo_open_issue_count(
704-                                &state.db,
705-                                &issue.repo_did,
706-                                &issue.repo_name,
707-                            )
708-                            .await?;
709-                        }
710-                    }
711-                }
712-                tracing::debug!(did, rkey, "indexed tangled issue state");
713-            }
714-        }
715-        ("sh.tangled.repo.issue.state", "delete") => {
716-            db::issue_state::delete(&state.db, did, rkey).await?;
717-        }
718-
719-        // ─── Tangled Issue Comment ─────────────────────────────────
720-        ("sh.tangled.repo.issue.comment", "create" | "update") => {
721-            if let Some(rec) = record {
722-                let issue_uri = rec
723-                    .get("issue")
724-                    .and_then(|v| v.as_str())
725-                    .unwrap_or("")
726-                    .to_string();
727-
728-                let mut row: db::issue_comment::IssueCommentRow =
729-                    serde_json::from_value(transform_record(state, collection, rec))?;
730-                row.did = did.to_string();
731-                row.rkey = rkey.to_string();
732-                row.indexed_at = Utc::now();
733-
734-                let existing = db::issue_comment::get(&state.db, did, rkey).await?;
735-                db::issue_comment::upsert(&state.db, &row).await?;
736-
737-                if existing.is_none() {
738-                    let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&issue_uri);
739-                    db::issue::increment_comment_count(&state.db, &issue_did, &issue_rkey).await?;
740-                }
741-                tracing::debug!(did, rkey, "indexed tangled issue comment");
742-            }
743-        }
744-        ("sh.tangled.repo.issue.comment", "delete") => {
510+        // ─── Issue Comment (counter decrement) ──────────────────────
511+        "dev.cospan.repo.issue.comment" | "sh.tangled.repo.issue.comment" => {
745512             if let Some(comment) = db::issue_comment::get(&state.db, did, rkey).await? {
746513                 let (issue_did, issue_rkey) = parse_at_uri_did_rkey(&comment.issue_uri);
747514                 db::issue::decrement_comment_count(&state.db, &issue_did, &issue_rkey).await?;
@@ -749,19 +516,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
749516             db::issue_comment::delete(&state.db, did, rkey).await?;
750517         }
751518 
752-        // ─── Tangled Pull Request ──────────────────────────────────
753-        ("sh.tangled.repo.pull", "create" | "update") => {
754-            if let Some(rec) = record {
755-                let mut row: db::pull::PullRow =
756-                    serde_json::from_value(transform_record(state, collection, rec))?;
757-                row.did = did.to_string();
758-                row.rkey = rkey.to_string();
759-                row.indexed_at = Utc::now();
760-                db::pull::upsert(&state.db, &row).await?;
761-                tracing::debug!(did, rkey, "indexed tangled pull");
762-            }
763-        }
764-        ("sh.tangled.repo.pull", "delete") => {
519+        // ─── Pull (counter decrement on delete) ─────────────────────
520+        "dev.cospan.repo.pull" | "sh.tangled.repo.pull" => {
765521             if let Some(pull) = db::pull::get_by_pk(&state.db, did, rkey).await?
766522                 && pull.state == "open"
767523             {
@@ -770,80 +526,8 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
770526             db::pull::delete(&state.db, did, rkey).await?;
771527         }
772528 
773-        // ─── Tangled Pull Status → Pull State ─────────────────────
774-        ("sh.tangled.repo.pull.status", "create" | "update") => {
775-            if let Some(rec) = record {
776-                let pull_uri = rec
777-                    .get("pull")
778-                    .and_then(|v| v.as_str())
779-                    .unwrap_or("")
780-                    .to_string();
781-                let mut row: db::pull_state::PullStateRow =
782-                    serde_json::from_value(transform_record(state, collection, rec))?;
783-                row.did = did.to_string();
784-                row.rkey = rkey.to_string();
785-                row.indexed_at = Utc::now();
786-                let new_state = row.state.clone();
787-                db::pull_state::upsert(&state.db, &row).await?;
788-
789-                // Update the pull's state and repo counters
790-                let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&pull_uri);
791-                if let Some(pull) = db::pull::get_by_pk(&state.db, &pull_did, &pull_rkey).await? {
792-                    let old_state = &pull.state;
793-                    if old_state != &new_state {
794-                        db::pull::update_state(&state.db, &pull_did, &pull_rkey, &new_state)
795-                            .await?;
796-
797-                        if old_state == "open" && new_state != "open" {
798-                            decrement_repo_open_mr_count(
799-                                &state.db,
800-                                &pull.repo_did,
801-                                &pull.repo_name,
802-                            )
803-                            .await?;
804-                        } else if old_state != "open" && new_state == "open" {
805-                            increment_repo_open_mr_count(
806-                                &state.db,
807-                                &pull.repo_did,
808-                                &pull.repo_name,
809-                            )
810-                            .await?;
811-                        }
812-                    }
813-                }
814-                tracing::debug!(did, rkey, "indexed tangled pull status as pull state");
815-            }
816-        }
817-        ("sh.tangled.repo.pull.status", "delete") => {
818-            db::pull_state::delete(&state.db, did, rkey).await?;
819-        }
820-
821-        // ─── Tangled Pull Comment ──────────────────────────────────
822-        ("sh.tangled.repo.pull.comment", "create" | "update") => {
823-            if let Some(rec) = record {
824-                let pull_uri = rec
825-                    .get("pull")
826-                    .and_then(|v| v.as_str())
827-                    .unwrap_or("")
828-                    .to_string();
829-
830-                let mut row: db::pull_comment::PullCommentRow =
831-                    serde_json::from_value(transform_record(state, collection, rec))?;
832-                row.did = did.to_string();
833-                row.rkey = rkey.to_string();
834-                row.indexed_at = Utc::now();
835-
836-                let existing = db::pull_comment::get(&state.db, did, rkey).await?;
837-                db::pull_comment::upsert(&state.db, &row).await?;
838-
839-                if existing.is_none() {
840-                    let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&pull_uri);
841-                    db::pull::increment_comment_count(&state.db, &pull_did, &pull_rkey).await?;
842-                }
843-                tracing::debug!(did, rkey, "indexed tangled pull comment");
844-            }
845-        }
846-        ("sh.tangled.repo.pull.comment", "delete") => {
529+        // ─── Pull Comment (counter decrement) ───────────────────────
530+        "dev.cospan.repo.pull.comment" | "sh.tangled.repo.pull.comment" => {
847531             if let Some(comment) = db::pull_comment::get(&state.db, did, rkey).await? {
848532                 let (pull_did, pull_rkey) = parse_at_uri_did_rkey(&comment.pull_uri);
849533                 db::pull::decrement_comment_count(&state.db, &pull_did, &pull_rkey).await?;
@@ -851,265 +535,30 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
851535             db::pull_comment::delete(&state.db, did, rkey).await?;
852536         }
853537 
854-        // ─── Tangled Collaborator ──────────────────────────────────
855-        ("sh.tangled.repo.collaborator", "create" | "update") => {
856-            if let Some(rec) = record {
857-                let mut row: db::collaborator::CollaboratorRow =
858-                    serde_json::from_value(transform_record(state, collection, rec))?;
859-                row.did = did.to_string();
860-                row.rkey = rkey.to_string();
861-                row.indexed_at = Utc::now();
862-                db::collaborator::upsert(&state.db, &row).await?;
863-                tracing::debug!(did, rkey, "indexed tangled collaborator");
864-            }
865-        }
866-        ("sh.tangled.repo.collaborator", "delete") => {
867-            db::collaborator::delete(&state.db, did, rkey).await?;
868-        }
869-
870-        // ─── Tangled Knot → Node ──────────────────────────────────
871-        ("sh.tangled.knot", "create" | "update") => {
872-            if let Some(rec) = record {
873-                let mut row: db::node::NodeRow =
874-                    serde_json::from_value(transform_record(state, collection, rec))?;
875-                row.did = did.to_string();
876-                row.rkey = rkey.to_string();
877-                row.indexed_at = Utc::now();
878-                db::node::upsert(&state.db, &row).await?;
879-                tracing::debug!(did, rkey, "indexed tangled knot as node");
880-            }
881-        }
882-        ("sh.tangled.knot", "delete") => {
883-            db::node::delete(&state.db, did).await?;
884-        }
885-
886-        // ─── Tangled Spindle → Org ────────────────────────────────
887-        ("sh.tangled.spindle", "create" | "update") => {
888-            if let Some(rec) = record {
889-                // Use the DID as a fallback name; Tangled spindles don't carry a name field
890-                let name = rec
891-                    .get("name")
892-                    .and_then(|v| v.as_str())
893-                    .unwrap_or(did)
894-                    .to_string();
895-
896-                let row = db::org::OrgRow {
897-                    did: did.to_string(),
898-                    rkey: rkey.to_string(),
899-                    name,
900-                    description: rec
901-                        .get("description")
902-                        .and_then(|v| v.as_str())
903-                        .map(String::from),
904-                    avatar_cid: None,
905-                    created_at: parse_datetime(rec, "createdAt"),
906-                    indexed_at: Utc::now(),
907-                };
908-                db::org::upsert(&state.db, &row).await?;
909-                tracing::debug!(did, rkey, "indexed tangled spindle as org");
910-            }
911-        }
912-        ("sh.tangled.spindle", "delete") => {
913-            db::org::delete(&state.db, did, rkey).await?;
914-        }
915-
916-        // ─── Tangled Actor Profile ─────────────────────────────────
917-        ("sh.tangled.actor.profile", "create" | "update") => {
918-            if let Some(rec) = record {
919-                let mut row: db::actor_profile::ActorProfileRow =
920-                    serde_json::from_value(transform_record(state, collection, rec))?;
921-                row.did = did.to_string();
922-                row.indexed_at = Utc::now();
923-                db::actor_profile::upsert(&state.db, &row).await?;
924-                tracing::debug!(did, rkey, "indexed tangled actor profile");
925-            }
926-        }
927-        ("sh.tangled.actor.profile", "delete") => {
928-            db::actor_profile::delete(&state.db, did).await?;
929-        }
930-
931-        // ─── Tangled Repo ──────────────────────────────────────────
932-        ("sh.tangled.repo", "create" | "update") => {
933-            if let Some(rec) = record {
934-                let mut row: db::repo::RepoRow =
935-                    serde_json::from_value(transform_record(state, collection, rec))?;
936-                row.did = did.to_string();
937-                row.rkey = rkey.to_string();
938-                row.indexed_at = Utc::now();
939-                db::repo::upsert(&state.db, &row).await?;
940-                tracing::debug!(did, rkey, "indexed tangled repo");
941-            }
942-        }
943-        ("sh.tangled.repo", "delete") => {
944-            tracing::warn!(did, rkey, "tangled repo delete (need rkey->name lookup)");
945-        }
946-
947-        // ─── Tangled Knot Member → Org Member ─────────────────────
948-        ("sh.tangled.knot.member", "create" | "update") => {
949-            if let Some(rec) = record {
950-                let mut row: db::org_member::OrgMemberRow =
951-                    serde_json::from_value(transform_record(state, collection, rec))?;
952-                row.did = did.to_string();
953-                row.rkey = rkey.to_string();
954-                row.indexed_at = Utc::now();
955-                db::org_member::upsert(&state.db, &row).await?;
956-                tracing::debug!(did, rkey, "indexed tangled knot member as org member");
957-            }
958-        }
959-        ("sh.tangled.knot.member", "delete") => {
960-            db::org_member::delete(&state.db, did, rkey).await?;
961-        }
962-
963-        // ─── Tangled Spindle Member → Org Member ──────────────────
964-        ("sh.tangled.spindle.member", "create" | "update") => {
965-            if let Some(rec) = record {
966-                let org_uri = format!("at://{did}/sh.tangled.spindle/self");
967-
968-                let row = db::org_member::OrgMemberRow {
969-                    did: did.to_string(),
970-                    rkey: rkey.to_string(),
971-                    org_uri,
972-                    member_did: rec
973-                        .get("subject")
974-                        .and_then(|v| v.as_str())
975-                        .unwrap_or("")
976-                        .to_string(),
977-                    role: "member".to_string(),
978-                    created_at: parse_datetime(rec, "createdAt"),
979-                    indexed_at: Utc::now(),
980-                };
981-                db::org_member::upsert(&state.db, &row).await?;
982-                tracing::debug!(did, rkey, "indexed tangled spindle member as org member");
983-            }
984-        }
985-        ("sh.tangled.spindle.member", "delete") => {
986-            db::org_member::delete(&state.db, did, rkey).await?;
987-        }
988-
989-        // ─── Tangled Label Definition ──────────────────────────────
990-        ("sh.tangled.label.definition", "create" | "update") => {
991-            if let Some(rec) = record {
992-                let repo_uri = rec.get("repo").and_then(|v| v.as_str()).unwrap_or("");
993-                let (repo_did, repo_name) = parse_repo_at_uri(repo_uri);
994-
995-                let row = db::label::LabelRow {
996-                    did: did.to_string(),
997-                    rkey: rkey.to_string(),
998-                    repo_did,
999-                    repo_name,
1000-                    name: rec
1001-                        .get("name")
1002-                        .and_then(|v| v.as_str())
1003-                        .unwrap_or("")
1004-                        .to_string(),
1005-                    color: rec
1006-                        .get("color")
1007-                        .and_then(|v| v.as_str())
1008-                        .unwrap_or("")
1009-                        .to_string(),
1010-                    // Tangled may store description in valueType
1011-                    description: rec
1012-                        .get("description")
1013-                        .or_else(|| rec.get("valueType"))
1014-                        .and_then(|v| v.as_str())
1015-                        .map(String::from),
1016-                    created_at: parse_datetime(rec, "createdAt"),
1017-                    indexed_at: Utc::now(),
1018-                };
1019-                db::label::upsert(&state.db, &row).await?;
1020-                tracing::debug!(did, rkey, "indexed tangled label definition");
1021-            }
1022-        }
1023-        ("sh.tangled.label.definition", "delete") => {
1024-            db::label::delete(&state.db, did, rkey).await?;
1025-        }
1026-
1027-        // ─── Tangled Pipeline ──────────────────────────────────────
1028-        ("sh.tangled.pipeline", "create" | "update") => {
1029-            if let Some(rec) = record {
1030-                let mut row: db::pipeline::PipelineRow =
1031-                    serde_json::from_value(transform_record(state, collection, rec))?;
1032-                row.did = did.to_string();
1033-                row.rkey = rkey.to_string();
1034-                row.indexed_at = Utc::now();
1035-                db::pipeline::upsert(&state.db, &row).await?;
1036-                tracing::debug!(did, rkey, "indexed tangled pipeline");
1037-            }
1038-        }
1039-        ("sh.tangled.pipeline", "delete") => {
1040-            db::pipeline::delete(&state.db, did, rkey).await?;
1041-        }
538+        // ─── Star (counter decrement + SSE) ─────────────────────────
539+        "dev.cospan.feed.star" | "sh.tangled.feed.star" => {
540+            if let Some(star) = db::star::get(&state.db, did, rkey).await? {
541+                let (repo_did, repo_name) = parse_repo_at_uri(&star.subject);
542+                db::star::decrement_repo_star_count(&state.db, &repo_did, &repo_name).await?;
1042543 
1043-        // ─── Tangled Pipeline Status → Update pipeline ─────────────
1044-        ("sh.tangled.pipeline.status", "create" | "update") => {
1045-            if let Some(rec) = record {
1046-                // The pipeline AT-URI to look up
1047-                let pipeline_uri = rec.get("pipeline").and_then(|v| v.as_str()).unwrap_or("");
1048-                let (pipeline_did, pipeline_rkey) = parse_at_uri_did_rkey(pipeline_uri);
1049-
1050-                // Map Tangled status values to Cospan equivalents
1051-                let raw_status = rec
1052-                    .get("status")
1053-                    .and_then(|v| v.as_str())
1054-                    .unwrap_or("pending");
1055-                let mapped_status = match raw_status {
1056-                    "success" => "passed",
1057-                    "failed" => "failed",
1058-                    "cancelled" | "canceled" => "cancelled",
1059-                    "running" | "in_progress" => "running",
1060-                    other => other,
1061-                };
1062-
1063-                if !pipeline_did.is_empty() && !pipeline_rkey.is_empty() {
1064-                    // Update the pipeline status in-place
1065-                    sqlx::query(
1066-                        "UPDATE pipelines SET status = $1, indexed_at = NOW() \
1067-                         WHERE did = $2 AND rkey = $3",
1068-                    )
1069-                    .bind(mapped_status)
1070-                    .bind(&pipeline_did)
1071-                    .bind(&pipeline_rkey)
1072-                    .execute(&state.db)
1073-                    .await?;
544+                if collection.starts_with("dev.cospan.") {
545+                    let _ = state.event_tx.send(IndexEvent::StarDeleted {
546+                        did: did.to_string(),
547+                        subject: star.subject.clone(),
548+                    });
1074549                 }
1075-                tracing::debug!(
1076-                    did,
1077-                    rkey,
1078-                    status = mapped_status,
1079-                    "indexed tangled pipeline status"
1080-                );
1081550             }
1082-        }
1083-        ("sh.tangled.pipeline.status", "delete") => {
1084-            // Pipeline status deletes are no-ops (status is stored on the pipeline row)
1085-            tracing::debug!(did, rkey, "tangled pipeline status delete (no-op)");
551+            db::star::delete(&state.db, did, rkey).await?;
1086552         }
1087553 
1088-        // ─── Tangled Git RefUpdate ─────────────────────────────────
1089-        ("sh.tangled.git.refUpdate", "create" | "update") => {
1090-            if let Some(rec) = record {
1091-                let mut row: db::ref_update::RefUpdateRow =
1092-                    serde_json::from_value(transform_record(state, collection, rec))?;
1093-                row.rkey = rkey.to_string();
1094-                row.indexed_at = Utc::now();
1095-                db::ref_update::upsert(&state.db, &row).await?;
1096-                tracing::debug!(did, rkey, "indexed tangled git refUpdate");
1097-            }
1098-        }
1099-        ("sh.tangled.git.refUpdate", "delete") => {
1100-            db::ref_update::delete(&state.db, did, rkey).await?;
554+        // ─── Pipeline Status (no-op) ────────────────────────────────
555+        "sh.tangled.pipeline.status" => {
556+            tracing::debug!(did, rkey, "tangled pipeline status delete (no-op)");
1101557         }
1102558 
1103-        // ─── Tangled-only records (no Cospan equivalent) ──────────
1104-        // These are display-only features in Tangled with no Cospan schema.
1105-        // Log and skip.
1106-        (
1107-            "sh.tangled.publicKey"
1108-            | "sh.tangled.string"
1109-            | "sh.tangled.repo.artifact"
1110-            | "sh.tangled.label.op",
1111-            _,
1112-        ) => {
559+        // ─── Tangled-only records ───────────────────────────────────
560+        "sh.tangled.publicKey" | "sh.tangled.string" | "sh.tangled.repo.artifact"
561+        | "sh.tangled.label.op" => {
1113562             tracing::debug!(
1114563                 collection,
1115564                 did,
@@ -1118,16 +567,14 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1118567             );
1119568         }
1120569 
1121-        // ─── Catch-all for other tangled collections ───────────────
1122-        (c, _) if c.starts_with("sh.tangled.") => {
570+        // ─── Catch-alls ─────────────────────────────────────────────
571+        c if c.starts_with("sh.tangled.") => {
1123572             tracing::debug!(
1124573                 collection = c,
1125574                 "tangled record received for unhandled collection"
1126575             );
1127576         }
1128-
1129-        // ─── Catch-all for unhandled cospan collections ─────────────
1130-        (c, _) if c.starts_with("dev.cospan.") => {
577+        c if c.starts_with("dev.cospan.") => {
1131578             tracing::debug!(collection = c, "unhandled dev.cospan collection");
1132579         }
1133580 
@@ -1139,15 +586,6 @@ pub async fn process_event(state: &Arc<AppState>, event: &serde_json::Value) ->
1139586 
1140587 // ─── Helper functions ───────────────────────────────────────────────────────
1141588 
1142-fn parse_datetime(record: &serde_json::Value, field: &str) -> chrono::DateTime<Utc> {
1143-    record
1144-        .get(field)
1145-        .and_then(|v| v.as_str())
1146-        .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
1147-        .map(|dt| dt.with_timezone(&Utc))
1148-        .unwrap_or_else(Utc::now)
1149-}
1150-
1151589 fn extract_did_from_at_uri(uri: &str) -> String {
1152590     // at://did:plc:abc123/collection/rkey -> did:plc:abc123
1153591     uri.strip_prefix("at://")
@@ -0,0 +1,145 @@
1+//! Generic dispatch table for simple record types.
2+//!
3+//! A "simple" record is one where the create/update handler is:
4+//!   1. Deserialize transformed JSON into the row type
5+//!   2. Set did, rkey, indexed_at
6+//!   3. Upsert
7+//!
8+//! And the delete handler is just a direct delete call.
9+//!
10+//! Records with side effects (counter updates, SSE events, state transitions,
11+//! custom field extraction) are handled as special cases in consumer.rs.
12+
13+use std::sync::Arc;
14+
15+use chrono::Utc;
16+
17+use crate::db;
18+use crate::state::AppState;
19+
20+use super::consumer::transform_record;
21+
22+/// Dispatch a create/update for a simple record type.
23+/// Returns `Ok(true)` if handled, `Ok(false)` if not a simple record.
24+pub async fn dispatch_simple_upsert(
25+    state: &Arc<AppState>,
26+    collection: &str,
27+    did: &str,
28+    rkey: &str,
29+    record: &serde_json::Value,
30+) -> anyhow::Result<bool> {
31+    // Macro to reduce boilerplate for the common pattern:
32+    //   deserialize -> set did+rkey+indexed_at -> upsert
33+    macro_rules! simple_upsert {
34+        // Standard: has both did and rkey
35+        ($mod:ident, $Row:ident) => {{
36+            let mut row: db::$mod::$Row =
37+                serde_json::from_value(transform_record(state, collection, record))?;
38+            row.did = did.to_string();
39+            row.rkey = rkey.to_string();
40+            row.indexed_at = Utc::now();
41+            db::$mod::upsert(&state.db, &row).await?;
42+            return Ok(true);
43+        }};
44+        // No rkey (e.g., actor.profile with literal:self key)
45+        ($mod:ident, $Row:ident, no_rkey) => {{
46+            let mut row: db::$mod::$Row =
47+                serde_json::from_value(transform_record(state, collection, record))?;
48+            row.did = did.to_string();
49+            row.indexed_at = Utc::now();
50+            db::$mod::upsert(&state.db, &row).await?;
51+            return Ok(true);
52+        }};
53+        // No did (e.g., ref_update where did is committer_did)
54+        ($mod:ident, $Row:ident, no_did) => {{
55+            let mut row: db::$mod::$Row =
56+                serde_json::from_value(transform_record(state, collection, record))?;
57+            row.rkey = rkey.to_string();
58+            row.indexed_at = Utc::now();
59+            db::$mod::upsert(&state.db, &row).await?;
60+            return Ok(true);
61+        }};
62+    }
63+
64+    match collection {
65+        // ─── dev.cospan simple records ──────────────────────────────
66+        "dev.cospan.node" => simple_upsert!(node, NodeRow),
67+        "dev.cospan.actor.profile" => simple_upsert!(actor_profile, ActorProfileRow, no_rkey),
68+        "dev.cospan.graph.follow" => simple_upsert!(follow, FollowRow),
69+        "dev.cospan.feed.reaction" => simple_upsert!(reaction, ReactionRow),
70+        "dev.cospan.label.definition" => simple_upsert!(label, LabelRow),
71+        "dev.cospan.org" => simple_upsert!(org, OrgRow),
72+        "dev.cospan.org.member" => simple_upsert!(org_member, OrgMemberRow),
73+        "dev.cospan.repo.collaborator" => simple_upsert!(collaborator, CollaboratorRow),
74+        "dev.cospan.repo.dependency" => simple_upsert!(dependency, DependencyRow),
75+
76+        // ─── sh.tangled simple records (same DB tables) ─────────────
77+        "sh.tangled.knot" => simple_upsert!(node, NodeRow),
78+        "sh.tangled.actor.profile" => simple_upsert!(actor_profile, ActorProfileRow, no_rkey),
79+        "sh.tangled.graph.follow" => simple_upsert!(follow, FollowRow),
80+        "sh.tangled.feed.reaction" => simple_upsert!(reaction, ReactionRow),
81+        "sh.tangled.repo.collaborator" => simple_upsert!(collaborator, CollaboratorRow),
82+        "sh.tangled.knot.member" => simple_upsert!(org_member, OrgMemberRow),
83+        "sh.tangled.repo" => simple_upsert!(repo, RepoRow),
84+        "sh.tangled.pipeline" => simple_upsert!(pipeline, PipelineRow),
85+        "sh.tangled.git.refUpdate" => simple_upsert!(ref_update, RefUpdateRow, no_did),
86+        "sh.tangled.repo.issue" => simple_upsert!(issue, IssueRow),
87+        "sh.tangled.repo.pull" => simple_upsert!(pull, PullRow),
88+
89+        _ => Ok(false),
90+    }
91+}
92+
93+/// Dispatch a delete for a simple record type.
94+/// Returns `Ok(true)` if handled, `Ok(false)` if not a simple record.
95+pub async fn dispatch_simple_delete(
96+    state: &Arc<AppState>,
97+    collection: &str,
98+    did: &str,
99+    rkey: &str,
100+) -> anyhow::Result<bool> {
101+    // Macro for delete: most use (did, rkey), some use (did) only
102+    macro_rules! simple_delete {
103+        ($mod:ident, did_rkey) => {{
104+            db::$mod::delete(&state.db, did, rkey).await?;
105+            return Ok(true);
106+        }};
107+        ($mod:ident, did_only) => {{
108+            db::$mod::delete(&state.db, did).await?;
109+            return Ok(true);
110+        }};
111+    }
112+
113+    match collection {
114+        // ─── dev.cospan simple deletes ──────────────────────────────
115+        "dev.cospan.node" => simple_delete!(node, did_only),
116+        "dev.cospan.actor.profile" => simple_delete!(actor_profile, did_only),
117+        "dev.cospan.graph.follow" => simple_delete!(follow, did_rkey),
118+        "dev.cospan.feed.reaction" => simple_delete!(reaction, did_rkey),
119+        "dev.cospan.label.definition" => simple_delete!(label, did_rkey),
120+        "dev.cospan.org" => simple_delete!(org, did_rkey),
121+        "dev.cospan.org.member" => simple_delete!(org_member, did_rkey),
122+        "dev.cospan.repo.collaborator" => simple_delete!(collaborator, did_rkey),
123+        "dev.cospan.repo.dependency" => simple_delete!(dependency, did_rkey),
124+        "dev.cospan.repo.issue.state" => simple_delete!(issue_state, did_rkey),
125+        "dev.cospan.repo.pull.state" => simple_delete!(pull_state, did_rkey),
126+        "dev.cospan.vcs.refUpdate" => simple_delete!(ref_update, did_rkey),
127+
128+        // ─── sh.tangled simple deletes ──────────────────────────────
129+        "sh.tangled.knot" => simple_delete!(node, did_only),
130+        "sh.tangled.actor.profile" => simple_delete!(actor_profile, did_only),
131+        "sh.tangled.graph.follow" => simple_delete!(follow, did_rkey),
132+        "sh.tangled.feed.reaction" => simple_delete!(reaction, did_rkey),
133+        "sh.tangled.repo.collaborator" => simple_delete!(collaborator, did_rkey),
134+        "sh.tangled.knot.member" => simple_delete!(org_member, did_rkey),
135+        "sh.tangled.spindle" => simple_delete!(org, did_rkey),
136+        "sh.tangled.spindle.member" => simple_delete!(org_member, did_rkey),
137+        "sh.tangled.label.definition" => simple_delete!(label, did_rkey),
138+        "sh.tangled.pipeline" => simple_delete!(pipeline, did_rkey),
139+        "sh.tangled.git.refUpdate" => simple_delete!(ref_update, did_rkey),
140+        "sh.tangled.repo.issue.state" => simple_delete!(issue_state, did_rkey),
141+        "sh.tangled.repo.pull.status" => simple_delete!(pull_state, did_rkey),
142+
143+        _ => Ok(false),
144+    }
145+}
@@ -1,4 +1,5 @@
11 mod consumer;
2+mod dispatch;
23 mod jetstream;
34 
45 use std::sync::Arc;
@@ -9,18 +9,17 @@ use panproto_protocols::emit::{IndentWriter, children_by_edge};
99 use crate::record_config::RecordConfig;
1010 
1111 /// Column info extracted from the panproto schema + record config.
12-#[allow(dead_code)]
13-struct Column {
14-    name: String,
15-    camel_name: String,
16-    rust_type: String,
17-    sql_type: String,
18-    optional: bool,
12+pub struct Column {
13+    pub name: String,
14+    pub camel_name: String,
15+    pub rust_type: String,
16+    pub sql_type: String,
17+    pub optional: bool,
1918     /// Whether this is a denormalized field (not from upsert input).
20-    is_counter: bool,
19+    pub is_counter: bool,
2120 }
2221 
23-fn columns_for_record(
22+pub fn columns_for_record(
2423     schema: &panproto_schema::Schema,
2524     nsid: &str,
2625     config: &RecordConfig,
@@ -0,0 +1,210 @@
1+//! Emit PostgreSQL CREATE TABLE DDL from panproto Schema + RecordConfig.
2+//!
3+//! This is the SQL codegen target, following the same pattern as Cassandra's
4+//! `emit_cql()` in panproto-protocols. It takes the projected column list
5+//! (from `columns_for_record`) and the RecordConfig metadata (indexes,
6+//! foreign keys, defaults, primary keys) to emit complete PostgreSQL DDL
7+//! that matches the hand-written migrations.
8+
9+use std::fmt::Write as _;
10+
11+use crate::emit_rows::columns_for_record;
12+use crate::record_config::RecordConfig;
13+
14+/// Emit a complete PostgreSQL CREATE TABLE statement (plus indexes) for one record type.
15+///
16+/// The generated DDL includes:
17+/// - Column definitions with types, NOT NULL, and DEFAULT constraints
18+/// - PRIMARY KEY (inline for single-column, table-level for composite)
19+/// - FOREIGN KEY constraints
20+/// - CREATE INDEX / CREATE UNIQUE INDEX statements
21+pub fn emit_create_table(
22+    schema: &panproto_schema::Schema,
23+    nsid: &str,
24+    config: &RecordConfig,
25+) -> String {
26+    let cols = columns_for_record(schema, nsid, config);
27+    let table = config.table_name;
28+    let mut out = String::new();
29+
30+    // -- Comment header
31+    let _ = writeln!(out, "-- Projected from {nsid} Lexicon");
32+
33+    // -- CREATE TABLE
34+    let _ = writeln!(out, "CREATE TABLE {table} (");
35+
36+    let col_count = cols.len();
37+
38+    // Determine PK strategy:
39+    // - has_serial_id: PK is `id BIGSERIAL PRIMARY KEY` (inline), conflict_keys are a UNIQUE index
40+    // - single conflict_key: inline PRIMARY KEY on that column
41+    // - multiple conflict_keys: table-level PRIMARY KEY (col1, col2)
42+    let pk_columns: Vec<&str> = if config.has_serial_id {
43+        vec!["id"]
44+    } else {
45+        config.conflict_keys.to_vec()
46+    };
47+    let pk_is_inline = pk_columns.len() == 1;
48+    let needs_table_pk = !pk_is_inline;
49+
50+    // Check if we need table-level constraints after the columns
51+    let composite_fks: Vec<_> = config
52+        .foreign_keys
53+        .iter()
54+        .filter(|fk| fk.columns.len() > 1)
55+        .collect();
56+    let has_table_constraints = needs_table_pk || !composite_fks.is_empty();
57+
58+    for (i, col) in cols.iter().enumerate() {
59+        let mut parts = Vec::new();
60+
61+        // Column name + type
62+        parts.push(format!("    {} {}", col.name, col.sql_type));
63+
64+        // PRIMARY KEY inline for single-column PKs
65+        if pk_is_inline && pk_columns.contains(&col.name.as_str()) {
66+            parts.push("PRIMARY KEY".into());
67+        }
68+
69+        // REFERENCES for single-column foreign keys
70+        for fk in config.foreign_keys {
71+            if fk.columns.len() == 1 && fk.columns[0] == col.name {
72+                parts.push(format!(
73+                    "REFERENCES {}({})",
74+                    fk.ref_table,
75+                    fk.ref_columns.join(", ")
76+                ));
77+            }
78+        }
79+
80+        // NOT NULL (skip for inline PRIMARY KEY columns since PK implies NOT NULL,
81+        // and skip for BIGSERIAL which is implicitly NOT NULL)
82+        let is_inline_pk = pk_is_inline && pk_columns.contains(&col.name.as_str());
83+        if !col.optional && col.sql_type != "BIGSERIAL" && !is_inline_pk {
84+            parts.push("NOT NULL".into());
85+        }
86+
87+        // DEFAULT
88+        if let Some(def) = find_default(config, &col.name) {
89+            parts.push(format!("DEFAULT {def}"));
90+        } else if col.name == "indexed_at" {
91+            parts.push("DEFAULT NOW()".into());
92+        }
93+
94+        // Trailing comma logic
95+        let is_last_col = i == col_count - 1;
96+        let has_trailing = !is_last_col || has_table_constraints;
97+
98+        let line = parts.join(" ");
99+        if has_trailing {
100+            let _ = writeln!(out, "{line},");
101+        } else {
102+            let _ = writeln!(out, "{line}");
103+        }
104+    }
105+
106+    // Composite PRIMARY KEY
107+    if config.conflict_keys.len() > 1 {
108+        let pk_cols = config.conflict_keys.join(", ");
109+        let has_more = !config.foreign_keys.is_empty();
110+        if has_more {
111+            let _ = writeln!(out, "    PRIMARY KEY ({pk_cols}),");
112+        } else {
113+            let _ = writeln!(out, "    PRIMARY KEY ({pk_cols})");
114+        }
115+    }
116+
117+    // FOREIGN KEY constraints (composite only; single-column FKs use REFERENCES inline)
118+    let composite_fks: Vec<_> = config
119+        .foreign_keys
120+        .iter()
121+        .filter(|fk| fk.columns.len() > 1)
122+        .collect();
123+    for (i, fk) in composite_fks.iter().enumerate() {
124+        let local = fk.columns.join(", ");
125+        let remote = fk.ref_columns.join(", ");
126+        let comma = if i + 1 < composite_fks.len() { "," } else { "" };
127+        let _ = writeln!(
128+            out,
129+            "    FOREIGN KEY ({local}) REFERENCES {}({remote}){comma}",
130+            fk.ref_table
131+        );
132+    }
133+
134+    let _ = writeln!(out, ");");
135+
136+    // -- CREATE INDEX statements
137+    if !config.indexes.is_empty() {
138+        let _ = writeln!(out);
139+    }
140+    for idx in config.indexes {
141+        emit_index(&mut out, table, idx);
142+    }
143+
144+    out
145+}
146+
147+/// Emit all record types' DDL into a single migration file.
148+pub fn emit_all_migrations(
149+    schemas: &[(String, panproto_schema::Schema)],
150+    configs: &[RecordConfig],
151+) -> String {
152+    let mut out = String::new();
153+    out.push_str("-- Generated by cospan-codegen from packages/lexicons/\n");
154+    out.push_str("-- Do not edit manually.\n\n");
155+
156+    for config in configs {
157+        if let Some((_, schema)) = schemas.iter().find(|(nsid, _)| nsid == config.nsid) {
158+            let ddl = emit_create_table(schema, config.nsid, config);
159+            out.push_str(&ddl);
160+            out.push('\n');
161+        }
162+    }
163+
164+    out
165+}
166+
167+// ---------------------------------------------------------------------------
168+// Helpers
169+// ---------------------------------------------------------------------------
170+
171+fn find_default<'a>(config: &'a RecordConfig, col_name: &str) -> Option<&'a str> {
172+    config
173+        .column_defaults
174+        .iter()
175+        .find(|d| d.column == col_name)
176+        .map(|d| d.expression)
177+}
178+
179+fn emit_index(out: &mut String, table: &str, idx: &crate::record_config::IndexConfig) {
180+    let unique = if idx.unique { "UNIQUE " } else { "" };
181+
182+    if let Some(using) = idx.using {
183+        // GIN or other non-btree index
184+        if let Some(raw) = idx.raw_expression {
185+            let _ = writeln!(
186+                out,
187+                "CREATE INDEX {name} ON {table} USING {using} (\n    {raw}\n);",
188+                name = idx.name,
189+            );
190+        } else {
191+            let cols = idx.columns.join(", ");
192+            let _ = writeln!(
193+                out,
194+                "CREATE {unique}INDEX {name} ON {table} USING {using} ({cols});",
195+                name = idx.name,
196+            );
197+        }
198+    } else {
199+        let cols = idx.columns.join(", ");
200+        let where_clause = idx
201+            .where_clause
202+            .map(|w| format!("\n    WHERE {w}"))
203+            .unwrap_or_default();
204+        let _ = writeln!(
205+            out,
206+            "CREATE {unique}INDEX {name} ON {table} ({cols}){where_clause};",
207+            name = idx.name,
208+        );
209+    }
210+}
cospan · schematic version control on atproto built on AT Protocol