Skip to content

[BP-2.3][FLINK-39436][FLINK-39437][table] Backport of two PTF fixes#27998

Merged
fhueske merged 2 commits intoapache:release-2.3from
fhueske:fhueske-backport-2.3-FLINK-39436-FLINK-39437
Apr 22, 2026
Merged

[BP-2.3][FLINK-39436][FLINK-39437][table] Backport of two PTF fixes#27998
fhueske merged 2 commits intoapache:release-2.3from
fhueske:fhueske-backport-2.3-FLINK-39436-FLINK-39437

Conversation

@fhueske
Copy link
Copy Markdown
Contributor

@fhueske fhueske commented Apr 22, 2026

Backport of [FLINK-39436] (PR #27935) and [FLINK-39437] (PR #27962) to release-2.3.

These changes fix limitation of the PTF framework.

fhueske added 2 commits April 22, 2026 16:05
Previously, late events (rowtime <= watermark) were silently dropped
before reaching PTF eval(), and timer registrations for times <=
watermark were also silently dropped. This change removes both
restrictions:

- ProcessTableRunner: remove the early-return guard in processEval()
  so that late events are passed to the PTF's eval() method.

- WritableInternalTimeContext: remove the watermark check in
  registerOnTimeInternal() so that timers can be registered for past
  times. Such timers fire immediately at the next watermark advance,
  including when registered from within onTimer(). The previous guard
  also had an unintended side effect: any call to replaceNamedTimer()
  with a past time would delete the existing timer entry but then
  silently drop the new registration, leaving the named timer in a
  state where it appeared un-registered but the old timer was gone.

- AbstractProcessTableOperator: remove the fired named timer's state
  entry before invoking onTimer() to prevent stale entries from
  accumulating in the named timers map state.

Tests are updated to reflect the new semantics:

- PROCESS_LATE_EVENTS: demonstrates that late events enter eval(),
  can register timers (including for past times), and that such timers
  fire immediately at the next watermark advance.

- PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics
  PTFs.

- PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect
  that timer registrations for past times are no longer dropped.
  Previously, once the watermark passed the registered time, the timer
  was silently discarded; now it fires immediately.
Override `useInterruptibleTimers()` in `AbstractProcessTableOperator` to
return `true`, activating the `MailboxWatermarkProcessor` for PTF operators.
This allows timer firing to be interrupted between mailbox iterations,
improving throughput by not blocking mailbox processing during large timer
batches.

Also removes `currentWatermark` from `ProcessTableRunner` and redirect reads to 
`AbstractStreamOperatorV2.combinedWatermark`.

Add a test (`ProcessSetTableOperatorInterruptibleTimersTest`) to assert correct behavior
of PTF framework when timer processing is interrupted.
@fhueske fhueske marked this pull request as ready for review April 22, 2026 14:12
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 22, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@fhueske fhueske merged commit 62b37ed into apache:release-2.3 Apr 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants