diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateController.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateController.java new file mode 100644 index 0000000000..dfa641b592 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateController.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.metadata.TableBucket; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Controls the fetch rate for individual buckets based on the amount of data returned in recent + * fetches. + * + *

For buckets that consistently return data, they are fetched every round. For buckets that + * return no data for consecutive fetches, the fetch frequency is progressively reduced using + * exponential backoff. This is particularly useful for partitioned tables where only the latest + * partitions have active data, avoiding wasted CPU and network resources on empty partitions. + * + *

The backoff schedule is: after 1 empty fetch, skip 1 round; after 2 empty fetches, skip 2 + * rounds; after 3, skip 4 rounds; and so on (powers of 2), up to {@code maxSkipRounds}. Any fetch + * that returns data immediately resets the backoff to zero. + * + *

This class is NOT thread-safe. Callers must ensure proper synchronization. + */ +@Internal +class BucketFetchRateController { + + private static final Logger LOG = LoggerFactory.getLogger(BucketFetchRateController.class); + + /** Maximum exponent for the exponential backoff (2^5 = 32). */ + private static final int MAX_BACKOFF_SHIFT = 5; + + private final int maxSkipRounds; + private final Map bucketStates; + + BucketFetchRateController(int maxSkipRounds) { + this.maxSkipRounds = maxSkipRounds; + this.bucketStates = new HashMap<>(); + } + + /** + * Determines whether the given bucket should be included in the current fetch round. + * + *

If the bucket is in a cool down period (i.e., it has been returning empty results), this + * method decrements the remaining skip count and returns {@code false}. Otherwise, it returns + * {@code true} indicating the bucket should be fetched. + * + * @param tableBucket the bucket to check + * @return {@code true} if the bucket should be fetched in this round + */ + boolean shouldFetch(TableBucket tableBucket) { + BucketFetchState state = bucketStates.get(tableBucket); + if (state == null) { + return true; + } + if (state.remainingSkipRounds > 0) { + state.remainingSkipRounds--; + LOG.trace( + "Adaptive fetch: skipping bucket {} (consecutive empty fetches: {}, " + + "remaining skip rounds: {})", + tableBucket, + state.consecutiveEmptyFetches, + state.remainingSkipRounds); + return false; + } + return true; + } + + /** + * Records the result of a fetch for the given bucket, adjusting future fetch frequency. + * + *

If the fetch returned data, the backoff is immediately reset to zero. If the fetch + * returned no data, the consecutive empty count is incremented and a new skip interval is + * calculated using exponential backoff. + * + * @param tableBucket the bucket that was fetched + * @param hasRecords {@code true} if the fetch returned any records + */ + void recordFetchResult(TableBucket tableBucket, boolean hasRecords) { + BucketFetchState state = + bucketStates.computeIfAbsent(tableBucket, k -> new BucketFetchState()); + if (hasRecords) { + if (state.consecutiveEmptyFetches > 0) { + LOG.info( + "Adaptive fetch: bucket {} recovered from cool down " + + "(was throttled after {} consecutive empty fetches)", + tableBucket, + state.consecutiveEmptyFetches); + } + state.consecutiveEmptyFetches = 0; + state.remainingSkipRounds = 0; + } else { + state.consecutiveEmptyFetches++; + int shift = Math.min(state.consecutiveEmptyFetches - 1, MAX_BACKOFF_SHIFT); + state.remainingSkipRounds = Math.min(1 << shift, maxSkipRounds); + LOG.info( + "Adaptive fetch: bucket {} has no data, entering cool down " + + "(consecutive empty fetches: {}, will skip {} rounds)", + tableBucket, + state.consecutiveEmptyFetches, + state.remainingSkipRounds); + } + } + + /** Removes the tracking state for the given bucket. */ + void removeBucket(TableBucket tableBucket) { + bucketStates.remove(tableBucket); + } + + /** Resets all tracking state. */ + void reset() { + bucketStates.clear(); + } + + /** Returns the current number of remaining skip rounds for the given bucket, for testing. */ + @VisibleForTesting + int getRemainingSkipRounds(TableBucket tableBucket) { + BucketFetchState state = bucketStates.get(tableBucket); + return state == null ? 0 : state.remainingSkipRounds; + } + + /** Returns the number of consecutive empty fetches for the given bucket, for testing. */ + @VisibleForTesting + int getConsecutiveEmptyFetches(TableBucket tableBucket) { + BucketFetchState state = bucketStates.get(tableBucket); + return state == null ? 0 : state.consecutiveEmptyFetches; + } + + /** Per-bucket fetch state tracking consecutive empty fetches and cool down. */ + private static class BucketFetchState { + int consecutiveEmptyFetches = 0; + int remainingSkipRounds = 0; + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index 14a2a3d313..ff6d91632d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -102,6 +102,7 @@ public class LogFetcher implements Closeable { private final LogFetchBuffer logFetchBuffer; private final LogFetchCollector logFetchCollector; private final RemoteLogDownloader remoteLogDownloader; + @Nullable private final BucketFetchRateController fetchRateController; @GuardedBy("this") private final Set nodesWithPendingFetchRequests; @@ -150,6 +151,15 @@ public LogFetcher( this.remoteLogDownloader = new RemoteLogDownloader(tablePath, conf, remoteFileDownloader, scannerMetricGroup); remoteLogDownloader.start(); + if (conf.getBoolean(ConfigOptions.CLIENT_SCANNER_LOG_ADAPTIVE_FETCH_ENABLED)) { + this.fetchRateController = + new BucketFetchRateController( + conf.getInt( + ConfigOptions + .CLIENT_SCANNER_LOG_ADAPTIVE_FETCH_MAX_SKIP_ROUNDS)); + } else { + this.fetchRateController = null; + } } /** @@ -383,15 +393,17 @@ private synchronized void handleFetchLogResponse( + "unsubscribed.", tb); } else { + boolean hasData; if (fetchResultForBucket.fetchFromRemote()) { + hasData = true; pendRemoteFetches( fetchResultForBucket.remoteLogFetchInfo(), fetchOffset, fetchResultForBucket.getHighWatermark()); } else { LogRecords logRecords = fetchResultForBucket.recordsOrEmpty(); - boolean hasRecords = !MemoryLogRecords.EMPTY.equals(logRecords); - if (hasRecords) { + hasData = !MemoryLogRecords.EMPTY.equals(logRecords); + if (hasData) { // Retain the parsed buffer so it stays alive while // this CompletedFetch's records are being consumed. if (parsedByteBuf != null) { @@ -422,6 +434,11 @@ private synchronized void handleFetchLogResponse( null)); } } + // Track adaptive fetch rate for successful fetches + if (fetchRateController != null + && fetchResultForBucket.getErrorCode() == Errors.NONE.code()) { + fetchRateController.recordFetchResult(tb, hasData); + } } } } @@ -494,11 +511,17 @@ private void pendRemoteFetches( Map prepareFetchLogRequests() { Map> fetchLogReqForBuckets = new HashMap<>(); int readyForFetchCount = 0; + int skippedByAdaptiveFetch = 0; Long tableId = null; for (TableBucket tb : fetchableBuckets()) { if (tableId == null) { tableId = tb.getTableId(); } + // Adaptive fetch: skip buckets in cool down period + if (fetchRateController != null && !fetchRateController.shouldFetch(tb)) { + skippedByAdaptiveFetch++; + continue; + } Long offset = logScannerStatus.getBucketOffset(tb); if (offset == null) { LOG.debug( @@ -538,8 +561,20 @@ Map prepareFetchLogRequests() { } if (readyForFetchCount == 0) { + if (skippedByAdaptiveFetch > 0) { + LOG.info( + "No fetch requests prepared, {} buckets skipped by adaptive fetch", + skippedByAdaptiveFetch); + } return Collections.emptyMap(); } else { + if (skippedByAdaptiveFetch > 0) { + LOG.info( + "Preparing fetch requests for {} buckets, " + + "{} buckets skipped by adaptive fetch", + readyForFetchCount, + skippedByAdaptiveFetch); + } Map fetchLogRequests = new HashMap<>(); long finalTableId = tableId; fetchLogReqForBuckets.forEach( diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateControllerTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateControllerTest.java new file mode 100644 index 0000000000..8a445f2f39 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/BucketFetchRateControllerTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.metadata.TableBucket; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link BucketFetchRateController}. */ +class BucketFetchRateControllerTest { + + private static final int MAX_SKIP_ROUNDS = 32; + private BucketFetchRateController controller; + + @BeforeEach + void setup() { + controller = new BucketFetchRateController(MAX_SKIP_ROUNDS); + } + + @Test + void testNewBucketShouldAlwaysFetch() { + TableBucket tb = new TableBucket(1L, 0L, 0); + assertThat(controller.shouldFetch(tb)).isTrue(); + } + + @Test + void testBucketWithDataResetsCoolDown() { + TableBucket tb = new TableBucket(1L, 0L, 0); + + // Record multiple empty fetches to build up cool down + controller.recordFetchResult(tb, false); + controller.recordFetchResult(tb, false); + controller.recordFetchResult(tb, false); + assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(3); + assertThat(controller.getRemainingSkipRounds(tb)).isGreaterThan(0); + + // Now record a fetch with data + controller.recordFetchResult(tb, true); + assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(0); + assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(0); + assertThat(controller.shouldFetch(tb)).isTrue(); + } + + @Test + void testExponentialBackoff() { + TableBucket tb = new TableBucket(1L, 0L, 0); + + // 1st empty fetch: skip 1 round (2^0 = 1) + controller.recordFetchResult(tb, false); + assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(1); + + // Consume the skip round + assertThat(controller.shouldFetch(tb)).isFalse(); + assertThat(controller.shouldFetch(tb)).isTrue(); + + // 2nd empty fetch: skip 2 rounds (2^1 = 2) + controller.recordFetchResult(tb, false); + assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(2); + + // Consume the 2 skip rounds + assertThat(controller.shouldFetch(tb)).isFalse(); + assertThat(controller.shouldFetch(tb)).isFalse(); + assertThat(controller.shouldFetch(tb)).isTrue(); + + // 3rd empty fetch: skip 4 rounds (2^2 = 4) + controller.recordFetchResult(tb, false); + assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(4); + + for (int i = 0; i < 4; i++) { + assertThat(controller.shouldFetch(tb)).isFalse(); + } + assertThat(controller.shouldFetch(tb)).isTrue(); + } + + @Test + void testMaxSkipRoundsCap() { + TableBucket tb = new TableBucket(1L, 0L, 0); + + // Record many empty fetches to exceed the max backoff shift + for (int i = 0; i < 20; i++) { + controller.recordFetchResult(tb, false); + // Consume all skip rounds + while (!controller.shouldFetch(tb)) { + // draining + } + } + + // After 20 empty fetches, skip rounds should be capped at MAX_SKIP_ROUNDS + controller.recordFetchResult(tb, false); + assertThat(controller.getRemainingSkipRounds(tb)).isLessThanOrEqualTo(MAX_SKIP_ROUNDS); + } + + @Test + void testMultipleBucketsIndependent() { + TableBucket tb1 = new TableBucket(1L, 0L, 0); + TableBucket tb2 = new TableBucket(1L, 1L, 0); + + // tb1 gets empty fetches, tb2 gets data + controller.recordFetchResult(tb1, false); + controller.recordFetchResult(tb1, false); + controller.recordFetchResult(tb2, true); + + // tb1 should be in cool down + assertThat(controller.getConsecutiveEmptyFetches(tb1)).isEqualTo(2); + assertThat(controller.getRemainingSkipRounds(tb1)).isGreaterThan(0); + + // tb2 should be fetch able immediately + assertThat(controller.getConsecutiveEmptyFetches(tb2)).isEqualTo(0); + assertThat(controller.shouldFetch(tb2)).isTrue(); + } + + @Test + void testRemoveBucket() { + TableBucket tb = new TableBucket(1L, 0L, 0); + + controller.recordFetchResult(tb, false); + assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(1); + + controller.removeBucket(tb); + assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(0); + assertThat(controller.shouldFetch(tb)).isTrue(); + } + + @Test + void testReset() { + TableBucket tb1 = new TableBucket(1L, 0L, 0); + TableBucket tb2 = new TableBucket(1L, 1L, 0); + + controller.recordFetchResult(tb1, false); + controller.recordFetchResult(tb2, false); + + controller.reset(); + + assertThat(controller.getConsecutiveEmptyFetches(tb1)).isEqualTo(0); + assertThat(controller.getConsecutiveEmptyFetches(tb2)).isEqualTo(0); + assertThat(controller.shouldFetch(tb1)).isTrue(); + assertThat(controller.shouldFetch(tb2)).isTrue(); + } + + @Test + void testShouldFetchDecrementsCounter() { + TableBucket tb = new TableBucket(1L, 0L, 0); + + // 1 empty fetch -> 1 skip round + controller.recordFetchResult(tb, false); + assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(1); + + // shouldFetch returns false and decrements to 0 + assertThat(controller.shouldFetch(tb)).isFalse(); + assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(0); + + // Now it should be fetch able + assertThat(controller.shouldFetch(tb)).isTrue(); + } + + @Test + void testSmallMaxSkipRounds() { + BucketFetchRateController smallController = new BucketFetchRateController(4); + TableBucket tb = new TableBucket(1L, 0L, 0); + + // Record many empty fetches + for (int i = 0; i < 10; i++) { + smallController.recordFetchResult(tb, false); + // Drain skip rounds + while (!smallController.shouldFetch(tb)) { + // draining + } + } + + // Even after many empty fetches, skip rounds should be capped at 4 + smallController.recordFetchResult(tb, false); + assertThat(smallController.getRemainingSkipRounds(tb)).isLessThanOrEqualTo(4); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 8da2246b0a..41ca891ffe 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1206,6 +1206,29 @@ public class ConfigOptions { + CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME.key() + " time to return."); + public static final ConfigOption CLIENT_SCANNER_LOG_ADAPTIVE_FETCH_ENABLED = + key("client.scanner.log.adaptive-fetch.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to enable adaptive fetch rate control for LogScanner. " + + "When enabled, the scanner dynamically adjusts the fetch frequency " + + "per bucket based on the amount of data returned. Buckets that " + + "consistently return no data are fetched less frequently, reducing " + + "CPU and network overhead. This is particularly useful for partitioned " + + "tables where only the latest partitions have active data."); + + public static final ConfigOption CLIENT_SCANNER_LOG_ADAPTIVE_FETCH_MAX_SKIP_ROUNDS = + key("client.scanner.log.adaptive-fetch.max-skip-rounds") + .intType() + .defaultValue(32) + .withDescription( + "The maximum number of fetch rounds to skip for a bucket that returns " + + "no data when adaptive fetch is enabled. Higher values save more " + + "resources for inactive buckets but increase the latency to detect " + + "new data arrival. The backoff increases exponentially: 1, 2, 4, 8, " + + "..., up to this limit."); + public static final ConfigOption CLIENT_LOOKUP_QUEUE_SIZE = key("client.lookup.queue-size") .intType()