Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ use rivet_envoy_client::handle::EnvoyHandle;
use crate::error::ActorRuntime;
use crate::types::ListOpts;

/// Maximum keys per `apply_batch` put or delete list. Mirrors the engine-side
/// `MAX_KEYS` limit in `engine/packages/pegboard/src/actor_kv/mod.rs`; the
/// envoy backend rejects requests above this.
pub(crate) const APPLY_BATCH_CHUNK_SIZE: usize = 128;

#[derive(Clone)]
pub struct Kv {
backend: KvBackend,
Expand Down
21 changes: 16 additions & 5 deletions rivetkit-rust/packages/rivetkit-core/src/actor/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tracing::Instrument;

use crate::actor::context::ActorContext;
use crate::actor::keys::{LAST_PUSHED_ALARM_KEY, PERSIST_DATA_KEY, make_connection_key};
use crate::actor::kv::APPLY_BATCH_CHUNK_SIZE;
use crate::actor::messages::StateDelta;
use crate::actor::persist::{
decode_latest_with_embedded_version, encode_latest_with_embedded_version,
Expand Down Expand Up @@ -339,11 +340,21 @@ impl ActorContext {
(puts, deletes, next_state, revision, self.begin_write())
};

self.0
.kv
.apply_batch(&puts, &deletes)
.await
.context("persist actor state deltas to kv")?;
// TODO: Make this atomic. The ideal path is to store these deltas in SQLite.
let mut put_chunks = puts.chunks(APPLY_BATCH_CHUNK_SIZE);
let mut delete_chunks = deletes.chunks(APPLY_BATCH_CHUNK_SIZE);
loop {
let put_chunk = put_chunks.next().unwrap_or(&[]);
let delete_chunk = delete_chunks.next().unwrap_or(&[]);
if put_chunk.is_empty() && delete_chunk.is_empty() {
break;
}
self.0
.kv
.apply_batch(put_chunk, delete_chunk)
.await
.context("persist actor state deltas to kv")?;
}

if let Some(state) = next_state {
*self.0.current_state.write() = state;
Expand Down
Loading