diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java index 2ade97762..f19eb94b1 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java @@ -336,7 +336,15 @@ public void testAsyncPollFailed() @Test public void testSuspendPolling() throws InterruptedException, ExecutionException, AsyncPoller.PollTaskAsyncAbort { - CountingSlotSupplier slotSupplierInner = new CountingSlotSupplier<>(1); + AtomicInteger reserveCalls = new AtomicInteger(); + CountingSlotSupplier slotSupplierInner = + new CountingSlotSupplier(1) { + @Override + public SlotSupplierFuture reserveSlot(SlotReserveContext ctx) throws Exception { + reserveCalls.incrementAndGet(); + return super.reserveSlot(ctx); + } + }; TrackingSlotSupplier slotSupplier = new TrackingSlotSupplier<>(slotSupplierInner, new NoopScope()); DummyTaskExecutor executor = new DummyTaskExecutor(slotSupplier); @@ -370,18 +378,21 @@ public void testSuspendPolling() poller.resumePolling(); assertFalse(poller.isSuspended()); pollLatch.await(); - assertEventually( - Duration.ofSeconds(1), - () -> { - assertEquals(0, executor.processed.get()); - assertEquals(1, slotSupplierInner.reservedCount.get()); - assertEquals(0, slotSupplier.getUsedSlots().size()); - }); - // Suspend polling again, this will not affect the already issued poll request + assertEquals(0, executor.processed.get()); + assertEquals(1, slotSupplierInner.reservedCount.get()); + assertEquals(0, slotSupplier.getUsedSlots().size()); + // Wait for iter 2 of the poll loop to invoke reserveSlot. Completion of that reservation + // requires iter 1's slot to be released (via completePoll below) since the supplier has + // only one slot — but the *call* to reserveSlot happens as soon as iter 2 passes the + // suspend-latch check. Waiting on that call here removes a race where the suspendPolling + // below could otherwise strand iter 2 at the suspend latch before it reserves. + assertEventually(Duration.ofSeconds(5), () -> assertEquals(2, reserveCalls.get())); + // Suspend polling again, this will not affect the already issued poll request or the + // iter-2 reserveSlot call above (which is now waiting for the slot to free up). poller.suspendPolling(); completePoll.get().apply(); assertEventually( - Duration.ofSeconds(1), + Duration.ofSeconds(5), () -> { assertEquals(1, executor.processed.get()); assertEquals(2, slotSupplierInner.reservedCount.get());