diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java index 70f2f66ef35ed..7500151805591 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java @@ -144,6 +144,10 @@ private RefCountedFile downloadToCacheFile(FileStateHandle fileHandle) { return new RefCountedFile(file); } catch (IOException e) { + LOG.warn("Failed to download cache file. Deleting partially downloaded file {}", file.getPath(), e); + if (!file.delete()) { + LOG.warn("Could not delete partially downloaded cache file {}", file.getPath()); + } ExceptionUtils.rethrow(e); return null; } @@ -152,11 +156,16 @@ private RefCountedFile downloadToCacheFile(FileStateHandle fileHandle) { private FileInputStream openAndSeek(RefCountedFile refCountedFile, long offset) throws IOException { FileInputStream fin = new FileInputStream(refCountedFile.getFile()); - if (offset != 0) { - LOG.trace("seek to {}", offset); - fin.getChannel().position(offset); + try { + if (offset != 0) { + LOG.trace("seek to {}", offset); + fin.getChannel().position(offset); + } + return fin; + } catch (Exception e) { + IOUtils.closeQuietly(fin); + throw e; } - return fin; } private DataInputStream wrapStream(Path dfsPath, FileInputStream fin) {