From 41a559cd205cf48d20649b1776714ba8259a3108 Mon Sep 17 00:00:00 2001 From: Purushottam Sinha Date: Wed, 15 Apr 2026 22:54:00 +0530 Subject: [PATCH] [FLINK-39061][example] Roll back StateMachineExample to v1 State API StateMachineExample was using the State V2 async API (enableAsyncState(), asyncValue(), asyncUpdate(), asyncClear()), which requires the ForSt backend. ForSt is still in development and does not support CANONICAL savepoints, causing an IllegalStateException when one is triggered. Roll back to the v1 synchronous State API, which works with all GA backends and supports all savepoint formats. --- .../statemachine/StateMachineExample.java | 50 ++++++++----------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java index a36efae9b552e..265760dc38d9d 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; -import org.apache.flink.api.common.state.v2.ValueState; -import org.apache.flink.api.common.state.v2.ValueStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -163,7 +163,6 @@ public static void main(String[] args) throws Exception { // partition on the address to make sure equal addresses // end up in the same state machine flatMap function .keyBy(Event::sourceAddress) - .enableAsyncState() // the function that evaluates the state machine over the sequence of events .flatMap(new StateMachineMapper()); @@ -214,32 +213,25 @@ public void open(OpenContext openContext) { @Override public void flatMap(Event evt, Collector out) throws Exception { // get the current state for the key (source address) - currentState - .asyncValue() - .thenAccept( - state -> { - // if no state exists, yet, the state must be the state machine's - // initial state - if (state == null) { - state = State.Initial; - } - - // ask the state machine what state we should go to based on the - // given event - - State nextState = state.transition(evt.type()); - if (nextState == State.InvalidTransition) { - // the current event resulted in an invalid transition - // raise an alert! - out.collect(new Alert(evt.sourceAddress(), state, evt.type())); - } else if (nextState.isTerminal()) { - // we reached a terminal state, clean up the current state - currentState.asyncClear(); - } else { - // remember the new state - currentState.asyncUpdate(nextState); - } - }); + State state = currentState.value(); + + // if no state exists, yet, the state must be the state machine's initial state + if (state == null) { + state = State.Initial; + } + + // ask the state machine what state we should go to based on the given event + State nextState = state.transition(evt.type()); + if (nextState == State.InvalidTransition) { + // the current event resulted in an invalid transition — raise an alert! + out.collect(new Alert(evt.sourceAddress(), state, evt.type())); + } else if (nextState.isTerminal()) { + // we reached a terminal state, clean up the current state + currentState.clear(); + } else { + // remember the new state + currentState.update(nextState); + } } }