Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,40 @@ private int processMissingIndexes(
List<DatanodeDetails> availableSourceNodes,
List<DatanodeDetails> excludedNodes,
List<DatanodeDetails> usedNodes) throws IOException {
List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
if (missingIndexes.isEmpty()) {
return 0;
}
return processReconstruction(replicaCount, missingIndexes, sources,
availableSourceNodes, excludedNodes, usedNodes);
}

/**
* Core logic to schedule an EC reconstruction command.
*
* @param replicaCount the current replica count of the container
* @param missingIndexes the indexes that need to be reconstructed
* @param sources available source replicas
* @param availableSourceNodes healthy source nodes
* @param excludedNodes nodes to be excluded from target selection
* @param usedNodes nodes already used for this container
* @return number of commands sent
* @throws IOException if an error occurs
*/
private int processReconstruction(
ECContainerReplicaCount replicaCount,
List<Integer> missingIndexes,
Map<Integer, Pair<ContainerReplica, NodeStatus>> sources,
List<DatanodeDetails> availableSourceNodes,
List<DatanodeDetails> excludedNodes,
List<DatanodeDetails> usedNodes) throws IOException {
ContainerInfo container = replicaCount.getContainer();
ECReplicationConfig repConfig =
(ECReplicationConfig)container.getReplicationConfig();
List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
LOG.debug("Processing missing indexes {} for container {}.", missingIndexes,
container.containerID());
(ECReplicationConfig) container.getReplicationConfig();
LOG.debug("Processing reconstruction of indexes {} for container {}.",
missingIndexes, container.containerID());
final int expectedTargetCount = missingIndexes.size();
boolean recoveryIsCritical = expectedTargetCount == repConfig.getParity();
if (expectedTargetCount == 0) {
return 0;
}

int commandsSent = 0;
if (sources.size() >= repConfig.getData()) {
Expand All @@ -297,9 +320,9 @@ private int processMissingIndexes(
final boolean hasOverloaded = !excludedDueToLoad.isEmpty();
final List<DatanodeDetails> excludedOrOverloadedNodes = hasOverloaded
? new ArrayList<>(ImmutableSet.<DatanodeDetails>builder()
.addAll(excludedNodes)
.addAll(excludedDueToLoad)
.build())
.addAll(excludedNodes)
.addAll(excludedDueToLoad)
.build())
: excludedNodes;

// placement with overloaded nodes excluded
Expand Down Expand Up @@ -350,12 +373,43 @@ private int processMissingIndexes(
}
if (0 < targetCount) {
usedNodes.addAll(selectedDatanodes);
// TODO - what are we adding all the selected nodes to available
// sources?
availableSourceNodes.addAll(selectedDatanodes);
List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
sourceDatanodesWithIndex = new ArrayList<>();
for (Pair<ContainerReplica, NodeStatus> src : sources.values()) {

// If we have more than the required number of data blocks, we can
// prefer IN_SERVICE nodes to avoid overloading decommissioning nodes.
List<Pair<ContainerReplica, NodeStatus>> sortedSources =
sources.values().stream()
.sorted((p1, p2) -> {
boolean p1InService =
p1.getRight().getOperationalState() == IN_SERVICE;
boolean p2InService =
p2.getRight().getOperationalState() == IN_SERVICE;
if (p1InService && !p2InService) {
return -1;
}
if (!p1InService && p2InService) {
return 1;
}
return 0;
})
.collect(Collectors.toList());

int inServiceCount = 0;
for (Pair<ContainerReplica, NodeStatus> src : sortedSources) {
if (src.getRight().getOperationalState() == IN_SERVICE) {
inServiceCount++;
}
}

for (Pair<ContainerReplica, NodeStatus> src : sortedSources) {
// If we have enough in-service nodes to fulfill the reconstruction
// requirements, we skip any out-of-service nodes.
if (inServiceCount >= repConfig.getData() &&
src.getRight().getOperationalState() != IN_SERVICE) {
continue;
}
sourceDatanodesWithIndex.add(
new ReconstructECContainersCommand
.DatanodeDetailsAndReplicaIndex(
Expand All @@ -368,11 +422,6 @@ private int processMissingIndexes(
sourceDatanodesWithIndex, selectedDatanodes,
integers2ByteString(missingIndexes),
repConfig);
// This can throw a CommandTargetOverloadedException, but there is no
// point in retrying here. The sources we picked already have the
// overloaded nodes excluded, so we should not get an overloaded
// exception, but it could happen due to other threads adding work to
// the DNs. If it happens here, we just let the exception bubble up.
replicationManager.sendThrottledReconstructionCommand(
container, reconstructionCommand);
for (int i = 0; i < missingIndexes.size(); i++) {
Expand All @@ -383,7 +432,7 @@ private int processMissingIndexes(
}
if (targetCount != expectedTargetCount) {
LOG.debug("Insufficient nodes were returned from the placement policy" +
" to fully reconstruct container {}. Requested {} received {}",
" to fully reconstruct container {}. Requested {} received {}",
container.getContainerID(), expectedTargetCount, targetCount);
if (hasOverloaded && recoveryIsCritical) {
metrics.incrECPartialReconstructionCriticalTotal();
Expand All @@ -400,8 +449,6 @@ private int processMissingIndexes(
+ " {}. Available sources are: {}", container.containerID(),
repConfig.getData(), sources.size(), sources);
}
LOG.trace("Sent {} commands for container {}.", commandsSent,
container.containerID());
return commandsSent;
}

Expand Down Expand Up @@ -429,71 +476,88 @@ private int processDecommissioningIndexes(
throws IOException {
ContainerInfo container = replicaCount.getContainer();
Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
int commandsSent = 0;
if (!decomIndexes.isEmpty()) {
LOG.debug("Processing decommissioning indexes {} for container {}.",
decomIndexes, container.containerID());
final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
container, decomIndexes.size(), usedNodes, excludedNodes);
if (decomIndexes.isEmpty()) {
return 0;
}

ContainerPlacementStatus placementStatusWithSelectedTargets =
validatePlacement(container, availableSourceNodes, selectedDatanodes);
if (!placementStatusWithSelectedTargets.isPolicySatisfied()) {
LOG.debug("Target nodes + existing nodes for EC container {}" +
" will not satisfy placement policy {}. Reason: {}. Selected" +
" nodes: {}. Available source nodes: {}. Resuming recovery " +
"regardless.",
container.containerID(), containerPlacement.getClass().getName(),
placementStatusWithSelectedTargets.misReplicatedReason(),
selectedDatanodes, availableSourceNodes);
if (replicationManager.getConfig().isEcDecommissionReconstructionEnabled()) {
for (Integer index : decomIndexes) {
Pair<ContainerReplica, NodeStatus> source = sources.get(index);
if (source != null && replicationManager.isNodeHighlyLoaded(
source.getLeft().getDatanodeDetails())) {
LOG.info("Source node {} is highly loaded, switching to " +
"reconstruction for decommissioning container {}",
source.getLeft().getDatanodeDetails(), container.containerID());
return processReconstruction(replicaCount,
new ArrayList<>(decomIndexes), sources, availableSourceNodes,
excludedNodes, usedNodes);
}
}
}

usedNodes.addAll(selectedDatanodes);
Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
// In this case we need to do one to one copy.
CommandTargetOverloadedException overloadedException = null;
for (Integer decomIndex : decomIndexes) {
Pair<ContainerReplica, NodeStatus> source = sources.get(decomIndex);
if (source == null) {
LOG.warn("Cannot find source replica for decommissioning index " +
"{} in container {}", decomIndex, container.containerID());
continue;
}
ContainerReplica sourceReplica = source.getLeft();
if (!iterator.hasNext()) {
LOG.warn("Couldn't find enough targets. Available source"
+ " nodes: {}, the target nodes: {}, excluded nodes: {},"
+ " usedNodes: {}, and the decommission indexes: {}",
sources.values().stream()
.map(Pair::getLeft).collect(Collectors.toSet()),
selectedDatanodes, excludedNodes, usedNodes, decomIndexes);
break;
}
try {
createReplicateCommand(
container, iterator, sourceReplica, replicaCount);
commandsSent++;
} catch (CommandTargetOverloadedException e) {
LOG.debug("Unable to send Replicate command for container {}" +
" index {} because the source node {} is overloaded.",
container.getContainerID(), sourceReplica.getReplicaIndex(),
sourceReplica.getDatanodeDetails());
overloadedException = e;
}
LOG.debug("Processing decommissioning indexes {} for container {}.",
decomIndexes, container.containerID());
final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
container, decomIndexes.size(), usedNodes, excludedNodes);

ContainerPlacementStatus placementStatusWithSelectedTargets =
validatePlacement(container, availableSourceNodes, selectedDatanodes);
if (!placementStatusWithSelectedTargets.isPolicySatisfied()) {
LOG.debug("Target nodes + existing nodes for EC container {}" +
" will not satisfy placement policy {}. Reason: {}. Selected" +
" nodes: {}. Available source nodes: {}. Resuming recovery " +
"regardless.",
container.containerID(), containerPlacement.getClass().getName(),
placementStatusWithSelectedTargets.misReplicatedReason(),
selectedDatanodes, availableSourceNodes);
}

usedNodes.addAll(selectedDatanodes);
Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
// In this case we need to do one to one copy.
int commandsSent = 0;
CommandTargetOverloadedException overloadedException = null;
for (Integer decomIndex : decomIndexes) {
Pair<ContainerReplica, NodeStatus> source = sources.get(decomIndex);
if (source == null) {
LOG.warn("Cannot find source replica for decommissioning index " +
"{} in container {}", decomIndex, container.containerID());
continue;
}
ContainerReplica sourceReplica = source.getLeft();
if (!iterator.hasNext()) {
LOG.warn("Couldn't find enough targets. Available source"
+ " nodes: {}, the target nodes: {}, excluded nodes: {},"
+ " usedNodes: {}, and the decommission indexes: {}",
sources.values().stream()
.map(Pair::getLeft).collect(Collectors.toSet()),
selectedDatanodes, excludedNodes, usedNodes, decomIndexes);
break;
}
if (overloadedException != null) {
throw overloadedException;
try {
createReplicateCommand(
container, iterator, sourceReplica, replicaCount);
commandsSent++;
} catch (CommandTargetOverloadedException e) {
LOG.debug("Unable to send Replicate command for container {}" +
" index {} because the source node {} is overloaded.",
container.getContainerID(), sourceReplica.getReplicaIndex(),
sourceReplica.getDatanodeDetails());
overloadedException = e;
}
}
if (overloadedException != null) {
throw overloadedException;
}

if (selectedDatanodes.size() != decomIndexes.size()) {
LOG.debug("Insufficient nodes were returned from the placement policy" +
" to fully replicate the decommission indexes for container {}." +
" Requested {} received {}", container.getContainerID(),
decomIndexes.size(), selectedDatanodes.size());
metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal();
throw new InsufficientDatanodesException(decomIndexes.size(),
selectedDatanodes.size());
}
if (selectedDatanodes.size() != decomIndexes.size()) {
LOG.debug("Insufficient nodes were returned from the placement policy" +
" to fully replicate the decommission indexes for container {}." +
" Requested {} received {}", container.getContainerID(),
decomIndexes.size(), selectedDatanodes.size());
metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal();
throw new InsufficientDatanodesException(decomIndexes.size(),
selectedDatanodes.size());
}
LOG.trace("Sent {} commands for container {}.", commandsSent,
container.containerID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ protected boolean inflightOperationLimitReached(ReplicationManager rm,
return false;
}

@Override
protected boolean reconstructionLimitReached(ReplicationManager rm) {
return false;
}

@Override
protected int sendDatanodeCommands(
ReplicationManager replicationManager,
Expand Down
Loading