From af28f00ea8cbd99510721295025ea4c6b3175aa6 Mon Sep 17 00:00:00 2001 From: ugurtafrali Date: Fri, 17 Apr 2026 12:16:47 +0300 Subject: [PATCH] [flink] Handle coordinator disconnect in TieringSourceEnumerator (#3106) --- .../enumerator/TieringSourceEnumerator.java | 13 ++++++++++++- .../TieringSourceEnumeratorTest.java | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index e81362fa84..3e7387311b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -23,6 +23,8 @@ import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.DisconnectException; +import org.apache.fluss.exception.NetworkException; import org.apache.fluss.flink.metrics.FlinkMetricRegistry; import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; @@ -42,6 +44,7 @@ import org.apache.fluss.rpc.messages.PbLakeTieringStats; import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo; import org.apache.fluss.rpc.metrics.ClientMetricGroup; +import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.MapUtils; import org.apache.flink.api.connector.source.ReaderInfo; @@ -352,9 +355,17 @@ protected void handleTableTieringReachMaxDuration( } } - private void generateAndAssignSplits( + @VisibleForTesting + void generateAndAssignSplits( @Nullable Tuple3 tieringTable, Throwable throwable) { if (throwable != null) { + if (ExceptionUtils.findThrowable(throwable, NetworkException.class).isPresent() + || ExceptionUtils.findThrowable(throwable, DisconnectException.class) + .isPresent()) { + throw new FlinkRuntimeException( + "Coordinator disconnected during heartbeat, triggering failover", + throwable); + } LOG.warn("Failed to request tiering table, will retry later.", throwable); } if (tieringTable != null) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index b725f99e7e..5b18b3b3d7 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -19,6 +19,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.NetworkException; import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; @@ -37,6 +38,7 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.util.FlinkRuntimeException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -55,6 +57,7 @@ import static org.apache.fluss.config.ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit tests for {@link TieringSourceEnumerator} and {@link TieringSplitGenerator}. */ class TieringSourceEnumeratorTest extends TieringTestBase { @@ -730,6 +733,21 @@ private TieringSourceEnumerator createTieringSourceEnumerator( return new TieringSourceEnumerator(flussConf, context, 500); } + @Test + void testNetworkErrorInHeartbeatTriggersFailover() throws Exception { + try (FlussMockSplitEnumeratorContext context = + new FlussMockSplitEnumeratorContext<>(1)) { + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); + FlinkRuntimeException networkError = + new FlinkRuntimeException( + "Failed to wait heartbeat response due to ", + new NetworkException("coordinator disconnected")); + assertThatThrownBy(() -> enumerator.generateAndAssignSplits(null, networkError)) + .isInstanceOf(FlinkRuntimeException.class) + .hasMessageContaining("failover"); + } + } + @Test void testTableReachMaxTieringDuration() throws Throwable { TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-max-duration-test-log-table");