-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-39482][filesystem] Support configurable maxConnections in S3ClientProvider #27970
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
b0ea378
28ef8d9
d342900
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -353,6 +353,89 @@ void testEmptyRegionFallsBackToAutodiscovery() throws Exception { | |
| } | ||
| } | ||
|
|
||
| @Test | ||
| void testInvalidMaxConnectionsThrowsException() { | ||
| NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); | ||
| Configuration config = new Configuration(); | ||
| config.setString("s3.access-key", "test-access-key"); | ||
| config.setString("s3.secret-key", "test-secret-key"); | ||
| config.setString("s3.region", "us-east-1"); | ||
| config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0); | ||
| config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); | ||
|
|
||
| factory.configure(config); | ||
|
|
||
| URI fsUri = URI.create("s3://test-bucket/"); | ||
| assertThatThrownBy(() -> factory.create(fsUri)) | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessageContaining("s3.connection.max") | ||
| .hasMessageContaining("must be a positive integer"); | ||
| } | ||
|
|
||
| @Test | ||
| void testInvalidBulkCopyMaxConcurrentThrowsException() { | ||
| NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); | ||
| Configuration config = new Configuration(); | ||
| config.setString("s3.access-key", "test-access-key"); | ||
| config.setString("s3.secret-key", "test-secret-key"); | ||
| config.setString("s3.region", "us-east-1"); | ||
| config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0); | ||
| config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); | ||
|
|
||
| factory.configure(config); | ||
|
|
||
| URI fsUri = URI.create("s3://test-bucket/"); | ||
| assertThatThrownBy(() -> factory.create(fsUri)) | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessageContaining("maxConcurrentCopies must be positive"); | ||
|
Samrat002 marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| @Test | ||
| void testBulkCopyMaxConcurrentClampedToMaxConnections() throws Exception { | ||
| NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); | ||
| Configuration config = new Configuration(); | ||
| config.setString("s3.access-key", "test-access-key"); | ||
| config.setString("s3.secret-key", "test-secret-key"); | ||
| config.setString("s3.region", "us-east-1"); | ||
| config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true); | ||
| config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32); | ||
| config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 10); | ||
|
Comment on lines
+402
to
+403
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just for my own understanding. BULK_COPY_MAX_CONCURRENT drives bulk operation concurrency but then what exactly MAX_CONNECTIONS drives (I read the config explanation but that's a bit cloudy)?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They operate at different layers:
The root cause of FLINK-39482: these two layers interact because S3TransferManager uses multipart downloads for files > 8MB, each part is a separate HTTP connection from the shared pool. So Does this configuration make sense, or are there any suggestions to simplify it? |
||
| config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); | ||
|
|
||
| factory.configure(config); | ||
|
|
||
| URI fsUri = URI.create("s3://test-bucket/"); | ||
| FileSystem fs = factory.create(fsUri); | ||
|
|
||
| assertThat(fs).isInstanceOf(NativeS3FileSystem.class); | ||
| NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs; | ||
| assertThat(nativeFs.getBulkCopyHelper()).isNotNull(); | ||
| assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10); | ||
| } | ||
|
|
||
| @Test | ||
| void testBulkCopyMaxConcurrentPreservedWithinMaxConnections() throws Exception { | ||
| NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); | ||
| Configuration config = new Configuration(); | ||
| config.setString("s3.access-key", "test-access-key"); | ||
| config.setString("s3.secret-key", "test-secret-key"); | ||
| config.setString("s3.region", "us-east-1"); | ||
| config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true); | ||
| config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 10); | ||
| config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 50); | ||
| config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir")); | ||
|
|
||
| factory.configure(config); | ||
|
|
||
| URI fsUri = URI.create("s3://test-bucket/"); | ||
| FileSystem fs = factory.create(fsUri); | ||
|
|
||
| assertThat(fs).isInstanceOf(NativeS3FileSystem.class); | ||
| NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs; | ||
| assertThat(nativeFs.getBulkCopyHelper()).isNotNull(); | ||
| assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10); | ||
| } | ||
|
|
||
| @Test | ||
| void testS3ASchemeReturnsS3A() { | ||
| NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.