Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,17 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
}
throw ex;
} catch (FileAlreadyExistsException ex) {
throw new StorageContainerException("Container creation failed " +
"because ContainerFile already exists", ex,
CONTAINER_ALREADY_EXISTS);
containerData.releaseCommitSpace();
volumes.remove(containerVolume);
LOG.warn("Container {} already exists on selected volume {}, " +
"trying another volume. Remaining volumes: {}",
containerData.getContainerID(), containerVolume.getHddsRootDir(),
volumes.size(), ex);
if (volumes.isEmpty()) {
throw new StorageContainerException("Container creation failed " +
"because ContainerFile already exists on all candidate volumes",
ex, CONTAINER_ALREADY_EXISTS);
}
} catch (IOException ex) {
// This is a general catch all - no space left of device, which should
// not happen as the volume Choosing policy should filter out full
Expand All @@ -233,6 +241,7 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
containerMetaDataPath.getParentFile().exists()) {
FileUtil.fullyDelete(containerMetaDataPath.getParentFile());
}
containerData.releaseCommitSpace();
volumes.remove(containerVolume);
LOG.error("Failed to create {} on volume {}, remaining volumes: {}]",
containerData, containerVolume.getHddsRootDir(), volumes.size(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand All @@ -42,7 +45,9 @@
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -90,62 +95,178 @@ public boolean isAllowedContainerImport(long containerID) {
public void importContainer(long containerID, Path tarFilePath,
HddsVolume targetVolume, CopyContainerCompression compression)
throws IOException {
markContainerImportInProgress(containerID, tarFilePath);

try {
checkContainerCanBeImported(containerID);
doImportContainer(containerID, tarFilePath, targetVolume, compression);
} finally {
importContainerProgress.remove(containerID);
FileUtils.deleteQuietly(tarFilePath.toFile());
}
}

/**
* Imports a container and retries on alternate volumes only when the selected
* volume already has the container directory.
*
* The caller is responsible for releasing committed bytes on
* initialTargetVolume. This method releases committed bytes only for
* retry-selected volumes.
*/
public void importContainerWithVolumeRetry(long containerID, Path tarFilePath,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add JavaDoc to clarify committed-space ownership.

/**
 * Imports a container and retries on alternate volumes only when the selected
 * volume already has the container directory.
 *
 * The caller is responsible for releasing committed bytes on
 * initialTargetVolume. This method releases committed bytes only for
 * retry-selected volumes.
 */

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

HddsVolume initialTargetVolume, CopyContainerCompression compression,
long spaceToReserve) throws IOException {
markContainerImportInProgress(containerID, tarFilePath);

try {
checkContainerCanBeImported(containerID);
List<HddsVolume> remainingVolumes =
getCandidateVolumesExcluding(initialTargetVolume);

IOException lastException = null;
if (initialTargetVolume != null) {
lastException = tryImportContainerToVolume(containerID, tarFilePath,
initialTargetVolume, compression);
if (lastException == null) {
return;
}
}

while (!remainingVolumes.isEmpty()) {
HddsVolume targetVolume = chooseNextVolume(remainingVolumes,
spaceToReserve);
try {
lastException = tryImportContainerToVolume(containerID, tarFilePath,
targetVolume, compression);
if (lastException == null) {
return;
}
} finally {
targetVolume.incCommittedBytes(-spaceToReserve);
}
remainingVolumes.remove(targetVolume);
}

throw new StorageContainerException(
"Container import failed because container " + containerID +
" already exists on all candidate volumes",
lastException, ContainerProtos.Result.CONTAINER_ALREADY_EXISTS);
} finally {
importContainerProgress.remove(containerID);
FileUtils.deleteQuietly(tarFilePath.toFile());
}
}

private void markContainerImportInProgress(long containerID, Path tarFilePath)
throws StorageContainerException {
if (!importContainerProgress.add(containerID)) {
deleteFileQuietely(tarFilePath);
FileUtils.deleteQuietly(tarFilePath.toFile());
String log = "Container import in progress with container Id " + containerID;
LOG.warn(log);
throw new StorageContainerException(log,
ContainerProtos.Result.CONTAINER_EXISTS);
}
}

private List<HddsVolume> getCandidateVolumesExcluding(
HddsVolume excludedVolume) {
volumeSet.readLock();
try {
List<HddsVolume> volumes =
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
volumes.remove(excludedVolume);
return volumes;
} finally {
volumeSet.readUnlock();
}
}

private IOException tryImportContainerToVolume(long containerID,
Path tarFilePath, HddsVolume targetVolume,
CopyContainerCompression compression) throws IOException {
try {
if (containerSet.getContainer(containerID) != null) {
String log = "Container already exists with container Id " + containerID;
LOG.warn(log);
throw new StorageContainerException(log,
ContainerProtos.Result.CONTAINER_EXISTS);
importContainerToSelectedVolume(containerID, tarFilePath, targetVolume,
compression);
return null;
} catch (IOException ex) {
if (!isContainerAlreadyExistsException(ex)) {
throw ex;
}
return ex;
}
}

KeyValueContainerData containerData;
TarContainerPacker packer = getPacker(compression);
private void importContainerToSelectedVolume(long containerID,
Path tarFilePath, HddsVolume targetVolume,
CopyContainerCompression compression) throws IOException {
if (containerDirExists(targetVolume, containerID)) {
throw new StorageContainerException(
"Container " + containerID + " already exists on selected volume " +
targetVolume.getHddsRootDir(),
ContainerProtos.Result.CONTAINER_ALREADY_EXISTS);
}
doImportContainer(containerID, tarFilePath, targetVolume, compression);
}

try (InputStream input = Files.newInputStream(tarFilePath)) {
byte[] containerDescriptorYaml =
packer.unpackContainerDescriptor(input);
containerData = getKeyValueContainerData(containerDescriptorYaml);
}
ContainerUtils.verifyContainerFileChecksum(containerData, conf);
containerData.setVolume(targetVolume);
// lastDataScanTime should be cleared for an imported container
containerData.setDataScanTimestamp(null);

try (InputStream input = Files.newInputStream(tarFilePath)) {
Container container = controller.importContainer(
containerData, input, packer);
// After container import is successful, increase used space for the volume and schedule an OnDemand scan for it
targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed());
containerSet.addContainerByOverwriteMissingContainer(container);
containerSet.scanContainer(containerID, "Imported container");
} catch (Exception e) {
private void doImportContainer(long containerID, Path tarFilePath,
HddsVolume targetVolume, CopyContainerCompression compression)
throws IOException {
KeyValueContainerData containerData;
TarContainerPacker packer = getPacker(compression);

try (InputStream input = Files.newInputStream(tarFilePath)) {
byte[] containerDescriptorYaml =
packer.unpackContainerDescriptor(input);
containerData = getKeyValueContainerData(containerDescriptorYaml);
}
ContainerUtils.verifyContainerFileChecksum(containerData, conf);
containerData.setVolume(targetVolume);
// lastDataScanTime should be cleared for an imported container
containerData.setDataScanTimestamp(null);

try (InputStream input = Files.newInputStream(tarFilePath)) {
Container container = controller.importContainer(
containerData, input, packer);
// After container import is successful, increase used space for the volume and schedule an OnDemand scan for it
targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed());
containerSet.addContainerByOverwriteMissingContainer(container);
containerSet.scanContainer(containerID, "Imported container");
} catch (Exception e) {
if (!isContainerAlreadyExistsException(e)) {
// Trigger a volume scan if the import failed.
StorageVolumeUtil.onFailure(containerData.getVolume());
throw e;
}
} finally {
importContainerProgress.remove(containerID);
deleteFileQuietely(tarFilePath);
throw e;
}
}

private static void deleteFileQuietely(Path tarFilePath) {
try {
Files.delete(tarFilePath);
} catch (Exception ex) {
LOG.error("Got exception while deleting temporary container file: "
+ tarFilePath.toAbsolutePath(), ex);
private void checkContainerCanBeImported(long containerID)
throws StorageContainerException {
if (containerSet.getContainer(containerID) != null) {
String log = "Container already exists with container Id " + containerID;
LOG.warn(log);
throw new StorageContainerException(log,
ContainerProtos.Result.CONTAINER_EXISTS);
}
}

private boolean containerDirExists(HddsVolume targetVolume, long containerID)
throws IOException {
String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
targetVolume, targetVolume.getClusterID());
Path containerPath = Paths.get(KeyValueContainerLocationUtil
.getBaseContainerLocation(targetVolume.getHddsRootDir().toString(),
idDir, containerID));
return Files.exists(containerPath);
}

private boolean isContainerAlreadyExistsException(Throwable ex) {
return ex instanceof StorageContainerException &&
((StorageContainerException) ex).getResult() ==
ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
}

HddsVolume chooseNextVolume(long spaceToReserve) throws IOException {
// Choose volume that can hold both container in tmp and dest directory
LOG.debug("Choosing volume to reserve space : {}", spaceToReserve);
Expand All @@ -154,6 +275,14 @@ HddsVolume chooseNextVolume(long spaceToReserve) throws IOException {
spaceToReserve);
}

HddsVolume chooseNextVolume(List<HddsVolume> volumes, long spaceToReserve)
throws IOException {
// Choose volume that can hold both container in tmp and dest directory
LOG.debug("Choosing volume to reserve space : {}", spaceToReserve);
return volumeChoosingPolicy.chooseVolume(new ArrayList<>(volumes),
spaceToReserve);
}

public static Path getUntarDirectory(HddsVolume hddsVolume)
throws IOException {
return Paths.get(hddsVolume.getVolumeRootDir())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ public void replicate(ReplicationTask task) {
containerID, bytes);
task.setTransferredBytes(bytes);

containerImporter.importContainer(containerID, tarFilePath, targetVolume,
compression);
containerImporter.importContainerWithVolumeRetry(containerID, tarFilePath,
targetVolume, compression,
containerImporter.getDefaultReplicationSpace());

LOG.info("Container {} is replicated successfully", containerID);
task.setStatus(Status.DONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public void onCompleted() {
closeOutput();

try {
importer.importContainer(containerId, path, volume, compression);
importer.importContainerWithVolumeRetry(containerId, path, volume,
compression, spaceToReserve);
LOG.info("Container {} is replicated successfully", containerId);
responseObserver.onNext(SendContainerResponse.newBuilder().build());
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.apache.hadoop.util.DiskChecker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -260,6 +262,43 @@ protected void createContainerMetaData(File containerMetaDataPath,
assertEquals(2, callCount.get());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion - DO you think the test coverage should verify non-retriable failures are not retried

The PR explicitly says retry is scoped only to CONTAINER_ALREADY_EXISTS. That behavior should have a test.

For example, mock controller.importContainer(...) to throw a normal IOException or a StorageContainerException with a different result code. Then verify only the initially selected volume was attempted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArafatKhan2198 this comment doesn't hold right in this test class. We do mock controller.importContainer in ContainerImporter#importContainerWithVolumeRetry which belongs to TestContainerImporter class.

@ContainerTestVersionInfo.ContainerTest
public void testNextVolumeTriedWhenSelectedVolumeHasContainerDir(
ContainerTestVersionInfo versionInfo) throws Exception {
init(versionInfo);
String volumeDirPath =
Files.createDirectory(folder.toPath().resolve("volumeDir"))
.toFile().getAbsolutePath();
HddsVolume newVolume = new HddsVolume.Builder(volumeDirPath)
.conf(CONF).datanodeUuid(datanodeId.toString()).build();
StorageVolumeUtil.checkVolume(newVolume, scmId, scmId, CONF, null, null);
hddsVolumes.add(newVolume);
reset(volumeChoosingPolicy);
when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenAnswer(invocation -> {
List<HddsVolume> volumes = invocation.getArgument(0);
long spaceToReserve = invocation.getArgument(1);
HddsVolume volume = volumes.get(0);
volume.incCommittedBytes(spaceToReserve);
return volume;
});

HddsVolume firstVolume = hddsVolumes.get(0);
long initialCommittedBytes = firstVolume.getCommittedBytes();
String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
firstVolume, scmId);
File staleContainerDir = new File(KeyValueContainerLocationUtil
.getBaseContainerLocation(firstVolume.getHddsRootDir().toString(),
idDir, keyValueContainerData.getContainerID()));
assertTrue(staleContainerDir.mkdirs());

keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);

assertEquals(initialCommittedBytes, firstVolume.getCommittedBytes());
assertEquals(newVolume, keyValueContainer.getContainerData().getVolume());
assertTrue(staleContainerDir.exists());
}

@ContainerTestVersionInfo.ContainerTest
public void testEmptyContainerImportExport(
ContainerTestVersionInfo versionInfo) throws Exception {
Expand Down Expand Up @@ -610,7 +649,8 @@ public void testDuplicateContainer(ContainerTestVersionInfo versionInfo) throws
StorageContainerException exception = assertThrows(StorageContainerException.class, () ->
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId));
assertEquals(ContainerProtos.Result.CONTAINER_ALREADY_EXISTS, exception.getResult());
assertThat(exception).hasMessage("Container creation failed because ContainerFile already exists");
assertThat(exception).hasMessage(
"Container creation failed because ContainerFile already exists on all candidate volumes");
}

@ContainerTestVersionInfo.ContainerTest
Expand Down
Loading