-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-39437][table] Support interruptible timers in PTFs #27962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ChainedReceivingFunction; | ||
| import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ChainedSendingFunction; | ||
| import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ClearStateFunction; | ||
| import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ConsistentWatermarkTimersFunction; | ||
| import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ContextFunction; | ||
| import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction; | ||
| import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction; | ||
|
|
@@ -1144,6 +1145,34 @@ public class ProcessTableFunctionTestPrograms { | |
| "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") | ||
| .build(); | ||
|
|
||
| public static final TableTestProgram PROCESS_CONSISTENT_WATERMARK_TIMERS = | ||
| TableTestProgram.of( | ||
| "process-consistent-watermark-timers", | ||
| "test that multiple named timers registered at the same timestamp all see a consistent watermark") | ||
| .setupTemporarySystemFunction("f", ConsistentWatermarkTimersFunction.class) | ||
| .setupTableSource(TIMED_SOURCE) | ||
| .setupTableSink( | ||
| SinkTestStep.newBuilder("sink") | ||
| .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA) | ||
| .consumedValues( | ||
| "+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 1970-01-01T00:00:00Z]", | ||
| "+I[Bob, {Registering timer timerA for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]", | ||
| "+I[Bob, {Registering timer timerB for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]", | ||
| "+I[Bob, {Registering timer timerC for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]", | ||
| "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", | ||
| "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:00:00.002Z] at time 2 watermark 0}, 1970-01-01T00:00:00.002Z]", | ||
| "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 1}, 1970-01-01T00:00:00.003Z]", | ||
| "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 2}, 1970-01-01T00:00:00.004Z]", | ||
| "+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark 3}, 1970-01-01T00:00:00.005Z]", | ||
| "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark 4}, 1970-01-01T00:00:00.006Z]", | ||
| "+I[Bob, {Timer timerA fired at time 5 watermark 5}, 1970-01-01T00:00:00.005Z]", | ||
| "+I[Bob, {Timer timerB fired at time 5 watermark 5}, 1970-01-01T00:00:00.005Z]", | ||
| "+I[Bob, {Timer timerC fired at time 5 watermark 5}, 1970-01-01T00:00:00.005Z]") | ||
| .build()) | ||
| .runSql( | ||
| "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") | ||
| .build(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we need to create a dedicated HarnessTest to test whether interruptions are still correct
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added |
||
|
|
||
| public static final TableTestProgram PROCESS_SCALAR_ARGS_TIME = | ||
| TableTestProgram.of( | ||
| "process-scalar-args-time", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ | |
| import org.apache.flink.api.common.state.StateTtlConfig; | ||
| import org.apache.flink.api.common.state.ValueStateDescriptor; | ||
| import org.apache.flink.api.common.typeutils.base.LongSerializer; | ||
| import org.apache.flink.configuration.ReadableConfig; | ||
| import org.apache.flink.runtime.state.VoidNamespace; | ||
| import org.apache.flink.runtime.state.VoidNamespaceSerializer; | ||
| import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; | ||
|
|
@@ -145,11 +146,17 @@ public void open() throws Exception { | |
| FunctionUtils.openFunction(processTableRunner, DefaultOpenContext.INSTANCE); | ||
| } | ||
|
|
||
| @Override | ||
| public final boolean useInterruptibleTimers(ReadableConfig config) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] final could be dropped on useInterruptibleTimers? For consistency, all seven analogous overrides (WindowOperator, TimeIntervalJoin, BaseTemporalSortOperator, CepOperator, and others) have the similar function.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the final on purpose to prevent this function from being overwritten by extending classes. |
||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public void processWatermark(Watermark mark) throws Exception { | ||
| super.processWatermark(mark); | ||
| // TODO this line has issues with interruptible timers, see FLINK-39437 | ||
| // Update the runner's watermark before firing timers to keep it consistent with the | ||
| // timer service watermark, which is also advanced before any timer fires. | ||
| processTableRunner.ingestCurrentWatermarkEvent(mark.getTimestamp()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that this line swap is sufficient. The docs in MailboxWatermarkProcessor state that there might be a chance that new rows enter the PTF while the current watermark has not fully been processed. We should call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before processing any timers, I think this also makes sense, also in the context of late data handling, because the operator has already received the WM (all following records with a ts < wm are de-facto late) and started to operate on it. The first triggered timers might already cleanup state and/or emit data. So even before all timers have been processed, data can (and IMO should) be treated as late. Flipping the calls here just makes it more obvious that the internal WM state is updated before the timers are called. Even with interrupted timers, the |
||
| super.processWatermark(mark); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.