Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rivetkit-rust/packages/client/src/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ pub enum DriverStopReason {
#[derive(Debug)]
pub struct DriverHandle {
abort_handle: AbortHandle,
sender: mpsc::Sender<MessageToServer>,
sender: mpsc::UnboundedSender<MessageToServer>,
}

impl DriverHandle {
pub fn new(sender: mpsc::Sender<MessageToServer>, abort_handle: AbortHandle) -> Self {
pub fn new(sender: mpsc::UnboundedSender<MessageToServer>, abort_handle: AbortHandle) -> Self {
Self {
sender,
abort_handle,
}
}

pub async fn send(&self, msg: Arc<to_server::ToServer>) -> Result<()> {
self.sender.send(msg).await?;
self.sender.send(msg)?;

Ok(())
}
Expand All @@ -61,7 +61,7 @@ impl Drop for DriverHandle {

pub type DriverConnection = (
DriverHandle,
mpsc::Receiver<MessageToClient>,
mpsc::UnboundedReceiver<MessageToClient>,
JoinHandle<DriverStopReason>,
);

Expand Down
12 changes: 6 additions & 6 deletions rivetkit-rust/packages/client/src/drivers/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub(crate) async fn connect(args: DriverConnectArgs) -> Result<DriverConnection>
.await
.context("Failed to connect to WebSocket via gateway")?;

let (in_tx, in_rx) = mpsc::channel::<MessageToClient>(32);
let (out_tx, out_rx) = mpsc::channel::<MessageToServer>(32);
let (in_tx, in_rx) = mpsc::unbounded_channel::<MessageToClient>();
let (out_tx, out_rx) = mpsc::unbounded_channel::<MessageToServer>();

let task = tokio::spawn(start(ws, args.encoding_kind, in_tx, out_rx));
let handle = DriverHandle::new(out_tx, task.abort_handle());
Expand All @@ -51,8 +51,8 @@ async fn start(
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
encoding_kind: EncodingKind,
in_tx: mpsc::Sender<MessageToClient>,
mut out_rx: mpsc::Receiver<MessageToServer>,
in_tx: mpsc::UnboundedSender<MessageToClient>,
mut out_rx: mpsc::UnboundedReceiver<MessageToServer>,
) -> DriverStopReason {
let (mut ws_sink, mut ws_stream) = ws.split();

Expand Down Expand Up @@ -85,7 +85,7 @@ async fn start(
// Handle ws incoming
msg = ws_stream.next() => {
let Some(msg) = msg else {
println!("Receiver dropped");
debug!("Receiver dropped");
return DriverStopReason::ServerDisconnect;
};

Expand All @@ -97,7 +97,7 @@ async fn start(
continue;
};

if let Err(e) = in_tx.send(Arc::new(msg)).await {
if let Err(e) = in_tx.send(Arc::new(msg)) {
debug!("Failed to send text message: {}", e);
// failure to send means user dropped incoming receiver
return DriverStopReason::UserAborted;
Expand Down
9 changes: 0 additions & 9 deletions rivetkit-rust/packages/rivetkit-core/src/actor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ const DEFAULT_MAX_QUEUE_SIZE: u32 = 1000;
const DEFAULT_MAX_QUEUE_MESSAGE_SIZE: u32 = 65_536;
const DEFAULT_MAX_INCOMING_MESSAGE_SIZE: u32 = 65_536;
const DEFAULT_MAX_OUTGOING_MESSAGE_SIZE: u32 = 1_048_576;
const DEFAULT_LIFECYCLE_COMMAND_INBOX_CAPACITY: usize = 64;
const DEFAULT_DISPATCH_COMMAND_INBOX_CAPACITY: usize = 1024;
const DEFAULT_LIFECYCLE_EVENT_INBOX_CAPACITY: usize = 4096;

#[derive(Clone)]
pub enum CanHibernateWebSocket {
Expand Down Expand Up @@ -83,9 +80,6 @@ pub struct ActorConfig {
pub max_queue_message_size: u32,
pub max_incoming_message_size: u32,
pub max_outgoing_message_size: u32,
pub lifecycle_command_inbox_capacity: usize,
pub dispatch_command_inbox_capacity: usize,
pub lifecycle_event_inbox_capacity: usize,
pub preload_max_workflow_bytes: Option<u64>,
pub preload_max_connections_bytes: Option<u64>,
pub overrides: Option<ActorConfigOverrides>,
Expand Down Expand Up @@ -233,9 +227,6 @@ impl Default for ActorConfig {
max_queue_message_size: DEFAULT_MAX_QUEUE_MESSAGE_SIZE,
max_incoming_message_size: DEFAULT_MAX_INCOMING_MESSAGE_SIZE,
max_outgoing_message_size: DEFAULT_MAX_OUTGOING_MESSAGE_SIZE,
lifecycle_command_inbox_capacity: DEFAULT_LIFECYCLE_COMMAND_INBOX_CAPACITY,
dispatch_command_inbox_capacity: DEFAULT_DISPATCH_COMMAND_INBOX_CAPACITY,
lifecycle_event_inbox_capacity: DEFAULT_LIFECYCLE_EVENT_INBOX_CAPACITY,
preload_max_workflow_bytes: None,
preload_max_connections_bytes: None,
overrides: None,
Expand Down
32 changes: 7 additions & 25 deletions rivetkit-rust/packages/rivetkit-core/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ use crate::actor::schedule::{InternalKeepAwakeCallback, LocalAlarmCallback};
use crate::actor::sleep::{CanSleep, SleepState};
use crate::actor::state::{PendingSave, PersistedActor, RequestSaveOpts};
use crate::actor::task::LifecycleEvent;
#[cfg(not(target_arch = "wasm32"))]
use crate::actor::task::{LIFECYCLE_EVENT_INBOX_CHANNEL, actor_channel_overloaded_error};
use crate::actor::task_types::UserTaskKind;
use crate::actor::work_registry::{ActorWorkKind, CountGuard, RegionGuard};
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
Expand Down Expand Up @@ -140,9 +138,8 @@ pub(crate) struct ActorContextInner {
inspector_attach_count: RwLock<Option<Arc<AtomicU32>>>,
inspector_overlay_tx: RwLock<Option<broadcast::Sender<Arc<Vec<u8>>>>>,
actor_events: RwLock<Option<mpsc::UnboundedSender<ActorEvent>>>,
pub(super) lifecycle_events: RwLock<Option<mpsc::Sender<LifecycleEvent>>>,
pub(super) lifecycle_events: RwLock<Option<mpsc::UnboundedSender<LifecycleEvent>>>,
hibernated_connection_liveness_override: RwLock<Option<BTreeSet<(Vec<u8>, Vec<u8>)>>>,
pub(super) lifecycle_event_inbox_capacity: usize,
pub(super) metrics: ActorMetrics,
diagnostics: ActorDiagnostics,
actor_id: String,
Expand Down Expand Up @@ -230,7 +227,6 @@ impl ActorContext {
#[cfg(feature = "sqlite-local")]
sql.set_vfs_metrics(Arc::new(metrics.clone()));
let diagnostics = ActorDiagnostics::new(actor_id.clone());
let lifecycle_event_inbox_capacity = config.lifecycle_event_inbox_capacity;
let state_save_interval = config.state_save_interval;
let abort_signal = CancellationToken::new();
let shutdown_deadline = CancellationToken::new();
Expand Down Expand Up @@ -307,7 +303,6 @@ impl ActorContext {
actor_events: RwLock::new(None),
lifecycle_events: RwLock::new(None),
hibernated_connection_liveness_override: RwLock::new(None),
lifecycle_event_inbox_capacity,
metrics,
diagnostics,
actor_id,
Expand Down Expand Up @@ -1243,7 +1238,10 @@ impl ActorContext {
.await
}

pub(crate) fn configure_lifecycle_events(&self, sender: Option<mpsc::Sender<LifecycleEvent>>) {
pub(crate) fn configure_lifecycle_events(
&self,
sender: Option<mpsc::UnboundedSender<LifecycleEvent>>,
) {
*self.0.lifecycle_events.write() = sender;
}

Expand Down Expand Up @@ -1466,28 +1464,12 @@ impl ActorContext {
}

fn try_send_lifecycle_event(&self, event: LifecycleEvent, operation: &'static str) {
#[cfg(target_arch = "wasm32")]
let _ = operation;

let Some(sender) = self.0.lifecycle_events.read().clone() else {
return;
};

match sender.try_reserve() {
Ok(permit) => {
permit.send(event);
}
#[cfg(target_arch = "wasm32")]
Err(_) => {}
#[cfg(not(target_arch = "wasm32"))]
Err(_) => {
let _ = actor_channel_overloaded_error(
LIFECYCLE_EVENT_INBOX_CHANNEL,
self.0.lifecycle_event_inbox_capacity,
operation,
Some(&self.0.metrics),
);
}
if sender.send(event).is_err() {
tracing::warn!(operation, "failed to enqueue actor lifecycle event");
}
}
}
Expand Down
24 changes: 0 additions & 24 deletions rivetkit-rust/packages/rivetkit-core/src/actor/diagnostics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const WARNING_LIMIT: usize = 3;

// Forced-sync: warning windows are updated from synchronous diagnostics paths.
static GLOBAL_WARNINGS: OnceLock<SccHashMap<String, Arc<Mutex<WarningWindow>>>> = OnceLock::new();
static ACTOR_WARNINGS: OnceLock<SccHashMap<String, Arc<Mutex<WarningWindow>>>> = OnceLock::new();

#[derive(Debug)]
pub(crate) struct ActorDiagnostics {
Expand Down Expand Up @@ -43,25 +42,6 @@ impl ActorDiagnostics {
}
}

pub(crate) fn record_actor_warning(
actor_id: &str,
kind: &'static str,
) -> Option<WarningSuppression> {
let actor_key = format!("{actor_id}:{kind}");
let per_actor = record_limited_warning(actor_warnings(), actor_key, Instant::now());
let global = record_limited_warning(global_warnings(), kind.to_owned(), Instant::now());

if per_actor.emit && global.emit {
Some(WarningSuppression {
actor_id: actor_id.to_owned(),
per_actor_suppressed: per_actor.suppressed,
global_suppressed: global.suppressed,
})
} else {
None
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct WarningSuppression {
pub(crate) actor_id: String,
Expand Down Expand Up @@ -138,7 +118,3 @@ fn record_limited_warning(
fn global_warnings() -> &'static SccHashMap<String, Arc<Mutex<WarningWindow>>> {
GLOBAL_WARNINGS.get_or_init(SccHashMap::new)
}

fn actor_warnings() -> &'static SccHashMap<String, Arc<Mutex<WarningWindow>>> {
ACTOR_WARNINGS.get_or_init(SccHashMap::new)
}
73 changes: 1 addition & 72 deletions rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

#[derive(Clone)]
pub(crate) struct ActorMetrics {
actor_id: Arc<str>,
inner: Arc<Option<ActorMetricsInner>>,
}

Expand All @@ -29,11 +28,8 @@
active_connections: IntGauge,
connections_total: IntCounter,
lifecycle_inbox_depth: IntGauge,
lifecycle_inbox_overload_total: CounterVec,
dispatch_inbox_depth: IntGauge,
dispatch_inbox_overload_total: CounterVec,
lifecycle_event_inbox_depth: IntGauge,
lifecycle_event_overload_total: CounterVec,
user_tasks_active: IntGaugeVec,
user_task_duration_seconds: HistogramVec,
shutdown_wait_seconds: HistogramVec,
Expand Down Expand Up @@ -97,13 +93,10 @@
"actor metrics disabled after initialization failure"
);
None
}

Check warning on line 96 in rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs
};

Self {
actor_id: Arc::from(actor_id),
inner: Arc::new(inner),
}
Self { inner: Arc::new(inner) }
}

fn try_new_inner(actor_id: &str, actor_name: String) -> Result<ActorMetricsInner> {
Expand Down Expand Up @@ -154,40 +147,16 @@
"current actor lifecycle command inbox depth",
))
.context("create lifecycle_inbox_depth gauge")?;
let lifecycle_inbox_overload_total = CounterVec::new(
Opts::new(
"lifecycle_inbox_overload_total",
"total actor lifecycle command inbox overloads",
),
&["command"],
)
.context("create lifecycle_inbox_overload_total counter")?;
let dispatch_inbox_depth = IntGauge::with_opts(Opts::new(
"dispatch_inbox_depth",
"current actor dispatch command inbox depth",
))
.context("create dispatch_inbox_depth gauge")?;
let dispatch_inbox_overload_total = CounterVec::new(
Opts::new(
"dispatch_inbox_overload_total",
"total actor dispatch command inbox overloads",
),
&["command"],
)
.context("create dispatch_inbox_overload_total counter")?;
let lifecycle_event_inbox_depth = IntGauge::with_opts(Opts::new(
"lifecycle_event_inbox_depth",
"current actor lifecycle event inbox depth",
))
.context("create lifecycle_event_inbox_depth gauge")?;
let lifecycle_event_overload_total = CounterVec::new(
Opts::new(
"lifecycle_event_overload_total",
"total actor lifecycle event inbox overloads",
),
&["event"],
)
.context("create lifecycle_event_overload_total counter")?;
let user_tasks_active = IntGaugeVec::new(
Opts::new("user_tasks_active", "current active actor user tasks"),
&["kind"],
Expand Down Expand Up @@ -385,11 +354,8 @@
register_metric(&registry, active_connections.clone());
register_metric(&registry, connections_total.clone());
register_metric(&registry, lifecycle_inbox_depth.clone());
register_metric(&registry, lifecycle_inbox_overload_total.clone());
register_metric(&registry, dispatch_inbox_depth.clone());
register_metric(&registry, dispatch_inbox_overload_total.clone());
register_metric(&registry, lifecycle_event_inbox_depth.clone());
register_metric(&registry, lifecycle_event_overload_total.clone());
register_metric(&registry, user_tasks_active.clone());
register_metric(&registry, user_task_duration_seconds.clone());
register_metric(&registry, shutdown_wait_seconds.clone());
Expand Down Expand Up @@ -464,11 +430,8 @@
active_connections,
connections_total,
lifecycle_inbox_depth,
lifecycle_inbox_overload_total,
dispatch_inbox_depth,
dispatch_inbox_overload_total,
lifecycle_event_inbox_depth,
lifecycle_event_overload_total,
user_tasks_active,
user_task_duration_seconds,
shutdown_wait_seconds,
Expand Down Expand Up @@ -520,10 +483,6 @@
})
}

pub(crate) fn actor_id(&self) -> &str {
&self.actor_id
}

pub(crate) fn render(&self) -> Result<String> {
let Some(inner) = self.inner.as_ref().as_ref() else {
return Ok(String::new());
Expand Down Expand Up @@ -600,16 +559,6 @@
.set(depth.try_into().unwrap_or(i64::MAX));
}

pub(crate) fn inc_lifecycle_inbox_overload(&self, command: &str) {
let Some(inner) = self.inner.as_ref().as_ref() else {
return;
};
inner
.lifecycle_inbox_overload_total
.with_label_values(&[command])
.inc();
}

pub(crate) fn set_dispatch_inbox_depth(&self, depth: usize) {
let Some(inner) = self.inner.as_ref().as_ref() else {
return;
Expand All @@ -619,16 +568,6 @@
.set(depth.try_into().unwrap_or(i64::MAX));
}

pub(crate) fn inc_dispatch_inbox_overload(&self, command: &str) {
let Some(inner) = self.inner.as_ref().as_ref() else {
return;
};
inner
.dispatch_inbox_overload_total
.with_label_values(&[command])
.inc();
}

pub(crate) fn set_lifecycle_event_inbox_depth(&self, depth: usize) {
let Some(inner) = self.inner.as_ref().as_ref() else {
return;
Expand All @@ -638,16 +577,6 @@
.set(depth.try_into().unwrap_or(i64::MAX));
}

pub(crate) fn inc_lifecycle_event_overload(&self, event: &str) {
let Some(inner) = self.inner.as_ref().as_ref() else {
return;
};
inner
.lifecycle_event_overload_total
.with_label_values(&[event])
.inc();
}

pub(crate) fn begin_user_task(&self, kind: UserTaskKind) {
let Some(inner) = self.inner.as_ref().as_ref() else {
return;
Expand Down
Loading
Loading