diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
index 2dc8fb10d19b8..5fde18815d391 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
@@ -19,6 +19,7 @@
package org.apache.flink.fs.s3native;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
@@ -37,15 +38,18 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* Helper class for performing bulk S3 to local file system copies using S3TransferManager.
*
*
Concurrency Model: Uses batch-based concurrency control with {@code
- * maxConcurrentCopies} to limit parallel downloads. The current implementation waits for each batch
- * to complete before starting the next batch. A future enhancement could use a bounded thread pool
- * (e.g., {@link java.util.concurrent.Semaphore} or bounded executor) to allow continuous submission
- * of new downloads as slots become available, which would provide better throughput by avoiding the
- * "slowest task in batch" bottleneck.
+ * maxConcurrentCopies} to limit parallel downloads. The effective concurrency is clamped to the
+ * HTTP connection pool size ({@code maxConnections}) to prevent connection pool exhaustion. The
+ * current implementation waits for each batch to complete before starting the next batch. A future
+ * enhancement could use a bounded thread pool (e.g., {@link java.util.concurrent.Semaphore} or
+ * bounded executor) to allow continuous submission of new downloads as slots become available,
+ * which would provide better throughput by avoiding the "slowest task in batch" bottleneck.
*
*
Retry Handling: Relies on the S3TransferManager's built-in retry mechanism for
* transient failures. If a download fails after retries:
@@ -70,10 +74,39 @@ class NativeS3BulkCopyHelper {
private final S3TransferManager transferManager;
private final int maxConcurrentCopies;
+ private final int maxConnections;
- public NativeS3BulkCopyHelper(S3TransferManager transferManager, int maxConcurrentCopies) {
+ /**
+ * Creates a new bulk copy helper.
+ *
+ * @param transferManager the S3 transfer manager for async downloads
+ * @param maxConcurrentCopies the requested maximum number of concurrent copy operations
+ * @param maxConnections the HTTP connection pool size; if {@code maxConcurrentCopies} exceeds
+ * this value, it is clamped down to prevent connection pool exhaustion
+ */
+ NativeS3BulkCopyHelper(
+ S3TransferManager transferManager, int maxConcurrentCopies, int maxConnections) {
+ checkArgument(maxConcurrentCopies > 0, "maxConcurrentCopies must be positive");
+ checkArgument(maxConnections > 0, "maxConnections must be positive");
this.transferManager = transferManager;
- this.maxConcurrentCopies = maxConcurrentCopies;
+ this.maxConnections = maxConnections;
+ if (maxConcurrentCopies > maxConnections) {
+ LOG.warn(
+ "{} ({}) exceeds {} ({}). "
+ + "Clamping concurrent copies to {} to prevent connection pool exhaustion.",
+ NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT.key(),
+ maxConcurrentCopies,
+ NativeS3FileSystemFactory.MAX_CONNECTIONS.key(),
+ maxConnections,
+ maxConnections);
+ this.maxConcurrentCopies = maxConnections;
+ } else {
+ this.maxConcurrentCopies = maxConcurrentCopies;
+ }
+ }
+
+ int getMaxConcurrentCopies() {
+ return maxConcurrentCopies;
}
/**
@@ -97,9 +130,17 @@ public void copyFiles(
return;
}
- LOG.info("Starting bulk copy of {} files using S3TransferManager", requests.size());
+ int totalFiles = requests.size();
+ int totalBatches = (totalFiles + maxConcurrentCopies - 1) / maxConcurrentCopies;
+ LOG.info(
+ "Starting bulk copy of {} files using S3TransferManager "
+ + "(batch size: {}, total batches: {})",
+ totalFiles,
+ maxConcurrentCopies,
+ totalBatches);
List> copyFutures = new ArrayList<>();
+ int batchNumber = 0;
try {
for (int i = 0; i < requests.size(); i++) {
@@ -113,12 +154,18 @@ public void copyFiles(
}
if (copyFutures.size() >= maxConcurrentCopies || i == requests.size() - 1) {
+ batchNumber++;
+ LOG.debug(
+ "Waiting for batch {}/{} ({} files)",
+ batchNumber,
+ totalBatches,
+ copyFutures.size());
waitForCopies(copyFutures);
copyFutures.clear();
}
}
- LOG.info("Completed bulk copy of {} files", requests.size());
+ LOG.info("Completed bulk copy of {} files", totalFiles);
} catch (Exception e) {
if (!copyFutures.isEmpty()) {
LOG.warn(
@@ -181,8 +228,42 @@ private void waitForCopies(List> futures) throw
Thread.currentThread().interrupt();
throw new IOException("Bulk copy interrupted", e);
} catch (ExecutionException e) {
- throw new IOException("Bulk copy failed", e.getCause());
+ Throwable cause = e.getCause();
+ if (isConnectionPoolExhausted(cause)) {
+ throw new IOException(
+ String.format(
+ "S3 connection pool exhausted during bulk copy. "
+ + "The configured connection pool size (%d) could not serve "
+ + "the concurrent download requests (%d). "
+ + "Consider reducing '%s' or increasing '%s'.",
+ maxConnections,
+ maxConcurrentCopies,
+ NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT.key(),
+ NativeS3FileSystemFactory.MAX_CONNECTIONS.key()),
+ cause);
+ }
+ throw new IOException("Bulk copy failed", cause);
+ }
+ }
+
+ /**
+ * Checks whether a failure was caused by HTTP connection pool exhaustion.
+ *
+ * Walks the causal chain looking for the SDK's characteristic message about connection
+ * acquire timeouts. This detection is deliberately broad (substring match on the message) to
+ * remain resilient to minor SDK wording changes across versions.
+ */
+ @VisibleForTesting
+ static boolean isConnectionPoolExhausted(Throwable throwable) {
+ Throwable current = throwable;
+ while (current != null) {
+ String message = current.getMessage();
+ if (message != null && message.contains("Acquire operation took longer than")) {
+ return true;
+ }
+ current = current.getCause();
}
+ return false;
}
/**
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
index 40e88d8ae5a13..64795cc8c76fe 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
@@ -170,6 +170,12 @@ S3ClientProvider getClientProvider() {
return clientProvider;
}
+ @VisibleForTesting
+ @Nullable
+ NativeS3BulkCopyHelper getBulkCopyHelper() {
+ return bulkCopyHelper;
+ }
+
@Override
public URI getUri() {
return uri;
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 60b151035cbb0..81d92dc762bea 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -122,6 +122,15 @@ public class NativeS3FileSystemFactory implements FileSystemFactory {
.defaultValue(true)
.withDescription("Enable bulk copy operations using S3TransferManager");
+ public static final ConfigOption MAX_CONNECTIONS =
+ ConfigOptions.key("s3.connection.max")
+ .intType()
+ .defaultValue(50)
+ .withDescription(
+ "Maximum number of HTTP connections in the S3 client connection pool. "
+ + "Applies to both the sync client (Apache HTTP) and the async client (Netty). "
+ + "Must be at least as large as 's3.bulk-copy.max-concurrent'.");
+
public static final ConfigOption BULK_COPY_MAX_CONCURRENT =
ConfigOptions.key("s3.bulk-copy.max-concurrent")
.intType()
@@ -348,6 +357,13 @@ public FileSystem create(URI fsUri) throws IOException {
readBufferSize);
}
+ final int maxConnections = config.get(MAX_CONNECTIONS);
+ Preconditions.checkArgument(
+ maxConnections > 0,
+ "'%s' must be a positive integer, but was %s",
+ MAX_CONNECTIONS.key(),
+ maxConnections);
+
S3ClientProvider clientProvider =
S3ClientProvider.builder()
.accessKey(accessKey)
@@ -355,6 +371,7 @@ public FileSystem create(URI fsUri) throws IOException {
.region(region)
.endpoint(endpoint)
.pathStyleAccess(pathStyleAccess)
+ .maxConnections(maxConnections)
.connectionTimeout(config.get(CONNECTION_TIMEOUT))
.socketTimeout(config.get(SOCKET_TIMEOUT))
.connectionMaxIdleTime(config.get(CONNECTION_MAX_IDLE_TIME))
@@ -371,10 +388,17 @@ public FileSystem create(URI fsUri) throws IOException {
NativeS3BulkCopyHelper bulkCopyHelper = null;
if (config.get(BULK_COPY_ENABLED)) {
+ final int bulkCopyMaxConcurrent = config.get(BULK_COPY_MAX_CONCURRENT);
+ Preconditions.checkArgument(
+ bulkCopyMaxConcurrent > 0,
+ "'%s' must be a positive integer, but was %s",
+ BULK_COPY_MAX_CONCURRENT.key(),
+ bulkCopyMaxConcurrent);
bulkCopyHelper =
new NativeS3BulkCopyHelper(
clientProvider.getTransferManager(),
- config.get(BULK_COPY_MAX_CONCURRENT));
+ bulkCopyMaxConcurrent,
+ maxConnections);
}
return new NativeS3FileSystem(
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index cee92d1ce5d50..daccd6247cdb8 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -389,7 +389,9 @@ public S3ClientProvider build() {
NettyNioAsyncHttpClient.builder()
.maxConcurrency(maxConnections)
.connectionTimeout(connectionTimeout)
- .readTimeout(socketTimeout))
+ .readTimeout(socketTimeout)
+ .connectionAcquisitionTimeout(
+ connectionTimeout))
.overrideConfiguration(overrideConfig)
.endpointOverride(endpointUri)
.build())
diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
index 37b35f88acae8..5b244850dcae6 100644
--- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
+++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
@@ -20,7 +20,14 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.core.exception.SdkClientException;
+
+import java.util.Collections;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
@@ -28,7 +35,9 @@
/** Tests for {@link NativeS3BulkCopyHelper}. */
class NativeS3BulkCopyHelperTest {
- private static final NativeS3BulkCopyHelper helper = new NativeS3BulkCopyHelper(null, 1);
+ private static final NativeS3BulkCopyHelper helper = new NativeS3BulkCopyHelper(null, 1, 1);
+
+ // --- URI parsing tests ---
@ParameterizedTest
@CsvSource({
@@ -105,4 +114,46 @@ void testExtractKeyVeryLongPath() {
path.append("file.txt");
assertThat(helper.extractKey("s3://bucket/" + path)).isEqualTo(path.toString());
}
+
+ private static Stream connectionPoolExhaustedCases() {
+ return Stream.of(
+ Arguments.of(
+ "direct message match",
+ SdkClientException.builder()
+ .message(
+ "Unable to execute HTTP request: "
+ + "Acquire operation took longer than the configured maximum time.")
+ .build(),
+ true),
+ Arguments.of(
+ "nested causal chain",
+ SdkClientException.builder()
+ .message("Unable to execute HTTP request")
+ .cause(
+ new RuntimeException(
+ "channel acquisition failed",
+ new TimeoutException(
+ "Acquire operation took longer than 10000 milliseconds.")))
+ .build(),
+ true),
+ Arguments.of(
+ "unrelated error",
+ SdkClientException.builder().message("Access Denied").build(),
+ false),
+ Arguments.of("null message", new RuntimeException((String) null), false),
+ Arguments.of("null throwable", null, false));
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("connectionPoolExhaustedCases")
+ void testConnectionPoolExhaustedDetection(
+ String description, Throwable throwable, boolean expected) {
+ assertThat(NativeS3BulkCopyHelper.isConnectionPoolExhausted(throwable)).isEqualTo(expected);
+ }
+
+ @Test
+ void testEmptyRequestListIsNoOp() throws Exception {
+ NativeS3BulkCopyHelper noOpHelper = new NativeS3BulkCopyHelper(null, 16, 50);
+ noOpHelper.copyFiles(Collections.emptyList(), null);
+ }
}
diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index 31e6814ab1b0e..f9e57b55858be 100644
--- a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -353,6 +353,90 @@ void testEmptyRegionFallsBackToAutodiscovery() throws Exception {
}
}
+ @Test
+ void testInvalidMaxConnectionsThrowsException() {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0);
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ assertThatThrownBy(() -> factory.create(fsUri))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("s3.connection.max")
+ .hasMessageContaining("must be a positive integer");
+ }
+
+ @Test
+ void testInvalidBulkCopyMaxConcurrentThrowsException() {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0);
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ assertThatThrownBy(() -> factory.create(fsUri))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("s3.bulk-copy.max-concurrent")
+ .hasMessageContaining("must be a positive integer");
+ }
+
+ @Test
+ void testBulkCopyMaxConcurrentClampedToMaxConnections() throws Exception {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
+ config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32);
+ config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 10);
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ FileSystem fs = factory.create(fsUri);
+
+ assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
+ assertThat(nativeFs.getBulkCopyHelper()).isNotNull();
+ assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
+ }
+
+ @Test
+ void testBulkCopyMaxConcurrentPreservedWithinMaxConnections() throws Exception {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
+ config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 10);
+ config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 50);
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ FileSystem fs = factory.create(fsUri);
+
+ assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
+ assertThat(nativeFs.getBulkCopyHelper()).isNotNull();
+ assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
+ }
+
@Test
void testS3ASchemeReturnsS3A() {
NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();