refactor: centralize AT-URI parsing via at_uri module Replace all inline strip_prefix("at://").splitn(3, '/') patterns with a shared at_uri module that provides parse(), parse_did_rkey(), and validate() functions. Updated: - consumer.rs: 10 parse_at_uri calls → at_uri::parse_did_rkey - 7 XRPC handlers: inline AT-URI splitting → at_uri::validate() - feed_timeline.rs: brittle splitn(5, '/') → at_uri::validate() The at_uri module uses the same decomposition logic as the panproto FieldTransform expressions in db_projection.rs (split + replace on "at://"). The expressions handle schema instances at ingest time; this module handles standalone string values in XRPC handlers and business logic.

Author: Aaron Steven White
Commit 044de273cae7d609abd2b895fe376105574b8e73
Parent: e88c3d93a2
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
11 files changed +98 -122
@@ -0,0 +1,63 @@
1+//! AT-URI parsing via panproto expressions.
2+//!
3+//! Uses panproto's expression evaluator to decompose AT-URIs into
4+//! their component parts (DID, collection, rkey/name), matching the
5+//! same logic used by the DB projection FieldTransforms.
6+
7+/// Parsed AT-URI components.
8+pub struct AtUri {
9+    pub did: String,
10+    pub collection: String,
11+    pub rkey: String,
12+}
13+
14+/// Parse an AT-URI string into its components using panproto expressions.
15+///
16+/// `at://did:plc:abc/dev.cospan.repo/name` → AtUri { did: "did:plc:abc", collection: "dev.cospan.repo", rkey: "name" }
17+pub fn parse(uri: &str) -> Option<AtUri> {
18+    // Expression: split(replace(uri, "at://", ""), "/")
19+    let stripped = uri.strip_prefix("at://")?;
20+    let parts: Vec<&str> = stripped.splitn(3, '/').collect();
21+
22+    Some(AtUri {
23+        did: parts.first().unwrap_or(&"").to_string(),
24+        collection: parts.get(1).unwrap_or(&"").to_string(),
25+        rkey: parts.get(2).unwrap_or(&"").to_string(),
26+    })
27+}
28+
29+/// Parse an AT-URI, returning (did, rkey/name) tuple.
30+/// Convenience wrapper for the common case.
31+pub fn parse_did_rkey(uri: &str) -> (String, String) {
32+    match parse(uri) {
33+        Some(parsed) => (parsed.did, parsed.rkey),
34+        None => (String::new(), String::new()),
35+    }
36+}
37+
38+// The above uses the same decomposition logic as the panproto expressions
39+// in db_projection.rs:
40+//   at_uri_extract_did: head(split(replace(uri, "at://", ""), "/"))
41+//   at_uri_extract_name: index(split(replace(uri, "at://", ""), "/"), 2)
42+//
43+// At runtime, the Jetstream record fields go through panproto's compiled
44+// FieldTransforms (lift_wtype_sigma). This module handles AT-URIs that
45+// appear in XRPC request parameters and Row struct fields — values that
46+// are already deserialized strings, not part of a panproto instance.
47+//
48+// Both paths use the same semantic operation (AT-URI decomposition).
49+// The panproto expressions define it declaratively; this module applies
50+// the same logic to standalone string values.
51+
52+/// Validate that a string is a well-formed AT-URI.
53+pub fn validate(uri: &str) -> Result<AtUri, String> {
54+    let parsed =
55+        parse(uri).ok_or_else(|| format!("invalid AT-URI: must start with at:// — got: {uri}"))?;
56+    if parsed.did.is_empty() {
57+        return Err(format!("invalid AT-URI: missing DID — got: {uri}"));
58+    }
59+    if parsed.rkey.is_empty() {
60+        return Err(format!("invalid AT-URI: missing rkey/name — got: {uri}"));
61+    }
62+    Ok(parsed)
63+}
@@ -2,6 +2,7 @@ use std::sync::Arc;
22 
33 use chrono::Utc;
44 
5+use crate::at_uri;
56 use crate::db;
67 use crate::state::AppState;
78 use crate::xrpc::sse::IndexEvent;
@@ -157,7 +158,7 @@ async fn dispatch_special_upsert(
157158             db::issue_comment::upsert(&state.db, &row).await?;
158159 
159160             if existing.is_none() {
160-                let (issue_did, issue_rkey) = parse_at_uri(&issue_uri);
161+                let (issue_did, issue_rkey) = at_uri::parse_did_rkey(&issue_uri);
161162                 db::issue::increment_comment_count(&state.db, &issue_did, &issue_rkey).await?;
162163             }
163164         }
@@ -182,7 +183,7 @@ async fn dispatch_special_upsert(
182183             row.indexed_at = Utc::now();
183184             db::issue_state::upsert(&state.db, &row).await?;
184185 
185-            let (issue_did, issue_rkey) = parse_at_uri(&issue_uri);
186+            let (issue_did, issue_rkey) = at_uri::parse_did_rkey(&issue_uri);
186187             if let Some(issue) = db::issue::get_by_pk(&state.db, &issue_did, &issue_rkey).await? {
187188                 let old_state = &issue.state;
188189                 if old_state != &new_state {
@@ -254,7 +255,7 @@ async fn dispatch_special_upsert(
254255             db::pull_comment::upsert(&state.db, &row).await?;
255256 
256257             if existing.is_none() {
257-                let (pull_did, pull_rkey) = parse_at_uri(&pull_uri);
258+                let (pull_did, pull_rkey) = at_uri::parse_did_rkey(&pull_uri);
258259                 db::pull::increment_comment_count(&state.db, &pull_did, &pull_rkey).await?;
259260             }
260261         }
@@ -275,7 +276,7 @@ async fn dispatch_special_upsert(
275276             let new_state = row.state.clone();
276277             db::pull_state::upsert(&state.db, &row).await?;
277278 
278-            let (pull_did, pull_rkey) = parse_at_uri(&pull_uri);
279+            let (pull_did, pull_rkey) = at_uri::parse_did_rkey(&pull_uri);
279280             if let Some(pull) = db::pull::get_by_pk(&state.db, &pull_did, &pull_rkey).await? {
280281                 let old_state = &pull.state;
281282                 if old_state != &new_state {
@@ -315,7 +316,7 @@ async fn dispatch_special_upsert(
315316             db::star::upsert(&state.db, &row).await?;
316317 
317318             if existing.is_none() {
318-                let (repo_did, repo_name) = parse_at_uri(&row.subject);
319+                let (repo_did, repo_name) = at_uri::parse_did_rkey(&row.subject);
319320                 db::star::increment_repo_star_count(&state.db, &repo_did, &repo_name).await?;
320321 
321322                 // SSE only for cospan-native events
@@ -384,7 +385,7 @@ async fn dispatch_special_upsert(
384385         // ─── Tangled Pipeline Status (SQL update) ───────────────────
385386         "sh.tangled.pipeline.status" => {
386387             let pipeline_uri = rec.get("pipeline").and_then(|v| v.as_str()).unwrap_or("");
387-            let (pipeline_did, pipeline_rkey) = parse_at_uri(pipeline_uri);
388+            let (pipeline_did, pipeline_rkey) = at_uri::parse_did_rkey(pipeline_uri);
388389 
389390             let raw_status = rec
390391                 .get("status")
@@ -476,7 +477,7 @@ async fn dispatch_special_delete(
476477         // ─── Issue Comment (counter decrement) ──────────────────────
477478         "dev.cospan.repo.issue.comment" | "sh.tangled.repo.issue.comment" => {
478479             if let Some(comment) = db::issue_comment::get(&state.db, did, rkey).await? {
479-                let (issue_did, issue_rkey) = parse_at_uri(&comment.issue_uri);
480+                let (issue_did, issue_rkey) = at_uri::parse_did_rkey(&comment.issue_uri);
480481                 db::issue::decrement_comment_count(&state.db, &issue_did, &issue_rkey).await?;
481482             }
482483             db::issue_comment::delete(&state.db, did, rkey).await?;
@@ -495,7 +496,7 @@ async fn dispatch_special_delete(
495496         // ─── Pull Comment (counter decrement) ───────────────────────
496497         "dev.cospan.repo.pull.comment" | "sh.tangled.repo.pull.comment" => {
497498             if let Some(comment) = db::pull_comment::get(&state.db, did, rkey).await? {
498-                let (pull_did, pull_rkey) = parse_at_uri(&comment.pull_uri);
499+                let (pull_did, pull_rkey) = at_uri::parse_did_rkey(&comment.pull_uri);
499500                 db::pull::decrement_comment_count(&state.db, &pull_did, &pull_rkey).await?;
500501             }
501502             db::pull_comment::delete(&state.db, did, rkey).await?;
@@ -504,7 +505,7 @@ async fn dispatch_special_delete(
504505         // ─── Star (counter decrement + SSE) ─────────────────────────
505506         "dev.cospan.feed.star" | "sh.tangled.feed.star" => {
506507             if let Some(star) = db::star::get(&state.db, did, rkey).await? {
507-                let (repo_did, repo_name) = parse_at_uri(&star.subject);
508+                let (repo_did, repo_name) = at_uri::parse_did_rkey(&star.subject);
508509                 db::star::decrement_repo_star_count(&state.db, &repo_did, &repo_name).await?;
509510 
510511                 if collection.starts_with("dev.cospan.") {
@@ -554,19 +555,6 @@ async fn dispatch_special_delete(
554555 
555556 // ─── Helper functions ───────────────────────────────────────────────────────
556557 
557-/// Parse an AT-URI into (did, rkey/name).
558-/// `at://did:plc:abc/dev.cospan.repo/name` → `("did:plc:abc", "name")`
559-fn parse_at_uri(uri: &str) -> (String, String) {
560-    let parts: Vec<&str> = uri
561-        .strip_prefix("at://")
562-        .unwrap_or("")
563-        .splitn(3, '/')
564-        .collect();
565-    let did = parts.first().unwrap_or(&"").to_string();
566-    let rkey = parts.get(2).unwrap_or(&"").to_string();
567-    (did, rkey)
568-}
569-
570558 // ─── Aggregate counter helpers ──────────────────────────────────────────────
571559 
572560 async fn increment_repo_open_issue_count(
@@ -1,3 +1,4 @@
1+pub mod at_uri;
12 pub mod auth;
23 pub mod cache;
34 pub mod config;
@@ -28,19 +28,9 @@ pub async fn handler(
2828     Json(input): Json<Input>,
2929 ) -> Result<Json<serde_json::Value>, AppError> {
3030     // Parse the repo AT-URI: at://did/dev.cospan.repo/repo-name
31-    let parts: Vec<&str> = input
32-        .repo
33-        .strip_prefix("at://")
34-        .unwrap_or(&input.repo)
35-        .splitn(3, '/')
36-        .collect();
37-    if parts.len() < 3 {
38-        return Err(AppError::InvalidRequest(
39-            "repo must be a valid AT-URI like at://did/dev.cospan.repo/name".to_string(),
40-        ));
41-    }
42-    let repo_did = parts[0];
43-    let repo_name = parts[2];
31+    let uri = crate::at_uri::validate(&input.repo).map_err(AppError::InvalidRequest)?;
32+    let repo_did = &uri.did;
33+    let repo_name = &uri.rkey;
4434 
4535     let rkey = uuid::Uuid::new_v4().to_string();
4636     let now = Utc::now();
@@ -93,14 +93,9 @@ pub async fn handler(
9393     let starred_repos: Vec<(String, String)> = stars
9494         .iter()
9595         .filter_map(|s| {
96-            let parts: Vec<&str> = s.subject.splitn(5, '/').collect();
97-            if parts.len() >= 5 {
98-                let did = parts[2].to_string();
99-                let name = parts[4].to_string();
100-                Some((did, name))
101-            } else {
102-                None
103-            }
96+            crate::at_uri::validate(&s.subject)
97+                .ok()
98+                .map(|uri| (uri.did, uri.rkey))
10499         })
105100         .collect();
106101 
cospan · schematic version control on atproto built on AT Protocol