diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index b8214882f12e..c72790746686 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -220,9 +220,17 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy } throw ex; } catch (FileAlreadyExistsException ex) { - throw new StorageContainerException("Container creation failed " + - "because ContainerFile already exists", ex, - CONTAINER_ALREADY_EXISTS); + containerData.releaseCommitSpace(); + volumes.remove(containerVolume); + LOG.warn("Container {} already exists on selected volume {}, " + + "trying another volume. Remaining volumes: {}", + containerData.getContainerID(), containerVolume.getHddsRootDir(), + volumes.size(), ex); + if (volumes.isEmpty()) { + throw new StorageContainerException("Container creation failed " + + "because ContainerFile already exists on all candidate volumes", + ex, CONTAINER_ALREADY_EXISTS); + } } catch (IOException ex) { // This is a general catch all - no space left of device, which should // not happen as the volume Choosing policy should filter out full @@ -233,6 +241,7 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy containerMetaDataPath.getParentFile().exists()) { FileUtil.fullyDelete(containerMetaDataPath.getParentFile()); } + containerData.releaseCommitSpace(); volumes.remove(containerVolume); LOG.error("Failed to create {} on volume {}, remaining volumes: {}]", containerData, containerVolume.getHddsRootDir(), volumes.size(), ex); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index 7b42006b2293..bf0393685e73 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -23,9 +23,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -42,7 +45,9 @@ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,62 +95,178 @@ public boolean isAllowedContainerImport(long containerID) { public void importContainer(long containerID, Path tarFilePath, HddsVolume targetVolume, CopyContainerCompression compression) throws IOException { + markContainerImportInProgress(containerID, tarFilePath); + + try { + checkContainerCanBeImported(containerID); + doImportContainer(containerID, tarFilePath, targetVolume, compression); + } finally { + importContainerProgress.remove(containerID); + FileUtils.deleteQuietly(tarFilePath.toFile()); + } + } + + /** + * Imports a container and retries on alternate volumes only when the selected + * volume already has the container directory. + * + * The caller is responsible for releasing committed bytes on + * initialTargetVolume. This method releases committed bytes only for + * retry-selected volumes. + */ + public void importContainerWithVolumeRetry(long containerID, Path tarFilePath, + HddsVolume initialTargetVolume, CopyContainerCompression compression, + long spaceToReserve) throws IOException { + markContainerImportInProgress(containerID, tarFilePath); + + try { + checkContainerCanBeImported(containerID); + List remainingVolumes = + getCandidateVolumesExcluding(initialTargetVolume); + + IOException lastException = null; + if (initialTargetVolume != null) { + lastException = tryImportContainerToVolume(containerID, tarFilePath, + initialTargetVolume, compression); + if (lastException == null) { + return; + } + } + + while (!remainingVolumes.isEmpty()) { + HddsVolume targetVolume = chooseNextVolume(remainingVolumes, + spaceToReserve); + try { + lastException = tryImportContainerToVolume(containerID, tarFilePath, + targetVolume, compression); + if (lastException == null) { + return; + } + } finally { + targetVolume.incCommittedBytes(-spaceToReserve); + } + remainingVolumes.remove(targetVolume); + } + + throw new StorageContainerException( + "Container import failed because container " + containerID + + " already exists on all candidate volumes", + lastException, ContainerProtos.Result.CONTAINER_ALREADY_EXISTS); + } finally { + importContainerProgress.remove(containerID); + FileUtils.deleteQuietly(tarFilePath.toFile()); + } + } + + private void markContainerImportInProgress(long containerID, Path tarFilePath) + throws StorageContainerException { if (!importContainerProgress.add(containerID)) { - deleteFileQuietely(tarFilePath); + FileUtils.deleteQuietly(tarFilePath.toFile()); String log = "Container import in progress with container Id " + containerID; LOG.warn(log); throw new StorageContainerException(log, ContainerProtos.Result.CONTAINER_EXISTS); } + } + + private List getCandidateVolumesExcluding( + HddsVolume excludedVolume) { + volumeSet.readLock(); + try { + List volumes = + StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()); + volumes.remove(excludedVolume); + return volumes; + } finally { + volumeSet.readUnlock(); + } + } + private IOException tryImportContainerToVolume(long containerID, + Path tarFilePath, HddsVolume targetVolume, + CopyContainerCompression compression) throws IOException { try { - if (containerSet.getContainer(containerID) != null) { - String log = "Container already exists with container Id " + containerID; - LOG.warn(log); - throw new StorageContainerException(log, - ContainerProtos.Result.CONTAINER_EXISTS); + importContainerToSelectedVolume(containerID, tarFilePath, targetVolume, + compression); + return null; + } catch (IOException ex) { + if (!isContainerAlreadyExistsException(ex)) { + throw ex; } + return ex; + } + } - KeyValueContainerData containerData; - TarContainerPacker packer = getPacker(compression); + private void importContainerToSelectedVolume(long containerID, + Path tarFilePath, HddsVolume targetVolume, + CopyContainerCompression compression) throws IOException { + if (containerDirExists(targetVolume, containerID)) { + throw new StorageContainerException( + "Container " + containerID + " already exists on selected volume " + + targetVolume.getHddsRootDir(), + ContainerProtos.Result.CONTAINER_ALREADY_EXISTS); + } + doImportContainer(containerID, tarFilePath, targetVolume, compression); + } - try (InputStream input = Files.newInputStream(tarFilePath)) { - byte[] containerDescriptorYaml = - packer.unpackContainerDescriptor(input); - containerData = getKeyValueContainerData(containerDescriptorYaml); - } - ContainerUtils.verifyContainerFileChecksum(containerData, conf); - containerData.setVolume(targetVolume); - // lastDataScanTime should be cleared for an imported container - containerData.setDataScanTimestamp(null); - - try (InputStream input = Files.newInputStream(tarFilePath)) { - Container container = controller.importContainer( - containerData, input, packer); - // After container import is successful, increase used space for the volume and schedule an OnDemand scan for it - targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed()); - containerSet.addContainerByOverwriteMissingContainer(container); - containerSet.scanContainer(containerID, "Imported container"); - } catch (Exception e) { + private void doImportContainer(long containerID, Path tarFilePath, + HddsVolume targetVolume, CopyContainerCompression compression) + throws IOException { + KeyValueContainerData containerData; + TarContainerPacker packer = getPacker(compression); + + try (InputStream input = Files.newInputStream(tarFilePath)) { + byte[] containerDescriptorYaml = + packer.unpackContainerDescriptor(input); + containerData = getKeyValueContainerData(containerDescriptorYaml); + } + ContainerUtils.verifyContainerFileChecksum(containerData, conf); + containerData.setVolume(targetVolume); + // lastDataScanTime should be cleared for an imported container + containerData.setDataScanTimestamp(null); + + try (InputStream input = Files.newInputStream(tarFilePath)) { + Container container = controller.importContainer( + containerData, input, packer); + // After container import is successful, increase used space for the volume and schedule an OnDemand scan for it + targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed()); + containerSet.addContainerByOverwriteMissingContainer(container); + containerSet.scanContainer(containerID, "Imported container"); + } catch (Exception e) { + if (!isContainerAlreadyExistsException(e)) { // Trigger a volume scan if the import failed. StorageVolumeUtil.onFailure(containerData.getVolume()); - throw e; } - } finally { - importContainerProgress.remove(containerID); - deleteFileQuietely(tarFilePath); + throw e; } } - private static void deleteFileQuietely(Path tarFilePath) { - try { - Files.delete(tarFilePath); - } catch (Exception ex) { - LOG.error("Got exception while deleting temporary container file: " - + tarFilePath.toAbsolutePath(), ex); + private void checkContainerCanBeImported(long containerID) + throws StorageContainerException { + if (containerSet.getContainer(containerID) != null) { + String log = "Container already exists with container Id " + containerID; + LOG.warn(log); + throw new StorageContainerException(log, + ContainerProtos.Result.CONTAINER_EXISTS); } } + private boolean containerDirExists(HddsVolume targetVolume, long containerID) + throws IOException { + String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID( + targetVolume, targetVolume.getClusterID()); + Path containerPath = Paths.get(KeyValueContainerLocationUtil + .getBaseContainerLocation(targetVolume.getHddsRootDir().toString(), + idDir, containerID)); + return Files.exists(containerPath); + } + + private boolean isContainerAlreadyExistsException(Throwable ex) { + return ex instanceof StorageContainerException && + ((StorageContainerException) ex).getResult() == + ContainerProtos.Result.CONTAINER_ALREADY_EXISTS; + } + HddsVolume chooseNextVolume(long spaceToReserve) throws IOException { // Choose volume that can hold both container in tmp and dest directory LOG.debug("Choosing volume to reserve space : {}", spaceToReserve); @@ -154,6 +275,14 @@ HddsVolume chooseNextVolume(long spaceToReserve) throws IOException { spaceToReserve); } + HddsVolume chooseNextVolume(List volumes, long spaceToReserve) + throws IOException { + // Choose volume that can hold both container in tmp and dest directory + LOG.debug("Choosing volume to reserve space : {}", spaceToReserve); + return volumeChoosingPolicy.chooseVolume(new ArrayList<>(volumes), + spaceToReserve); + } + public static Path getUntarDirectory(HddsVolume hddsVolume) throws IOException { return Paths.get(hddsVolume.getVolumeRootDir()) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 2457b592b141..0ae52d7c2be6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -90,8 +90,9 @@ public void replicate(ReplicationTask task) { containerID, bytes); task.setTransferredBytes(bytes); - containerImporter.importContainer(containerID, tarFilePath, targetVolume, - compression); + containerImporter.importContainerWithVolumeRetry(containerID, tarFilePath, + targetVolume, compression, + containerImporter.getDefaultReplicationSpace()); LOG.info("Container {} is replicated successfully", containerID); task.setStatus(Status.DONE); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java index 0824341127c3..806dca5cef87 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java @@ -142,7 +142,8 @@ public void onCompleted() { closeOutput(); try { - importer.importContainer(containerId, path, volume, compression); + importer.importContainerWithVolumeRetry(containerId, path, volume, + compression, spaceToReserve); LOG.info("Container {} is replicated successfully", containerId); responseObserver.onNext(SendContainerResponse.newBuilder().build()); responseObserver.onCompleted(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java index 9e66aaeb067a..b64317a28a6c 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java @@ -90,10 +90,12 @@ import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore; import org.apache.hadoop.ozone.container.metadata.DatanodeStore; import org.apache.hadoop.ozone.container.replication.CopyContainerCompression; +import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; import org.apache.hadoop.util.DiskChecker; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -260,6 +262,43 @@ protected void createContainerMetaData(File containerMetaDataPath, assertEquals(2, callCount.get()); } + @ContainerTestVersionInfo.ContainerTest + public void testNextVolumeTriedWhenSelectedVolumeHasContainerDir( + ContainerTestVersionInfo versionInfo) throws Exception { + init(versionInfo); + String volumeDirPath = + Files.createDirectory(folder.toPath().resolve("volumeDir")) + .toFile().getAbsolutePath(); + HddsVolume newVolume = new HddsVolume.Builder(volumeDirPath) + .conf(CONF).datanodeUuid(datanodeId.toString()).build(); + StorageVolumeUtil.checkVolume(newVolume, scmId, scmId, CONF, null, null); + hddsVolumes.add(newVolume); + reset(volumeChoosingPolicy); + when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) + .thenAnswer(invocation -> { + List volumes = invocation.getArgument(0); + long spaceToReserve = invocation.getArgument(1); + HddsVolume volume = volumes.get(0); + volume.incCommittedBytes(spaceToReserve); + return volume; + }); + + HddsVolume firstVolume = hddsVolumes.get(0); + long initialCommittedBytes = firstVolume.getCommittedBytes(); + String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID( + firstVolume, scmId); + File staleContainerDir = new File(KeyValueContainerLocationUtil + .getBaseContainerLocation(firstVolume.getHddsRootDir().toString(), + idDir, keyValueContainerData.getContainerID())); + assertTrue(staleContainerDir.mkdirs()); + + keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); + + assertEquals(initialCommittedBytes, firstVolume.getCommittedBytes()); + assertEquals(newVolume, keyValueContainer.getContainerData().getVolume()); + assertTrue(staleContainerDir.exists()); + } + @ContainerTestVersionInfo.ContainerTest public void testEmptyContainerImportExport( ContainerTestVersionInfo versionInfo) throws Exception { @@ -610,7 +649,8 @@ public void testDuplicateContainer(ContainerTestVersionInfo versionInfo) throws StorageContainerException exception = assertThrows(StorageContainerException.class, () -> keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId)); assertEquals(ContainerProtos.Result.CONTAINER_ALREADY_EXISTS, exception.getResult()); - assertThat(exception).hasMessage("Container creation failed because ContainerFile already exists"); + assertThat(exception).hasMessage( + "Container creation failed because ContainerFile already exists on all candidate volumes"); } @ContainerTestVersionInfo.ContainerTest diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java index e594dad3e58b..5f3e921e7d0b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java @@ -41,6 +41,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; @@ -66,7 +67,9 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.Assertions; @@ -110,7 +113,7 @@ void setup() throws IOException { return container; }); containerSet = spy(newContainerSet(0)); - volumeSet = new MutableVolumeSet("test", conf, null, + volumeSet = new MutableVolumeSet("test", "test", conf, null, StorageVolume.VolumeType.DATA_VOLUME, null); // create containerImporter object containerImporter = new ContainerImporter(conf, @@ -234,6 +237,127 @@ public void testImportContainerResetsLastScanTime() throws Exception { assertEquals(Optional.empty(), containerData.lastDataScanTime()); } + @Test + public void testImportWithVolumeRetrySkipsSelectedVolumeWithContainerDir() + throws Exception { + List volumes = setupTwoVolumeImporterWithMockPolicy(); + HddsVolume firstVolume = volumes.get(0); + HddsVolume secondVolume = volumes.get(1); + long secondInitialCommittedBytes = secondVolume.getCommittedBytes(); + + File staleContainerDir = createStaleContainerDir(firstVolume, containerId); + + File tarFile = containerTarFile(containerId, containerData); + containerImporter.importContainerWithVolumeRetry(containerId, + tarFile.toPath(), firstVolume, NO_COMPRESSION, + containerImporter.getDefaultReplicationSpace()); + + assertEquals(secondVolume, containerData.getVolume()); + assertThat(staleContainerDir).exists(); + assertEquals(secondInitialCommittedBytes, secondVolume.getCommittedBytes()); + verify(containerSet, atLeastOnce()).scanContainer(containerId, + "Imported container"); + } + + @Test + public void testImportWithVolumeRetryFailsWhenAllVolumesHaveContainerDir() + throws Exception { + List volumes = setupTwoVolumeImporterWithMockPolicy(); + HddsVolume firstVolume = volumes.get(0); + HddsVolume secondVolume = volumes.get(1); + long spaceToReserve = containerImporter.getDefaultReplicationSpace(); + long firstInitialCommittedBytes = firstVolume.getCommittedBytes(); + long secondInitialCommittedBytes = secondVolume.getCommittedBytes(); + + firstVolume.incCommittedBytes(spaceToReserve); + createStaleContainerDir(firstVolume, containerId); + createStaleContainerDir(secondVolume, containerId); + + File tarFile = containerTarFile(containerId, containerData); + StorageContainerException exception = + assertThrows(StorageContainerException.class, + () -> containerImporter.importContainerWithVolumeRetry(containerId, + tarFile.toPath(), firstVolume, NO_COMPRESSION, + spaceToReserve)); + + assertEquals(ContainerProtos.Result.CONTAINER_ALREADY_EXISTS, + exception.getResult()); + assertThat(exception).hasMessageContaining( + "already exists on all candidate volumes"); + assertThat(tarFile).doesNotExist(); + assertEquals(firstInitialCommittedBytes + spaceToReserve, + firstVolume.getCommittedBytes()); + firstVolume.incCommittedBytes(-spaceToReserve); + assertEquals(firstInitialCommittedBytes, firstVolume.getCommittedBytes()); + assertEquals(secondInitialCommittedBytes, secondVolume.getCommittedBytes()); + assertEquals(0, containerSet.containerCount()); + verify(controllerMock, times(0)).importContainer(any(ContainerData.class), + any(), any()); + } + + @Test + public void testImportWithVolumeRetryDoesNotRetryNonContainerExistsFailure() + throws Exception { + List volumes = setupTwoVolumeImporterWithMockPolicy(); + HddsVolume firstVolume = volumes.get(0); + long spaceToReserve = containerImporter.getDefaultReplicationSpace(); + long firstInitialCommittedBytes = firstVolume.getCommittedBytes(); + firstVolume.incCommittedBytes(spaceToReserve); + + when(controllerMock.importContainer(any(ContainerData.class), any(), any())) + .thenThrow(new IOException("non-retriable")); + + File tarFile = containerTarFile(containerId, containerData); + IOException exception = assertThrows(IOException.class, + () -> containerImporter.importContainerWithVolumeRetry(containerId, + tarFile.toPath(), firstVolume, NO_COMPRESSION, spaceToReserve)); + + assertThat(exception).hasMessageContaining("non-retriable"); + assertThat(tarFile).doesNotExist(); + assertEquals(firstInitialCommittedBytes + spaceToReserve, + firstVolume.getCommittedBytes()); + firstVolume.incCommittedBytes(-spaceToReserve); + assertEquals(firstInitialCommittedBytes, firstVolume.getCommittedBytes()); + verify(controllerMock, times(1)).importContainer(any(ContainerData.class), + any(), any()); + verify(volumeChoosingPolicy, times(0)).chooseVolume(any(), anyLong()); + } + + private List setupTwoVolumeImporterWithMockPolicy() + throws IOException { + File volume2 = Files.createDirectory(tempDir.toPath().resolve("volume2")) + .toFile(); + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, + tempDir.getAbsolutePath() + "," + volume2.getAbsolutePath()); + + volumeChoosingPolicy = mock(VolumeChoosingPolicy.class); + when(volumeChoosingPolicy.chooseVolume(any(), anyLong())).thenAnswer( + invocation -> { + List volumes = invocation.getArgument(0); + long spaceToReserve = invocation.getArgument(1); + HddsVolume volume = volumes.get(0); + volume.incCommittedBytes(spaceToReserve); + return volume; + }); + volumeSet = new MutableVolumeSet("test", "test", conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + containerImporter = new ContainerImporter(conf, + containerSet, controllerMock, volumeSet, volumeChoosingPolicy); + + return StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()); + } + + private File createStaleContainerDir(HddsVolume volume, long id) + throws IOException { + String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID( + volume, volume.getClusterID()); + File staleContainerDir = new File(KeyValueContainerLocationUtil + .getBaseContainerLocation(volume.getHddsRootDir().toString(), + idDir, id)); + assertThat(staleContainerDir.mkdirs()).isTrue(); + return staleContainerDir; + } + private File containerTarFile(long id, ContainerData data) throws IOException { File yamlFile = new File(tempDir, "container.yaml"); ContainerDataYaml.createContainerFile(data, yamlFile); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java index 8b831fa06466..3be8bdb18184 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java @@ -147,7 +147,8 @@ public void init() throws Exception { doAnswer(invocation -> { pushContainerId.set((long) invocation.getArguments()[0]); return null; - }).when(importer).importContainer(anyLong(), any(), any(), any()); + }).when(importer).importContainerWithVolumeRetry(anyLong(), any(), any(), + any(), anyLong()); doReturn(true).when(importer).isAllowedContainerImport(eq( CONTAINER_ID)); when(importer.chooseNextVolume(anyLong())).thenReturn(new HddsVolume.Builder( diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 9ceb0a99e9f0..c613088d2182 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -380,8 +380,8 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) .clock(clock) .build(); - MutableVolumeSet volumeSet = new MutableVolumeSet(datanode.getUuidString(), conf, null, - StorageVolume.VolumeType.DATA_VOLUME, null); + MutableVolumeSet volumeSet = new MutableVolumeSet(datanode.getUuidString(), + "test", conf, null, StorageVolume.VolumeType.DATA_VOLUME, null); long containerId = 1; // create container