diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs index a6a4533318..2d45f37bfc 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs @@ -274,6 +274,12 @@ impl ActorContext { self.clear_preloaded_messages(); let config = self.config(); + tracing::warn!( + name, + body_len = body.len(), + max_queue_size = config.max_queue_size, + "DEBUG_QUEUE enqueue_message: config" + ); if encoded_message.len() > config.max_queue_message_size as usize { return Err(QueueMessageTooLarge { size: encoded_message.len(), @@ -283,6 +289,12 @@ impl ActorContext { } let mut metadata = self.0.queue_metadata.lock().await; + tracing::warn!( + metadata_size = metadata.size, + max_queue_size = config.max_queue_size, + will_reject = metadata.size >= config.max_queue_size, + "DEBUG_QUEUE enqueue_message: pre-enqueue check" + ); if metadata.size >= config.max_queue_size { return Err(QueueFull { limit: config.max_queue_size, @@ -374,17 +386,30 @@ impl ActorContext { let deadline = opts.timeout.map(|timeout| Instant::now() + timeout); let names = normalize_names(opts.names); + tracing::warn!( + count, + timeout_ms = opts.timeout.map(|t| t.as_millis() as u64), + ?names, + completable = opts.completable, + "DEBUG_QUEUE next_batch: enter" + ); + loop { let messages = self .try_receive_batch(names.as_ref(), count, opts.completable) .await?; if !messages.is_empty() { + tracing::warn!( + received = messages.len(), + "DEBUG_QUEUE next_batch: returning messages" + ); return Ok(messages); } let remaining_timeout = deadline.map(|deadline| deadline.saturating_duration_since(Instant::now())); if matches!(remaining_timeout, Some(timeout) if timeout.is_zero()) { + tracing::warn!("DEBUG_QUEUE next_batch: returning empty, timeout zero"); return Ok(Vec::new()); } @@ -560,6 +585,7 @@ impl ActorContext { .queue_initialize .get_or_try_init(|| async { let preload = self.0.queue_preloaded_kv.lock().take(); + let has_preload = preload.is_some(); let metadata = if let Some(preloaded) = preload.as_ref() { self.configure_preloaded_messages(preloaded); if let Some(metadata) = self.load_metadata_from_preload(preloaded).await? { @@ -570,6 +596,12 @@ impl ActorContext { } else { self.load_or_create_metadata().await? }; + tracing::warn!( + has_preload, + size = metadata.size, + next_id = metadata.next_id, + "DEBUG_QUEUE ensure_initialized" + ); let mut state = self.0.queue_metadata.lock().await; *state = metadata; self.0.metrics.set_queue_depth(state.size); @@ -667,6 +699,15 @@ impl ActorContext { let _receive_guard = self.0.queue_receive_lock.lock().await; let messages = self.list_messages().await?; + tracing::warn!( + kv_message_count = messages.len(), + ?names, + count, + completable, + message_names = ?messages.iter().map(|m| &m.name).collect::>(), + message_ids = ?messages.iter().map(|m| m.id).collect::>(), + "DEBUG_QUEUE try_receive_batch: listed messages" + ); let mut selected = Vec::new(); for message in messages { if let Some(names) = names @@ -726,10 +767,15 @@ impl ActorContext { async fn list_message_entries(&self) -> Result, Vec)>> { if let Some(entries) = self.0.queue_preloaded_message_entries.lock().take() { + tracing::warn!( + entry_count = entries.len(), + "DEBUG_QUEUE list_message_entries: using preloaded" + ); return Ok(entries); } - self.0 + let entries = self + .0 .kv .list_prefix( &QUEUE_MESSAGES_PREFIX, @@ -739,7 +785,12 @@ impl ActorContext { }, ) .await - .context("list queue messages") + .context("list queue messages")?; + tracing::warn!( + entry_count = entries.len(), + "DEBUG_QUEUE list_message_entries: read from kv" + ); + Ok(entries) } fn clear_preloaded_messages(&self) { diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts index 3059e3f37d..65d863ba73 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts @@ -3110,12 +3110,14 @@ class NativeWorkflowRuntimeAdapter { _abortSignal, completable, ) => { + logger().debug({ msg: "DEBUG_QUEUE native receive: calling nextBatch", names, count, timeout, completable, effectiveTimeout: timeout ?? 0 }); const messages = await this.#ctx.queue.nextBatch({ names, count, timeout: timeout ?? 0, completable, }); + logger().debug({ msg: "DEBUG_QUEUE native receive: nextBatch returned", messageCount: messages.length }); return messages.map((message) => this.#wrapQueueMessage(message), ); diff --git a/rivetkit-typescript/packages/workflow-engine/src/context.ts b/rivetkit-typescript/packages/workflow-engine/src/context.ts index d23f820bc7..f37aa534fd 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/context.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/context.ts @@ -1648,6 +1648,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { if (existingCount && existingCount.kind.type === "message") { const replayCount = existingCount.kind.data.data as number; + this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: replaying from history", name, replayCount, completable }); return await this.readReplayQueueMessages( name, replayCount, @@ -1657,6 +1658,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { const now = Date.now(); if (deadline !== undefined && now >= deadline) { + this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: deadline already passed", name, deadline, now }); if (deadlineEntry && deadlineEntry.kind.type === "sleep") { deadlineEntry.kind.data.state = "completed"; deadlineEntry.dirty = true; @@ -1669,11 +1671,13 @@ export class WorkflowContextImpl implements WorkflowContextInterface { return []; } + this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: calling receiveMessagesNow", name, messageNames, count, completable }); const received = await this.receiveMessagesNow( messageNames, count, completable, ); + this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: receiveMessagesNow returned", name, receivedCount: received.length, receivedNames: received.map(m => m.name) }); if (received.length > 0) { const historyMessages = received.map((message) => this.toWorkflowQueueMessage(message), @@ -1701,8 +1705,10 @@ export class WorkflowContextImpl implements WorkflowContextInterface { } if (deadline === undefined) { + this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: no messages and no deadline, throwing MessageWaitError", name, messageNames }); throw new MessageWaitError(messageNames); } + this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: no messages, throwing SleepError", name, messageNames, deadline }); throw new SleepError(deadline, messageNames); }