diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java index 09bea8b4ab1..02d35e30646 100644 --- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java @@ -22,10 +22,21 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.GenericManifestFile; +import org.apache.iceberg.GenericPartitionFieldSummary; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.InternalData; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.RewriteTablePathUtil; import org.apache.iceberg.RewriteTablePathUtil.RewriteResult; @@ -37,6 +48,7 @@ import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.actions.ImmutableRewriteTablePath; import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.util.Pair; /** @@ -57,11 +69,15 @@ public class RewriteTablePathOzoneAction implements RewriteTablePath { private String stagingDir; private int parallelism; + private ExecutorService executorService; + private static final int MAX_INFLIGHT_MULTIPLIER = 4; + private static final int DEFAULT_THREAD_COUNT = 10; + private final Table table; public RewriteTablePathOzoneAction(Table table) { this.table = table; - this.parallelism = Runtime.getRuntime().availableProcessors(); + this.parallelism = DEFAULT_THREAD_COUNT; } public RewriteTablePathOzoneAction(Table table, int parallelism) { @@ -102,8 +118,7 @@ public RewriteTablePath stagingLocation(String stagingLocation) { @Override public Result execute() { validateInputs(); - // TODO: should use for parallel manifest and position delete file rewriting. - ExecutorService executorService = Executors.newFixedThreadPool(parallelism); + executorService = Executors.newFixedThreadPool(parallelism); try { return doExecute(); } finally { @@ -197,6 +212,9 @@ private boolean versionInFilePath(String path, String version) { private String rebuildMetadata() { //TODO need to implement rewrite of manifest list , manifest files and position delete files. + TableMetadata startMetadata = startVersionName != null + ? new StaticTableOperations(startVersionName, table.io()).current() + : null; TableMetadata endMetadata = new StaticTableOperations(endVersionName, table.io()).current(); List partitionStats = endMetadata.partitionStatisticsFiles(); @@ -205,6 +223,11 @@ private String rebuildMetadata() { } RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); + Set deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); + Set validSnapshots = new HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata)); + validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata)); + //TODO: manifestsToRewrite will be used while re-write of manifest-list files. + Set manifestsToRewrite = manifestsToRewrite(deltaSnapshots, validSnapshots, startMetadata); Set> copyPlan = new HashSet<>(); copyPlan.addAll(rewriteVersionResult.copyPlan()); @@ -251,4 +274,109 @@ private Set> rewriteVersionFile(TableMetadata metadata, Str return result; } + + private Set manifestsToRewrite(Set deltaSnapshots, Set validSnapshots, + TableMetadata startMetadata) { + + final Set deltaSnapshotIds; + if (startMetadata != null) { + deltaSnapshotIds = deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + } else { + deltaSnapshotIds = null; + } + + Set manifestPaths = ConcurrentHashMap.newKeySet(); + int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER; + Semaphore semaphore = new Semaphore(maxInFlight); + + ExecutorCompletionService completionService = new ExecutorCompletionService<>(executorService); + + int submittedTasks = 0; + int completedTasks = 0; + + try { + for (Snapshot snapshot : validSnapshots) { + semaphore.acquire(); // blocks when maxInFlight tasks are already in-flight + + final long snapshotId = snapshot.snapshotId(); + final String manifestListLocation = snapshot.manifestListLocation(); + + boolean taskSubmitted = false; + try { + completionService.submit(() -> { + try (CloseableIterable manifests = + InternalData.read( + FileFormat.AVRO, + table.io().newInputFile(manifestListLocation)) + .setRootType(GenericManifestFile.class) + .setCustomType( + ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID, + GenericPartitionFieldSummary.class) + .project(ManifestFile.schema()) + .build()) { + + for (ManifestFile manifest : manifests) { + if (deltaSnapshotIds == null) { + manifestPaths.add(manifest.path()); + } else if (manifest.snapshotId() != null + && deltaSnapshotIds.contains(manifest.snapshotId())) { + manifestPaths.add(manifest.path()); + } + } + + } catch (Exception e) { + throw new RuntimeException( + "Failed to read manifests for snapshot " + snapshotId, e); + } finally { + semaphore.release(); + } + return null; + }); + taskSubmitted = true; + submittedTasks++; + } finally { + if (!taskSubmitted) { + semaphore.release(); + } + } + Future done; + while ((done = completionService.poll()) != null) { + done.get(); + completedTasks++; + } + } + + while (completedTasks < submittedTasks) { + completionService.take().get(); + completedTasks++; + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + executorService.shutdownNow(); + throw new RuntimeException("Interrupted while processing manifests", e); + + } catch (ExecutionException e) { + executorService.shutdownNow(); + throw new RuntimeException( + "Failed to collect manifests to rewrite. " + + "The end version may contain invalid snapshots. " + + "Please choose an earlier version.", + e.getCause()); + } + + return manifestPaths; + } + + private Set deltaSnapshots(TableMetadata startMetadata, Set allSnapshots) { + if (startMetadata == null) { + return allSnapshots; + } else { + Set startSnapshotIds = + startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return allSnapshots.stream() + .filter(s -> !startSnapshotIds.contains(s.snapshotId())) + .collect(Collectors.toSet()); + } + } } diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java index e7a36da95f7..97f129e05d5 100644 --- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java +++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneUtils.java @@ -27,8 +27,10 @@ import java.util.Set; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.RewriteTablePathUtil; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -113,4 +115,12 @@ static void writeAsCsv(Set> rows, OutputFile outputFile) { throw new RuntimeIOException(e); } } + + static Set snapshotSet(TableMetadata metadata) { + if (metadata == null) { + return new HashSet<>(); + } else { + return new HashSet<>(metadata.snapshots()); + } + } }