From 0d2bc04ae70222026e5b75bdc9d9670e8d363268 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Fri, 17 Apr 2026 19:34:18 +0200 Subject: [PATCH 1/3] [FLINK-39437][table] Support interruptible timers in PTFs Override `useInterruptibleTimers()` in `AbstractProcessTableOperator` to return `true`, activating the `MailboxWatermarkProcessor` for PTF operators. This allows timer firing to be interrupted between mailbox iterations, improving throughput by not blocking mailbox processing during large timer batches. Also reorder `processWatermark()` to call `ingestCurrentWatermarkEvent()` before `super.processWatermark()` to keep the runner's watermark consistent with the timer service watermark, which is advanced before any timer fires. Add a test (`PROCESS_CONSISTENT_WATERMARK_TIMERS`) that validates multiple named timers registered at the same timestamp all see a consistent watermark in their callbacks. --- .../ProcessTableFunctionSemanticTests.java | 1 + .../ProcessTableFunctionTestPrograms.java | 29 +++++++++++++++++++ .../stream/ProcessTableFunctionTestUtils.java | 20 +++++++++++++ .../process/AbstractProcessTableOperator.java | 11 +++++-- 4 files changed, 59 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java index 7903b93bc5d1f..bbf44018dc07c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java @@ -87,6 +87,7 @@ public List programs() { ProcessTableFunctionTestPrograms.PROCESS_NAMED_TIMERS, ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS, ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS, + ProcessTableFunctionTestPrograms.PROCESS_CONSISTENT_WATERMARK_TIMERS, ProcessTableFunctionTestPrograms.PROCESS_SCALAR_ARGS_TIME, ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_PARTITION_BY_TIME, ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_ON_TIME, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java index 7248ff1755a7f..5360f8a7264e7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ContextFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction; +import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ConsistentWatermarkTimersFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalDayArgFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalYearArgFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidPassThroughTimersFunction; @@ -1144,6 +1145,34 @@ public class ProcessTableFunctionTestPrograms { "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") .build(); + public static final TableTestProgram PROCESS_CONSISTENT_WATERMARK_TIMERS = + TableTestProgram.of( + "process-consistent-watermark-timers", + "test that multiple named timers registered at the same timestamp all see a consistent watermark") + .setupTemporarySystemFunction("f", ConsistentWatermarkTimersFunction.class) + .setupTableSource(TIMED_SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA) + .consumedValues( + "+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Registering timer timerA for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Registering timer timerB for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Registering timer timerC for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", + "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:00:00.002Z] at time 2 watermark 0}, 1970-01-01T00:00:00.002Z]", + "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 1}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 2}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark 3}, 1970-01-01T00:00:00.005Z]", + "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark 4}, 1970-01-01T00:00:00.006Z]", + "+I[Bob, {Timer timerA fired at time 5 watermark 5}, 1970-01-01T00:00:00.005Z]", + "+I[Bob, {Timer timerB fired at time 5 watermark 5}, 1970-01-01T00:00:00.005Z]", + "+I[Bob, {Timer timerC fired at time 5 watermark 5}, 1970-01-01T00:00:00.005Z]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") + .build(); + public static final TableTestProgram PROCESS_SCALAR_ARGS_TIME = TableTestProgram.of( "process-scalar-args-time", diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java index be6902a0b17ea..af2f088e51ae7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java @@ -662,6 +662,26 @@ public void onTimer(OnTimerContext ctx) { } } + /** Testing function for validating watermark consistency across same-timestamp timer callbacks. */ + public static class ConsistentWatermarkTimersFunction extends AppendProcessTableFunctionBase { + public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row r) { + final TimeContext timeCtx = ctx.timeContext(Long.class); + collectEvalEvent(timeCtx, r); + if (timeCtx.time() == 0) { + // Register multiple named timers at the same time to validate that all timer + // callbacks see a consistent watermark, even when interrupted across mailbox + // iterations. + collectCreateTimer(timeCtx, "timerA", 5L); + collectCreateTimer(timeCtx, "timerB", 5L); + collectCreateTimer(timeCtx, "timerC", 5L); + } + } + + public void onTimer(OnTimerContext ctx) { + collectOnTimerEvent(ctx); + } + } + /** Testing function. */ public static class ScalarArgsTimeFunction extends AppendProcessTableFunctionBase { public void eval(Context ctx) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java index 93eee5e8e4f00..22626f5e075b4 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; @@ -145,11 +146,17 @@ public void open() throws Exception { FunctionUtils.openFunction(processTableRunner, DefaultOpenContext.INSTANCE); } + @Override + public final boolean useInterruptibleTimers(ReadableConfig config) { + return true; + } + @Override public void processWatermark(Watermark mark) throws Exception { - super.processWatermark(mark); - // TODO this line has issues with interruptible timers, see FLINK-39437 + // Update the runner's watermark before firing timers to keep it consistent with the + // timer service watermark, which is also advanced before any timer fires. processTableRunner.ingestCurrentWatermarkEvent(mark.getTimestamp()); + super.processWatermark(mark); } @Override From 1e56e43b621e141692c6fc1bc7a577e51bb0098a Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Mon, 20 Apr 2026 09:34:02 +0200 Subject: [PATCH 2/3] [FLINK-39437][table] Apply formatter --- .../nodes/exec/stream/ProcessTableFunctionTestPrograms.java | 2 +- .../plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java index 5360f8a7264e7..ed113cf64f147 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java @@ -25,10 +25,10 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ChainedReceivingFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ChainedSendingFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ClearStateFunction; +import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ConsistentWatermarkTimersFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ContextFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction; -import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ConsistentWatermarkTimersFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalDayArgFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalYearArgFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidPassThroughTimersFunction; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java index af2f088e51ae7..8ac499ea76ea8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java @@ -662,7 +662,9 @@ public void onTimer(OnTimerContext ctx) { } } - /** Testing function for validating watermark consistency across same-timestamp timer callbacks. */ + /** + * Testing function for validating watermark consistency across same-timestamp timer callbacks. + */ public static class ConsistentWatermarkTimersFunction extends AppendProcessTableFunctionBase { public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row r) { final TimeContext timeCtx = ctx.timeContext(Long.class); From 869e4748d52c374442d8502f9e8697f27c2a4853 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Mon, 20 Apr 2026 19:15:16 +0200 Subject: [PATCH 3/3] [FLINK-39437][table] Add unit test for ProcessSetTableOperator with interruptible timers --- ...tTableOperatorInterruptibleTimersTest.java | 344 ++++++++++++++++++ 1 file changed, 344 insertions(+) create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java new file mode 100644 index 0000000000000..f0580a89595ba --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.process; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness; +import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.ProcessTableFunction.TimeContext; +import org.apache.flink.table.runtime.generated.HashFunction; +import org.apache.flink.table.runtime.generated.ProcessTableRunner; +import org.apache.flink.table.runtime.generated.RecordComparator; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that {@link ProcessSetTableOperator} drives {@link ProcessTableRunner} correctly when user + * timers are fired under unaligned checkpoints, both with and without interruptible timers. + * + *

