diff --git a/docs/content.zh/docs/dev/table/functions/ptfs.md b/docs/content.zh/docs/dev/table/functions/ptfs.md index 9a077a6473c31..fc89ab76f6c35 100644 --- a/docs/content.zh/docs/dev/table/functions/ptfs.md +++ b/docs/content.zh/docs/dev/table/functions/ptfs.md @@ -1021,6 +1021,18 @@ class TimerFunction extends ProcessTableFunction { {{< /tab >}} {{< /tabs >}} +### Handling of Late Records + +A late record is a record with a time attribute value that is less than or equal to the current +watermark. PTFs handle late records just like non-late records by calling the `eval()` method. If +the `on_time` argument is specified, the late timestamp is preserved in the output. This behavior is +the same for PTFs with row and set semantics. + +Registering a timer for a time that is less than or equal to the current watermark is allowed. +If registered from within `eval()`, the timer fires on the next watermark advance. If registered +from within `onTimer()`, the timer fires immediately after the current timer finishes. Note that +unconditionally re-registering a past-time timer from within `onTimer()` causes an infinite loop. + ### Efficiency and Design Principles Registering too many timers might affect performance. An ever-growing timer state can happen diff --git a/docs/content/docs/dev/table/functions/ptfs.md b/docs/content/docs/dev/table/functions/ptfs.md index 1b919dbc0d3f5..0b392c56d1141 100644 --- a/docs/content/docs/dev/table/functions/ptfs.md +++ b/docs/content/docs/dev/table/functions/ptfs.md @@ -1022,6 +1022,18 @@ class TimerFunction extends ProcessTableFunction { {{< /tab >}} {{< /tabs >}} +### Handling of Late Records + +A late record is a record with a time attribute value that is less than or equal to the current +watermark. PTFs handle late records just like non-late records by calling the `eval()` method. If +the `on_time` argument is specified, the late timestamp is preserved in the output. This behavior is +the same for PTFs with row and set semantics. + +Registering a timer for a time that is less than or equal to the current watermark is allowed. +If registered from within `eval()`, the timer fires on the next watermark advance. If registered +from within `onTimer()`, the timer fires immediately after the current timer finishes. Note that +unconditionally re-registering a past-time timer from within `onTimer()` causes an infinite loop. + ### Efficiency and Design Principles Registering too many timers might affect performance. An ever-growing timer state can happen diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java index bec4bcc85938d..19e5bdee501dd 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java @@ -403,6 +403,19 @@ * } * } * + *

Handling of Late Records

+ * + *

A late record is a record with a time attribute value that is less than or equal to the + * current watermark. PTFs handle late records just like non-late records by calling the {@code + * eval()} method. If the {@code on_time} argument is specified, the late timestamp is preserved in + * the output. This behavior is the same for PTFs with row and set semantics. + * + *

Registering a timer for a time that is less than or equal to the current watermark is allowed. + * If registered from within {@code eval()}, the timer fires on the next watermark advance. If + * registered from within {@code onTimer()}, the timer fires immediately after the current timer + * finishes. Note that unconditionally re-registering a past-time timer from within {@code + * onTimer()} causes an infinite loop. + * *

Efficiency and Design Principles

* *

Registering too many timers might affect performance. An ever-growing timer state can happen @@ -660,6 +673,12 @@ public interface TimeContext { * timer only fires if a watermark was received from all inputs and the timestamp is smaller * or equal to the minimum of all received watermarks. * + *

If the timestamp of the registered timer is already less than or equal to the current + * watermark, the timer fires on the next watermark advance if registered from within {@code + * eval()}, or immediately after the current timer finishes if registered from within {@code + * onTimer()}. Note that unconditionally re-registering a past-time timer from within {@code + * onTimer()} causes an infinite loop. + * *

Timers can be named for distinguishing them in the {@code onTimer()} method. * Registering a timer under the same name twice will replace an existing timer. * @@ -680,6 +699,12 @@ public interface TimeContext { * timer only fires if a watermark was received from all inputs and the timestamp is smaller * or equal to the minimum of all received watermarks. * + *

If the timestamp of the registered timer is already less than or equal to the current + * watermark, the timer fires on the next watermark advance if registered from within {@code + * eval()}, or immediately after the current timer finishes if registered from within {@code + * onTimer()}. Note that unconditionally re-registering a past-time timer from within {@code + * onTimer()} causes an infinite loop. + * *

Only one timer can be registered for a given time. * *

Note: Because only PTFs taking set semantic tables support state, and timers are a diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java index d95c4a7dd732a..03ef42de0e142 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java @@ -40,6 +40,7 @@ public List programs() { ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE_RESTORE, ProcessTableFunctionTestPrograms.PROCESS_UPDATING_OUTPUT_UPSERT_RESTORE, ProcessTableFunctionTestPrograms.PROCESS_ORDER_BY_RESTORE, - ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE); + ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE, + ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS_RESTORE); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java index 7903b93bc5d1f..aa547037a13e9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java @@ -87,6 +87,7 @@ public List programs() { ProcessTableFunctionTestPrograms.PROCESS_NAMED_TIMERS, ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS, ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS, + ProcessTableFunctionTestPrograms.PROCESS_ROW_LATE_EVENTS, ProcessTableFunctionTestPrograms.PROCESS_SCALAR_ARGS_TIME, ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_PARTITION_BY_TIME, ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_ON_TIME, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java index 7248ff1755a7f..062de5c241604 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java @@ -50,6 +50,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateTimeFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoWithDefaultStateFunction; +import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RequiredTimeFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTableFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTablePassThroughFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsFunction; @@ -1075,8 +1076,8 @@ public class ProcessTableFunctionTestPrograms { "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark null}, 1970-01-01T00:00:00.004Z]", "+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark null}, 1970-01-01T00:00:00.005Z]", "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark null}, 1970-01-01T00:00:00.006Z]", - "+I[Bob, {Timer timeout2 fired at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]", - "+I[Bob, {Clearing all timers at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]") + "+I[Bob, {Timer timeout2 fired at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]", + "+I[Bob, {Clearing all timers at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]") .build()) .runSql( "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") @@ -1115,7 +1116,7 @@ public class ProcessTableFunctionTestPrograms { public static final TableTestProgram PROCESS_LATE_EVENTS = TableTestProgram.of( "process-late-events", - "test that late events are dropped in both input and when registering timers") + "test that late events enter PTF (eval and timer registration)") .setupTemporarySystemFunction("f", LateTimersFunction.class) .setupTableSource(TIMED_SOURCE_LATE_EVENTS) .setupTableSink( @@ -1123,27 +1124,102 @@ public class ProcessTableFunctionTestPrograms { .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA) .consumedValues( "+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer t for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Registering timer bob for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", "+I[Bob, {Registering timer for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", - "+I[Alice, {Registering timer t for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", - "+I[Alice, {Registering timer for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Registering timer alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", "+I[Bob, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Timer t fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Timer t fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", - "+I[Bob, {Registering timer t for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", - "+I[Bob, {Registering timer for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]") + "+I[Bob, {Registering timer bob for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Registering timer for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Timer null fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Alice, {Timer alice fired at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Registering timer alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Timer alice-again fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer bob for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer bob for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Timer null fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]") .build()) .runSql( "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") .build(); + public static final TableTestProgram PROCESS_LATE_EVENTS_RESTORE = + TableTestProgram.of( + "process-late-events-restore", + "test that late events and their past-time timers work correctly after restore") + .setupTemporarySystemFunction("f", LateTimersFunction.class) + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema(TIMED_SOURCE_SCHEMA) + .producedBeforeRestore( + Row.of("Bob", 1, Instant.ofEpochMilli(0)), + Row.of("Alice", 1, Instant.ofEpochMilli(1))) + .producedAfterRestore( + Row.of("Bob", 2, Instant.ofEpochMilli(99999)), + Row.of("Bob", 3, Instant.ofEpochMilli(3)), + Row.of("Bob", 4, Instant.ofEpochMilli(4))) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA) + .consumedBeforeRestore( + "+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Registering timer bob for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Registering timer for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Registering timer alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", + "+I[Bob, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]") + .consumedAfterRestore( + "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark null}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Registering timer bob for 0 at time 99999 watermark null}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Registering timer for 0 at time 99999 watermark null}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Timer null fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Alice, {Timer alice fired at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Registering timer alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Timer alice-again fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer bob for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer bob for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Timer null fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") + .build(); + + public static final TableTestProgram PROCESS_ROW_LATE_EVENTS = + TableTestProgram.of( + "process-row-late-events", + "test that late events enter a PTF with row semantics") + .setupTemporarySystemFunction("f", RequiredTimeFunction.class) + .setupTableSource(TIMED_SOURCE_LATE_EVENTS) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema(TIMED_BASE_SINK_SCHEMA) + .consumedValues( + "+I[{+I[Bob, 1, 1970-01-01T00:00:00Z]}, 1970-01-01T00:00:00Z]", + "+I[{+I[Alice, 1, 1970-01-01T00:00:00.001Z]}, 1970-01-01T00:00:00.001Z]", + "+I[{+I[Bob, 2, 1970-01-01T00:01:39.999Z]}, 1970-01-01T00:01:39.999Z]", + "+I[{+I[Bob, 3, 1970-01-01T00:00:00.003Z]}, 1970-01-01T00:00:00.003Z]", + "+I[{+I[Bob, 4, 1970-01-01T00:00:00.004Z]}, 1970-01-01T00:00:00.004Z]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM f(r => TABLE t, on_time => DESCRIPTOR(ts))") + .build(); + public static final TableTestProgram PROCESS_SCALAR_ARGS_TIME = TableTestProgram.of( "process-scalar-args-time", @@ -1187,7 +1263,7 @@ public class ProcessTableFunctionTestPrograms { public static final TableTestProgram PROCESS_OPTIONAL_ON_TIME = TableTestProgram.of( "process-optional-on-time", - "test optional time attribute, fire once for constant timer") + "test optional time attribute, re-fires timer for constant timer registration after time passed") .setupTemporarySystemFunction("f", OptionalOnTimeFunction.class) .setupTableSource(TIMED_SOURCE) .setupTableSink( @@ -1195,21 +1271,25 @@ public class ProcessTableFunctionTestPrograms { .addSchema(KEYED_BASE_SINK_SCHEMA) .consumedValues( "+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time null watermark null}]", - "+I[Bob, {Registering timer t for 2 at time null watermark null}]", + "+I[Bob, {Registering timer t for 1 at time null watermark null}]", "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time null watermark -1}]", - "+I[Alice, {Registering timer t for 2 at time null watermark -1}]", + "+I[Alice, {Registering timer t for 1 at time null watermark -1}]", "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:00:00.002Z] at time null watermark 0}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 0}]", + "+I[Bob, {Registering timer t for 1 at time null watermark 0}]", + "+I[Alice, {Timer t fired at time 1 watermark 1}]", + "+I[Bob, {Timer t fired at time 1 watermark 1}]", "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time null watermark 1}]", "+I[Bob, {Registering timer t for 2 at time null watermark 1}]", - "+I[Alice, {Timer t fired at time 2 watermark 2}]", "+I[Bob, {Timer t fired at time 2 watermark 2}]", "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time null watermark 2}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 2}]", + "+I[Bob, {Registering timer t for 3 at time null watermark 2}]", + "+I[Bob, {Timer t fired at time 3 watermark 3}]", "+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time null watermark 3}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 3}]", + "+I[Bob, {Registering timer t for 4 at time null watermark 3}]", + "+I[Bob, {Timer t fired at time 4 watermark 4}]", "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time null watermark 4}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 4}]") + "+I[Bob, {Registering timer t for 5 at time null watermark 4}]", + "+I[Bob, {Timer t fired at time 5 watermark 5}]") .build()) .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name)") .build(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java index be6902a0b17ea..495293c924498 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java @@ -645,20 +645,23 @@ public static class LateTimersFunction extends AppendProcessTableFunctionBase { public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row r) { final TimeContext timeCtx = ctx.timeContext(Long.class); collectEvalEvent(timeCtx, r); - // all timers should be executed once - if (timeCtx.time() == 99998) { - // will never be fired because it's late - collectCreateTimer(timeCtx, "late", 1L); + if (r.getFieldAs("name").equals("Bob")) { + // Bob registers timers at 0 + collectCreateTimer(timeCtx, "bob", 0L); + collectCreateTimer(timeCtx, 0L); + } else { + // Alice registers timer at 1 + collectCreateTimer(timeCtx, "alice", 1L); } - collectCreateTimer(timeCtx, "t", 0L); - collectCreateTimer(timeCtx, 0L); } public void onTimer(OnTimerContext ctx) { final TimeContext timeCtx = ctx.timeContext(Long.class); collectOnTimerEvent(ctx); - // will never be fired because it's late - collectCreateTimer(timeCtx, "again", timeCtx.time()); + if (ctx.currentTimer() != null && ctx.currentTimer().equals("alice")) { + // register a timer in the past, which should fire immediately + collectCreateTimer(timeCtx, "alice-again", 0); + } } } @@ -699,7 +702,12 @@ public static class OptionalOnTimeFunction extends AppendProcessTableFunctionBas public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row r) { final TimeContext timeCtx = ctx.timeContext(Long.class); collectEvalEvent(timeCtx, r); - collectCreateTimer(timeCtx, "t", 2); + Long wm = timeCtx.currentWatermark(); + // Register at wm+1 to always target the immediate next watermark: the timer fires + // exactly once per watermark advance, and each new row re-registers the timer for the + // following watermark step, demonstrating repeated timer re-registration. + long timer = wm == null || wm < 0 ? 1 : wm + 1; + collectCreateTimer(timeCtx, "t", timer); } public void onTimer(OnTimerContext ctx) { diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json new file mode 100644 index 0000000000000..8d22a24a156a8 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json @@ -0,0 +1,320 @@ +{ + "flinkVersion" : "2.4", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "dataType" : "INT" + }, { + "name" : "ts", + "dataType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "ts", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + }, + "serializableString" : "`ts` - INTERVAL '0.001' SECOND" + } + } ] + } + } + }, + "abilities" : [ { + "type" : "WatermarkPushDown", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + }, + "rowtimeExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + }, + "idleTimeoutMillis" : -1, + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "fieldType" : "INT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "watermarkParams" : { + "emitStrategy" : "ON_EVENT", + "alignGroupName" : null, + "alignMaxDrift" : "PT0S", + "alignUpdateInterval" : "PT1S", + "sourceIdleTimeout" : -1 + } + } ] + }, + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "fieldType" : "INT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, t, watermark=[-(ts, 1:INTERVAL SECOND)], watermarkEmitStrategy=[on-event]]], fields=[name, score, ts])" + }, { + "id" : 2, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "fieldType" : "INT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[name]])" + }, { + "id" : 3, + "type" : "stream-exec-process-table-function_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "out", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), DESCRIPTOR(_UTF-16LE'ts'), DEFAULT())], uid=[f], select=[name,out,rowtime], rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) out, TIMESTAMP_LTZ(3) *ROWTIME* rowtime)])", + "uid" : "f", + "functionCall" : { + "kind" : "CALL", + "systemName" : "f", + "operands" : [ { + "kind" : "TABLE_ARG_CALL", + "inputIndex" : 0, + "partitionKeys" : [ 0 ], + "orderKeys" : [ ], + "orderDirections" : [ ], + "type" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "fieldType" : "INT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + } + }, { + "kind" : "CALL", + "internalName" : "$DESCRIPTOR$1", + "operands" : [ { + "kind" : "LITERAL", + "value" : "ts", + "type" : "CHAR(2) NOT NULL" + } ], + "type" : "DESCRIPTOR NOT NULL" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$DEFAULT$1", + "type" : "VARCHAR(2147483647)" + } ], + "type" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "out", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + } + }, + "inputChangelogModes" : [ [ "INSERT" ] ], + "outputChangelogMode" : [ "INSERT" ] + }, { + "id" : 4, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "out", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + } ] + } + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "ADAPTIVE", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "out", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.sink], fields=[name, out, rowtime])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata new file mode 100644 index 0000000000000..90af4c3530896 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java index 318dd4ae0d358..b90a201649ed0 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java @@ -160,10 +160,6 @@ public long getTableWatermark() { } public void processEval() throws Exception { - // Drop late events - if (rowtime != null && rowtime <= currentWatermark) { - return; - } processMethod(this::callEval); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java index 93eee5e8e4f00..cf3a6149af653 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java @@ -155,9 +155,16 @@ public void processWatermark(Watermark mark) throws Exception { @Override public void onEventTime(InternalTimer timer) throws Exception { final Object namedTimer = timer.getNamespace(); + boolean isNamedTimer = namedTimer != VoidNamespace.INSTANCE; + // Remove the fired timer's state entry immediately to prevent stale entries from + // accumulating. Without this, entries for fired timers would persist until the same + // timer name is re-registered or the state is explicitly cleared. + if (isNamedTimer) { + namedTimersMapState.remove((StringData) namedTimer); + } processTableRunner.ingestTimerEvent( timer.getKey(), - namedTimer == VoidNamespace.INSTANCE ? null : (StringData) namedTimer, + isNamedTimer ? (StringData) namedTimer : null, timer.getTimestamp()); processTableRunner.processOnTimer(); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java index 5fd996fecaa15..ce97c630c80c7 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java @@ -87,11 +87,6 @@ public void clearAllTimers() { } private void registerOnTimeInternal(@Nullable String name, long newTime) { - if (newTime <= unnamedTimerService.currentWatermark()) { - // Do not register timers for late events. - // Otherwise, the next watermark would trigger an onTimer() that emits late events. - return; - } if (name != null) { replaceNamedTimer(StringData.fromString(name), newTime); } else {