Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions engine/packages/engine/src/commands/udb/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ impl SubCommand {
}

let (tx_entries, rx_entries) =
mpsc::channel::<protocol::ChangelogEntry>(10_000);
mpsc::unbounded_channel::<protocol::ChangelogEntry>();

// Writer task: re-applies v2 entries in txn-bounded batches.
// recv_many is called inside each txn iteration when the local buffer
Expand Down Expand Up @@ -970,7 +970,6 @@ impl SubCommand {

tx_entries
.send(v3_entry)
.await
.context("writer task closed")?;
}
}
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/guard-core/tests/streaming_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ async fn test_streaming_response_should_timeout() {
drop(message_sender);
}

async fn start_streaming_server() -> (SocketAddr, mpsc::Sender<String>) {
async fn start_streaming_server() -> (SocketAddr, mpsc::UnboundedSender<String>) {
// Create a channel for sending messages to the streaming endpoint
let (message_tx, _message_rx) = mpsc::channel::<String>(100);
let (message_tx, _message_rx) = mpsc::unbounded_channel::<String>();

// Bind to a random port
let listener = TcpListener::bind("127.0.0.1:0")
Expand Down
7 changes: 3 additions & 4 deletions engine/packages/pegboard-gateway/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use crate::{WebsocketPendingLimitReached, metrics};

pub struct InFlightRequestHandle {
pub msg_rx: mpsc::Receiver<protocol::mk2::ToServerTunnelMessageKind>,
pub msg_rx: mpsc::UnboundedReceiver<protocol::mk2::ToServerTunnelMessageKind>,
/// Used to check if the request handler has been dropped.
///
/// This is separate from `msg_rx` there may still be messages that need to be sent to the
Expand Down Expand Up @@ -81,7 +81,7 @@
protocol_version: u16,
state: InFlightRequestState,
/// Sender for incoming messages to this request.
msg_tx: mpsc::Sender<protocol::mk2::ToServerTunnelMessageKind>,
msg_tx: mpsc::UnboundedSender<protocol::mk2::ToServerTunnelMessageKind>,
/// Used to check if the request handler has been dropped.
drop_tx: watch::Sender<Option<MsgGcReason>>,
/// True once first message for this request has been sent (so runner learned reply_to).
Expand Down Expand Up @@ -184,7 +184,7 @@
request_id: protocol::mk2::RequestId,
state: InFlightRequestState,
) -> InFlightRequestHandle {
let (msg_tx, msg_rx) = mpsc::channel(128);
let (msg_tx, msg_rx) = mpsc::unbounded_channel();
let (drop_tx, drop_rx) = watch::channel(None);

let new = match self.in_flight_requests.entry_async(request_id).await {
Expand Down Expand Up @@ -475,13 +475,12 @@
gateway_id=%protocol::util::id_to_string(&message_id.gateway_id),
request_id=%protocol::util::id_to_string(&message_id.request_id),
message_index=message_id.message_index,
inner_size,

Check warning on line 478 in engine/packages/pegboard-gateway/src/shared_state.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/pegboard-gateway/src/shared_state.rs
"forwarding message to request handler"
);
if in_flight
.msg_tx
.send(msg.message_kind.clone())
.await
.is_err()
{
tracing::warn!(
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway/src/tunnel_to_ws_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub async fn task(
client_ws: WebSocketHandle,
request_id: protocol::RequestId,
mut stopped_sub: message::SubscriptionHandle<pegboard::workflows::actor::Stopped>,
mut msg_rx: mpsc::Receiver<protocol::mk2::ToServerTunnelMessageKind>,
mut msg_rx: mpsc::UnboundedReceiver<protocol::mk2::ToServerTunnelMessageKind>,
mut drop_rx: watch::Receiver<Option<MsgGcReason>>,
can_hibernate: bool,
egress_bytes: Arc<AtomicU64>,
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway2/src/hibernation_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub async fn task(
in_flight_req: InFlightRequestHandle,
ctx: StandaloneCtx,
actor_id: Id,
mut msg_rx: mpsc::Receiver<protocol::ToRivetTunnelMessageKind>,
mut msg_rx: mpsc::UnboundedReceiver<protocol::ToRivetTunnelMessageKind>,
mut drop_rx: watch::Receiver<Option<MsgGcReason>>,
egress_bytes: Arc<AtomicU64>,
mut hibernation_abort_rx: watch::Receiver<()>,
Expand Down
18 changes: 9 additions & 9 deletions engine/packages/pegboard-gateway2/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl SharedState {
request_id: protocol::RequestId,
after_hibernation: bool,
) -> Result<InFlightRequestCtx> {
let (msg_tx, msg_rx) = mpsc::channel(128);
let (msg_tx, msg_rx) = mpsc::unbounded_channel();
let (drop_tx, drop_rx) = watch::channel(None);

let new = match self.in_flight_requests.entry_async(request_id).await {
Expand Down Expand Up @@ -206,7 +206,7 @@ impl SharedState {
.await
.context("request not in flight")?;

let (msg_tx, msg_rx) = mpsc::channel(128);
let (msg_tx, msg_rx) = mpsc::unbounded_channel();
let (drop_tx, drop_rx) = watch::channel(None);

req.hibernate(msg_tx, drop_tx).await;
Expand Down Expand Up @@ -288,7 +288,7 @@ impl Deref for SharedState {
}

pub struct InFlightRequestCtx {
pub msg_rx: mpsc::Receiver<protocol::ToRivetTunnelMessageKind>,
pub msg_rx: mpsc::UnboundedReceiver<protocol::ToRivetTunnelMessageKind>,
/// Used to check if the request handler has been dropped.
///
/// This is separate from `msg_rx` there may still be messages that need to be sent to the
Expand Down Expand Up @@ -676,7 +676,7 @@ impl InFlightRequest {
async fn wake(
&mut self,
receiver_subject: String,
msg_tx: mpsc::Sender<protocol::ToRivetTunnelMessageKind>,
msg_tx: mpsc::UnboundedSender<protocol::ToRivetTunnelMessageKind>,
drop_tx: watch::Sender<Option<MsgGcReason>>,
) {
self.receiver_subject = receiver_subject;
Expand Down Expand Up @@ -722,7 +722,7 @@ impl InFlightRequest {
#[tracing::instrument(skip_all)]
async fn hibernate(
&mut self,
msg_tx: mpsc::Sender<protocol::ToRivetTunnelMessageKind>,
msg_tx: mpsc::UnboundedSender<protocol::ToRivetTunnelMessageKind>,
drop_tx: watch::Sender<Option<MsgGcReason>>,
) {
let mut pending_tunnel_msgs = Vec::new();
Expand Down Expand Up @@ -816,7 +816,7 @@ impl InFlightRequest {
enum InFlightRequestState {
Active {
/// Sender for incoming messages to this request.
msg_tx: mpsc::Sender<protocol::ToRivetTunnelMessageKind>,
msg_tx: mpsc::UnboundedSender<protocol::ToRivetTunnelMessageKind>,
/// Used to check if the request handler has been dropped.
drop_tx: watch::Sender<Option<MsgGcReason>>,
last_pong: i64,
Expand All @@ -826,7 +826,7 @@ enum InFlightRequestState {
PendingHibernation { hibernation_state: HibernationState },
Hibernating {
/// Sender for incoming messages to this request.
msg_tx: mpsc::Sender<protocol::ToRivetTunnelMessageKind>,
msg_tx: mpsc::UnboundedSender<protocol::ToRivetTunnelMessageKind>,
/// Used to check if the hibernation handler has been dropped.
drop_tx: watch::Sender<Option<MsgGcReason>>,
hibernation_state: HibernationState,
Expand All @@ -853,7 +853,7 @@ pub struct PendingWebsocketMessage {
#[tracing::instrument(skip_all)]
async fn forward_tunnel_message(
receiver_subject: &str,
msg_tx: &mpsc::Sender<protocol::ToRivetTunnelMessageKind>,
msg_tx: &mpsc::UnboundedSender<protocol::ToRivetTunnelMessageKind>,
mut msg: protocol::ToRivetTunnelMessage,
) -> Option<protocol::ToRivetTunnelMessage> {
// Send message to the request handler to emulate the real network action
Expand All @@ -869,7 +869,7 @@ async fn forward_tunnel_message(
"forwarding message to request handler"
);

if let Err(send_err) = msg_tx.send(msg.message_kind).await {
if let Err(send_err) = msg_tx.send(msg.message_kind) {
tracing::debug!(
gateway_id=%protocol::util::id_to_string(&msg.message_id.gateway_id),
request_id=%protocol::util::id_to_string(&msg.message_id.request_id),
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway2/src/tunnel_to_ws_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub async fn task(
in_flight_req: InFlightRequestHandle,
client_ws: WebSocketHandle,
mut stopped_sub: message::SubscriptionHandle<pegboard::workflows::actor2::Stopped>,
mut msg_rx: mpsc::Receiver<protocol::ToRivetTunnelMessageKind>,
mut msg_rx: mpsc::UnboundedReceiver<protocol::ToRivetTunnelMessageKind>,
mut drop_rx: watch::Receiver<Option<MsgGcReason>>,
can_hibernate: bool,
egress_bytes: Arc<AtomicU64>,
Expand Down
12 changes: 3 additions & 9 deletions engine/packages/universaldb/src/driver/postgres/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct PostgresTransactionDriver {
pool: Pool,
operations: TransactionOperations,
committed: AtomicBool,
tx_sender: OnceCell<mpsc::Sender<TransactionCommand>>,
tx_sender: OnceCell<mpsc::UnboundedSender<TransactionCommand>>,
}

impl PostgresTransactionDriver {
Expand All @@ -38,10 +38,10 @@ impl PostgresTransactionDriver {
}

/// Get or create the transaction task
async fn ensure_transaction(&self) -> Result<&mpsc::Sender<TransactionCommand>> {
async fn ensure_transaction(&self) -> Result<&mpsc::UnboundedSender<TransactionCommand>> {
self.tx_sender
.get_or_try_init(|| async {
let (sender, receiver) = mpsc::channel(100);
let (sender, receiver) = mpsc::unbounded_channel();

// Spawn the transaction task with serializable isolation
let task = TransactionTask::new(self.pool.clone(), receiver);
Expand Down Expand Up @@ -78,7 +78,6 @@ impl TransactionDriver for PostgresTransactionDriver {
key: key.clone(),
response: response_tx,
})
.await
.context("failed to send postgres transaction command")?;

// Wait for response
Expand Down Expand Up @@ -115,7 +114,6 @@ impl TransactionDriver for PostgresTransactionDriver {
offset,
response: response_tx,
})
.await
.context("failed to send postgres transaction command")?;

// Wait for response
Expand Down Expand Up @@ -166,7 +164,6 @@ impl TransactionDriver for PostgresTransactionDriver {
reverse,
response: response_tx,
})
.await
.context("failed to send postgres transaction command")?;

// Wait for response
Expand Down Expand Up @@ -230,7 +227,6 @@ impl TransactionDriver for PostgresTransactionDriver {
conflict_ranges,
response: response_tx,
})
.await
.context("failed to send postgres transaction command")?;

// Wait for commit response
Expand Down Expand Up @@ -288,7 +284,6 @@ impl TransactionDriver for PostgresTransactionDriver {
end,
response: response_tx,
})
.await
.context("failed to send postgres command")?;

// Wait for response
Expand Down Expand Up @@ -320,7 +315,6 @@ impl TransactionDriver for PostgresTransactionDriver {
conflict_ranges,
response: response_tx,
})
.await
.context("failed to send postgres transaction command")?;

// Wait for commit response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ pub enum TransactionCommand {
/// issues while maintaining a single serializable transaction for all operations.
pub struct TransactionTask {
pool: Pool,
receiver: mpsc::Receiver<TransactionCommand>,
receiver: mpsc::UnboundedReceiver<TransactionCommand>,
}

impl TransactionTask {
pub fn new(pool: Pool, receiver: mpsc::Receiver<TransactionCommand>) -> Self {
pub fn new(pool: Pool, receiver: mpsc::UnboundedReceiver<TransactionCommand>) -> Self {
Self { pool, receiver }
}

Expand Down
12 changes: 3 additions & 9 deletions engine/packages/universaldb/src/driver/rocksdb/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct RocksDbTransactionDriver {
db: Arc<OptimisticTransactionDB>,
operations: TransactionOperations,
committed: AtomicBool,
tx_sender: OnceCell<mpsc::Sender<TransactionCommand>>,
tx_sender: OnceCell<mpsc::UnboundedSender<TransactionCommand>>,
txn_conflict_tracker: TransactionConflictTracker,
start_version: u64,
}
Expand All @@ -53,10 +53,10 @@ impl RocksDbTransactionDriver {
}

/// Get or create the transaction task for non-snapshot operations
async fn ensure_transaction(&self) -> Result<&mpsc::Sender<TransactionCommand>> {
async fn ensure_transaction(&self) -> Result<&mpsc::UnboundedSender<TransactionCommand>> {
self.tx_sender
.get_or_try_init(|| async {
let (sender, receiver) = mpsc::channel(100);
let (sender, receiver) = mpsc::unbounded_channel();

// Spawn the transaction task
let task = TransactionTask::new(
Expand Down Expand Up @@ -97,7 +97,6 @@ impl TransactionDriver for RocksDbTransactionDriver {
key: key.clone(),
response: response_tx,
})
.await
.context("failed to send rocksdb transaction command")?;

// Wait for response
Expand Down Expand Up @@ -134,7 +133,6 @@ impl TransactionDriver for RocksDbTransactionDriver {
offset,
response: response_tx,
})
.await
.context("failed to send rocksdb transaction command")?;

// Wait for response
Expand Down Expand Up @@ -185,7 +183,6 @@ impl TransactionDriver for RocksDbTransactionDriver {
reverse,
response: response_tx,
})
.await
.context("failed to send rocksdb transaction command")?;

// Wait for response
Expand Down Expand Up @@ -250,7 +247,6 @@ impl TransactionDriver for RocksDbTransactionDriver {
conflict_ranges,
response: response_tx,
})
.await
.context("failed to send rocksdb transaction command")?;

// Wait for commit response
Expand Down Expand Up @@ -307,7 +303,6 @@ impl TransactionDriver for RocksDbTransactionDriver {
end,
response: response_tx,
})
.await
.context("failed to send rocksdb command")?;

// Wait for response
Expand Down Expand Up @@ -340,7 +335,6 @@ impl TransactionDriver for RocksDbTransactionDriver {
conflict_ranges,
response: response_tx,
})
.await
.context("failed to send rocksdb transaction command")?;

// Wait for commit response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ pub enum TransactionCommand {
pub struct TransactionTask {
db: Arc<OptimisticTransactionDB>,
txn_conflict_tracker: TransactionConflictTracker,
receiver: mpsc::Receiver<TransactionCommand>,
receiver: mpsc::UnboundedReceiver<TransactionCommand>,
}

impl TransactionTask {
pub fn new(
db: Arc<OptimisticTransactionDB>,
txn_conflict_tracker: TransactionConflictTracker,
receiver: mpsc::Receiver<TransactionCommand>,
receiver: mpsc::UnboundedReceiver<TransactionCommand>,
) -> Self {
TransactionTask {
db,
Expand Down
10 changes: 4 additions & 6 deletions rivetkit-python/client/src/simple/async/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use tokio::sync::mpsc;

use crate::util;

const EVENT_BUFFER_SIZE: usize = 100;

struct ActorEvent {
name: String,
args: Vec<serde_json::Value>,
Expand All @@ -15,13 +13,13 @@ struct ActorEvent {
#[pyclass]
pub struct ActorHandle {
handle: rivetkit_rs::connection::ActorHandle,
event_rx: Option<mpsc::Receiver<ActorEvent>>,
event_tx: mpsc::Sender<ActorEvent>,
event_rx: Option<mpsc::UnboundedReceiver<ActorEvent>>,
event_tx: mpsc::UnboundedSender<ActorEvent>,
}

impl ActorHandle {
pub fn new(handle: rivetkit_rs::connection::ActorHandle) -> Self {
let (event_tx, event_rx) = mpsc::channel(EVENT_BUFFER_SIZE);
let (event_tx, event_rx) = mpsc::unbounded_channel();

Self {
handle,
Expand Down Expand Up @@ -100,7 +98,7 @@ impl ActorHandle {
args: args.clone(),
};
// Send this upstream(?)
tx.send(event).await.map_err(|e| {
tx.send(event).map_err(|e| {
py_runtime_err!(
"Failed to send via inner tx: {}",
e
Expand Down
Loading
Loading