Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 168 additions & 10 deletions docs/content.zh/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -932,22 +932,37 @@ table.

The timestamp of the firing timer currently being processed within the `onTimer()` method.

#### Table Watermark

`TimeContext#tableWatermark()` returns the event-time watermark of the input table currently being processed.

Watermarks are generated in sources and sent through the topology for advancing the logical clock. The current watermark
of an input table is the minimum watermark of all upstream Flink subtasks producing the table.

In multi-input scenarios, each input table can have its own independent watermark. This method returns the watermark
specific to the input table that is currently being processed in the `eval()` method, rather than the global minimum
watermark across all input tables (which is returned by `currentWatermark()`).

This is particularly useful for late event detection on a per-input basis.

It returns the current watermark of the input table being processed. `null` if called within the `onTimer()` method or a
watermark has not yet been received from all upstream Flink subtasks producing the table.

#### Current Watermark

`TimeContext#currentWatermark()` returns the current event-time watermark.
`TimeContext#currentWatermark()` returns the current event-time watermark at this PTF instance.

Watermarks are generated in sources and sent through the topology for advancing the logical clock in each Flink subtask.
The current watermark of a Flink subtask is the global minimum watermark of all inputs (i.e. across all parallel inputs
and table partitions).
Watermarks are generated in sources and sent through the topology for advancing the logical clock. The current watermark
of a PTF instance is the global minimum watermark of all input tables (i.e., across all upstream Flink subtasks and
table partitions).

This method returns the current watermark of the Flink subtask that evaluates the PTF. Thus, the returned timestamp
represents the entire Flink subtask, independent of the currently processed partition. This behavior is similar to a call
to `SELECT CURRENT_WATERMARK(...)` in SQL.
represents the entire Flink subtask, independent of the currently processed input table and partition. This behavior is
similar to a call to `SELECT CURRENT_WATERMARK(...)` in SQL.

If a watermark was not received from all inputs, the method returns `null`.

In case this method is called within the `onTimer()` method, the returned watermark is the triggering watermark
that currently fires the timer.
It returns the current watermark at the PTF instance across all upstream Flink subtasks and table partitions. A `null`
value is returned if no minimum logical time could be calculated across all inputs; this happens during startup
or recovery when one or more active (i.e. not idle) inputs haven't sent a watermark yet.

### Timers

Expand Down Expand Up @@ -1015,6 +1030,149 @@ not needed anymore via `Context#clearAllTimers()` or `TimeContext#clearTimer(Str

{{< top >}}

Ordering
--------

A PTF that takes a table with set semantics can optionally specify an ORDER BY clause in the
function call to define the order in which rows are processed within each partition. The ORDER BY
clause guarantees that rows are delivered to the eval() method in the specified order.

The ORDER BY clause requires that the first column is a time attribute column (i.e., a
TIMESTAMP or TIMESTAMP_LTZ column with a watermark declaration). The first ORDER BY column must
be specified in ascending order. This ensures that rows are processed in event-time order.
Additional columns can be specified as secondary sort keys to define the ordering of
rows with the same timestamp.

{{< tabs "2137eeed-order-by-syntax" >}}
{{< tab "SQL" >}}
```sql
SELECT * FROM my_ptf(
input_table => TABLE source_table
PARTITION BY user_id
ORDER BY (event_time ASC, priority DESC NULLS FIRST)
)
```
{{< /tab >}}
{{< tab "Java" >}}
```java
env.from("source_table")
.partitionBy($("user_id"))
.orderBy($("event_time").asc(), $("priority").desc())
.process(MyPTF.class, descriptor("event_time").asArgument("on_time"))
```
{{< /tab >}}
{{< /tabs >}}

### Difference Between ORDER BY and on_time Argument

While both ORDER BY and the `on_time` argument relate to time attributes, they serve
different purposes:

- **on_time**: Declares which time attribute column powers the time context (`TimeContext#time()`) and
output timestamp. It does NOT affect the processing order of rows.
- **ORDER BY**: Physically buffers and sorts rows within each partition to guarantee ordered delivery
to the eval() method. If both ORDER BY and `on_time` are specified for the same table argument, they
must reference the same time attribute column.

### Ordering Guarantees and Late Events

When ORDER BY is specified on a time attribute column, the framework maintains a sort buffer
per partition and input table to reorder out-of-order events. The sort buffer is flushed when the
watermark for the given input table advances, at which point all buffered rows with timestamps
less than or equal to the watermark are delivered to the eval() method in sorted order. Late
events (arriving after the watermark) are dropped to maintain the ordering guarantee.

The following example demonstrates ordered processing with secondary sorting. First, the function implementation:

```java
// Function that processes events in order and captures the ordering
public static class OrderedProcessor extends ProcessTableFunction<List<Event>> {
public record Event(Integer score, Instant ts) {}

public static class BufferState {
// Stores all input events that enter the eval() after sorting
public List<Event> events = new ArrayList<>();
}

public void eval(
Context ctx,
@StateHint BufferState state,
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
) {
// Optional: Access ordering information at runtime
TableSemantics semantics = ctx.tableSemanticsFor("input");
int[] orderColumns = semantics.orderByColumns();
SortDirection[] directions = semantics.orderByDirections();

// Buffer the incoming row
state.events.add(new Event(input.getFieldAs("score"), input.getFieldAs("ts")));

// Emit current buffer state
collect(state.events);
}
}
```

The function can be called using either SQL or Table API:

{{< tabs "2237eeed-order-by-example" >}}
{{< tab "SQL" >}}
```sql
-- Create a watermarked table
CREATE TABLE Events (
name STRING,
score INT,
ts TIMESTAMP_LTZ(3),
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH (
'connector' = 'datagen'
);

-- Register the function
CREATE FUNCTION OrderedProcessor AS 'org.example.OrderedProcessor';

-- Use ORDER BY with primary and secondary sort columns
SELECT * FROM OrderedProcessor(
input => TABLE Events
PARTITION BY name
ORDER BY (ts ASC, score DESC)
);
```
{{< /tab >}}
{{< tab "Java" >}}
```java
TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

// Create a watermarked table
env.executeSql(
"CREATE TABLE Events (" +
"name STRING, " +
"score INT, " +
"ts TIMESTAMP_LTZ(3), " +
"WATERMARK FOR ts AS ts - INTERVAL '1' SECOND" +
") WITH ('connector' = 'datagen')"
);

// Use orderBy() with primary and secondary sort columns
env.from("Events")
.partitionBy($("name"))
.orderBy($("ts").asc(), $("score").desc())
.process(OrderedProcessor.class)
.execute()
.print();
```
{{< /tab >}}
{{< /tabs >}}

In this example:
- Events are first sorted by `ts` (ascending), ensuring event-time order
- Events with the same timestamp are then sorted by `score`
- Late events (with timestamp less than the watermark) are automatically dropped
- The `TableSemantics` API provides runtime access to the ordering configuration
- The output is an ever-growing list of sorted input events

{{< top >}}

Multiple Tables
---------------

Expand Down
Loading