Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,15 @@ public void testAsyncPollFailed()
@Test
public void testSuspendPolling()
throws InterruptedException, ExecutionException, AsyncPoller.PollTaskAsyncAbort {
CountingSlotSupplier<SlotInfo> slotSupplierInner = new CountingSlotSupplier<>(1);
AtomicInteger reserveCalls = new AtomicInteger();
CountingSlotSupplier<SlotInfo> slotSupplierInner =
new CountingSlotSupplier<SlotInfo>(1) {
@Override
public SlotSupplierFuture reserveSlot(SlotReserveContext<SlotInfo> ctx) throws Exception {
reserveCalls.incrementAndGet();
return super.reserveSlot(ctx);
}
};
TrackingSlotSupplier<?> slotSupplier =
new TrackingSlotSupplier<>(slotSupplierInner, new NoopScope());
DummyTaskExecutor executor = new DummyTaskExecutor(slotSupplier);
Expand Down Expand Up @@ -370,18 +378,21 @@ public void testSuspendPolling()
poller.resumePolling();
assertFalse(poller.isSuspended());
pollLatch.await();
assertEventually(
Duration.ofSeconds(1),
() -> {
assertEquals(0, executor.processed.get());
assertEquals(1, slotSupplierInner.reservedCount.get());
assertEquals(0, slotSupplier.getUsedSlots().size());
});
// Suspend polling again, this will not affect the already issued poll request
assertEquals(0, executor.processed.get());
assertEquals(1, slotSupplierInner.reservedCount.get());
assertEquals(0, slotSupplier.getUsedSlots().size());
// Wait for iter 2 of the poll loop to invoke reserveSlot. Completion of that reservation
// requires iter 1's slot to be released (via completePoll below) since the supplier has
// only one slot — but the *call* to reserveSlot happens as soon as iter 2 passes the
// suspend-latch check. Waiting on that call here removes a race where the suspendPolling
// below could otherwise strand iter 2 at the suspend latch before it reserves.
assertEventually(Duration.ofSeconds(5), () -> assertEquals(2, reserveCalls.get()));
// Suspend polling again, this will not affect the already issued poll request or the
// iter-2 reserveSlot call above (which is now waiting for the slot to free up).
poller.suspendPolling();
completePoll.get().apply();
assertEventually(
Duration.ofSeconds(1),
Duration.ofSeconds(5),
() -> {
assertEquals(1, executor.processed.get());
assertEquals(2, slotSupplierInner.reservedCount.get());
Expand Down
Loading