Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ impl ActorContext {
self.clear_preloaded_messages();

let config = self.config();
tracing::warn!(
name,
body_len = body.len(),
max_queue_size = config.max_queue_size,
"DEBUG_QUEUE enqueue_message: config"
);
if encoded_message.len() > config.max_queue_message_size as usize {
return Err(QueueMessageTooLarge {
size: encoded_message.len(),
Expand All @@ -283,6 +289,12 @@ impl ActorContext {
}

let mut metadata = self.0.queue_metadata.lock().await;
tracing::warn!(
metadata_size = metadata.size,
max_queue_size = config.max_queue_size,
will_reject = metadata.size >= config.max_queue_size,
"DEBUG_QUEUE enqueue_message: pre-enqueue check"
);
if metadata.size >= config.max_queue_size {
return Err(QueueFull {
limit: config.max_queue_size,
Expand Down Expand Up @@ -374,17 +386,30 @@ impl ActorContext {
let deadline = opts.timeout.map(|timeout| Instant::now() + timeout);
let names = normalize_names(opts.names);

tracing::warn!(
count,
timeout_ms = opts.timeout.map(|t| t.as_millis() as u64),
?names,
completable = opts.completable,
"DEBUG_QUEUE next_batch: enter"
);

loop {
let messages = self
.try_receive_batch(names.as_ref(), count, opts.completable)
.await?;
if !messages.is_empty() {
tracing::warn!(
received = messages.len(),
"DEBUG_QUEUE next_batch: returning messages"
);
return Ok(messages);
}

let remaining_timeout =
deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
if matches!(remaining_timeout, Some(timeout) if timeout.is_zero()) {
tracing::warn!("DEBUG_QUEUE next_batch: returning empty, timeout zero");
return Ok(Vec::new());
}

Expand Down Expand Up @@ -560,6 +585,7 @@ impl ActorContext {
.queue_initialize
.get_or_try_init(|| async {
let preload = self.0.queue_preloaded_kv.lock().take();
let has_preload = preload.is_some();
let metadata = if let Some(preloaded) = preload.as_ref() {
self.configure_preloaded_messages(preloaded);
if let Some(metadata) = self.load_metadata_from_preload(preloaded).await? {
Expand All @@ -570,6 +596,12 @@ impl ActorContext {
} else {
self.load_or_create_metadata().await?
};
tracing::warn!(
has_preload,
size = metadata.size,
next_id = metadata.next_id,
"DEBUG_QUEUE ensure_initialized"
);
let mut state = self.0.queue_metadata.lock().await;
*state = metadata;
self.0.metrics.set_queue_depth(state.size);
Expand Down Expand Up @@ -667,6 +699,15 @@ impl ActorContext {
let _receive_guard = self.0.queue_receive_lock.lock().await;

let messages = self.list_messages().await?;
tracing::warn!(
kv_message_count = messages.len(),
?names,
count,
completable,
message_names = ?messages.iter().map(|m| &m.name).collect::<Vec<_>>(),
message_ids = ?messages.iter().map(|m| m.id).collect::<Vec<_>>(),
"DEBUG_QUEUE try_receive_batch: listed messages"
);
let mut selected = Vec::new();
for message in messages {
if let Some(names) = names
Expand Down Expand Up @@ -726,10 +767,15 @@ impl ActorContext {

async fn list_message_entries(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
if let Some(entries) = self.0.queue_preloaded_message_entries.lock().take() {
tracing::warn!(
entry_count = entries.len(),
"DEBUG_QUEUE list_message_entries: using preloaded"
);
return Ok(entries);
}

self.0
let entries = self
.0
.kv
.list_prefix(
&QUEUE_MESSAGES_PREFIX,
Expand All @@ -739,7 +785,12 @@ impl ActorContext {
},
)
.await
.context("list queue messages")
.context("list queue messages")?;
tracing::warn!(
entry_count = entries.len(),
"DEBUG_QUEUE list_message_entries: read from kv"
);
Ok(entries)
}

fn clear_preloaded_messages(&self) {
Expand Down
2 changes: 2 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/registry/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3110,12 +3110,14 @@ class NativeWorkflowRuntimeAdapter {
_abortSignal,
completable,
) => {
logger().debug({ msg: "DEBUG_QUEUE native receive: calling nextBatch", names, count, timeout, completable, effectiveTimeout: timeout ?? 0 });
const messages = await this.#ctx.queue.nextBatch({
names,
count,
timeout: timeout ?? 0,
completable,
});
logger().debug({ msg: "DEBUG_QUEUE native receive: nextBatch returned", messageCount: messages.length });
return messages.map((message) =>
this.#wrapQueueMessage(message),
);
Expand Down
6 changes: 6 additions & 0 deletions rivetkit-typescript/packages/workflow-engine/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface {

if (existingCount && existingCount.kind.type === "message") {
const replayCount = existingCount.kind.data.data as number;
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: replaying from history", name, replayCount, completable });
return await this.readReplayQueueMessages<T>(
name,
replayCount,
Expand All @@ -1657,6 +1658,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface {

const now = Date.now();
if (deadline !== undefined && now >= deadline) {
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: deadline already passed", name, deadline, now });
if (deadlineEntry && deadlineEntry.kind.type === "sleep") {
deadlineEntry.kind.data.state = "completed";
deadlineEntry.dirty = true;
Expand All @@ -1669,11 +1671,13 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
return [];
}

this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: calling receiveMessagesNow", name, messageNames, count, completable });
const received = await this.receiveMessagesNow(
messageNames,
count,
completable,
);
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: receiveMessagesNow returned", name, receivedCount: received.length, receivedNames: received.map(m => m.name) });
if (received.length > 0) {
const historyMessages = received.map((message) =>
this.toWorkflowQueueMessage<T>(message),
Expand Down Expand Up @@ -1701,8 +1705,10 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
}

if (deadline === undefined) {
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: no messages and no deadline, throwing MessageWaitError", name, messageNames });
throw new MessageWaitError(messageNames);
}
this.log("debug", { msg: "DEBUG_QUEUE executeQueueNextBatch: no messages, throwing SleepError", name, messageNames, deadline });
throw new SleepError(deadline, messageNames);
}

Expand Down
Loading