From 13a297c1610b8540e4502f17c4fa603b77a8a803 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Mon, 27 Apr 2026 14:42:06 +0530 Subject: [PATCH 1/3] HDDS-14942. Implement manifest selection logic for rewrite based on snapshot delta --- .../iceberg/RewriteTablePathOzoneAction.java | 131 +++++++++++++++++- .../iceberg/RewriteTablePathOzoneUtils.java | 7 + 2 files changed, 135 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java index 09bea8b4ab1..5c3e9c86474 100644 --- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java @@ -22,10 +22,21 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.GenericManifestFile; +import org.apache.iceberg.GenericPartitionFieldSummary; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.InternalData; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.RewriteTablePathUtil; import org.apache.iceberg.RewriteTablePathUtil.RewriteResult; @@ -37,6 +48,7 @@ import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.actions.ImmutableRewriteTablePath; import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.util.Pair; /** @@ -57,11 +69,15 @@ public class RewriteTablePathOzoneAction implements RewriteTablePath { private String stagingDir; private int parallelism; + private ExecutorService executorService; + private static final int MAX_INFLIGHT_MULTIPLIER = 4; + private static final int DEFAULT_THREAD_COUNT = 10; + private final Table table; public RewriteTablePathOzoneAction(Table table) { this.table = table; - this.parallelism = Runtime.getRuntime().availableProcessors(); + this.parallelism = DEFAULT_THREAD_COUNT; } public RewriteTablePathOzoneAction(Table table, int parallelism) { @@ -102,8 +118,7 @@ public RewriteTablePath stagingLocation(String stagingLocation) { @Override public Result execute() { validateInputs(); - // TODO: should use for parallel manifest and position delete file rewriting. - ExecutorService executorService = Executors.newFixedThreadPool(parallelism); + executorService = Executors.newFixedThreadPool(parallelism); try { return doExecute(); } finally { @@ -197,6 +212,9 @@ private boolean versionInFilePath(String path, String version) { private String rebuildMetadata() { //TODO need to implement rewrite of manifest list , manifest files and position delete files. + TableMetadata startMetadata = startVersionName != null + ? new StaticTableOperations(startVersionName, table.io()).current() + : null; TableMetadata endMetadata = new StaticTableOperations(endVersionName, table.io()).current(); List partitionStats = endMetadata.partitionStatisticsFiles(); @@ -205,6 +223,8 @@ private String rebuildMetadata() { } RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); + Set deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); + Set manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata); Set> copyPlan = new HashSet<>(); copyPlan.addAll(rewriteVersionResult.copyPlan()); @@ -251,4 +271,109 @@ private Set> rewriteVersionFile(TableMetadata metadata, Str return result; } + + private Set manifestsToRewrite(Set deltaSnapshots, TableMetadata startMetadata) { + Table endStaticTable = RewriteTablePathOzoneUtils.newStaticTable(endVersionName, table.io()); + + final Set deltaSnapshotIds; + if (startMetadata != null) { + deltaSnapshotIds = deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + } else { + deltaSnapshotIds = null; + } + + Set manifestPaths = ConcurrentHashMap.newKeySet(); + int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER; + Semaphore semaphore = new Semaphore(maxInFlight); + + ExecutorCompletionService completionService = new ExecutorCompletionService<>(executorService); + + int submittedTasks = 0; + int completedTasks = 0; + + try { + for (Snapshot snapshot : endStaticTable.snapshots()) { + semaphore.acquire(); // blocks when maxInFlight tasks are already in-flight + + final long snapshotId = snapshot.snapshotId(); + final String manifestListLocation = snapshot.manifestListLocation(); + + boolean taskSubmitted = false; + try { + completionService.submit(() -> { + try (CloseableIterable manifests = + InternalData.read( + FileFormat.AVRO, + table.io().newInputFile(manifestListLocation)) + .setRootType(GenericManifestFile.class) + .setCustomType( + ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID, + GenericPartitionFieldSummary.class) + .project(ManifestFile.schema()) + .build()) { + + for (ManifestFile manifest : manifests) { + if (deltaSnapshotIds == null) { + manifestPaths.add(manifest.path()); + } else if (manifest.snapshotId() != null + && deltaSnapshotIds.contains(manifest.snapshotId())) { + manifestPaths.add(manifest.path()); + } + } + + } catch (Exception e) { + throw new RuntimeException( + "Failed to read manifests for snapshot " + snapshotId, e); + } finally { + semaphore.release(); + } + return null; + }); + taskSubmitted = true; + submittedTasks++; + } finally { + if (!taskSubmitted) { + semaphore.release(); + } + } + Future done; + while ((done = completionService.poll()) != null) { + done.get(); + completedTasks++; + } + } + + while (completedTasks < submittedTasks) { + completionService.take().get(); + completedTasks++; + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + executorService.shutdownNow(); + throw new RuntimeException("Interrupted while processing manifests", e); + + } catch (ExecutionException e) { + executorService.shutdownNow(); + throw new RuntimeException( + "Failed to collect manifests to rewrite. " + + "The end version may contain invalid snapshots. " + + "Please choose an earlier version.", + e.getCause()); + } + + return manifestPaths; + } + + private Set deltaSnapshots(TableMetadata startMetadata, Set allSnapshots) { + if (startMetadata == null) { + return allSnapshots; + } else { + Set startSnapshotIds = + startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return allSnapshots.stream() + .filter(s -> !startSnapshotIds.contains(s.snapshotId())) + .collect(Collectors.toSet()); + } + } } diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java index e7a36da95f7..0507e55c3e9 100644 --- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java +++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java @@ -25,8 +25,10 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.RewriteTablePathUtil; +import org.apache.iceberg.StaticTableOperations; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -113,4 +115,9 @@ static void writeAsCsv(Set> rows, OutputFile outputFile) { throw new RuntimeIOException(e); } } + + static Table newStaticTable(String metadataFileLocation, FileIO io) { + StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io); + return new BaseTable(ops, metadataFileLocation); + } } From 91394c9f448c72ca1cd6e78b0b6f1cda4d102eb8 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Wed, 29 Apr 2026 15:39:01 +0530 Subject: [PATCH 2/3] Minor code updates --- .../ozone/iceberg/RewriteTablePathOzoneAction.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java index 5c3e9c86474..c010cc237ce 100644 --- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java @@ -224,7 +224,8 @@ private String rebuildMetadata() { RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); Set deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); - Set manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata); + //TODO: manifestsToRewrite will be used while re-write of manifest-list files. + Set manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata, endMetadata); Set> copyPlan = new HashSet<>(); copyPlan.addAll(rewriteVersionResult.copyPlan()); @@ -272,8 +273,8 @@ private Set> rewriteVersionFile(TableMetadata metadata, Str return result; } - private Set manifestsToRewrite(Set deltaSnapshots, TableMetadata startMetadata) { - Table endStaticTable = RewriteTablePathOzoneUtils.newStaticTable(endVersionName, table.io()); + private Set manifestsToRewrite(Set deltaSnapshots, TableMetadata startMetadata, + TableMetadata endMetadata) { final Set deltaSnapshotIds; if (startMetadata != null) { @@ -292,7 +293,7 @@ private Set manifestsToRewrite(Set deltaSnapshots, TableMetada int completedTasks = 0; try { - for (Snapshot snapshot : endStaticTable.snapshots()) { + for (Snapshot snapshot : endMetadata.snapshots()) { semaphore.acquire(); // blocks when maxInFlight tasks are already in-flight final long snapshotId = snapshot.snapshotId(); From 2e7b64f9b6b8d858f926e6f3727afd66abbcb6f6 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Fri, 1 May 2026 15:12:26 +0530 Subject: [PATCH 3/3] Updated code to use validSnapshots --- .../ozone/iceberg/RewriteTablePathOzoneAction.java | 10 ++++++---- .../ozone/iceberg/RewriteTablePathOzoneUtils.java | 13 ++++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java index c010cc237ce..02d35e30646 100644 --- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java @@ -224,8 +224,10 @@ private String rebuildMetadata() { RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); Set deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); + Set validSnapshots = new HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata)); + validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata)); //TODO: manifestsToRewrite will be used while re-write of manifest-list files. - Set manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata, endMetadata); + Set manifestsToRewrite = manifestsToRewrite(deltaSnapshots, validSnapshots, startMetadata); Set> copyPlan = new HashSet<>(); copyPlan.addAll(rewriteVersionResult.copyPlan()); @@ -273,8 +275,8 @@ private Set> rewriteVersionFile(TableMetadata metadata, Str return result; } - private Set manifestsToRewrite(Set deltaSnapshots, TableMetadata startMetadata, - TableMetadata endMetadata) { + private Set manifestsToRewrite(Set deltaSnapshots, Set validSnapshots, + TableMetadata startMetadata) { final Set deltaSnapshotIds; if (startMetadata != null) { @@ -293,7 +295,7 @@ private Set manifestsToRewrite(Set deltaSnapshots, TableMetada int completedTasks = 0; try { - for (Snapshot snapshot : endMetadata.snapshots()) { + for (Snapshot snapshot : validSnapshots) { semaphore.acquire(); // blocks when maxInFlight tasks are already in-flight final long snapshotId = snapshot.snapshotId(); diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java index 0507e55c3e9..97f129e05d5 100644 --- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java +++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java @@ -25,12 +25,12 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import org.apache.iceberg.BaseTable; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.RewriteTablePathUtil; -import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -116,8 +116,11 @@ static void writeAsCsv(Set> rows, OutputFile outputFile) { } } - static Table newStaticTable(String metadataFileLocation, FileIO io) { - StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io); - return new BaseTable(ops, metadataFileLocation); + static Set snapshotSet(TableMetadata metadata) { + if (metadata == null) { + return new HashSet<>(); + } else { + return new HashSet<>(metadata.snapshots()); + } } }