[FLINK-39436][table] Allow late data in PTFs#27935
Conversation
gustavodemorais
left a comment
There was a problem hiding this comment.
The PR looks good in general to me!
Two nits: can you rename the PR title and commit title to use this format [FLINK-39436][table]?
| "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark null}, 1970-01-01T00:00:00.006Z]", | ||
| "+I[Bob, {Timer timeout2 fired at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]", | ||
| "+I[Bob, {Clearing all timers at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]") | ||
| "+I[Bob, {Timer timeout2 fired at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]", |
There was a problem hiding this comment.
Nice, this is basically the bugfix, right? We were incorrectly firing timeout2 at 2 and now we fire corrently at 5
There was a problem hiding this comment.
It's not really a bug fix. It's an artifact of allowing late timer registration.
| "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark null}, 1970-01-01T00:00:00.001Z]", | ||
| "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:00:00.002Z] at time 2 watermark null}, 1970-01-01T00:00:00.002Z]") | ||
| .consumedAfterRestore( | ||
| "+I[Bob, {Timer timeout1 fired at time 1 watermark 9223372036854775807}, 1970-01-01T00:00:00.001Z]", |
There was a problem hiding this comment.
One thing that was a bit confusing for me is why we have watermark 9223372036854775807 here? Do we emit this max watermark only for testing or do our sources actually emit this when all values have been read? I wondered if that basically makes the user not know if something is late anymore since
There was a problem hiding this comment.
Sources emit this when all values have been read. It will flush all remaining timers and windows and mark the job as complete.
There was a problem hiding this comment.
Makes sense. This is an unrelated topic to the PR so definitely not a blocker: but I guess it makes sense for users to be aware of this so they can write their logic accordingly as well? Maybe it's already documented somewhere and then there's nothing to do 🙂
There was a problem hiding this comment.
And for this PR: maybe add a short comment Sources emit 9223372036854775807 as a MAX watermark after all values have been read. It makes understanding the test a bit easier
There was a problem hiding this comment.
AFAICT this is a property of the testing framework.
I can see that this looks surprising, but I'm not sure if it makes sense to put a comment everywhere the framework is being used.
| public static final TableTestProgram PROCESS_ROW_LATE_EVENTS = | ||
| TableTestProgram.of( | ||
| "process-row-late-events", | ||
| "test that late events enter a row-level PTF") |
There was a problem hiding this comment.
| "test that late events enter a row-level PTF") | |
| "test that late events enter a PTF with row semantics") |
|
One thing worth noting: removing the watermark guard means past-time timers registered from onTimer() fire immediately (within the same advanceWatermark loop). But it also means something like this lead to infinite loops: AI says this is the same behavior as standard Flink KeyedProcessFunction, so the design is consistent - just wanted to flag that PTF users were previously protected from this and they may shoot themselves in the foot more easily now. Maybe worth a short note in the timers section of ptfs.md along the lines of "timers registered at or below the current watermark fire immediately at the next watermark advance; avoid unconditional re-registration of past-time timers from onTimer() as this will cause an infinite loop." |
twalthr
left a comment
There was a problem hiding this comment.
Thanks for the PR @fhueske. I added some minor comments. Could we update the JavaDoc in ProcessTableFunction and the documentation for late events. Late events deserve a dedicated section, before * <h2>Efficiency and Design Principles</h2>.
| // Remove the fired timer's state entry immediately to prevent stale entries from | ||
| // accumulating. Without this, entries for fired timers would persist until the same | ||
| // timer name is re-registered or the state is explicitly cleared. | ||
| if (namedTimersMapState != null && namedTimer != VoidNamespace.INSTANCE) { | ||
| namedTimersMapState.remove((StringData) namedTimer); | ||
| } | ||
| processTableRunner.ingestTimerEvent( | ||
| timer.getKey(), | ||
| namedTimer == VoidNamespace.INSTANCE ? null : (StringData) namedTimer, |
There was a problem hiding this comment.
good catch! some nit comments
| // Remove the fired timer's state entry immediately to prevent stale entries from | |
| // accumulating. Without this, entries for fired timers would persist until the same | |
| // timer name is re-registered or the state is explicitly cleared. | |
| if (namedTimersMapState != null && namedTimer != VoidNamespace.INSTANCE) { | |
| namedTimersMapState.remove((StringData) namedTimer); | |
| } | |
| processTableRunner.ingestTimerEvent( | |
| timer.getKey(), | |
| namedTimer == VoidNamespace.INSTANCE ? null : (StringData) namedTimer, | |
| boolean isNamedTimer = namedTimer != VoidNamespace.INSTANCE; | |
| // Remove the fired timer's state entry immediately to prevent stale entries from | |
| // accumulating. Without this, entries for fired timers would persist until the same | |
| // timer name is re-registered or the state is explicitly cleared. | |
| if (isNamedTimer) { | |
| namedTimersMapState.remove((StringData) namedTimer); | |
| } | |
| processTableRunner.ingestTimerEvent( | |
| timer.getKey(), | |
| isNamedTimer ? (StringData) namedTimer: null, |
| // Register at wm+1 to always target the immediate next watermark: the timer fires | ||
| // exactly once per watermark advance, and each new row re-registers the timer for the | ||
| // following watermark step, demonstrating repeated timer re-registration. | ||
| long timer = wm == null || wm < 0 ? 1 : wm + 1; |
There was a problem hiding this comment.
wouldn't this be more correct and allowing for watermarks before epoch.
| long timer = wm == null || wm < 0 ? 1 : wm + 1; | |
| long timer = wm == null ? Long.MIN_VALUE + 1: wm + 1; |
There was a problem hiding this comment.
This is just a function for testing purposes and we assert against the behavior that we implement.
So I think it doesn't really matter.
Or is there an (edge) case that would be covered with your suggestion that isn't with the current version?
857e77c to
a08f393
Compare
|
Thanks for your reviews @gustavodemorais and @twalthr. |
gustavodemorais
left a comment
There was a problem hiding this comment.
The PR looks good to me! Thanks, @fhueske
- I think you have to run spotless to make the CI green
a08f393 to
57b157d
Compare
| {{< /tab >}} | ||
| {{< /tabs >}} | ||
|
|
||
| ### Handling of Late Records |
There was a problem hiding this comment.
copy this to the Chinese docs as well
| ### Handling of Late Records | ||
|
|
||
| A late record is a record with a time attribute value that is less than or equal to the current | ||
| watermark. PTFs handle late records just like non-late records by calling the `eval()` method. |
There was a problem hiding this comment.
keep docs in sync:
| watermark. PTFs handle late records just like non-late records by calling the `eval()` method. | |
| watermark. PTFs handle late records just like non-late records by calling the `eval()` method. If the {@code on_time} argument is specified, the late timestamp is preserved in | |
| * the output |
Previously, late events (rowtime <= watermark) were silently dropped before reaching PTF eval(), and timer registrations for times <= watermark were also silently dropped. This change removes both restrictions: - ProcessTableRunner: remove the early-return guard in processEval() so that late events are passed to the PTF's eval() method. - WritableInternalTimeContext: remove the watermark check in registerOnTimeInternal() so that timers can be registered for past times. Such timers fire immediately at the next watermark advance, including when registered from within onTimer(). The previous guard also had an unintended side effect: any call to replaceNamedTimer() with a past time would delete the existing timer entry but then silently drop the new registration, leaving the named timer in a state where it appeared un-registered but the old timer was gone. - AbstractProcessTableOperator: remove the fired named timer's state entry before invoking onTimer() to prevent stale entries from accumulating in the named timers map state. Tests are updated to reflect the new semantics: - PROCESS_LATE_EVENTS: demonstrates that late events enter eval(), can register timers (including for past times), and that such timers fire immediately at the next watermark advance. - PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics PTFs. - PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect that timer registrations for past times are no longer dropped. Previously, once the watermark passed the registered time, the timer was silently discarded; now it fires immediately.
57b157d to
1781123
Compare
What is the purpose of the change
Previously, late events (rowtime <= watermark) were silently dropped before reaching PTF eval(), and timer registrations for times <= watermark were also silently dropped. This change removes both restrictions.
These are breaking changes that are part of FLIP-565 which was approved.
I checked that the docs don't need to be adjusted because the earlier behavior of PTFs dropping late data was not documented. The new behavior is aligned with other functions in Flink.
Brief change log
ProcessTableRunner: remove the early-return guard in processEval() so that late events are passed to the PTF's eval() method.
WritableInternalTimeContext: remove the watermark check in registerOnTimeInternal() so that timers can be registered for past times. Such timers fire immediately at the next watermark advance, including when registered from within onTimer(). The previous guard also had an unintended side effect: any call to replaceNamedTimer() with a past time would delete the existing timer entry but then silently drop the new registration, leaving the named timer in a state where it appeared un-registered but the old timer was gone.
AbstractProcessTableOperator: remove the fired named timer's state entry before invoking onTimer() to prevent stale entries from accumulating in the named timers map state.
Verifying this change
Tests are updated to reflect the new semantics:
PROCESS_LATE_EVENTS: demonstrates that late events enter eval(), can register timers (including for past times), and that such timers fire immediately at the next watermark advance.
PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics PTFs.
PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect that timer registrations for past times are no longer dropped. Previously, once the watermark passed the registered time, the timer was silently discarded; now it fires immediately.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation