Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.fs.s3native;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.PathsCopyingFileSystem;

Expand All @@ -37,15 +38,18 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Helper class for performing bulk S3 to local file system copies using S3TransferManager.
*
* <p><b>Concurrency Model:</b> Uses batch-based concurrency control with {@code
* maxConcurrentCopies} to limit parallel downloads. The current implementation waits for each batch
* to complete before starting the next batch. A future enhancement could use a bounded thread pool
* (e.g., {@link java.util.concurrent.Semaphore} or bounded executor) to allow continuous submission
* of new downloads as slots become available, which would provide better throughput by avoiding the
* "slowest task in batch" bottleneck.
* maxConcurrentCopies} to limit parallel downloads. The effective concurrency is clamped to the
* HTTP connection pool size ({@code maxConnections}) to prevent connection pool exhaustion. The
* current implementation waits for each batch to complete before starting the next batch. A future
* enhancement could use a bounded thread pool (e.g., {@link java.util.concurrent.Semaphore} or
* bounded executor) to allow continuous submission of new downloads as slots become available,
* which would provide better throughput by avoiding the "slowest task in batch" bottleneck.
*
* <p><b>Retry Handling:</b> Relies on the S3TransferManager's built-in retry mechanism for
* transient failures. If a download fails after retries:
Expand All @@ -70,10 +74,39 @@ class NativeS3BulkCopyHelper {

private final S3TransferManager transferManager;
private final int maxConcurrentCopies;
private final int maxConnections;

public NativeS3BulkCopyHelper(S3TransferManager transferManager, int maxConcurrentCopies) {
/**
* Creates a new bulk copy helper.
*
* @param transferManager the S3 transfer manager for async downloads
* @param maxConcurrentCopies the requested maximum number of concurrent copy operations
* @param maxConnections the HTTP connection pool size; if {@code maxConcurrentCopies} exceeds
* this value, it is clamped down to prevent connection pool exhaustion
*/
NativeS3BulkCopyHelper(
S3TransferManager transferManager, int maxConcurrentCopies, int maxConnections) {
checkArgument(maxConcurrentCopies > 0, "maxConcurrentCopies must be positive");
checkArgument(maxConnections > 0, "maxConnections must be positive");
this.transferManager = transferManager;
this.maxConcurrentCopies = maxConcurrentCopies;
this.maxConnections = maxConnections;
if (maxConcurrentCopies > maxConnections) {
LOG.warn(
"{} ({}) exceeds {} ({}). "
+ "Clamping concurrent copies to {} to prevent connection pool exhaustion.",
Comment thread
Samrat002 marked this conversation as resolved.
NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT.key(),
maxConcurrentCopies,
NativeS3FileSystemFactory.MAX_CONNECTIONS.key(),
maxConnections,
maxConnections);
this.maxConcurrentCopies = maxConnections;
} else {
this.maxConcurrentCopies = maxConcurrentCopies;
}
}

int getMaxConcurrentCopies() {
return maxConcurrentCopies;
}

/**
Expand All @@ -97,9 +130,17 @@ public void copyFiles(
return;
}

LOG.info("Starting bulk copy of {} files using S3TransferManager", requests.size());
int totalFiles = requests.size();
int totalBatches = (totalFiles + maxConcurrentCopies - 1) / maxConcurrentCopies;
LOG.info(
"Starting bulk copy of {} files using S3TransferManager "
+ "(batch size: {}, total batches: {})",
totalFiles,
maxConcurrentCopies,
totalBatches);

List<CompletableFuture<CompletedCopy>> copyFutures = new ArrayList<>();
int batchNumber = 0;

try {
for (int i = 0; i < requests.size(); i++) {
Expand All @@ -113,12 +154,18 @@ public void copyFiles(
}

if (copyFutures.size() >= maxConcurrentCopies || i == requests.size() - 1) {
batchNumber++;
LOG.debug(
"Waiting for batch {}/{} ({} files)",
batchNumber,
totalBatches,
copyFutures.size());
waitForCopies(copyFutures);
copyFutures.clear();
}
}

LOG.info("Completed bulk copy of {} files", requests.size());
LOG.info("Completed bulk copy of {} files", totalFiles);
Comment thread
Samrat002 marked this conversation as resolved.
} catch (Exception e) {
if (!copyFutures.isEmpty()) {
LOG.warn(
Expand Down Expand Up @@ -181,8 +228,42 @@ private void waitForCopies(List<CompletableFuture<CompletedCopy>> futures) throw
Thread.currentThread().interrupt();
throw new IOException("Bulk copy interrupted", e);
} catch (ExecutionException e) {
throw new IOException("Bulk copy failed", e.getCause());
Throwable cause = e.getCause();
if (isConnectionPoolExhausted(cause)) {
Comment thread
Samrat002 marked this conversation as resolved.
throw new IOException(
String.format(
"S3 connection pool exhausted during bulk copy. "
+ "The configured connection pool size (%d) could not serve "
+ "the concurrent download requests (%d). "
+ "Consider reducing '%s' or increasing '%s'.",
Comment thread
Samrat002 marked this conversation as resolved.
maxConnections,
maxConcurrentCopies,
NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT.key(),
NativeS3FileSystemFactory.MAX_CONNECTIONS.key()),
cause);
}
throw new IOException("Bulk copy failed", cause);
}
}

/**
* Checks whether a failure was caused by HTTP connection pool exhaustion.
*
* <p>Walks the causal chain looking for the SDK's characteristic message about connection
* acquire timeouts. This detection is deliberately broad (substring match on the message) to
* remain resilient to minor SDK wording changes across versions.
*/
@VisibleForTesting
static boolean isConnectionPoolExhausted(Throwable throwable) {
Throwable current = throwable;
while (current != null) {
String message = current.getMessage();
if (message != null && message.contains("Acquire operation took longer than")) {
return true;
}
current = current.getCause();
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ S3ClientProvider getClientProvider() {
return clientProvider;
}

@VisibleForTesting
@Nullable
NativeS3BulkCopyHelper getBulkCopyHelper() {
return bulkCopyHelper;
}

@Override
public URI getUri() {
return uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ public class NativeS3FileSystemFactory implements FileSystemFactory {
.defaultValue(true)
.withDescription("Enable bulk copy operations using S3TransferManager");

public static final ConfigOption<Integer> MAX_CONNECTIONS =
ConfigOptions.key("s3.connection.max")
.intType()
.defaultValue(50)
.withDescription(
"Maximum number of HTTP connections in the S3 client connection pool. "
+ "Applies to both the sync client (Apache HTTP) and the async client (Netty). "
+ "Must be at least as large as 's3.bulk-copy.max-concurrent'.");

public static final ConfigOption<Integer> BULK_COPY_MAX_CONCURRENT =
ConfigOptions.key("s3.bulk-copy.max-concurrent")
.intType()
Expand Down Expand Up @@ -348,13 +357,21 @@ public FileSystem create(URI fsUri) throws IOException {
readBufferSize);
}

final int maxConnections = config.get(MAX_CONNECTIONS);
Preconditions.checkArgument(
maxConnections > 0,
"'%s' must be a positive integer, but was %s",
MAX_CONNECTIONS.key(),
maxConnections);

S3ClientProvider clientProvider =
S3ClientProvider.builder()
.accessKey(accessKey)
.secretKey(secretKey)
.region(region)
.endpoint(endpoint)
.pathStyleAccess(pathStyleAccess)
.maxConnections(maxConnections)
.connectionTimeout(config.get(CONNECTION_TIMEOUT))
.socketTimeout(config.get(SOCKET_TIMEOUT))
.connectionMaxIdleTime(config.get(CONNECTION_MAX_IDLE_TIME))
Expand All @@ -371,10 +388,17 @@ public FileSystem create(URI fsUri) throws IOException {

NativeS3BulkCopyHelper bulkCopyHelper = null;
if (config.get(BULK_COPY_ENABLED)) {
final int bulkCopyMaxConcurrent = config.get(BULK_COPY_MAX_CONCURRENT);
Preconditions.checkArgument(
bulkCopyMaxConcurrent > 0,
"'%s' must be a positive integer, but was %s",
BULK_COPY_MAX_CONCURRENT.key(),
bulkCopyMaxConcurrent);
bulkCopyHelper =
new NativeS3BulkCopyHelper(
clientProvider.getTransferManager(),
config.get(BULK_COPY_MAX_CONCURRENT));
bulkCopyMaxConcurrent,
maxConnections);
}

return new NativeS3FileSystem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,9 @@ public S3ClientProvider build() {
NettyNioAsyncHttpClient.builder()
.maxConcurrency(maxConnections)
.connectionTimeout(connectionTimeout)
.readTimeout(socketTimeout))
.readTimeout(socketTimeout)
.connectionAcquisitionTimeout(
connectionTimeout))
.overrideConfiguration(overrideConfig)
.endpointOverride(endpointUri)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,24 @@

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.core.exception.SdkClientException;

import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;

/** Tests for {@link NativeS3BulkCopyHelper}. */
class NativeS3BulkCopyHelperTest {

private static final NativeS3BulkCopyHelper helper = new NativeS3BulkCopyHelper(null, 1);
private static final NativeS3BulkCopyHelper helper = new NativeS3BulkCopyHelper(null, 1, 1);

// --- URI parsing tests ---

@ParameterizedTest
@CsvSource({
Expand Down Expand Up @@ -105,4 +114,46 @@ void testExtractKeyVeryLongPath() {
path.append("file.txt");
assertThat(helper.extractKey("s3://bucket/" + path)).isEqualTo(path.toString());
}

private static Stream<Arguments> connectionPoolExhaustedCases() {
return Stream.of(
Arguments.of(
"direct message match",
SdkClientException.builder()
.message(
"Unable to execute HTTP request: "
+ "Acquire operation took longer than the configured maximum time.")
.build(),
true),
Arguments.of(
"nested causal chain",
SdkClientException.builder()
.message("Unable to execute HTTP request")
.cause(
new RuntimeException(
"channel acquisition failed",
new TimeoutException(
"Acquire operation took longer than 10000 milliseconds.")))
.build(),
true),
Arguments.of(
"unrelated error",
SdkClientException.builder().message("Access Denied").build(),
false),
Arguments.of("null message", new RuntimeException((String) null), false),
Arguments.of("null throwable", null, false));
}

@ParameterizedTest(name = "{0}")
@MethodSource("connectionPoolExhaustedCases")
void testConnectionPoolExhaustedDetection(
String description, Throwable throwable, boolean expected) {
assertThat(NativeS3BulkCopyHelper.isConnectionPoolExhausted(throwable)).isEqualTo(expected);
}

@Test
void testEmptyRequestListIsNoOp() throws Exception {
NativeS3BulkCopyHelper noOpHelper = new NativeS3BulkCopyHelper(null, 16, 50);
noOpHelper.copyFiles(Collections.emptyList(), null);
}
}
Loading