Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ public class FailingCollectionSource<T>
/** A failure will occur when the given number of elements have been processed. */
private final int failureAfterNumElements;

/** The number of completed checkpoints. */
private volatile int numSuccessfulCheckpoints;

/** The checkpointed number of emitted elements. */
private final Map<Long, Integer> checkpointedEmittedNums;

Expand Down Expand Up @@ -166,9 +163,8 @@ public void run(SourceContext<T> ctx) throws Exception {
if (!failedBefore) {
// delay a bit, if we have not failed before
Thread.sleep(1);
if (numSuccessfulCheckpoints >= 1 && lastCheckpointedEmittedNum >= 1) {
// cause a failure if we have not failed before and have a completed checkpoint
// and have processed at least one element
if (lastCheckpointedEmittedNum >= failureAfterNumElements) {
// trigger failure after enough elements are durably checkpointed
failedBefore = true;
throw new Exception("Artificial Failure");
}
Expand Down Expand Up @@ -238,7 +234,6 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
numSuccessfulCheckpoints++;
lastCheckpointedEmittedNum = checkpointedEmittedNums.get(checkpointId);
}

Expand Down