diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java index 78fbbb0a58d53..af51aa777987b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java @@ -77,9 +77,6 @@ public class FailingCollectionSource /** A failure will occur when the given number of elements have been processed. */ private final int failureAfterNumElements; - /** The number of completed checkpoints. */ - private volatile int numSuccessfulCheckpoints; - /** The checkpointed number of emitted elements. */ private final Map checkpointedEmittedNums; @@ -166,9 +163,8 @@ public void run(SourceContext ctx) throws Exception { if (!failedBefore) { // delay a bit, if we have not failed before Thread.sleep(1); - if (numSuccessfulCheckpoints >= 1 && lastCheckpointedEmittedNum >= 1) { - // cause a failure if we have not failed before and have a completed checkpoint - // and have processed at least one element + if (lastCheckpointedEmittedNum >= failureAfterNumElements) { + // trigger failure after enough elements are durably checkpointed failedBefore = true; throw new Exception("Artificial Failure"); } @@ -238,7 +234,6 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - numSuccessfulCheckpoints++; lastCheckpointedEmittedNum = checkpointedEmittedNums.get(checkpointId); }