From 9e6576abb006e00c8b2ec8f5807b925b543fd443 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Sat, 18 Apr 2026 11:48:09 +0800 Subject: [PATCH 1/2] [FLINK-39484][filesystem/s3] NativeS3InputStream: abort unfinished GetObject on seek/reopen MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix ConnectionClosedException: Premature end of Content-Length delimited message body when reading large Parquet objects through flink-s3-fs-native against S3-compatible endpoints (e.g. MinIO), especially after seek() / skip() or when the stream is closed before all bytes are consumed. When reopenStreamAtCurrentPosition() discards a partially-read GetObject body after seek/skip, ResponseInputStream.abort() is used immediately so the SDK does not attempt to read and drain the remainder of the body. For the close() path: attempt normal close() first (preserving HTTP connection reuse for well-behaved S3 servers). If the server closed the connection early (a pattern seen on MinIO where it terminates the TCP connection before all Content-Length bytes are sent), the ConnectionClosedException is non-fatal and escalated to WARN — the connection is already broken and cannot be reused, so falling back to abort() carries no additional performance penalty. This is the optimal approach: - No connection-pool penalty for correct S3/Genuine AWS behavior (close reuses) - No task failures when MinIO closes early (exception caught, WARN logged) - Seek/skip still aborts immediately (correct semantics regardless of server) See: https://issues.apache.org/jira/browse/FLINK-39484 Made-with: Cursor --- .../fs/s3native/NativeS3InputStream.java | 195 +++++++++++------- 1 file changed, 125 insertions(+), 70 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java index f6dca047f648d..ff0f09d869b2e 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java @@ -1,13 +1,13 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -35,8 +35,20 @@ * S3 input stream with configurable read-ahead buffer, range-based requests for seek operations, * automatic stream reopening on errors, and lazy initialization to minimize memory footprint. * - *

Thread Safety: Internal state is guarded by a lock to ensure safe concurrent access and - * resource cleanup. + *

Thread Safety: Internal state is guarded by a lock to ensure safe concurrent access + * and resource cleanup. + * + *

