diff --git a/engine/packages/pegboard-envoy/src/actor_lifecycle.rs b/engine/packages/pegboard-envoy/src/actor_lifecycle.rs index 53c6b3f1f3..963654b29b 100644 --- a/engine/packages/pegboard-envoy/src/actor_lifecycle.rs +++ b/engine/packages/pegboard-envoy/src/actor_lifecycle.rs @@ -235,12 +235,23 @@ pub async fn stop_actor(conn: &Conn, checkpoint: &protocol::ActorCheckpoint) -> pub async fn actor_stopped(conn: &Conn, checkpoint: &protocol::ActorCheckpoint) -> Result<()> { let actor_id = checkpoint.actor_id.clone(); - let active = conn + let active = match conn .active_actors .get_async(&actor_id) .await .map(|entry| entry.get().clone()) - .context("actor stopped without active sqlite state")?; + { + Some(active) => active, + None if conn.is_serverless => { + conn.sqlite_engine.force_close(&actor_id).await; + conn.serverless_sqlite_actors.remove_async(&actor_id).await; + return Ok(()); + } + None => { + ensure!(false, "actor stopped without active sqlite state"); + unreachable!(); + } + }; ensure!( active.actor_generation == checkpoint.generation, "stopped actor generation {} did not match active generation {}", @@ -288,6 +299,22 @@ pub async fn shutdown_conn_actors(conn: &Conn) { .buffer_unordered(SHUTDOWN_CLOSE_PARALLELISM) .for_each(|_| async {}) .await; + + let mut serverless_sqlite_actors = Vec::new(); + conn.serverless_sqlite_actors + .retain_sync(|actor_id, _generation| { + serverless_sqlite_actors.push(actor_id.clone()); + false + }); + stream::iter(serverless_sqlite_actors.into_iter().map(|actor_id| { + let sqlite_engine = conn.sqlite_engine.clone(); + async move { + sqlite_engine.force_close(&actor_id).await; + } + })) + .buffer_unordered(SHUTDOWN_CLOSE_PARALLELISM) + .for_each(|_| async {}) + .await; } async fn close_actor_on_shutdown( diff --git a/engine/packages/pegboard-envoy/src/conn.rs b/engine/packages/pegboard-envoy/src/conn.rs index 71f7ee5a7b..3e527f3a1f 100644 --- a/engine/packages/pegboard-envoy/src/conn.rs +++ b/engine/packages/pegboard-envoy/src/conn.rs @@ -30,6 +30,7 @@ pub struct Conn { pub authorized_tunnel_routes: HashMap<(protocol::GatewayId, protocol::RequestId), ()>, pub sqlite_engine: Arc, pub active_actors: HashMap, + pub serverless_sqlite_actors: HashMap, pub is_serverless: bool, pub last_rtt: AtomicU32, /// Timestamp (epoch ms) of the last pong received from the envoy. @@ -306,6 +307,7 @@ pub async fn init_conn( authorized_tunnel_routes: HashMap::new(), sqlite_engine, active_actors: HashMap::new(), + serverless_sqlite_actors: HashMap::new(), is_serverless, last_rtt: AtomicU32::new(0), last_ping_ts: AtomicI64::new(util::timestamp::now()), diff --git a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs index b12a28ee5b..bc1b3c3b7d 100644 --- a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs @@ -703,6 +703,7 @@ async fn handle_sqlite_get_pages( ) -> Result { validate_sqlite_get_pages_request(&request)?; validate_sqlite_actor(ctx, conn, &request.actor_id).await?; + ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?; match conn .sqlite_engine @@ -747,6 +748,7 @@ async fn handle_sqlite_commit( let decode_request_start = Instant::now(); validate_sqlite_dirty_pages("sqlite commit", &request.dirty_pages)?; validate_sqlite_actor(ctx, conn, &request.actor_id).await?; + ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?; let decode_request_duration = decode_request_start.elapsed(); conn.sqlite_engine.metrics().observe_commit_phase( "fast", @@ -813,6 +815,7 @@ async fn handle_sqlite_commit_stage( request: protocol::SqliteCommitStageRequest, ) -> Result { validate_sqlite_actor(ctx, conn, &request.actor_id).await?; + ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?; match conn .sqlite_engine @@ -850,6 +853,7 @@ async fn handle_sqlite_commit_stage_begin( request: protocol::SqliteCommitStageBeginRequest, ) -> Result { validate_sqlite_actor(ctx, conn, &request.actor_id).await?; + ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?; match conn .sqlite_engine @@ -884,6 +888,7 @@ async fn handle_sqlite_commit_finalize( ) -> Result { let decode_request_start = Instant::now(); validate_sqlite_actor(ctx, conn, &request.actor_id).await?; + ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?; conn.sqlite_engine.metrics().observe_commit_phase( "slow", "decode_request", @@ -952,6 +957,22 @@ async fn validate_sqlite_actor(ctx: &StandaloneCtx, conn: &Conn, actor_id: &str) Ok(()) } +async fn ensure_serverless_sqlite_open(conn: &Conn, actor_id: &str, generation: u64) -> Result<()> { + if !conn.is_serverless { + return Ok(()); + } + + conn.sqlite_engine + .ensure_local_open(actor_id, generation) + .await?; + + conn.serverless_sqlite_actors + .upsert_async(actor_id.to_string(), generation) + .await; + + Ok(()) +} + async fn sqlite_fence_mismatch( conn: &Conn, actor_id: &str, diff --git a/engine/packages/sqlite-storage/src/open.rs b/engine/packages/sqlite-storage/src/open.rs index a63b705b95..4ea8a8b9f4 100644 --- a/engine/packages/sqlite-storage/src/open.rs +++ b/engine/packages/sqlite-storage/src/open.rs @@ -393,6 +393,39 @@ impl SqliteEngine { Ok(()) } + pub async fn ensure_local_open(&self, actor_id: &str, generation: u64) -> Result<()> { + let head = self.load_head(actor_id).await?; + ensure!( + head.generation == generation, + SqliteStorageError::FenceMismatch { + reason: format!( + "ensure_local_open generation {} did not match current generation {}", + generation, head.generation + ), + }, + ); + + match self.open_dbs.entry_async(actor_id.to_string()).await { + scc::hash_map::Entry::Occupied(entry) => { + ensure!( + entry.get().generation == generation, + SqliteStorageError::FenceMismatch { + reason: format!( + "ensure_local_open generation {} did not match open generation {}", + generation, + entry.get().generation + ), + }, + ); + } + scc::hash_map::Entry::Vacant(entry) => { + entry.insert_entry(OpenDb { generation }); + } + } + + Ok(()) + } + // Unconditionally evict the actor's open-db / page-index / pending-stage caches without // generation fencing. Use only on shutdown paths where keeping a stale entry would block // future opens of the same actor on this process-wide engine.