Skip to content

[FLINK-39482][filesystem] Support configurable maxConnections in S3ClientProvider#27970

Open
Samrat002 wants to merge 3 commits intoapache:masterfrom
Samrat002:FLINK-39482
Open

[FLINK-39482][filesystem] Support configurable maxConnections in S3ClientProvider#27970
Samrat002 wants to merge 3 commits intoapache:masterfrom
Samrat002:FLINK-39482

Conversation

@Samrat002
Copy link
Copy Markdown
Contributor

@Samrat002 Samrat002 commented Apr 19, 2026

What is the purpose of the change

This pull request prevents S3 connection pool exhaustion during RocksDB state restore when using the Native S3 filesystem. When NativeS3BulkCopyHelper fires concurrent downloads via S3TransferManager, each multipart download can consume multiple HTTP connections. With the default pool size of 50 and a batch concurrency of 16, the pool can be exhausted, causing downloads to hang until the SDK's acquire timeout expires. This results in opaque SdkClientException failures during checkpoint restore.

The fix introduces a configurable s3.connection.max option, clamps s3.bulk-copy.max-concurrent to the connection pool size, raises the connection acquisition timeout from the SDK default to the user-configured connection timeout

Brief change log

Added configurable maxConnections.
Unit tests for connection pool exhautions

Verifying this change

  • Added testConnectionPoolExhaustedDetection in NativeS3BulkCopyHelperTest to verify detection of the SDK's connection acquire timeout message in both direct and nested causal
    chains, as well as false-positive resistance
  • Added testEmptyRequestListIsNoOp in NativeS3BulkCopyHelperTest to verify no NPE when no files are requested
  • Added testInvalidMaxConnectionsThrowsException and testInvalidBulkCopyMaxConcurrentThrowsException in NativeS3FileSystemFactoryTest to verify config validation through the
    factory
  • Added testBulkCopyMaxConcurrentClampedToMaxConnections and testBulkCopyMaxConcurrentPreservedWithinMaxConnections in NativeS3FileSystemFactoryTest to verify the clamping logic
    end-to-end through factory → filesystem → bulk copy helper

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: yes (affects state restore from S3 checkpoints)
  • The S3 file system connector: yes

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 19, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Samrat002
Copy link
Copy Markdown
Contributor Author

@gaborgsomogyi PTAL

Comment on lines +399 to +400
config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32);
config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 10);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding. BULK_COPY_MAX_CONCURRENT drives bulk operation concurrency but then what exactly MAX_CONNECTIONS drives (I read the config explanation but that's a bit cloudy)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They operate at different layers:

  1. s3.connection.max - the HTTP connection pool size in the underlying HTTP clients (Apache for sync ops, Netty for async ops). This is a shared pool: every S3 API call, like GetObject, PutObject, HeadObject, ListObjectsV2, etc., borrows a connection from it and returns it when done. This maps directly to
    https://github.com/apache/flink/blob/FLINK-39482/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java#L391 on the Netty async
    client and maxConnections on the Apache sync client.

  2. s3.bulk-copy.max-concurrent - how many S3 download operations NativeS3BulkCopyHelper fires in parallel during state restore (the batch size in copyFiles).

The root cause of FLINK-39482: these two layers interact because S3TransferManager uses multipart downloads for files > 8MB, each part is a separate HTTP connection from the shared pool. So maxConcurrentCopies=16 files × ~4 parts each = ~64 HTTP connections needed, but the pool only has 50 → acquire timeout → opaque SdkClientException.

Does this configuration make sense, or are there any suggestions to simplify it?

@gaborgsomogyi
Copy link
Copy Markdown
Contributor

After the nit fix + other comments resolution it's good to go from my perspective.

@Samrat002 Samrat002 requested a review from gaborgsomogyi April 20, 2026 18:16
@gaborgsomogyi
Copy link
Copy Markdown
Contributor

Intended to merge this unless comments arrive

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants