Handle coordinator disconnect in TieringSourceEnumerator#3107
Handle coordinator disconnect in TieringSourceEnumerator#3107utafrali wants to merge 1 commit intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates Flink’s tiering source enumerator to treat coordinator disconnect-style heartbeat failures as fatal, so the job fails over and can refresh coordinator metadata after restart (addresses #3106).
Changes:
- Detect
NetworkException/DisconnectExceptioninTieringSourceEnumerator.generateAndAssignSplits()and throw aFlinkRuntimeExceptionto trigger failover instead of log-and-retry. - Expose
generateAndAssignSplits()for testing (@VisibleForTesting) and add a unit test covering the network-error failover path.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java | Convert heartbeat disconnect/network failures into a fatal exception to force failover. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java | Add a regression test asserting failover behavior on heartbeat network failures. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @Test | ||
| void testNetworkErrorInHeartbeatTriggersFailover() throws Exception { | ||
| try (FlussMockSplitEnumeratorContext<TieringSplit> context = | ||
| new FlussMockSplitEnumeratorContext<>(1)) { | ||
| TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); | ||
| FlinkRuntimeException networkError = | ||
| new FlinkRuntimeException( | ||
| "Failed to wait heartbeat response due to ", | ||
| new NetworkException("coordinator disconnected")); | ||
| assertThatThrownBy(() -> enumerator.generateAndAssignSplits(null, networkError)) | ||
| .isInstanceOf(FlinkRuntimeException.class) | ||
| .hasMessageContaining("failover"); | ||
| } |
There was a problem hiding this comment.
The production failure mode described in the PR involves DisconnectException as well as NetworkException, but this test only covers the NetworkException path. Please add a dedicated test for the DisconnectException case (ideally also covering the nested NetworkException -> DisconnectException chain) to ensure the new failover behavior is fully regression-tested.
Purpose
Closes #3106
Brief change log
Coordinator IP changes can leave stale heartbeat connections that just log errors and retry instead of triggering failover. Added exception checks for
NetworkExceptionandDisconnectException- when we hit one during heartbeat, we throw aFlinkRuntimeExceptionto force proper failover. Other error types keep the existing log and retry behavior.Tests
Added test cases to verify failover behavior when network or disconnect exceptions occur.
API and Format
N/A
Documentation
N/A