Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.DisconnectException;
import org.apache.fluss.exception.NetworkException;
import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
Expand All @@ -42,6 +44,7 @@
import org.apache.fluss.rpc.messages.PbLakeTieringStats;
import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.MapUtils;

import org.apache.flink.api.connector.source.ReaderInfo;
Expand Down Expand Up @@ -352,9 +355,17 @@ protected void handleTableTieringReachMaxDuration(
}
}

private void generateAndAssignSplits(
@VisibleForTesting
void generateAndAssignSplits(
@Nullable Tuple3<Long, Long, TablePath> tieringTable, Throwable throwable) {
if (throwable != null) {
if (ExceptionUtils.findThrowable(throwable, NetworkException.class).isPresent()
|| ExceptionUtils.findThrowable(throwable, DisconnectException.class)
.isPresent()) {
throw new FlinkRuntimeException(
"Coordinator disconnected during heartbeat, triggering failover",
throwable);
}
LOG.warn("Failed to request tiering table, will retry later.", throwable);
}
if (tieringTable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.NetworkException;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
Expand All @@ -37,6 +38,7 @@
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -55,6 +57,7 @@
import static org.apache.fluss.config.ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Unit tests for {@link TieringSourceEnumerator} and {@link TieringSplitGenerator}. */
class TieringSourceEnumeratorTest extends TieringTestBase {
Expand Down Expand Up @@ -730,6 +733,21 @@ private TieringSourceEnumerator createTieringSourceEnumerator(
return new TieringSourceEnumerator(flussConf, context, 500);
}

@Test
void testNetworkErrorInHeartbeatTriggersFailover() throws Exception {
try (FlussMockSplitEnumeratorContext<TieringSplit> context =
new FlussMockSplitEnumeratorContext<>(1)) {
TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context);
FlinkRuntimeException networkError =
new FlinkRuntimeException(
"Failed to wait heartbeat response due to ",
new NetworkException("coordinator disconnected"));
assertThatThrownBy(() -> enumerator.generateAndAssignSplits(null, networkError))
.isInstanceOf(FlinkRuntimeException.class)
.hasMessageContaining("failover");
}
Comment on lines +736 to +748
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The production failure mode described in the PR involves DisconnectException as well as NetworkException, but this test only covers the NetworkException path. Please add a dedicated test for the DisconnectException case (ideally also covering the nested NetworkException -> DisconnectException chain) to ensure the new failover behavior is fully regression-tested.

Copilot uses AI. Check for mistakes.
}

@Test
void testTableReachMaxTieringDuration() throws Throwable {
TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-max-duration-test-log-table");
Expand Down
Loading