[FLINK-39481][tests] Fix flaky WindowDistinctAggregateITCase#testCumulateWindow_GroupingSets#27954
Open
featzhang wants to merge 1 commit intoapache:masterfrom
Open
[FLINK-39481][tests] Fix flaky WindowDistinctAggregateITCase#testCumulateWindow_GroupingSets#27954featzhang wants to merge 1 commit intoapache:masterfrom
featzhang wants to merge 1 commit intoapache:masterfrom
Conversation
Collaborator
spuru9
reviewed
Apr 17, 2026
spuru9
reviewed
Apr 17, 2026
…lateWindow_GroupingSets The FailingCollectionSource had a race condition where the artificial failure could be triggered with lastCheckpointedEmittedNum >= 1, meaning the failure could occur after as few as 1 element was checkpointed. When restarting from such an early checkpoint, the source would re-emit elements starting from position 1, but windows relying on the final watermark-advancing elements (e.g., timestamps 00:00:32 and 00:00:34 in windowDataWithTimestamp) could still be processed correctly if all data was re-emitted. However, in practice this leads to non-deterministic behavior depending on when the checkpoint barrier arrives relative to the source's emit loop. Fix by changing the failure trigger condition from lastCheckpointedEmittedNum >= 1 to lastCheckpointedEmittedNum >= failureAfterNumElements. This ensures the failure only occurs after at least failureAfterNumElements elements have been durably checkpointed, so the source always restarts from a consistent position and can emit all remaining elements (including those needed to advance the watermark past window boundaries).
spuru9
approved these changes
Apr 17, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
Fixes a race condition in
FailingCollectionSourcethat causesWindowDistinctAggregateITCase#testCumulateWindow_GroupingSets(and related CUBE/ROLLUP variants) to fail intermittently.Brief change log
FailingCollectionSource.run()fromlastCheckpointedEmittedNum >= 1tolastCheckpointedEmittedNum >= failureAfterNumElements.Root Cause
The
FailingCollectionSourceis used in window aggregate IT cases to test the checkpoint + restore path. It artificially fails after emitting the firstfailureAfterNumElementselements (=numElements / 2), then restarts from the checkpoint and emits the remaining elements.The previous trigger condition
lastCheckpointedEmittedNum >= 1allowed the failure to occur after as few as 1 element was checkpointed. WhenwindowDataWithTimestamp(11 elements,failureAfterNumElements = 5) is used, the checkpoint could be taken at position 1–4, causing the source to restart from that early position. After restart, onlynumElements - checkpointPositionelements are re-emitted.For
testCumulateWindow_GroupingSets, the CUMULATE windows[00:00:30, 00:00:35/40/45]can only be triggered byMAX_WATERMARK(emitted when the source finishes), since the last event timestamp00:00:34only advances the watermark to00:00:33, which is not enough to close those windows on its own. If the source restarts from an early checkpoint position and emits all remaining data correctly,MAX_WATERMARKis emitted and windows close properly.However, due to the non-determinism of checkpoint timing, in some runs the failure is triggered before all
failureAfterNumElementselements are checkpointed, leading to non-reproducible behavior in the restore phase.Fix
Change the trigger condition to
lastCheckpointedEmittedNum >= failureAfterNumElements. This ensures:failureAfterNumElementselements have been durably snapshotted.failureAfterNumElements, guaranteeing that the remaining elements (including the watermark-advancing tail data) are re-emitted in the restore run.MAX_WATERMARKis emitted when the source finishes, closing all pending windows deterministically.Verifying this change
This is a test-only change. The modified
FailingCollectionSourceis used by:WindowDistinctAggregateITCase(this fix targets)WindowAggregateITCaseWindowJoinITCaseWindowRankITCaseWindowTableFunctionITCaseWindowDeduplicateITCasefailing-source = trueRun the flaky test with repeated retries:
Does this pull request potentially affect one of the following areas?