diff --git a/engine/packages/pegboard-envoy/src/conn.rs b/engine/packages/pegboard-envoy/src/conn.rs index 09d42cd5cc..0278026988 100644 --- a/engine/packages/pegboard-envoy/src/conn.rs +++ b/engine/packages/pegboard-envoy/src/conn.rs @@ -21,7 +21,7 @@ use scc::HashMap; use universaldb::prelude::*; use vbare::OwnedVersionedData; -use crate::{actor_lifecycle, errors, metrics, utils::UrlData}; +use crate::{actor_lifecycle, errors, hibernating_requests, metrics, utils::UrlData}; pub type RemoteSqliteExecutors = HashMap<(String, u64), Arc>>; @@ -333,6 +333,7 @@ pub async fn init_conn( if !missed_commands.is_empty() { let replay_result: Result<()> = async { for cmd_wrapper in &mut missed_commands { + hibernating_requests::hydrate_command_wrapper(ctx, cmd_wrapper).await?; if let protocol::Command::CommandStopActor(_) = cmd_wrapper.inner { actor_lifecycle::stop_actor(&conn, &cmd_wrapper.checkpoint).await?; } diff --git a/engine/packages/pegboard-envoy/src/hibernating_requests.rs b/engine/packages/pegboard-envoy/src/hibernating_requests.rs new file mode 100644 index 0000000000..31cf8e6cb8 --- /dev/null +++ b/engine/packages/pegboard-envoy/src/hibernating_requests.rs @@ -0,0 +1,27 @@ +use anyhow::Context; +use gas::prelude::*; +use rivet_envoy_protocol as protocol; + +/// Hydrates ephemeral hibernating request ids before a start command reaches envoy. +/// Stored serverful commands keep this empty because request ids can change while +/// queued commands wait for envoy reconnect. +pub(crate) async fn hydrate_command_wrapper( + ctx: &StandaloneCtx, + command_wrapper: &mut protocol::CommandWrapper, +) -> Result<()> { + if let protocol::Command::CommandStartActor(start) = &mut command_wrapper.inner { + let actor_id = + Id::parse(&command_wrapper.checkpoint.actor_id).context("invalid command actor id")?; + start.hibernating_requests = ctx + .op(pegboard::ops::actor::hibernating_request::list::Input { actor_id }) + .await? + .into_iter() + .map(|request| protocol::HibernatingRequest { + gateway_id: request.gateway_id, + request_id: request.request_id, + }) + .collect(); + } + + Ok(()) +} diff --git a/engine/packages/pegboard-envoy/src/lib.rs b/engine/packages/pegboard-envoy/src/lib.rs index affdd440df..b25d3c131f 100644 --- a/engine/packages/pegboard-envoy/src/lib.rs +++ b/engine/packages/pegboard-envoy/src/lib.rs @@ -15,6 +15,7 @@ mod actor_event_demuxer; mod actor_lifecycle; mod conn; mod errors; +mod hibernating_requests; mod metrics; mod ping_task; pub mod sqlite_runtime; diff --git a/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs index 67a4319a69..16673dc05b 100644 --- a/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs @@ -9,7 +9,7 @@ use universalpubsub as ups; use universalpubsub::{NextOutput, PublishOpts, Subscriber}; use vbare::OwnedVersionedData; -use crate::{LifecycleResult, actor_lifecycle, conn::Conn, metrics}; +use crate::{LifecycleResult, actor_lifecycle, conn::Conn, hibernating_requests, metrics}; #[tracing::instrument(name="tunnel_to_ws_task", skip_all, fields(ray_id=?ctx.ray_id(), req_id=?ctx.req_id(), envoy_key=%conn.envoy_key, protocol_version=%conn.protocol_version))] pub async fn task( @@ -126,6 +126,7 @@ async fn handle_message( protocol::ToEnvoyConn::ToEnvoyCommands(mut command_wrappers) => { // TODO: Parallelize for command_wrapper in &mut command_wrappers { + hibernating_requests::hydrate_command_wrapper(ctx, command_wrapper).await?; if let protocol::Command::CommandStopActor(_) = &command_wrapper.inner { actor_lifecycle::stop_actor(conn, &command_wrapper.checkpoint).await?; } diff --git a/engine/packages/pegboard/src/workflows/actor2/runtime.rs b/engine/packages/pegboard/src/workflows/actor2/runtime.rs index 39c685484f..250d51c364 100644 --- a/engine/packages/pegboard/src/workflows/actor2/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor2/runtime.rs @@ -368,8 +368,8 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu .as_ref() .and_then(|x| BASE64_STANDARD.decode(x).ok()), }, - // Empty because request ids are ephemeral. This is intercepted by guard and - // populated before it reaches the runner + // Request ids are ephemeral. Pegboard-envoy refreshes this on the + // WebSocket send path immediately before the actor start reaches envoy. hibernating_requests: Vec::new(), preloaded_kv: None, });