Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand Down Expand Up @@ -163,7 +163,6 @@ public static void main(String[] args) throws Exception {
// partition on the address to make sure equal addresses
// end up in the same state machine flatMap function
.keyBy(Event::sourceAddress)
.enableAsyncState()
// the function that evaluates the state machine over the sequence of events
.flatMap(new StateMachineMapper());

Expand Down Expand Up @@ -214,32 +213,25 @@ public void open(OpenContext openContext) {
@Override
public void flatMap(Event evt, Collector<Alert> out) throws Exception {
// get the current state for the key (source address)
currentState
.asyncValue()
.thenAccept(
state -> {
// if no state exists, yet, the state must be the state machine's
// initial state
if (state == null) {
state = State.Initial;
}

// ask the state machine what state we should go to based on the
// given event

State nextState = state.transition(evt.type());
if (nextState == State.InvalidTransition) {
// the current event resulted in an invalid transition
// raise an alert!
out.collect(new Alert(evt.sourceAddress(), state, evt.type()));
} else if (nextState.isTerminal()) {
// we reached a terminal state, clean up the current state
currentState.asyncClear();
} else {
// remember the new state
currentState.asyncUpdate(nextState);
}
});
State state = currentState.value();

// if no state exists, yet, the state must be the state machine's initial state
if (state == null) {
state = State.Initial;
}

// ask the state machine what state we should go to based on the given event
State nextState = state.transition(evt.type());
if (nextState == State.InvalidTransition) {
// the current event resulted in an invalid transition — raise an alert!
out.collect(new Alert(evt.sourceAddress(), state, evt.type()));
} else if (nextState.isTerminal()) {
// we reached a terminal state, clean up the current state
currentState.clear();
} else {
// remember the new state
currentState.update(nextState);
}
}
}

Expand Down