Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion engine/packages/pegboard-envoy/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use scc::HashMap;
use universaldb::prelude::*;
use vbare::OwnedVersionedData;

use crate::{actor_lifecycle, errors, metrics, utils::UrlData};
use crate::{actor_lifecycle, errors, hibernating_requests, metrics, utils::UrlData};

pub type RemoteSqliteExecutors =
HashMap<(String, u64), Arc<tokio::sync::OnceCell<NativeDatabaseHandle>>>;
Expand Down Expand Up @@ -333,6 +333,7 @@ pub async fn init_conn(
if !missed_commands.is_empty() {
let replay_result: Result<()> = async {
for cmd_wrapper in &mut missed_commands {
hibernating_requests::hydrate_command_wrapper(ctx, cmd_wrapper).await?;
if let protocol::Command::CommandStopActor(_) = cmd_wrapper.inner {
actor_lifecycle::stop_actor(&conn, &cmd_wrapper.checkpoint).await?;
}
Expand Down
27 changes: 27 additions & 0 deletions engine/packages/pegboard-envoy/src/hibernating_requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use anyhow::Context;
use gas::prelude::*;
use rivet_envoy_protocol as protocol;

/// Hydrates ephemeral hibernating request ids before a start command reaches envoy.
/// Stored serverful commands keep this empty because request ids can change while
/// queued commands wait for envoy reconnect.
pub(crate) async fn hydrate_command_wrapper(
ctx: &StandaloneCtx,
command_wrapper: &mut protocol::CommandWrapper,
) -> Result<()> {
if let protocol::Command::CommandStartActor(start) = &mut command_wrapper.inner {
let actor_id =
Id::parse(&command_wrapper.checkpoint.actor_id).context("invalid command actor id")?;
start.hibernating_requests = ctx
.op(pegboard::ops::actor::hibernating_request::list::Input { actor_id })
.await?
.into_iter()
.map(|request| protocol::HibernatingRequest {
gateway_id: request.gateway_id,
request_id: request.request_id,
})
.collect();
}

Ok(())
}
1 change: 1 addition & 0 deletions engine/packages/pegboard-envoy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod actor_event_demuxer;
mod actor_lifecycle;
mod conn;
mod errors;
mod hibernating_requests;
mod metrics;
mod ping_task;
pub mod sqlite_runtime;
Expand Down
3 changes: 2 additions & 1 deletion engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use universalpubsub as ups;
use universalpubsub::{NextOutput, PublishOpts, Subscriber};
use vbare::OwnedVersionedData;

use crate::{LifecycleResult, actor_lifecycle, conn::Conn, metrics};
use crate::{LifecycleResult, actor_lifecycle, conn::Conn, hibernating_requests, metrics};

#[tracing::instrument(name="tunnel_to_ws_task", skip_all, fields(ray_id=?ctx.ray_id(), req_id=?ctx.req_id(), envoy_key=%conn.envoy_key, protocol_version=%conn.protocol_version))]
pub async fn task(
Expand Down Expand Up @@ -126,6 +126,7 @@ async fn handle_message(
protocol::ToEnvoyConn::ToEnvoyCommands(mut command_wrappers) => {
// TODO: Parallelize
for command_wrapper in &mut command_wrappers {
hibernating_requests::hydrate_command_wrapper(ctx, command_wrapper).await?;
if let protocol::Command::CommandStopActor(_) = &command_wrapper.inner {
actor_lifecycle::stop_actor(conn, &command_wrapper.checkpoint).await?;
}
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/pegboard/src/workflows/actor2/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ pub async fn send_outbound(ctx: &ActivityCtx, input: &SendOutboundInput) -> Resu
.as_ref()
.and_then(|x| BASE64_STANDARD.decode(x).ok()),
},
// Empty because request ids are ephemeral. This is intercepted by guard and
// populated before it reaches the runner
// Request ids are ephemeral. Pegboard-envoy refreshes this on the
// WebSocket send path immediately before the actor start reaches envoy.
hibernating_requests: Vec::new(),
preloaded_kv: None,
});
Expand Down
Loading