Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/content.zh/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,18 @@ class TimerFunction extends ProcessTableFunction<String> {
{{< /tab >}}
{{< /tabs >}}

### Handling of Late Records

A late record is a record with a time attribute value that is less than or equal to the current
watermark. PTFs handle late records just like non-late records by calling the `eval()` method. If
the `on_time` argument is specified, the late timestamp is preserved in the output. This behavior is
the same for PTFs with row and set semantics.

Registering a timer for a time that is less than or equal to the current watermark is allowed.
If registered from within `eval()`, the timer fires on the next watermark advance. If registered
from within `onTimer()`, the timer fires immediately after the current timer finishes. Note that
unconditionally re-registering a past-time timer from within `onTimer()` causes an infinite loop.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly warning in Javadoc that 'unconditionally re-registering a past-time timer from within onTimer() causes an infinite loop';
Is it necessary to consider adding protection, such as adding detection logic.
Alternatively, it may be necessary to add this infinite loop error test in the test, such as adding the timer for 'alice-again' in LateTimersFunction # eval

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same behavior as for other functions with access to timers such as KeyedProcessFunction.
Before this change, we ignored the registrations of timers with a timestamp < current_watermark, which is inconsistent with Flink's other functions and surprising for Flink users.

We might be able to introduce guards against such behavior, but I don't think it is necessary. I don't think this is a common problem and adding logic to detect and prevent such logic would cause more problems than it solves.


### Efficiency and Design Principles

Registering too many timers might affect performance. An ever-growing timer state can happen
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,18 @@ class TimerFunction extends ProcessTableFunction<String> {
{{< /tab >}}
{{< /tabs >}}

### Handling of Late Records
Comment thread
fhueske marked this conversation as resolved.

A late record is a record with a time attribute value that is less than or equal to the current
watermark. PTFs handle late records just like non-late records by calling the `eval()` method. If
the `on_time` argument is specified, the late timestamp is preserved in the output. This behavior is
the same for PTFs with row and set semantics.

Registering a timer for a time that is less than or equal to the current watermark is allowed.
If registered from within `eval()`, the timer fires on the next watermark advance. If registered
from within `onTimer()`, the timer fires immediately after the current timer finishes. Note that
unconditionally re-registering a past-time timer from within `onTimer()` causes an infinite loop.

### Efficiency and Design Principles

Registering too many timers might affect performance. An ever-growing timer state can happen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,19 @@
* }
* }</pre>
*
* <h2>Handling of Late Records</h2>
*
* <p>A late record is a record with a time attribute value that is less than or equal to the
* current watermark. PTFs handle late records just like non-late records by calling the {@code
* eval()} method. If the {@code on_time} argument is specified, the late timestamp is preserved in
* the output. This behavior is the same for PTFs with row and set semantics.
*
* <p>Registering a timer for a time that is less than or equal to the current watermark is allowed.
* If registered from within {@code eval()}, the timer fires on the next watermark advance. If
* registered from within {@code onTimer()}, the timer fires immediately after the current timer
* finishes. Note that unconditionally re-registering a past-time timer from within {@code
* onTimer()} causes an infinite loop.
*
* <h2>Efficiency and Design Principles</h2>
*
* <p>Registering too many timers might affect performance. An ever-growing timer state can happen
Expand Down Expand Up @@ -660,6 +673,12 @@ public interface TimeContext<TimeType> {
* timer only fires if a watermark was received from all inputs and the timestamp is smaller
* or equal to the minimum of all received watermarks.
*
* <p>If the timestamp of the registered timer is already less than or equal to the current
* watermark, the timer fires on the next watermark advance if registered from within {@code
* eval()}, or immediately after the current timer finishes if registered from within {@code
* onTimer()}. Note that unconditionally re-registering a past-time timer from within {@code
* onTimer()} causes an infinite loop.
*
* <p>Timers can be named for distinguishing them in the {@code onTimer()} method.
* Registering a timer under the same name twice will replace an existing timer.
*
Expand All @@ -680,6 +699,12 @@ public interface TimeContext<TimeType> {
* timer only fires if a watermark was received from all inputs and the timestamp is smaller
* or equal to the minimum of all received watermarks.
*
* <p>If the timestamp of the registered timer is already less than or equal to the current
* watermark, the timer fires on the next watermark advance if registered from within {@code
* eval()}, or immediately after the current timer finishes if registered from within {@code
* onTimer()}. Note that unconditionally re-registering a past-time timer from within {@code
* onTimer()} causes an infinite loop.
*
* <p>Only one timer can be registered for a given time.
*
* <p>Note: Because only PTFs taking set semantic tables support state, and timers are a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public List<TableTestProgram> programs() {
ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE_RESTORE,
ProcessTableFunctionTestPrograms.PROCESS_UPDATING_OUTPUT_UPSERT_RESTORE,
ProcessTableFunctionTestPrograms.PROCESS_ORDER_BY_RESTORE,
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE);
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE,
ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS_RESTORE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public List<TableTestProgram> programs() {
ProcessTableFunctionTestPrograms.PROCESS_NAMED_TIMERS,
ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS,
ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS,
ProcessTableFunctionTestPrograms.PROCESS_ROW_LATE_EVENTS,
ProcessTableFunctionTestPrograms.PROCESS_SCALAR_ARGS_TIME,
ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_PARTITION_BY_TIME,
ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_ON_TIME,
Expand Down
Loading
Loading