A minimal {@link ProcessTableRunner} is used: {@link ProcessTableRunner#callEval()} emits a + * "record-$ts@wm$wm" row via {@code evalCollector} and registers an unnamed event-time timer at the + * row's event time (plus an additional named timer for the row at ts=1000); {@link + * ProcessTableRunner#callOnTimer()} emits a "fired-[name-]$ts@wm$wm" row via {@code + * onTimerCollector}. The operator is wrapped so that after each timer firing a non-deferrable + * "mail-[name-]$ts" mailbox mail is scheduled — the PTF user API does not expose the mailbox, so + * this is the only way to observe externally whether the mailbox is drained between timer firings. + * + *

With interruptible timers enabled, the mailbox is drained between successive timer firings, so + * each "fired" row is immediately followed by its "mail" row. With interruptible timers disabled, + * all timers in a batch fire first (and the watermark is emitted downstream) before any of the + * scheduled mails are processed. The watermark values on "record" and "fired" rows also validate + * that {@link TimeContext#currentWatermark()} advances consistently across input processing and + * timer firing. + */ +class ProcessSetTableOperatorInterruptibleTimersTest { + + private static final InternalTypeInfo INPUT_TYPE = + InternalTypeInfo.ofFields(new BigIntType(), new TimestampType(3)); + + // Output layout is driven by PassPartitionKeysCollector / PassAllCollector in + // AbstractProcessTableOperator: [partition-key, PTF-emitted label, rowtime]. + private static final InternalTypeInfo OUTPUT_TYPE = + InternalTypeInfo.ofFields( + new BigIntType(), VarCharType.STRING_TYPE, new TimestampType(3)); + + private static final RowDataKeySelector KEY_SELECTOR = + HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, INPUT_TYPE.toRowFieldTypes()); + + private static final long KEY = 42L; + + private static final String NAMED_TIMER = "n"; + + @ParameterizedTest(name = "interruptibleTimers={0}") + @ValueSource(booleans = {true, false}) + void testTimersThroughProcessTableRunner(boolean interruptibleTimers) throws Exception { + try (StreamTaskMailboxTestHarness harness = createHarness(interruptibleTimers)) { + // Each input row registers one event-time timer at its own timestamp. The row at + // ts=1000 additionally registers a named timer at the same timestamp, so the first + // batch contains three unnamed firings and one named firing at identical timestamps + // (ts=1000) — exercising both timer services advancing together. + for (long ts = 1000L; ts <= 3000L; ts += 1000L) { + harness.processElement( + new StreamRecord<>( + GenericRowData.of(KEY, TimestampData.fromEpochMillis(ts)), ts)); + } + // Advance watermark past registered timers (first batch: three unnamed + one named). + harness.processElement(new Watermark(5000L)); + + // One more input row followed by a second watermark advance (second batch: one + // unnamed timer only). + harness.processElement( + new StreamRecord<>( + GenericRowData.of(KEY, TimestampData.fromEpochMillis(6000L)), 6000L)); + harness.processElement(new Watermark(7000L)); + + List labels = + harness.getOutput().stream() + .map(ProcessSetTableOperatorInterruptibleTimersTest::describe) + .collect(Collectors.toList()); + + // Records emitted from callEval carry the watermark visible during eval, which is null + // before any watermark has arrived and 5000 for the record processed between the two + // watermarks. + if (interruptibleTimers) { + // With interruptible timers, the mailbox is drained between successive timer + // firings, so each "fired-[name-]$ts" is immediately followed by its matching + // "mail-[name-]$ts". The watermark is only emitted downstream once all firings in + // the batch have completed. + assertThat(labels) + .containsExactly( + recordLabel(1000L, null), + recordLabel(2000L, null), + recordLabel(3000L, null), + firedLabel(null, 1000L, 5000L), + mailLabel(null, 1000L), + firedLabel(null, 2000L, 5000L), + mailLabel(null, 2000L), + firedLabel(null, 3000L, 5000L), + mailLabel(null, 3000L), + firedLabel(NAMED_TIMER, 1000L, 5000L), + mailLabel(NAMED_TIMER, 1000L), + watermarkLabel(5000L), + recordLabel(6000L, 5000L), + firedLabel(null, 6000L, 7000L), + mailLabel(null, 6000L), + watermarkLabel(7000L)); + } else { + // Without interruptible timers, all timers in a batch fire synchronously (emitting + // their "fired" rows) before the watermark is emitted downstream. Only afterwards + // does the mailbox drain the scheduled "mail" rows in FIFO order. + assertThat(labels) + .containsExactly( + recordLabel(1000L, null), + recordLabel(2000L, null), + recordLabel(3000L, null), + firedLabel(null, 1000L, 5000L), + firedLabel(null, 2000L, 5000L), + firedLabel(null, 3000L, 5000L), + firedLabel(NAMED_TIMER, 1000L, 5000L), + watermarkLabel(5000L), + mailLabel(null, 1000L), + mailLabel(null, 2000L), + mailLabel(null, 3000L), + mailLabel(NAMED_TIMER, 1000L), + recordLabel(6000L, 5000L), + firedLabel(null, 6000L, 7000L), + watermarkLabel(7000L), + mailLabel(null, 6000L)); + } + } + } + + private StreamTaskMailboxTestHarness createHarness(boolean interruptibleTimers) + throws Exception { + return new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, OUTPUT_TYPE) + .addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(1)) + .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, true) + .addJobConfig( + CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, + interruptibleTimers) + .setKeyType(KEY_SELECTOR.getProducedType()) + .addInput(INPUT_TYPE, 1, KEY_SELECTOR) + .setupOperatorChain(new TestOperatorFactory()) + .name("ptf") + .finishForSingletonOperatorChain( + OUTPUT_TYPE.createSerializer(new SerializerConfigImpl())) + .build(); + } + + private static String describe(Object obj) { + if (obj instanceof Watermark) { + return watermarkLabel(((Watermark) obj).getTimestamp()); + } else if (obj instanceof StreamRecord) { + Object value = ((StreamRecord) obj).getValue(); + if (value instanceof RowData) { + // Field 1 is the PTF-emitted label (see OUTPUT_TYPE); fields 0 and 2 are the + // pass-through partition key and rowtime. + return ((RowData) value).getString(1).toString(); + } + return value.toString(); + } + return obj.toString(); + } + + private static String recordLabel(long ts, @Nullable Long watermark) { + return "record-" + ts + "@wm" + (watermark == null ? "null" : watermark); + } + + private static String firedLabel(@Nullable String name, long ts, long watermark) { + return "fired-" + (name == null ? "" : name + "-") + ts + "@wm" + watermark; + } + + private static String mailLabel(@Nullable String name, long ts) { + return "mail-" + (name == null ? "" : name + "-") + ts; + } + + private static String watermarkLabel(long ts) { + return "Watermark@" + ts; + } + + private static RuntimeTableSemantics tableSemantics() { + return new RuntimeTableSemantics( + "r", + 0, + DataTypes.ROW( + DataTypes.FIELD("k", DataTypes.BIGINT()), + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3))), + new int[] {0}, + new int[0], + new RuntimeTableSemantics.SortDirection[0], + RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()), + /* passColumnsThrough */ false, + /* hasSetSemantics */ true, + /* timeColumn */ 1); + } + + // -------------------------------------------------------------------------------------------- + // Factory and operator + // -------------------------------------------------------------------------------------------- + + private static class TestOperatorFactory extends AbstractStreamOperatorFactory { + private static final long serialVersionUID = 1L; + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) new TestProcessSetTableOperator(parameters); + } + + @Override + @SuppressWarnings("rawtypes") + public Class getStreamOperatorClass(ClassLoader classLoader) { + return TestProcessSetTableOperator.class; + } + } + + /** + * {@link ProcessSetTableOperator} with a test-only override of {@link #onEventTime} that + * schedules a non-deferrable "mail-[name-]$ts" emission after each timer firing. The PTF user + * API does not expose the mailbox; this test-only shortcut is the only way to observe whether + * interruption happens between timer firings driven by {@link + * ProcessTableRunner#callOnTimer()}. + */ + private static class TestProcessSetTableOperator extends ProcessSetTableOperator { + + private final MailboxExecutor mailboxExecutor; + + TestProcessSetTableOperator(StreamOperatorParameters parameters) { + super( + parameters, + List.of(tableSemantics()), + List.of(), + // No ORDER BY on the input, so the comparator slot is null (the operator + // short-circuits sort-buffer setup when orderByColumns is empty). The array + // size must still match the number of inputs. + new RecordComparator[] {null}, + new TestProcessTableRunner(), + new HashFunction[0], + new RecordEqualiser[0], + RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()), + List.of()); + this.mailboxExecutor = parameters.getMailboxExecutor(); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + // Runs the full PTF path: ingestTimerEvent -> processOnTimer -> callOnTimer, emitting + // "fired-[name-]$ts" via onTimerCollector. + super.onEventTime(timer); + long ts = timer.getTimestamp(); + long keyValue = timer.getKey().getLong(0); + final Object ns = timer.getNamespace(); + final String name = (ns instanceof StringData) ? ((StringData) ns).toString() : null; + mailboxExecutor.execute( + () -> + output.collect( + new StreamRecord<>( + GenericRowData.of( + keyValue, + StringData.fromString(mailLabel(name, ts)), + TimestampData.fromEpochMillis(ts)))), + mailLabel(name, ts)); + } + } + + /** + * Minimal {@link ProcessTableRunner}: {@link #callEval} emits a "record-$ts@wm$wm" row and + * registers an unnamed event-time timer at the input row's event time (plus a named timer for + * the row at ts=1000); {@link #callOnTimer} emits a "fired-[name-]$ts@wm$wm" row. + */ + private static class TestProcessTableRunner extends ProcessTableRunner { + + @Override + public void callEval() { + final TimeContext tc = runnerContext.timeContext(Long.class); + final long ts = Objects.requireNonNull(tc.time(), "input row event time"); + final Long wm = tc.currentWatermark(); + tc.registerOnTime(ts); + if (ts == 1000L) { + tc.registerOnTime(NAMED_TIMER, ts); + } + evalCollector.collect(GenericRowData.of(StringData.fromString(recordLabel(ts, wm)))); + } + + @Override + public void callOnTimer() { + final TimeContext tc = runnerOnTimerContext.timeContext(Long.class); + final long ts = Objects.requireNonNull(tc.time(), "timer fire time"); + final long wm = + Objects.requireNonNull(tc.currentWatermark(), "watermark at timer fire"); + final String name = runnerOnTimerContext.currentTimer(); + onTimerCollector.collect( + GenericRowData.of(StringData.fromString(firedLabel(name, ts, wm)))); + } + } +}