Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/content.zh/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,18 @@ class TimerFunction extends ProcessTableFunction<String> {
{{< /tab >}}
{{< /tabs >}}

### Handling of Late Records

A late record is a record with a time attribute value that is less than or equal to the current
watermark. PTFs handle late records just like non-late records by calling the `eval()` method. If
the `on_time` argument is specified, the late timestamp is preserved in the output. This behavior is
the same for PTFs with row and set semantics.

Registering a timer for a time that is less than or equal to the current watermark is allowed.
If registered from within `eval()`, the timer fires on the next watermark advance. If registered
from within `onTimer()`, the timer fires immediately after the current timer finishes. Note that
unconditionally re-registering a past-time timer from within `onTimer()` causes an infinite loop.

### Efficiency and Design Principles

Registering too many timers might affect performance. An ever-growing timer state can happen
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,18 @@ class TimerFunction extends ProcessTableFunction<String> {
{{< /tab >}}
{{< /tabs >}}

### Handling of Late Records
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy this to the Chinese docs as well


A late record is a record with a time attribute value that is less than or equal to the current
watermark. PTFs handle late records just like non-late records by calling the `eval()` method. If
the `on_time` argument is specified, the late timestamp is preserved in the output. This behavior is
the same for PTFs with row and set semantics.

Registering a timer for a time that is less than or equal to the current watermark is allowed.
If registered from within `eval()`, the timer fires on the next watermark advance. If registered
from within `onTimer()`, the timer fires immediately after the current timer finishes. Note that
unconditionally re-registering a past-time timer from within `onTimer()` causes an infinite loop.

### Efficiency and Design Principles

Registering too many timers might affect performance. An ever-growing timer state can happen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,19 @@
* }
* }</pre>
*
* <h2>Handling of Late Records</h2>
*
* <p>A late record is a record with a time attribute value that is less than or equal to the
* current watermark. PTFs handle late records just like non-late records by calling the {@code
* eval()} method. If the {@code on_time} argument is specified, the late timestamp is preserved in
* the output. This behavior is the same for PTFs with row and set semantics.
*
* <p>Registering a timer for a time that is less than or equal to the current watermark is allowed.
* If registered from within {@code eval()}, the timer fires on the next watermark advance. If
* registered from within {@code onTimer()}, the timer fires immediately after the current timer
* finishes. Note that unconditionally re-registering a past-time timer from within {@code
* onTimer()} causes an infinite loop.
*
* <h2>Efficiency and Design Principles</h2>
*
* <p>Registering too many timers might affect performance. An ever-growing timer state can happen
Expand Down Expand Up @@ -660,6 +673,12 @@ public interface TimeContext<TimeType> {
* timer only fires if a watermark was received from all inputs and the timestamp is smaller
* or equal to the minimum of all received watermarks.
*
* <p>If the timestamp of the registered timer is already less than or equal to the current
* watermark, the timer fires on the next watermark advance if registered from within {@code
* eval()}, or immediately after the current timer finishes if registered from within {@code
* onTimer()}. Note that unconditionally re-registering a past-time timer from within {@code
* onTimer()} causes an infinite loop.
*
* <p>Timers can be named for distinguishing them in the {@code onTimer()} method.
* Registering a timer under the same name twice will replace an existing timer.
*
Expand All @@ -680,6 +699,12 @@ public interface TimeContext<TimeType> {
* timer only fires if a watermark was received from all inputs and the timestamp is smaller
* or equal to the minimum of all received watermarks.
*
* <p>If the timestamp of the registered timer is already less than or equal to the current
* watermark, the timer fires on the next watermark advance if registered from within {@code
* eval()}, or immediately after the current timer finishes if registered from within {@code
* onTimer()}. Note that unconditionally re-registering a past-time timer from within {@code
* onTimer()} causes an infinite loop.
*
* <p>Only one timer can be registered for a given time.
*
* <p>Note: Because only PTFs taking set semantic tables support state, and timers are a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public List<TableTestProgram> programs() {
ProcessTableFunctionTestPrograms.PROCESS_NAMED_TIMERS,
ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS,
ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS,
ProcessTableFunctionTestPrograms.PROCESS_ROW_LATE_EVENTS,
ProcessTableFunctionTestPrograms.PROCESS_SCALAR_ARGS_TIME,
ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_PARTITION_BY_TIME,
ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_ON_TIME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateTimeFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoWithDefaultStateFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RequiredTimeFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTableFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTablePassThroughFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsFunction;
Expand Down Expand Up @@ -1075,8 +1076,8 @@ public class ProcessTableFunctionTestPrograms {
"+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark null}, 1970-01-01T00:00:00.004Z]",
"+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark null}, 1970-01-01T00:00:00.005Z]",
"+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark null}, 1970-01-01T00:00:00.006Z]",
"+I[Bob, {Timer timeout2 fired at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]",
"+I[Bob, {Clearing all timers at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]")
"+I[Bob, {Timer timeout2 fired at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is basically the bugfix, right? We were incorrectly firing timeout2 at 2 and now we fire corrently at 5

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really a bug fix. It's an artifact of allowing late timer registration.

"+I[Bob, {Clearing all timers at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))")
Expand Down Expand Up @@ -1115,35 +1116,61 @@ public class ProcessTableFunctionTestPrograms {
public static final TableTestProgram PROCESS_LATE_EVENTS =
TableTestProgram.of(
"process-late-events",
"test that late events are dropped in both input and when registering timers")
"test that late events enter PTF (eval and timer registration)")
.setupTemporarySystemFunction("f", LateTimersFunction.class)
.setupTableSource(TIMED_SOURCE_LATE_EVENTS)
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
.consumedValues(
"+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Registering timer t for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Registering timer bob for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Registering timer for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
"+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
"+I[Alice, {Registering timer t for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
"+I[Alice, {Registering timer for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
"+I[Alice, {Registering timer alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
"+I[Bob, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
"+I[Alice, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
"+I[Alice, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Timer t fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
"+I[Alice, {Timer t fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
"+I[Alice, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Timer bob fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
"+I[Bob, {Registering timer t for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
"+I[Bob, {Registering timer for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]")
"+I[Bob, {Registering timer bob for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
"+I[Bob, {Registering timer for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
"+I[Bob, {Timer null fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Timer bob fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]",
"+I[Alice, {Timer alice fired at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
"+I[Alice, {Registering timer alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
"+I[Alice, {Timer alice-again fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
"+I[Bob, {Registering timer bob for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
"+I[Bob, {Registering timer for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
"+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
"+I[Bob, {Registering timer bob for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
"+I[Bob, {Registering timer for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
"+I[Bob, {Timer null fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Timer bob fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))")
.build();

public static final TableTestProgram PROCESS_ROW_LATE_EVENTS =
TableTestProgram.of(
"process-row-late-events",
"test that late events enter a PTF with row semantics")
.setupTemporarySystemFunction("f", RequiredTimeFunction.class)
.setupTableSource(TIMED_SOURCE_LATE_EVENTS)
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema(TIMED_BASE_SINK_SCHEMA)
.consumedValues(
"+I[{+I[Bob, 1, 1970-01-01T00:00:00Z]}, 1970-01-01T00:00:00Z]",
"+I[{+I[Alice, 1, 1970-01-01T00:00:00.001Z]}, 1970-01-01T00:00:00.001Z]",
"+I[{+I[Bob, 2, 1970-01-01T00:01:39.999Z]}, 1970-01-01T00:01:39.999Z]",
"+I[{+I[Bob, 3, 1970-01-01T00:00:00.003Z]}, 1970-01-01T00:00:00.003Z]",
"+I[{+I[Bob, 4, 1970-01-01T00:00:00.004Z]}, 1970-01-01T00:00:00.004Z]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM f(r => TABLE t, on_time => DESCRIPTOR(ts))")
.build();

public static final TableTestProgram PROCESS_SCALAR_ARGS_TIME =
TableTestProgram.of(
"process-scalar-args-time",
Expand Down Expand Up @@ -1187,29 +1214,33 @@ public class ProcessTableFunctionTestPrograms {
public static final TableTestProgram PROCESS_OPTIONAL_ON_TIME =
TableTestProgram.of(
"process-optional-on-time",
"test optional time attribute, fire once for constant timer")
"test optional time attribute, re-fires timer for constant timer registration after time passed")
.setupTemporarySystemFunction("f", OptionalOnTimeFunction.class)
.setupTableSource(TIMED_SOURCE)
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema(KEYED_BASE_SINK_SCHEMA)
.consumedValues(
"+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time null watermark null}]",
"+I[Bob, {Registering timer t for 2 at time null watermark null}]",
"+I[Bob, {Registering timer t for 1 at time null watermark null}]",
"+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time null watermark -1}]",
"+I[Alice, {Registering timer t for 2 at time null watermark -1}]",
"+I[Alice, {Registering timer t for 1 at time null watermark -1}]",
"+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:00:00.002Z] at time null watermark 0}]",
"+I[Bob, {Registering timer t for 2 at time null watermark 0}]",
"+I[Bob, {Registering timer t for 1 at time null watermark 0}]",
"+I[Alice, {Timer t fired at time 1 watermark 1}]",
"+I[Bob, {Timer t fired at time 1 watermark 1}]",
"+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time null watermark 1}]",
"+I[Bob, {Registering timer t for 2 at time null watermark 1}]",
"+I[Alice, {Timer t fired at time 2 watermark 2}]",
"+I[Bob, {Timer t fired at time 2 watermark 2}]",
"+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time null watermark 2}]",
"+I[Bob, {Registering timer t for 2 at time null watermark 2}]",
"+I[Bob, {Registering timer t for 3 at time null watermark 2}]",
"+I[Bob, {Timer t fired at time 3 watermark 3}]",
"+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time null watermark 3}]",
"+I[Bob, {Registering timer t for 2 at time null watermark 3}]",
"+I[Bob, {Registering timer t for 4 at time null watermark 3}]",
"+I[Bob, {Timer t fired at time 4 watermark 4}]",
"+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time null watermark 4}]",
"+I[Bob, {Registering timer t for 2 at time null watermark 4}]")
"+I[Bob, {Registering timer t for 5 at time null watermark 4}]",
"+I[Bob, {Timer t fired at time 5 watermark 5}]")
.build())
.runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name)")
.build();
Expand Down
Loading
Loading