Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public List<TableTestProgram> programs() {
ProcessTableFunctionTestPrograms.PROCESS_NAMED_TIMERS,
ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS,
ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS,
ProcessTableFunctionTestPrograms.PROCESS_CONSISTENT_WATERMARK_TIMERS,
ProcessTableFunctionTestPrograms.PROCESS_SCALAR_ARGS_TIME,
ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_PARTITION_BY_TIME,
ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_ON_TIME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ChainedReceivingFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ChainedSendingFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ClearStateFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ConsistentWatermarkTimersFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ContextFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction;
Expand Down Expand Up @@ -1144,6 +1145,34 @@ public class ProcessTableFunctionTestPrograms {
"INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))")
.build();

public static final TableTestProgram PROCESS_CONSISTENT_WATERMARK_TIMERS =
TableTestProgram.of(
"process-consistent-watermark-timers",
"test that multiple named timers registered at the same timestamp all see a consistent watermark")
.setupTemporarySystemFunction("f", ConsistentWatermarkTimersFunction.class)
.setupTableSource(TIMED_SOURCE)
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
.consumedValues(
"+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Registering timer timerA for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Registering timer timerB for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Registering timer timerC for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
"+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
"+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:00:00.002Z] at time 2 watermark 0}, 1970-01-01T00:00:00.002Z]",
"+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 1}, 1970-01-01T00:00:00.003Z]",
"+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 2}, 1970-01-01T00:00:00.004Z]",
"+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark 3}, 1970-01-01T00:00:00.005Z]",
"+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark 4}, 1970-01-01T00:00:00.006Z]",
"+I[Bob, {Timer timerA fired at time 5 watermark 5}, 1970-01-01T00:00:00.005Z]",
"+I[Bob, {Timer timerB fired at time 5 watermark 5}, 1970-01-01T00:00:00.005Z]",
"+I[Bob, {Timer timerC fired at time 5 watermark 5}, 1970-01-01T00:00:00.005Z]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))")
.build();
Comment thread
fhueske marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we need to create a dedicated HarnessTest to test whether interruptions are still correct

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added ProcessSetTableOperatorInterruptibleTimersTest to assert the call order or timers.


public static final TableTestProgram PROCESS_SCALAR_ARGS_TIME =
TableTestProgram.of(
"process-scalar-args-time",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,28 @@ public void onTimer(OnTimerContext ctx) {
}
}

/**
* Testing function for validating watermark consistency across same-timestamp timer callbacks.
*/
public static class ConsistentWatermarkTimersFunction extends AppendProcessTableFunctionBase {
public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row r) {
final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
collectEvalEvent(timeCtx, r);
if (timeCtx.time() == 0) {
// Register multiple named timers at the same time to validate that all timer
// callbacks see a consistent watermark, even when interrupted across mailbox
// iterations.
collectCreateTimer(timeCtx, "timerA", 5L);
collectCreateTimer(timeCtx, "timerB", 5L);
collectCreateTimer(timeCtx, "timerC", 5L);
}
}

public void onTimer(OnTimerContext ctx) {
collectOnTimerEvent(ctx);
}
}

/** Testing function. */
public static class ScalarArgsTimeFunction extends AppendProcessTableFunctionBase {
public void eval(Context ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
Expand Down Expand Up @@ -145,11 +146,17 @@ public void open() throws Exception {
FunctionUtils.openFunction(processTableRunner, DefaultOpenContext.INSTANCE);
}

@Override
public final boolean useInterruptibleTimers(ReadableConfig config) {
Copy link
Copy Markdown
Contributor

@spuru9 spuru9 Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] final could be dropped on useInterruptibleTimers? For consistency, all seven analogous overrides (WindowOperator, TimeIntervalJoin, BaseTemporalSortOperator, CepOperator, and others) have the similar function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the final on purpose to prevent this function from being overwritten by extending classes.
AbstractProcessTableOperator is an internal class. If it becomes necessary for a child class to overwrite the the method, we can always remove the final.
Until then, it documents that all child classes must be able to handle interruptible timers.

return true;
}

@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
// TODO this line has issues with interruptible timers, see FLINK-39437
// Update the runner's watermark before firing timers to keep it consistent with the
// timer service watermark, which is also advanced before any timer fires.
processTableRunner.ingestCurrentWatermarkEvent(mark.getTimestamp());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this line swap is sufficient. The docs in MailboxWatermarkProcessor state that there might be a chance that new rows enter the PTF while the current watermark has not fully been processed. We should call ingestCurrentWatermarkEvent only after the watermark has been fully processed. Otherwise a PTF that implements late data handling manually and relies on TimeContext.currentWatermark would not behave correctly. If you check MailboxPartialWatermarkProcessor it only updates the current watermark upon completion of processing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before processing any timers, MailboxPartialWatermarkProcessor, MailboxWatermarkProcessor and InternalTimerServiceImpl update their internal watermarks to the new watermark. So the operators internal state is already at the new watermark before all timers have been processed. Only the WM emission is delayed until all timers are handled.

I think this also makes sense, also in the context of late data handling, because the operator has already received the WM (all following records with a ts < wm are de-facto late) and started to operate on it. The first triggered timers might already cleanup state and/or emit data. So even before all timers have been processed, data can (and IMO should) be treated as late.
Imagine a case with two timers for key A and B. A has been fired and cleaned up A's state. Then there's an interrupt with a late record for key A. A's data is gone so the record cannot be processed correctly. Receiving a late record for B might be OK because the data is still present, but it is also late.

Flipping the calls here just makes it more obvious that the internal WM state is updated before the timers are called. Even with interrupted timers, the ingestCurrentWatermarkEvent() call would happen before an intermediate records is processed.

super.processWatermark(mark);
}

@Override
Expand Down
Loading