[FLINK-39484][filesystem/s3] NativeS3InputStream: graceful abort on early close; close() first for connection reuse#27965
Conversation
…tObject on seek/reopen Fix ConnectionClosedException: Premature end of Content-Length delimited message body when reading large Parquet objects through flink-s3-fs-native against S3-compatible endpoints (e.g. MinIO), especially after seek() / skip() or when the stream is closed before all bytes are consumed. When reopenStreamAtCurrentPosition() discards a partially-read GetObject body after seek/skip, ResponseInputStream.abort() is used immediately so the SDK does not attempt to read and drain the remainder of the body. For the close() path: attempt normal close() first (preserving HTTP connection reuse for well-behaved S3 servers). If the server closed the connection early (a pattern seen on MinIO where it terminates the TCP connection before all Content-Length bytes are sent), the ConnectionClosedException is non-fatal and escalated to WARN — the connection is already broken and cannot be reused, so falling back to abort() carries no additional performance penalty. This is the optimal approach: - No connection-pool penalty for correct S3/Genuine AWS behavior (close reuses) - No task failures when MinIO closes early (exception caught, WARN logged) - Seek/skip still aborts immediately (correct semantics regardless of server) See: https://issues.apache.org/jira/browse/FLINK-39484 Made-with: Cursor
91883a7 to
9e6576a
Compare
…euse; handle MinIO early close (release-2.3) Backport of apache#27965 behavior: on seek/skip abort partial GetObject; on close() attempt normal BufferedInputStream.close() first for HTTP connection reuse, catch premature Content-Length / connection-closed from S3-compatible storage, WARN and abort as non-fatal. https: //issues.apache.org/jira/browse/FLINK-39484 Made-with: Cursor
… PR apache#27965) Align release-2.3 with apache#27965: try close() before abort on normal close path when bytes remain; detect premature Content-Length / connection closed; seek still aborts immediately. Preserves bucket-root fix in 0a01cbd. Made-with: Cursor
|
@macdoor spotless is failing for the build, also there are some changes in the license header which I believe in unintended. |
…veS3InputStream Align ASF license block with Flink convention; apply Spotless (Javadoc wrap). Made-with: Cursor
|
@spuru9 Thanks — Spotless is fixed (spotless:apply + spotless:check), and the license header in NativeS3InputStream.java is aligned with the standard ASF template. |
|
@flinkbot run azure |
|
Thank you @macdoor for raising the patch. i have a fundamental query associate with this change. is the problem true for AWS S3? As per my analysis , Premature end of Content-Length delimited message body can only occur when the server closes the TCP connection before delivering all Content-Length bytes. This is an HTTP/1.1 protocol violation. The only valid concern for real S3 is performance. I see probable improvement is draining large amounts of unread data on close() just to reuse a connection. The SDK JavaDoc says it plainly:
|
What is the purpose of the change
Fix
ConnectionClosedException: Premature end of Content-Length delimited message bodywhen reading large objects (e.g. Parquet) through
flink-s3-fs-nativeagainstS3-compatible endpoints (e.g. MinIO), especially after
seek(),skip(), or when thestream is closed before all bytes are consumed.
Brief change log
GetObjectbody afterseek()orskip(),call
ResponseInputStream.abort()immediately so the SDK does not attempt to read anddrain the remainder of the response.
close()first to preserve HTTP connection reuse forwell-behaved S3 servers. If the server closes the connection early (MinIO pattern),
ConnectionClosedExceptionis caught, treated as non-fatal, escalated to WARN, andabort()is called as fallback -- because the connection is already broken and cannot bereused anyway, so aborting carries no additional performance penalty.
isPrematureEndOfMessage(IOException): helper that identifies MinIO/S3-compatibleearly-connection-close by checking for
Premature end of Content-Length,Connection closed, orConnectionClosedin the exception message.This is the optimal approach:
Verifying this change
Verified against MinIO with large Parquet reads and repeated seek() operations: the
ConnectionClosedException is replaced by a clean WARN log and the job completes
successfully. Azure Pipelines for flink-s3-fs-native (see CI on this PR).
Does this pull request potentially affect one of the following parts?
Documentation
JIRA
https://issues.apache.org/jira/browse/FLINK-39484