diff --git a/engine/packages/engine/src/commands/udb/cli.rs b/engine/packages/engine/src/commands/udb/cli.rs index dbc5ea438e..d777f55c64 100644 --- a/engine/packages/engine/src/commands/udb/cli.rs +++ b/engine/packages/engine/src/commands/udb/cli.rs @@ -660,7 +660,7 @@ impl SubCommand { } let (tx_entries, rx_entries) = - mpsc::channel::(10_000); + mpsc::unbounded_channel::(); // Writer task: re-applies v2 entries in txn-bounded batches. // recv_many is called inside each txn iteration when the local buffer @@ -970,7 +970,6 @@ impl SubCommand { tx_entries .send(v3_entry) - .await .context("writer task closed")?; } } diff --git a/engine/packages/guard-core/tests/streaming_response.rs b/engine/packages/guard-core/tests/streaming_response.rs index 41b59c148c..7cbde21a50 100644 --- a/engine/packages/guard-core/tests/streaming_response.rs +++ b/engine/packages/guard-core/tests/streaming_response.rs @@ -128,9 +128,9 @@ async fn test_streaming_response_should_timeout() { drop(message_sender); } -async fn start_streaming_server() -> (SocketAddr, mpsc::Sender) { +async fn start_streaming_server() -> (SocketAddr, mpsc::UnboundedSender) { // Create a channel for sending messages to the streaming endpoint - let (message_tx, _message_rx) = mpsc::channel::(100); + let (message_tx, _message_rx) = mpsc::unbounded_channel::(); // Bind to a random port let listener = TcpListener::bind("127.0.0.1:0") diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index 96e31bb288..1cb64a9869 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -17,7 +17,7 @@ use vbare::OwnedVersionedData; use crate::{WebsocketPendingLimitReached, metrics}; pub struct InFlightRequestHandle { - pub msg_rx: mpsc::Receiver, + pub msg_rx: mpsc::UnboundedReceiver, /// Used to check if the request handler has been dropped. /// /// This is separate from `msg_rx` there may still be messages that need to be sent to the @@ -81,7 +81,7 @@ struct InFlightRequest { protocol_version: u16, state: InFlightRequestState, /// Sender for incoming messages to this request. - msg_tx: mpsc::Sender, + msg_tx: mpsc::UnboundedSender, /// Used to check if the request handler has been dropped. drop_tx: watch::Sender>, /// True once first message for this request has been sent (so runner learned reply_to). @@ -184,7 +184,7 @@ impl SharedState { request_id: protocol::mk2::RequestId, state: InFlightRequestState, ) -> InFlightRequestHandle { - let (msg_tx, msg_rx) = mpsc::channel(128); + let (msg_tx, msg_rx) = mpsc::unbounded_channel(); let (drop_tx, drop_rx) = watch::channel(None); let new = match self.in_flight_requests.entry_async(request_id).await { @@ -481,7 +481,6 @@ impl SharedState { if in_flight .msg_tx .send(msg.message_kind.clone()) - .await .is_err() { tracing::warn!( diff --git a/engine/packages/pegboard-gateway/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-gateway/src/tunnel_to_ws_task.rs index 2f7d710afe..d7829e7a5f 100644 --- a/engine/packages/pegboard-gateway/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-gateway/src/tunnel_to_ws_task.rs @@ -21,7 +21,7 @@ pub async fn task( client_ws: WebSocketHandle, request_id: protocol::RequestId, mut stopped_sub: message::SubscriptionHandle, - mut msg_rx: mpsc::Receiver, + mut msg_rx: mpsc::UnboundedReceiver, mut drop_rx: watch::Receiver>, can_hibernate: bool, egress_bytes: Arc, diff --git a/engine/packages/pegboard-gateway2/src/hibernation_task.rs b/engine/packages/pegboard-gateway2/src/hibernation_task.rs index 040db1c39c..447296f1c6 100644 --- a/engine/packages/pegboard-gateway2/src/hibernation_task.rs +++ b/engine/packages/pegboard-gateway2/src/hibernation_task.rs @@ -26,7 +26,7 @@ pub async fn task( in_flight_req: InFlightRequestHandle, ctx: StandaloneCtx, actor_id: Id, - mut msg_rx: mpsc::Receiver, + mut msg_rx: mpsc::UnboundedReceiver, mut drop_rx: watch::Receiver>, egress_bytes: Arc, mut hibernation_abort_rx: watch::Receiver<()>, diff --git a/engine/packages/pegboard-gateway2/src/shared_state.rs b/engine/packages/pegboard-gateway2/src/shared_state.rs index f8dd70a925..2b6f2fee72 100644 --- a/engine/packages/pegboard-gateway2/src/shared_state.rs +++ b/engine/packages/pegboard-gateway2/src/shared_state.rs @@ -154,7 +154,7 @@ impl SharedState { request_id: protocol::RequestId, after_hibernation: bool, ) -> Result { - let (msg_tx, msg_rx) = mpsc::channel(128); + let (msg_tx, msg_rx) = mpsc::unbounded_channel(); let (drop_tx, drop_rx) = watch::channel(None); let new = match self.in_flight_requests.entry_async(request_id).await { @@ -206,7 +206,7 @@ impl SharedState { .await .context("request not in flight")?; - let (msg_tx, msg_rx) = mpsc::channel(128); + let (msg_tx, msg_rx) = mpsc::unbounded_channel(); let (drop_tx, drop_rx) = watch::channel(None); req.hibernate(msg_tx, drop_tx).await; @@ -288,7 +288,7 @@ impl Deref for SharedState { } pub struct InFlightRequestCtx { - pub msg_rx: mpsc::Receiver, + pub msg_rx: mpsc::UnboundedReceiver, /// Used to check if the request handler has been dropped. /// /// This is separate from `msg_rx` there may still be messages that need to be sent to the @@ -676,7 +676,7 @@ impl InFlightRequest { async fn wake( &mut self, receiver_subject: String, - msg_tx: mpsc::Sender, + msg_tx: mpsc::UnboundedSender, drop_tx: watch::Sender>, ) { self.receiver_subject = receiver_subject; @@ -722,7 +722,7 @@ impl InFlightRequest { #[tracing::instrument(skip_all)] async fn hibernate( &mut self, - msg_tx: mpsc::Sender, + msg_tx: mpsc::UnboundedSender, drop_tx: watch::Sender>, ) { let mut pending_tunnel_msgs = Vec::new(); @@ -816,7 +816,7 @@ impl InFlightRequest { enum InFlightRequestState { Active { /// Sender for incoming messages to this request. - msg_tx: mpsc::Sender, + msg_tx: mpsc::UnboundedSender, /// Used to check if the request handler has been dropped. drop_tx: watch::Sender>, last_pong: i64, @@ -826,7 +826,7 @@ enum InFlightRequestState { PendingHibernation { hibernation_state: HibernationState }, Hibernating { /// Sender for incoming messages to this request. - msg_tx: mpsc::Sender, + msg_tx: mpsc::UnboundedSender, /// Used to check if the hibernation handler has been dropped. drop_tx: watch::Sender>, hibernation_state: HibernationState, @@ -853,7 +853,7 @@ pub struct PendingWebsocketMessage { #[tracing::instrument(skip_all)] async fn forward_tunnel_message( receiver_subject: &str, - msg_tx: &mpsc::Sender, + msg_tx: &mpsc::UnboundedSender, mut msg: protocol::ToRivetTunnelMessage, ) -> Option { // Send message to the request handler to emulate the real network action @@ -869,7 +869,7 @@ async fn forward_tunnel_message( "forwarding message to request handler" ); - if let Err(send_err) = msg_tx.send(msg.message_kind).await { + if let Err(send_err) = msg_tx.send(msg.message_kind) { tracing::debug!( gateway_id=%protocol::util::id_to_string(&msg.message_id.gateway_id), request_id=%protocol::util::id_to_string(&msg.message_id.request_id), diff --git a/engine/packages/pegboard-gateway2/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-gateway2/src/tunnel_to_ws_task.rs index 0c0993ffc4..c815056e3a 100644 --- a/engine/packages/pegboard-gateway2/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-gateway2/src/tunnel_to_ws_task.rs @@ -21,7 +21,7 @@ pub async fn task( in_flight_req: InFlightRequestHandle, client_ws: WebSocketHandle, mut stopped_sub: message::SubscriptionHandle, - mut msg_rx: mpsc::Receiver, + mut msg_rx: mpsc::UnboundedReceiver, mut drop_rx: watch::Receiver>, can_hibernate: bool, egress_bytes: Arc, diff --git a/engine/packages/universaldb/src/driver/postgres/transaction.rs b/engine/packages/universaldb/src/driver/postgres/transaction.rs index 3d85dd8bae..f80cbba393 100644 --- a/engine/packages/universaldb/src/driver/postgres/transaction.rs +++ b/engine/packages/universaldb/src/driver/postgres/transaction.rs @@ -24,7 +24,7 @@ pub struct PostgresTransactionDriver { pool: Pool, operations: TransactionOperations, committed: AtomicBool, - tx_sender: OnceCell>, + tx_sender: OnceCell>, } impl PostgresTransactionDriver { @@ -38,10 +38,10 @@ impl PostgresTransactionDriver { } /// Get or create the transaction task - async fn ensure_transaction(&self) -> Result<&mpsc::Sender> { + async fn ensure_transaction(&self) -> Result<&mpsc::UnboundedSender> { self.tx_sender .get_or_try_init(|| async { - let (sender, receiver) = mpsc::channel(100); + let (sender, receiver) = mpsc::unbounded_channel(); // Spawn the transaction task with serializable isolation let task = TransactionTask::new(self.pool.clone(), receiver); @@ -78,7 +78,6 @@ impl TransactionDriver for PostgresTransactionDriver { key: key.clone(), response: response_tx, }) - .await .context("failed to send postgres transaction command")?; // Wait for response @@ -115,7 +114,6 @@ impl TransactionDriver for PostgresTransactionDriver { offset, response: response_tx, }) - .await .context("failed to send postgres transaction command")?; // Wait for response @@ -166,7 +164,6 @@ impl TransactionDriver for PostgresTransactionDriver { reverse, response: response_tx, }) - .await .context("failed to send postgres transaction command")?; // Wait for response @@ -230,7 +227,6 @@ impl TransactionDriver for PostgresTransactionDriver { conflict_ranges, response: response_tx, }) - .await .context("failed to send postgres transaction command")?; // Wait for commit response @@ -288,7 +284,6 @@ impl TransactionDriver for PostgresTransactionDriver { end, response: response_tx, }) - .await .context("failed to send postgres command")?; // Wait for response @@ -320,7 +315,6 @@ impl TransactionDriver for PostgresTransactionDriver { conflict_ranges, response: response_tx, }) - .await .context("failed to send postgres transaction command")?; // Wait for commit response diff --git a/engine/packages/universaldb/src/driver/postgres/transaction_task.rs b/engine/packages/universaldb/src/driver/postgres/transaction_task.rs index edb2febcb3..297751f608 100644 --- a/engine/packages/universaldb/src/driver/postgres/transaction_task.rs +++ b/engine/packages/universaldb/src/driver/postgres/transaction_task.rs @@ -59,11 +59,11 @@ pub enum TransactionCommand { /// issues while maintaining a single serializable transaction for all operations. pub struct TransactionTask { pool: Pool, - receiver: mpsc::Receiver, + receiver: mpsc::UnboundedReceiver, } impl TransactionTask { - pub fn new(pool: Pool, receiver: mpsc::Receiver) -> Self { + pub fn new(pool: Pool, receiver: mpsc::UnboundedReceiver) -> Self { Self { pool, receiver } } diff --git a/engine/packages/universaldb/src/driver/rocksdb/transaction.rs b/engine/packages/universaldb/src/driver/rocksdb/transaction.rs index 6090266b88..e62c3eda79 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/transaction.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/transaction.rs @@ -30,7 +30,7 @@ pub struct RocksDbTransactionDriver { db: Arc, operations: TransactionOperations, committed: AtomicBool, - tx_sender: OnceCell>, + tx_sender: OnceCell>, txn_conflict_tracker: TransactionConflictTracker, start_version: u64, } @@ -53,10 +53,10 @@ impl RocksDbTransactionDriver { } /// Get or create the transaction task for non-snapshot operations - async fn ensure_transaction(&self) -> Result<&mpsc::Sender> { + async fn ensure_transaction(&self) -> Result<&mpsc::UnboundedSender> { self.tx_sender .get_or_try_init(|| async { - let (sender, receiver) = mpsc::channel(100); + let (sender, receiver) = mpsc::unbounded_channel(); // Spawn the transaction task let task = TransactionTask::new( @@ -97,7 +97,6 @@ impl TransactionDriver for RocksDbTransactionDriver { key: key.clone(), response: response_tx, }) - .await .context("failed to send rocksdb transaction command")?; // Wait for response @@ -134,7 +133,6 @@ impl TransactionDriver for RocksDbTransactionDriver { offset, response: response_tx, }) - .await .context("failed to send rocksdb transaction command")?; // Wait for response @@ -185,7 +183,6 @@ impl TransactionDriver for RocksDbTransactionDriver { reverse, response: response_tx, }) - .await .context("failed to send rocksdb transaction command")?; // Wait for response @@ -250,7 +247,6 @@ impl TransactionDriver for RocksDbTransactionDriver { conflict_ranges, response: response_tx, }) - .await .context("failed to send rocksdb transaction command")?; // Wait for commit response @@ -307,7 +303,6 @@ impl TransactionDriver for RocksDbTransactionDriver { end, response: response_tx, }) - .await .context("failed to send rocksdb command")?; // Wait for response @@ -340,7 +335,6 @@ impl TransactionDriver for RocksDbTransactionDriver { conflict_ranges, response: response_tx, }) - .await .context("failed to send rocksdb transaction command")?; // Wait for commit response diff --git a/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs b/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs index 5c8157f6a9..b0608e81b6 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs @@ -57,14 +57,14 @@ pub enum TransactionCommand { pub struct TransactionTask { db: Arc, txn_conflict_tracker: TransactionConflictTracker, - receiver: mpsc::Receiver, + receiver: mpsc::UnboundedReceiver, } impl TransactionTask { pub fn new( db: Arc, txn_conflict_tracker: TransactionConflictTracker, - receiver: mpsc::Receiver, + receiver: mpsc::UnboundedReceiver, ) -> Self { TransactionTask { db, diff --git a/rivetkit-python/client/src/simple/async/handle.rs b/rivetkit-python/client/src/simple/async/handle.rs index 2daae6fc5e..f2f6459bb6 100644 --- a/rivetkit-python/client/src/simple/async/handle.rs +++ b/rivetkit-python/client/src/simple/async/handle.rs @@ -5,8 +5,6 @@ use tokio::sync::mpsc; use crate::util; -const EVENT_BUFFER_SIZE: usize = 100; - struct ActorEvent { name: String, args: Vec, @@ -15,13 +13,13 @@ struct ActorEvent { #[pyclass] pub struct ActorHandle { handle: rivetkit_rs::connection::ActorHandle, - event_rx: Option>, - event_tx: mpsc::Sender, + event_rx: Option>, + event_tx: mpsc::UnboundedSender, } impl ActorHandle { pub fn new(handle: rivetkit_rs::connection::ActorHandle) -> Self { - let (event_tx, event_rx) = mpsc::channel(EVENT_BUFFER_SIZE); + let (event_tx, event_rx) = mpsc::unbounded_channel(); Self { handle, @@ -100,7 +98,7 @@ impl ActorHandle { args: args.clone(), }; // Send this upstream(?) - tx.send(event).await.map_err(|e| { + tx.send(event).map_err(|e| { py_runtime_err!( "Failed to send via inner tx: {}", e diff --git a/rivetkit-python/client/src/simple/sync/handle.rs b/rivetkit-python/client/src/simple/sync/handle.rs index c75c5fe77e..f623576f07 100644 --- a/rivetkit-python/client/src/simple/sync/handle.rs +++ b/rivetkit-python/client/src/simple/sync/handle.rs @@ -5,8 +5,6 @@ use tokio::sync::mpsc; use crate::util::{self, SYNC_RUNTIME}; -const EVENT_BUFFER_SIZE: usize = 100; - struct ActorEvent { name: String, args: Vec, @@ -15,13 +13,13 @@ struct ActorEvent { #[pyclass] pub struct ActorHandle { handle: rivetkit_rs::connection::ActorHandle, - event_rx: Option>, - event_tx: mpsc::Sender, + event_rx: Option>, + event_tx: mpsc::UnboundedSender, } impl ActorHandle { pub fn new(handle: rivetkit_rs::connection::ActorHandle) -> Self { - let (event_tx, event_rx) = mpsc::channel(EVENT_BUFFER_SIZE); + let (event_tx, event_rx) = mpsc::unbounded_channel(); Self { handle, @@ -90,7 +88,7 @@ impl ActorHandle { args: args.clone(), }; // Send this upstream(?) - tx.send(event).await.map_err(|e| { + tx.send(event).map_err(|e| { py_runtime_err!( "Failed to send via inner tx: {}", e