[FLINK-39437][table] Support interruptible timers in PTFs#27962
[FLINK-39437][table] Support interruptible timers in PTFs#27962fhueske wants to merge 3 commits intoapache:masterfrom
Conversation
807ebc4 to
2c38910
Compare
There was a problem hiding this comment.
Pull request overview
Enables interruptible timer processing for Process Table Function (PTF) operators and adjusts watermark handling so timer callbacks observe a consistent watermark when timers are fired incrementally via the mailbox.
Changes:
- Enable interruptible timers for PTF operators by overriding
useInterruptibleTimers(...)inAbstractProcessTableOperator. - Update
processWatermark(...)ordering to ingest the new watermark into the PTF runner before delegating to the superclass (which advances the timer service and may fire timers). - Add a new planner semantic test program and test function intended to validate watermark consistency across multiple same-timestamp named timers.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java | Enables interruptible timers for PTF operators and reorders watermark ingestion vs. timer firing. |
| flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java | Adds a new PTF test function that registers multiple same-timestamp named timers. |
| flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java | Adds a new TableTestProgram with expected outputs for consistent watermark observation in timer callbacks. |
| flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java | Registers the new semantic test program in the PTF semantic test suite. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| @Override | ||
| public final boolean useInterruptibleTimers(ReadableConfig config) { |
There was a problem hiding this comment.
[nit] final could be dropped on useInterruptibleTimers? For consistency, all seven analogous overrides (WindowOperator, TimeIntervalJoin, BaseTemporalSortOperator, CepOperator, and others) have the similar function.
There was a problem hiding this comment.
I added the final on purpose to prevent this function from being overwritten by extending classes.
AbstractProcessTableOperator is an internal class. If it becomes necessary for a child class to overwrite the the method, we can always remove the final.
Until then, it documents that all child classes must be able to handle interruptible timers.
c12100e to
82c8ca6
Compare
| .build()) | ||
| .runSql( | ||
| "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") | ||
| .build(); |
There was a problem hiding this comment.
Perhaps we need to create a dedicated HarnessTest to test whether interruptions are still correct
There was a problem hiding this comment.
I've added ProcessSetTableOperatorInterruptibleTimersTest to assert the call order or timers.
Override `useInterruptibleTimers()` in `AbstractProcessTableOperator` to return `true`, activating the `MailboxWatermarkProcessor` for PTF operators. This allows timer firing to be interrupted between mailbox iterations, improving throughput by not blocking mailbox processing during large timer batches. Also reorder `processWatermark()` to call `ingestCurrentWatermarkEvent()` before `super.processWatermark()` to keep the runner's watermark consistent with the timer service watermark, which is advanced before any timer fires. Add a test (`PROCESS_CONSISTENT_WATERMARK_TIMERS`) that validates multiple named timers registered at the same timestamp all see a consistent watermark in their callbacks.
…nterruptible timers
3efca16 to
869e474
Compare
What is the purpose of the change
Enable interruptible timers for PTFs.
Brief change log
Override
useInterruptibleTimers()inAbstractProcessTableOperatorto returntrue, activating theMailboxWatermarkProcessorfor PTF operators. This allows timer firing to be interrupted between mailbox iterations, improving throughput by not blocking mailbox processing during large timer batches.Also reorder
processWatermark()to callingestWatermarkEvent()beforesuper.processWatermark(), ensuring all timer callbacks (including those deferred across mailbox iterations) see a consistent watermark in the runner. This matches the behavior ofWritableInternalTimeContext.currentWatermark(), which reads from the timer service and already sees the new watermark before any timer fires.Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation