feat: Redis-backed session store for production auth persistence Sessions now survive container restarts and deploys. Uses REDIS_URL env var (falls back to in-memory for local dev). Sessions expire after 7 days, auth flows after 10 minutes.

Author: Aaron Steven White
Commit 2cba37ec4cabba2e93e69b58a8a1d44bc0699b0d
Parent: 686bdf559e
Structural diff unavailable

These commits were pushed via plain git push, so no pre-parsed schemas are available. Install git-remote-cospan and re-push via panproto:// to see scope-level changes, breaking change detection, and semantic diffs.

brew install panproto/tap/git-remote-cospan
3 files changed +100 -6
@@ -61,8 +61,6 @@ impl SessionStore for InMemorySessionStore {
6161     async fn get_session(&self, session_id: &str) -> anyhow::Result<Option<Session>> {
6262         let sessions = self.sessions.read().await;
6363         let session = sessions.get(session_id).cloned();
64-        // Check expiration: if the access token has expired, the session still exists
65-        // but the caller should trigger a refresh. We don't auto-delete here.
6664         Ok(session)
6765     }
6866 
@@ -80,10 +78,8 @@ impl SessionStore for InMemorySessionStore {
8078     }
8179 
8280     async fn take_auth_flow(&self, state: &str) -> anyhow::Result<Option<AuthFlowState>> {
83-        // Remove and return (single use)
8481         let flow = self.auth_flows.write().await.remove(state);
8582 
86-        // Check expiry
8783         if let Some(ref f) = flow
8884             && chrono::Utc::now() > f.expires_at
8985         {
@@ -94,3 +90,92 @@ impl SessionStore for InMemorySessionStore {
9490         Ok(flow)
9591     }
9692 }
93+
94+/// Redis-backed session store for production.
95+/// Sessions survive container restarts and deploys.
96+#[derive(Clone)]
97+pub struct RedisSessionStore {
98+    client: redis::Client,
99+}
100+
101+impl RedisSessionStore {
102+    pub fn new(redis_url: &str) -> anyhow::Result<Self> {
103+        let client = redis::Client::open(redis_url)?;
104+        Ok(Self { client })
105+    }
106+}
107+
108+const SESSION_TTL: i64 = 7 * 24 * 3600; // 7 days
109+const AUTH_FLOW_TTL: i64 = 600; // 10 minutes
110+
111+#[async_trait]
112+impl SessionStore for RedisSessionStore {
113+    async fn put_session(&self, session_id: &str, session: Session) -> anyhow::Result<()> {
114+        let mut conn = self.client.get_multiplexed_async_connection().await?;
115+        let key = format!("session:{session_id}");
116+        let value = serde_json::to_string(&session)?;
117+        redis::cmd("SET")
118+            .arg(&key)
119+            .arg(&value)
120+            .arg("EX")
121+            .arg(SESSION_TTL)
122+            .query_async::<()>(&mut conn)
123+            .await?;
124+        Ok(())
125+    }
126+
127+    async fn get_session(&self, session_id: &str) -> anyhow::Result<Option<Session>> {
128+        let mut conn = self.client.get_multiplexed_async_connection().await?;
129+        let key = format!("session:{session_id}");
130+        let value: Option<String> = redis::cmd("GET")
131+            .arg(&key)
132+            .query_async(&mut conn)
133+            .await?;
134+        match value {
135+            Some(v) => Ok(Some(serde_json::from_str(&v)?)),
136+            None => Ok(None),
137+        }
138+    }
139+
140+    async fn delete_session(&self, session_id: &str) -> anyhow::Result<()> {
141+        let mut conn = self.client.get_multiplexed_async_connection().await?;
142+        let key = format!("session:{session_id}");
143+        redis::cmd("DEL").arg(&key).query_async::<()>(&mut conn).await?;
144+        Ok(())
145+    }
146+
147+    async fn put_auth_flow(&self, state: &str, flow: AuthFlowState) -> anyhow::Result<()> {
148+        let mut conn = self.client.get_multiplexed_async_connection().await?;
149+        let key = format!("authflow:{state}");
150+        let value = serde_json::to_string(&flow)?;
151+        redis::cmd("SET")
152+            .arg(&key)
153+            .arg(&value)
154+            .arg("EX")
155+            .arg(AUTH_FLOW_TTL)
156+            .query_async::<()>(&mut conn)
157+            .await?;
158+        Ok(())
159+    }
160+
161+    async fn take_auth_flow(&self, state: &str) -> anyhow::Result<Option<AuthFlowState>> {
162+        let mut conn = self.client.get_multiplexed_async_connection().await?;
163+        let key = format!("authflow:{state}");
164+        // GET then DEL atomically
165+        let value: Option<String> = redis::cmd("GETDEL")
166+            .arg(&key)
167+            .query_async(&mut conn)
168+            .await?;
169+        match value {
170+            Some(v) => {
171+                let flow: AuthFlowState = serde_json::from_str(&v)?;
172+                if chrono::Utc::now() > flow.expires_at {
173+                    tracing::warn!(state = state, "auth flow state expired");
174+                    return Ok(None);
175+                }
176+                Ok(Some(flow))
177+            }
178+            None => Ok(None),
179+        }
180+    }
181+}
@@ -5,7 +5,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
55 
66 use cospan_appview::auth::OAuthConfig;
77 use cospan_appview::auth::dpop::DpopKey;
8-use cospan_appview::auth::session::InMemorySessionStore;
8+use cospan_appview::auth::session::{InMemorySessionStore, RedisSessionStore, SessionStore};
99 use cospan_appview::config::AppConfig;
1010 use cospan_appview::state::AppState;
1111 
@@ -56,7 +56,15 @@ async fn main() -> anyhow::Result<()> {
5656     let dpop_key = DpopKey::generate();
5757     tracing::info!(kid = %dpop_key.kid, "DPoP signing key generated");
5858 
59-    let session_store = Arc::new(InMemorySessionStore::new());
59+    let session_store: Arc<dyn SessionStore> =
60+        if let Ok(redis_url) = std::env::var("REDIS_URL") {
61+            let store = RedisSessionStore::new(&redis_url)?;
62+            tracing::info!("using Redis session store");
63+            Arc::new(store)
64+        } else {
65+            tracing::warn!("REDIS_URL not set, using in-memory session store (sessions lost on restart)");
66+            Arc::new(InMemorySessionStore::new())
67+        };
6068 
6169     let state =
6270         Arc::new(AppState::new(config.clone(), pool, oauth_config, session_store, dpop_key).await?);
@@ -133,6 +133,7 @@ services:
133133     restart: unless-stopped
134134     environment:
135135       DATABASE_URL: postgres://cospan:${POSTGRES_PASSWORD}@db:5432/cospan
136+      REDIS_URL: redis://redis:6379
136137       JETSTREAM_URL: wss://jetstream2.us-east.bsky.network/subscribe
137138       TAP_URL: ws://tap:2480/channel
138139       APPVIEW_LISTEN: "0.0.0.0:3000"
cospan · schematic version control on atproto built on AT Protocol