Partial reads and {@code seek}: Apache HttpClient tries to drain the remainder of the + * response body on {@link java.io.InputStream#close()} to reuse connections. After a {@link + * #seek} or early close, draining can fail against S3-compatible endpoints (for example MinIO) + * with {@code ConnectionClosedException: Premature end of Content-Length delimited message body}. + * + *

The {@code close()} path handles this gracefully: it attempts normal connection cleanup first + * (which preserves HTTP connection reuse for well-behaved servers), and only falls back to {@link + * ResponseInputStream#abort()} when the connection was already closed early by the server — in + * which case the connection is not reusable anyway, so aborting carries no performance penalty. + * Abandoned streams after {@link #seek} always use {@code abort()} immediately since the + * connection is being discarded regardless. */ class NativeS3InputStream extends FSDataInputStream { @@ -100,7 +112,7 @@ private void lazyInitialize() throws IOException { *

This method: * *

@@ -108,24 +120,7 @@ private void lazyInitialize() throws IOException { private void openStreamAtCurrentPosition() throws IOException { lock.lock(); try { - if (bufferedStream != null) { - try { - bufferedStream.close(); - } catch (IOException e) { - LOG.warn("Error closing buffered stream for {}/{}", bucketName, key, e); - } finally { - bufferedStream = null; - } - } - if (currentStream != null) { - try { - currentStream.close(); - } catch (IOException e) { - LOG.warn("Error closing S3 response stream for {}/{}", bucketName, key, e); - } finally { - currentStream = null; - } - } + releaseCurrentObjectStream(true); try { GetObjectRequest.Builder requestBuilder = @@ -143,20 +138,7 @@ private void openStreamAtCurrentPosition() throws IOException { currentStream = s3Client.getObject(requestBuilder.build()); bufferedStream = new BufferedInputStream(currentStream, readBufferSize); } catch (Exception e) { - if (bufferedStream != null) { - try { - bufferedStream.close(); - } catch (IOException ignored) { - } - bufferedStream = null; - } - if (currentStream != null) { - try { - currentStream.close(); - } catch (IOException ignored) { - } - currentStream = null; - } + releaseCurrentObjectStream(true); throw new IOException("Failed to open S3 stream for " + bucketName + "/" + key, e); } } finally { @@ -164,6 +146,104 @@ private void openStreamAtCurrentPosition() throws IOException { } } + /** + * Releases the in-flight {@code GetObject} HTTP response. + * + *

When {@code abandon} is {@code true}, the stream is being discarded (e.g. after a {@link + * #seek}) and {@link ResponseInputStream#abort()} is called immediately, bypassing any + * attempt to drain the response body. This is necessary for correctness because the caller + * has already moved on and will not consume the rest of the body. + * + *

When {@code abandon} is {@code false}, normal {@code close()} is attempted first. This + * preserves HTTP connection reuse for well-behaved S3 servers. If the server closes the + * connection early (a pattern seen on MinIO and other S3-compatible storage), the resulting + * {@code ConnectionClosedException} is caught, treated as non-fatal, and escalated to a WARN + * log. The connection is then aborted since it is no longer usable anyway. + * + * @param abandon {@code true} to use {@code abort()} immediately (stream is being abandoned + * after seek); {@code false} to try {@code close()} first, falling back to {@code abort()} + * on early-connection-close from MinIO + */ + private void releaseCurrentObjectStream(boolean abandon) { + if (currentStream == null && bufferedStream == null) { + return; + } + if (abandon && currentStream != null) { + try { + currentStream.abort(); + } catch (RuntimeException e) { + LOG.warn("Error aborting S3 response stream for {}/{}", bucketName, key, e); + } finally { + bufferedStream = null; + currentStream = null; + } + return; + } + + // Normal close path: attempt graceful cleanup to preserve HTTP connection reuse. + // S3-compatible storage (MinIO) may close the connection before all bytes are drained. + // In that case the ConnectionClosedException is non-fatal — abort the stream and + // log a WARN so the task does not fail. + if (bufferedStream != null) { + try { + bufferedStream.close(); + } catch (IOException e) { + if (isPrematureEndOfMessage(e)) { + LOG.warn( + "S3 server closed connection prematurely for {}/{} (expected {} bytes) " + + "-- aborting and treating as non-fatal", + bucketName, key, contentLength, e); + abortSafely(); + } else { + LOG.warn("Error closing buffered stream for {}/{}", bucketName, key, e); + } + } finally { + bufferedStream = null; + currentStream = null; + } + } else if (currentStream != null) { + try { + currentStream.close(); + } catch (IOException e) { + if (isPrematureEndOfMessage(e)) { + LOG.warn( + "S3 server closed connection prematurely for {}/{} (expected {} bytes) " + + "-- aborting and treating as non-fatal", + bucketName, key, contentLength, e); + abortSafely(); + } else { + LOG.warn("Error closing S3 response stream for {}/{}", bucketName, key, e); + } + } finally { + currentStream = null; + } + } + } + + /** Abort the underlying {@code ResponseInputStream} safely, logging any exception as a WARN. */ + private void abortSafely() { + if (currentStream == null) { + return; + } + try { + currentStream.abort(); + } catch (RuntimeException e) { + LOG.warn("Error aborting S3 response stream for {}/{}", bucketName, key, e); + } + } + + /** + * Returns true if the given I/O exception represents a connection closed before all bytes + * were received — a pattern seen on S3-compatible storage (MinIO) when it closes a + * connection early and Apache HttpClient tries to drain the remainder. + */ + private static boolean isPrematureEndOfMessage(IOException e) { + String msg = e.getMessage() != null ? e.getMessage() : ""; + return msg.contains("Premature end of Content-Length") + || msg.contains("Connection closed") + || msg.contains("ConnectionClosed"); + } + @Override public void seek(long desired) throws IOException { lock(); @@ -268,35 +348,13 @@ public void close() throws IOException { if (closed) { return; } - closed = true; - IOException exception = null; - if (bufferedStream != null) { - try { - bufferedStream.close(); - } catch (IOException e) { - exception = e; - LOG.warn("Error closing buffered stream for {}/{}", bucketName, key, e); - } finally { - bufferedStream = null; - } - } - - if (currentStream != null) { - try { - currentStream.close(); - } catch (IOException e) { - if (exception == null) { - exception = e; - } else { - exception.addSuppressed(e); - } - LOG.warn("Error closing S3 response stream for {}/{}", bucketName, key, e); - } finally { - currentStream = null; - } - } + // Only skip normal close and go straight to abort if the stream was fully read. + // Otherwise we try close() first (to preserve connection reuse), falling back to + // abort() when MinIO closed the connection early (which makes reuse impossible anyway). + boolean discardRemaining = position >= contentLength; + releaseCurrentObjectStream(discardRemaining); LOG.debug( "Closed S3 input stream - bucket: {}, key: {}, final position: {}/{}", @@ -304,9 +362,6 @@ public void close() throws IOException { key, position, contentLength); - if (exception != null) { - throw exception; - } } finally { lock.unlock(); } @@ -359,4 +414,4 @@ public long skip(long n) throws IOException { lock.unlock(); } } -} +} \ No newline at end of file From a1e4c2bdbf9493797f61ffce669f3a936e7ae4e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Sun, 19 Apr 2026 22:16:00 +0800 Subject: [PATCH 2/2] [FLINK-39484][filesystem/s3] Fix license header and Spotless for NativeS3InputStream Align ASF license block with Flink convention; apply Spotless (Javadoc wrap). Made-with: Cursor --- .../fs/s3native/NativeS3InputStream.java | 46 +++++++++++-------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java index ff0f09d869b2e..d7490e4c93e8b 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java @@ -1,13 +1,13 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -35,20 +35,20 @@ * S3 input stream with configurable read-ahead buffer, range-based requests for seek operations, * automatic stream reopening on errors, and lazy initialization to minimize memory footprint. * - *

Thread Safety: Internal state is guarded by a lock to ensure safe concurrent access - * and resource cleanup. + *

Thread Safety: Internal state is guarded by a lock to ensure safe concurrent access and + * resource cleanup. * *

Partial reads and {@code seek}: Apache HttpClient tries to drain the remainder of the - * response body on {@link java.io.InputStream#close()} to reuse connections. After a {@link - * #seek} or early close, draining can fail against S3-compatible endpoints (for example MinIO) - * with {@code ConnectionClosedException: Premature end of Content-Length delimited message body}. + * response body on {@link java.io.InputStream#close()} to reuse connections. After a {@link #seek} + * or early close, draining can fail against S3-compatible endpoints (for example MinIO) with {@code + * ConnectionClosedException: Premature end of Content-Length delimited message body}. * *

The {@code close()} path handles this gracefully: it attempts normal connection cleanup first * (which preserves HTTP connection reuse for well-behaved servers), and only falls back to {@link * ResponseInputStream#abort()} when the connection was already closed early by the server — in * which case the connection is not reusable anyway, so aborting carries no performance penalty. - * Abandoned streams after {@link #seek} always use {@code abort()} immediately since the - * connection is being discarded regardless. + * Abandoned streams after {@link #seek} always use {@code abort()} immediately since the connection + * is being discarded regardless. */ class NativeS3InputStream extends FSDataInputStream { @@ -150,9 +150,9 @@ private void openStreamAtCurrentPosition() throws IOException { * Releases the in-flight {@code GetObject} HTTP response. * *

When {@code abandon} is {@code true}, the stream is being discarded (e.g. after a {@link - * #seek}) and {@link ResponseInputStream#abort()} is called immediately, bypassing any - * attempt to drain the response body. This is necessary for correctness because the caller - * has already moved on and will not consume the rest of the body. + * #seek}) and {@link ResponseInputStream#abort()} is called immediately, bypassing any attempt + * to drain the response body. This is necessary for correctness because the caller has already + * moved on and will not consume the rest of the body. * *

When {@code abandon} is {@code false}, normal {@code close()} is attempted first. This * preserves HTTP connection reuse for well-behaved S3 servers. If the server closes the @@ -192,7 +192,10 @@ private void releaseCurrentObjectStream(boolean abandon) { LOG.warn( "S3 server closed connection prematurely for {}/{} (expected {} bytes) " + "-- aborting and treating as non-fatal", - bucketName, key, contentLength, e); + bucketName, + key, + contentLength, + e); abortSafely(); } else { LOG.warn("Error closing buffered stream for {}/{}", bucketName, key, e); @@ -209,7 +212,10 @@ private void releaseCurrentObjectStream(boolean abandon) { LOG.warn( "S3 server closed connection prematurely for {}/{} (expected {} bytes) " + "-- aborting and treating as non-fatal", - bucketName, key, contentLength, e); + bucketName, + key, + contentLength, + e); abortSafely(); } else { LOG.warn("Error closing S3 response stream for {}/{}", bucketName, key, e); @@ -233,9 +239,9 @@ private void abortSafely() { } /** - * Returns true if the given I/O exception represents a connection closed before all bytes - * were received — a pattern seen on S3-compatible storage (MinIO) when it closes a - * connection early and Apache HttpClient tries to drain the remainder. + * Returns true if the given I/O exception represents a connection closed before all bytes were + * received — a pattern seen on S3-compatible storage (MinIO) when it closes a connection early + * and Apache HttpClient tries to drain the remainder. */ private static boolean isPrematureEndOfMessage(IOException e) { String msg = e.getMessage() != null ? e.getMessage() : ""; @@ -414,4 +420,4 @@ public long skip(long n) throws IOException { lock.unlock(); } } -} \ No newline at end of file +}