From 0f38996fb58d6cd3ddea280c05fc943ead156f6d Mon Sep 17 00:00:00 2001
From: yunhong <337361684@qq.com>
Date: Tue, 7 Apr 2026 09:55:53 +0800
Subject: [PATCH] [client] Implement adaptive fetch rate control for LogScanner
---
.../log/BucketFetchRateController.java | 154 ++++++++++++++
.../client/table/scanner/log/LogFetcher.java | 39 +++-
.../log/BucketFetchRateControllerTest.java | 192 ++++++++++++++++++
.../apache/fluss/config/ConfigOptions.java | 23 +++
4 files changed, 406 insertions(+), 2 deletions(-)
create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateController.java
create mode 100644 fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateControllerTest.java
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateController.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateController.java
new file mode 100644
index 0000000000..dfa641b592
--- /dev/null
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateController.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TableBucket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Controls the fetch rate for individual buckets based on the amount of data returned in recent
+ * fetches.
+ *
+ *
For buckets that consistently return data, they are fetched every round. For buckets that
+ * return no data for consecutive fetches, the fetch frequency is progressively reduced using
+ * exponential backoff. This is particularly useful for partitioned tables where only the latest
+ * partitions have active data, avoiding wasted CPU and network resources on empty partitions.
+ *
+ *
The backoff schedule is: after 1 empty fetch, skip 1 round; after 2 empty fetches, skip 2
+ * rounds; after 3, skip 4 rounds; and so on (powers of 2), up to {@code maxSkipRounds}. Any fetch
+ * that returns data immediately resets the backoff to zero.
+ *
+ *
This class is NOT thread-safe. Callers must ensure proper synchronization.
+ */
+@Internal
+class BucketFetchRateController {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BucketFetchRateController.class);
+
+ /** Maximum exponent for the exponential backoff (2^5 = 32). */
+ private static final int MAX_BACKOFF_SHIFT = 5;
+
+ private final int maxSkipRounds;
+ private final Map bucketStates;
+
+ BucketFetchRateController(int maxSkipRounds) {
+ this.maxSkipRounds = maxSkipRounds;
+ this.bucketStates = new HashMap<>();
+ }
+
+ /**
+ * Determines whether the given bucket should be included in the current fetch round.
+ *
+ * If the bucket is in a cool down period (i.e., it has been returning empty results), this
+ * method decrements the remaining skip count and returns {@code false}. Otherwise, it returns
+ * {@code true} indicating the bucket should be fetched.
+ *
+ * @param tableBucket the bucket to check
+ * @return {@code true} if the bucket should be fetched in this round
+ */
+ boolean shouldFetch(TableBucket tableBucket) {
+ BucketFetchState state = bucketStates.get(tableBucket);
+ if (state == null) {
+ return true;
+ }
+ if (state.remainingSkipRounds > 0) {
+ state.remainingSkipRounds--;
+ LOG.trace(
+ "Adaptive fetch: skipping bucket {} (consecutive empty fetches: {}, "
+ + "remaining skip rounds: {})",
+ tableBucket,
+ state.consecutiveEmptyFetches,
+ state.remainingSkipRounds);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Records the result of a fetch for the given bucket, adjusting future fetch frequency.
+ *
+ *
If the fetch returned data, the backoff is immediately reset to zero. If the fetch
+ * returned no data, the consecutive empty count is incremented and a new skip interval is
+ * calculated using exponential backoff.
+ *
+ * @param tableBucket the bucket that was fetched
+ * @param hasRecords {@code true} if the fetch returned any records
+ */
+ void recordFetchResult(TableBucket tableBucket, boolean hasRecords) {
+ BucketFetchState state =
+ bucketStates.computeIfAbsent(tableBucket, k -> new BucketFetchState());
+ if (hasRecords) {
+ if (state.consecutiveEmptyFetches > 0) {
+ LOG.info(
+ "Adaptive fetch: bucket {} recovered from cool down "
+ + "(was throttled after {} consecutive empty fetches)",
+ tableBucket,
+ state.consecutiveEmptyFetches);
+ }
+ state.consecutiveEmptyFetches = 0;
+ state.remainingSkipRounds = 0;
+ } else {
+ state.consecutiveEmptyFetches++;
+ int shift = Math.min(state.consecutiveEmptyFetches - 1, MAX_BACKOFF_SHIFT);
+ state.remainingSkipRounds = Math.min(1 << shift, maxSkipRounds);
+ LOG.info(
+ "Adaptive fetch: bucket {} has no data, entering cool down "
+ + "(consecutive empty fetches: {}, will skip {} rounds)",
+ tableBucket,
+ state.consecutiveEmptyFetches,
+ state.remainingSkipRounds);
+ }
+ }
+
+ /** Removes the tracking state for the given bucket. */
+ void removeBucket(TableBucket tableBucket) {
+ bucketStates.remove(tableBucket);
+ }
+
+ /** Resets all tracking state. */
+ void reset() {
+ bucketStates.clear();
+ }
+
+ /** Returns the current number of remaining skip rounds for the given bucket, for testing. */
+ @VisibleForTesting
+ int getRemainingSkipRounds(TableBucket tableBucket) {
+ BucketFetchState state = bucketStates.get(tableBucket);
+ return state == null ? 0 : state.remainingSkipRounds;
+ }
+
+ /** Returns the number of consecutive empty fetches for the given bucket, for testing. */
+ @VisibleForTesting
+ int getConsecutiveEmptyFetches(TableBucket tableBucket) {
+ BucketFetchState state = bucketStates.get(tableBucket);
+ return state == null ? 0 : state.consecutiveEmptyFetches;
+ }
+
+ /** Per-bucket fetch state tracking consecutive empty fetches and cool down. */
+ private static class BucketFetchState {
+ int consecutiveEmptyFetches = 0;
+ int remainingSkipRounds = 0;
+ }
+}
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index 14a2a3d313..ff6d91632d 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -102,6 +102,7 @@ public class LogFetcher implements Closeable {
private final LogFetchBuffer logFetchBuffer;
private final LogFetchCollector logFetchCollector;
private final RemoteLogDownloader remoteLogDownloader;
+ @Nullable private final BucketFetchRateController fetchRateController;
@GuardedBy("this")
private final Set nodesWithPendingFetchRequests;
@@ -150,6 +151,15 @@ public LogFetcher(
this.remoteLogDownloader =
new RemoteLogDownloader(tablePath, conf, remoteFileDownloader, scannerMetricGroup);
remoteLogDownloader.start();
+ if (conf.getBoolean(ConfigOptions.CLIENT_SCANNER_LOG_ADAPTIVE_FETCH_ENABLED)) {
+ this.fetchRateController =
+ new BucketFetchRateController(
+ conf.getInt(
+ ConfigOptions
+ .CLIENT_SCANNER_LOG_ADAPTIVE_FETCH_MAX_SKIP_ROUNDS));
+ } else {
+ this.fetchRateController = null;
+ }
}
/**
@@ -383,15 +393,17 @@ private synchronized void handleFetchLogResponse(
+ "unsubscribed.",
tb);
} else {
+ boolean hasData;
if (fetchResultForBucket.fetchFromRemote()) {
+ hasData = true;
pendRemoteFetches(
fetchResultForBucket.remoteLogFetchInfo(),
fetchOffset,
fetchResultForBucket.getHighWatermark());
} else {
LogRecords logRecords = fetchResultForBucket.recordsOrEmpty();
- boolean hasRecords = !MemoryLogRecords.EMPTY.equals(logRecords);
- if (hasRecords) {
+ hasData = !MemoryLogRecords.EMPTY.equals(logRecords);
+ if (hasData) {
// Retain the parsed buffer so it stays alive while
// this CompletedFetch's records are being consumed.
if (parsedByteBuf != null) {
@@ -422,6 +434,11 @@ private synchronized void handleFetchLogResponse(
null));
}
}
+ // Track adaptive fetch rate for successful fetches
+ if (fetchRateController != null
+ && fetchResultForBucket.getErrorCode() == Errors.NONE.code()) {
+ fetchRateController.recordFetchResult(tb, hasData);
+ }
}
}
}
@@ -494,11 +511,17 @@ private void pendRemoteFetches(
Map prepareFetchLogRequests() {
Map> fetchLogReqForBuckets = new HashMap<>();
int readyForFetchCount = 0;
+ int skippedByAdaptiveFetch = 0;
Long tableId = null;
for (TableBucket tb : fetchableBuckets()) {
if (tableId == null) {
tableId = tb.getTableId();
}
+ // Adaptive fetch: skip buckets in cool down period
+ if (fetchRateController != null && !fetchRateController.shouldFetch(tb)) {
+ skippedByAdaptiveFetch++;
+ continue;
+ }
Long offset = logScannerStatus.getBucketOffset(tb);
if (offset == null) {
LOG.debug(
@@ -538,8 +561,20 @@ Map prepareFetchLogRequests() {
}
if (readyForFetchCount == 0) {
+ if (skippedByAdaptiveFetch > 0) {
+ LOG.info(
+ "No fetch requests prepared, {} buckets skipped by adaptive fetch",
+ skippedByAdaptiveFetch);
+ }
return Collections.emptyMap();
} else {
+ if (skippedByAdaptiveFetch > 0) {
+ LOG.info(
+ "Preparing fetch requests for {} buckets, "
+ + "{} buckets skipped by adaptive fetch",
+ readyForFetchCount,
+ skippedByAdaptiveFetch);
+ }
Map fetchLogRequests = new HashMap<>();
long finalTableId = tableId;
fetchLogReqForBuckets.forEach(
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateControllerTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateControllerTest.java
new file mode 100644
index 0000000000..8a445f2f39
--- /dev/null
+++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateControllerTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.metadata.TableBucket;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BucketFetchRateController}. */
+class BucketFetchRateControllerTest {
+
+ private static final int MAX_SKIP_ROUNDS = 32;
+ private BucketFetchRateController controller;
+
+ @BeforeEach
+ void setup() {
+ controller = new BucketFetchRateController(MAX_SKIP_ROUNDS);
+ }
+
+ @Test
+ void testNewBucketShouldAlwaysFetch() {
+ TableBucket tb = new TableBucket(1L, 0L, 0);
+ assertThat(controller.shouldFetch(tb)).isTrue();
+ }
+
+ @Test
+ void testBucketWithDataResetsCoolDown() {
+ TableBucket tb = new TableBucket(1L, 0L, 0);
+
+ // Record multiple empty fetches to build up cool down
+ controller.recordFetchResult(tb, false);
+ controller.recordFetchResult(tb, false);
+ controller.recordFetchResult(tb, false);
+ assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(3);
+ assertThat(controller.getRemainingSkipRounds(tb)).isGreaterThan(0);
+
+ // Now record a fetch with data
+ controller.recordFetchResult(tb, true);
+ assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(0);
+ assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(0);
+ assertThat(controller.shouldFetch(tb)).isTrue();
+ }
+
+ @Test
+ void testExponentialBackoff() {
+ TableBucket tb = new TableBucket(1L, 0L, 0);
+
+ // 1st empty fetch: skip 1 round (2^0 = 1)
+ controller.recordFetchResult(tb, false);
+ assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(1);
+
+ // Consume the skip round
+ assertThat(controller.shouldFetch(tb)).isFalse();
+ assertThat(controller.shouldFetch(tb)).isTrue();
+
+ // 2nd empty fetch: skip 2 rounds (2^1 = 2)
+ controller.recordFetchResult(tb, false);
+ assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(2);
+
+ // Consume the 2 skip rounds
+ assertThat(controller.shouldFetch(tb)).isFalse();
+ assertThat(controller.shouldFetch(tb)).isFalse();
+ assertThat(controller.shouldFetch(tb)).isTrue();
+
+ // 3rd empty fetch: skip 4 rounds (2^2 = 4)
+ controller.recordFetchResult(tb, false);
+ assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(4);
+
+ for (int i = 0; i < 4; i++) {
+ assertThat(controller.shouldFetch(tb)).isFalse();
+ }
+ assertThat(controller.shouldFetch(tb)).isTrue();
+ }
+
+ @Test
+ void testMaxSkipRoundsCap() {
+ TableBucket tb = new TableBucket(1L, 0L, 0);
+
+ // Record many empty fetches to exceed the max backoff shift
+ for (int i = 0; i < 20; i++) {
+ controller.recordFetchResult(tb, false);
+ // Consume all skip rounds
+ while (!controller.shouldFetch(tb)) {
+ // draining
+ }
+ }
+
+ // After 20 empty fetches, skip rounds should be capped at MAX_SKIP_ROUNDS
+ controller.recordFetchResult(tb, false);
+ assertThat(controller.getRemainingSkipRounds(tb)).isLessThanOrEqualTo(MAX_SKIP_ROUNDS);
+ }
+
+ @Test
+ void testMultipleBucketsIndependent() {
+ TableBucket tb1 = new TableBucket(1L, 0L, 0);
+ TableBucket tb2 = new TableBucket(1L, 1L, 0);
+
+ // tb1 gets empty fetches, tb2 gets data
+ controller.recordFetchResult(tb1, false);
+ controller.recordFetchResult(tb1, false);
+ controller.recordFetchResult(tb2, true);
+
+ // tb1 should be in cool down
+ assertThat(controller.getConsecutiveEmptyFetches(tb1)).isEqualTo(2);
+ assertThat(controller.getRemainingSkipRounds(tb1)).isGreaterThan(0);
+
+ // tb2 should be fetch able immediately
+ assertThat(controller.getConsecutiveEmptyFetches(tb2)).isEqualTo(0);
+ assertThat(controller.shouldFetch(tb2)).isTrue();
+ }
+
+ @Test
+ void testRemoveBucket() {
+ TableBucket tb = new TableBucket(1L, 0L, 0);
+
+ controller.recordFetchResult(tb, false);
+ assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(1);
+
+ controller.removeBucket(tb);
+ assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(0);
+ assertThat(controller.shouldFetch(tb)).isTrue();
+ }
+
+ @Test
+ void testReset() {
+ TableBucket tb1 = new TableBucket(1L, 0L, 0);
+ TableBucket tb2 = new TableBucket(1L, 1L, 0);
+
+ controller.recordFetchResult(tb1, false);
+ controller.recordFetchResult(tb2, false);
+
+ controller.reset();
+
+ assertThat(controller.getConsecutiveEmptyFetches(tb1)).isEqualTo(0);
+ assertThat(controller.getConsecutiveEmptyFetches(tb2)).isEqualTo(0);
+ assertThat(controller.shouldFetch(tb1)).isTrue();
+ assertThat(controller.shouldFetch(tb2)).isTrue();
+ }
+
+ @Test
+ void testShouldFetchDecrementsCounter() {
+ TableBucket tb = new TableBucket(1L, 0L, 0);
+
+ // 1 empty fetch -> 1 skip round
+ controller.recordFetchResult(tb, false);
+ assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(1);
+
+ // shouldFetch returns false and decrements to 0
+ assertThat(controller.shouldFetch(tb)).isFalse();
+ assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(0);
+
+ // Now it should be fetch able
+ assertThat(controller.shouldFetch(tb)).isTrue();
+ }
+
+ @Test
+ void testSmallMaxSkipRounds() {
+ BucketFetchRateController smallController = new BucketFetchRateController(4);
+ TableBucket tb = new TableBucket(1L, 0L, 0);
+
+ // Record many empty fetches
+ for (int i = 0; i < 10; i++) {
+ smallController.recordFetchResult(tb, false);
+ // Drain skip rounds
+ while (!smallController.shouldFetch(tb)) {
+ // draining
+ }
+ }
+
+ // Even after many empty fetches, skip rounds should be capped at 4
+ smallController.recordFetchResult(tb, false);
+ assertThat(smallController.getRemainingSkipRounds(tb)).isLessThanOrEqualTo(4);
+ }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 8da2246b0a..41ca891ffe 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1206,6 +1206,29 @@ public class ConfigOptions {
+ CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME.key()
+ " time to return.");
+ public static final ConfigOption CLIENT_SCANNER_LOG_ADAPTIVE_FETCH_ENABLED =
+ key("client.scanner.log.adaptive-fetch.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to enable adaptive fetch rate control for LogScanner. "
+ + "When enabled, the scanner dynamically adjusts the fetch frequency "
+ + "per bucket based on the amount of data returned. Buckets that "
+ + "consistently return no data are fetched less frequently, reducing "
+ + "CPU and network overhead. This is particularly useful for partitioned "
+ + "tables where only the latest partitions have active data.");
+
+ public static final ConfigOption CLIENT_SCANNER_LOG_ADAPTIVE_FETCH_MAX_SKIP_ROUNDS =
+ key("client.scanner.log.adaptive-fetch.max-skip-rounds")
+ .intType()
+ .defaultValue(32)
+ .withDescription(
+ "The maximum number of fetch rounds to skip for a bucket that returns "
+ + "no data when adaptive fetch is enabled. Higher values save more "
+ + "resources for inactive buckets but increase the latency to detect "
+ + "new data arrival. The backoff increases exponentially: 1, 2, 4, 8, "
+ + "..., up to this limit.");
+
public static final ConfigOption CLIENT_LOOKUP_QUEUE_SIZE =
key("client.lookup.queue-size")
.intType()