diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 2f53178e9bf9..c068c928ca96 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -195,6 +195,9 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService, } nextHB = new AtomicLong(Time.monotonicNow()); + ReplicationConfig replicationConfig = + conf.getObject(ReplicationConfig.class); + ContainerImporter importer = new ContainerImporter(conf, container.getContainerSet(), container.getController(), @@ -205,15 +208,14 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService, importer, new SimpleContainerDownloader(conf, certClient)); ContainerReplicator pushReplicator = new PushReplicator(conf, - new OnDemandContainerReplicationSource(container.getController()), + new OnDemandContainerReplicationSource(container.getController(), + replicationConfig), new GrpcContainerUploader(conf, certClient, container.getController()) ); pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull"); pushReplicatorWithMetrics = new MeasuredReplicator(pushReplicator, "push"); - ReplicationConfig replicationConfig = - conf.getObject(ReplicationConfig.class); supervisor = ReplicationSupervisor.newBuilder() .stateContext(context) .datanodeConfig(dnConf) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index 310c46de5294..e71cd4e2f1ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import org.apache.commons.io.FileUtils; @@ -107,6 +108,9 @@ public class HddsVolume extends StorageVolume { private final AtomicBoolean dbLoaded = new AtomicBoolean(false); private final AtomicBoolean dbLoadFailure = new AtomicBoolean(false); + private final AtomicInteger activeOutboundReplications = + new AtomicInteger(0); + /** * Builder for HddsVolume. */ @@ -629,4 +633,16 @@ public void compactDb() { LOG.warn("compact rocksdb error in {}", dbFilePath, e); } } + + public int incActiveOutboundReplications() { + return activeOutboundReplications.incrementAndGet(); + } + + public int decActiveOutboundReplications() { + return activeOutboundReplications.decrementAndGet(); + } + + public int getActiveOutboundReplications() { + return activeOutboundReplications.get(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java index 422dd370d1fd..2ff991907f6a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java @@ -17,12 +17,14 @@ package org.apache.hadoop.ozone.container.replication; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; @@ -34,10 +36,13 @@ public class OnDemandContainerReplicationSource implements ContainerReplicationSource { private final ContainerController controller; + private final ReplicationServer.ReplicationConfig config; public OnDemandContainerReplicationSource( - ContainerController controller) { + ContainerController controller, + ReplicationServer.ReplicationConfig config) { this.controller = controller; + this.config = config; } @Override @@ -57,8 +62,24 @@ public void copyData(long containerId, OutputStream destination, " is not found.", CONTAINER_NOT_FOUND); } - controller.exportContainer( - container.getContainerType(), containerId, destination, - new TarContainerPacker(compression)); + HddsVolume volume = (HddsVolume) container.getContainerData().getVolume(); + if (volume != null) { + if (volume.getActiveOutboundReplications() >= + config.getVolumeOutboundLimit()) { + throw new StorageContainerException("Volume " + volume.getStorageID() + + " has reached the maximum number of concurrent replication reads (" + + config.getVolumeOutboundLimit() + ")", CONTAINER_INTERNAL_ERROR); + } + volume.incActiveOutboundReplications(); + } + try { + controller.exportContainer( + container.getContainerType(), containerId, destination, + new TarContainerPacker(compression)); + } finally { + if (volume != null) { + volume.decActiveOutboundReplications(); + } + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index 34b5a799548d..d3bb14d8c505 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -63,6 +63,8 @@ public class ReplicationServer { private ContainerController controller; + private ReplicationConfig replicationConfig; + private int port; private final ContainerImporter importer; @@ -75,6 +77,7 @@ public ReplicationServer(ContainerController controller, this.secConf = secConf; this.caClient = caClient; this.controller = controller; + this.replicationConfig = replicationConfig; this.importer = importer; this.port = replicationConfig.getPort(); @@ -103,7 +106,8 @@ public ReplicationServer(ContainerController controller, public void init() { GrpcReplicationService grpcReplicationService = new GrpcReplicationService( - new OnDemandContainerReplicationSource(controller), importer); + new OnDemandContainerReplicationSource(controller, replicationConfig), + importer); NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .addService(ServerInterceptors.intercept( @@ -225,10 +229,27 @@ public static final class ReplicationConfig { ) private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT; + @Config(key = "hdds.datanode.replication.volume.outbound.limit", + type = ConfigType.INT, + defaultValue = "2", + tags = {DATANODE}, + description = "The maximum number of concurrent replication reads " + + "allowed per physical disk volume." + ) + private int volumeOutboundLimit = 2; + public double getOutOfServiceFactor() { return outOfServiceFactor; } + public int getVolumeOutboundLimit() { + return volumeOutboundLimit; + } + + public void setVolumeOutboundLimit(int limit) { + this.volumeOutboundLimit = limit; + } + public int scaleOutOfServiceLimit(int original) { return (int) Math.ceil(original * outOfServiceFactor); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java index 8b831fa06466..5ab7ccc63577 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java @@ -191,7 +191,8 @@ public void testDownload() throws IOException { @Test public void testUpload() { ContainerReplicationSource source = - new OnDemandContainerReplicationSource(containerController); + new OnDemandContainerReplicationSource(containerController, + new ReplicationServer.ReplicationConfig()); GrpcContainerUploader uploader = new GrpcContainerUploader(conf, null, containerController); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java index 0bf886569c9d..6473ac20b9b8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java @@ -278,17 +278,40 @@ private int processMissingIndexes( List availableSourceNodes, List excludedNodes, List usedNodes) throws IOException { + List missingIndexes = replicaCount.unavailableIndexes(true); + if (missingIndexes.isEmpty()) { + return 0; + } + return processReconstruction(replicaCount, missingIndexes, sources, + availableSourceNodes, excludedNodes, usedNodes); + } + + /** + * Core logic to schedule an EC reconstruction command. + * + * @param replicaCount the current replica count of the container + * @param missingIndexes the indexes that need to be reconstructed + * @param sources available source replicas + * @param availableSourceNodes healthy source nodes + * @param excludedNodes nodes to be excluded from target selection + * @param usedNodes nodes already used for this container + * @return number of commands sent + * @throws IOException if an error occurs + */ + private int processReconstruction( + ECContainerReplicaCount replicaCount, + List missingIndexes, + Map> sources, + List availableSourceNodes, + List excludedNodes, + List usedNodes) throws IOException { ContainerInfo container = replicaCount.getContainer(); ECReplicationConfig repConfig = - (ECReplicationConfig)container.getReplicationConfig(); - List missingIndexes = replicaCount.unavailableIndexes(true); - LOG.debug("Processing missing indexes {} for container {}.", missingIndexes, - container.containerID()); + (ECReplicationConfig) container.getReplicationConfig(); + LOG.debug("Processing reconstruction of indexes {} for container {}.", + missingIndexes, container.containerID()); final int expectedTargetCount = missingIndexes.size(); boolean recoveryIsCritical = expectedTargetCount == repConfig.getParity(); - if (expectedTargetCount == 0) { - return 0; - } int commandsSent = 0; if (sources.size() >= repConfig.getData()) { @@ -297,9 +320,9 @@ private int processMissingIndexes( final boolean hasOverloaded = !excludedDueToLoad.isEmpty(); final List excludedOrOverloadedNodes = hasOverloaded ? new ArrayList<>(ImmutableSet.builder() - .addAll(excludedNodes) - .addAll(excludedDueToLoad) - .build()) + .addAll(excludedNodes) + .addAll(excludedDueToLoad) + .build()) : excludedNodes; // placement with overloaded nodes excluded @@ -350,12 +373,43 @@ private int processMissingIndexes( } if (0 < targetCount) { usedNodes.addAll(selectedDatanodes); - // TODO - what are we adding all the selected nodes to available - // sources? availableSourceNodes.addAll(selectedDatanodes); List sourceDatanodesWithIndex = new ArrayList<>(); - for (Pair src : sources.values()) { + + // If we have more than the required number of data blocks, we can + // prefer IN_SERVICE nodes to avoid overloading decommissioning nodes. + List> sortedSources = + sources.values().stream() + .sorted((p1, p2) -> { + boolean p1InService = + p1.getRight().getOperationalState() == IN_SERVICE; + boolean p2InService = + p2.getRight().getOperationalState() == IN_SERVICE; + if (p1InService && !p2InService) { + return -1; + } + if (!p1InService && p2InService) { + return 1; + } + return 0; + }) + .collect(Collectors.toList()); + + int inServiceCount = 0; + for (Pair src : sortedSources) { + if (src.getRight().getOperationalState() == IN_SERVICE) { + inServiceCount++; + } + } + + for (Pair src : sortedSources) { + // If we have enough in-service nodes to fulfill the reconstruction + // requirements, we skip any out-of-service nodes. + if (inServiceCount >= repConfig.getData() && + src.getRight().getOperationalState() != IN_SERVICE) { + continue; + } sourceDatanodesWithIndex.add( new ReconstructECContainersCommand .DatanodeDetailsAndReplicaIndex( @@ -368,11 +422,6 @@ private int processMissingIndexes( sourceDatanodesWithIndex, selectedDatanodes, integers2ByteString(missingIndexes), repConfig); - // This can throw a CommandTargetOverloadedException, but there is no - // point in retrying here. The sources we picked already have the - // overloaded nodes excluded, so we should not get an overloaded - // exception, but it could happen due to other threads adding work to - // the DNs. If it happens here, we just let the exception bubble up. replicationManager.sendThrottledReconstructionCommand( container, reconstructionCommand); for (int i = 0; i < missingIndexes.size(); i++) { @@ -383,7 +432,7 @@ private int processMissingIndexes( } if (targetCount != expectedTargetCount) { LOG.debug("Insufficient nodes were returned from the placement policy" + - " to fully reconstruct container {}. Requested {} received {}", + " to fully reconstruct container {}. Requested {} received {}", container.getContainerID(), expectedTargetCount, targetCount); if (hasOverloaded && recoveryIsCritical) { metrics.incrECPartialReconstructionCriticalTotal(); @@ -400,8 +449,6 @@ private int processMissingIndexes( + " {}. Available sources are: {}", container.containerID(), repConfig.getData(), sources.size(), sources); } - LOG.trace("Sent {} commands for container {}.", commandsSent, - container.containerID()); return commandsSent; } @@ -429,71 +476,88 @@ private int processDecommissioningIndexes( throws IOException { ContainerInfo container = replicaCount.getContainer(); Set decomIndexes = replicaCount.decommissioningOnlyIndexes(true); - int commandsSent = 0; - if (!decomIndexes.isEmpty()) { - LOG.debug("Processing decommissioning indexes {} for container {}.", - decomIndexes, container.containerID()); - final List selectedDatanodes = getTargetDatanodes( - container, decomIndexes.size(), usedNodes, excludedNodes); + if (decomIndexes.isEmpty()) { + return 0; + } - ContainerPlacementStatus placementStatusWithSelectedTargets = - validatePlacement(container, availableSourceNodes, selectedDatanodes); - if (!placementStatusWithSelectedTargets.isPolicySatisfied()) { - LOG.debug("Target nodes + existing nodes for EC container {}" + - " will not satisfy placement policy {}. Reason: {}. Selected" + - " nodes: {}. Available source nodes: {}. Resuming recovery " + - "regardless.", - container.containerID(), containerPlacement.getClass().getName(), - placementStatusWithSelectedTargets.misReplicatedReason(), - selectedDatanodes, availableSourceNodes); + if (replicationManager.getConfig().isEcDecommissionReconstructionEnabled()) { + for (Integer index : decomIndexes) { + Pair source = sources.get(index); + if (source != null && replicationManager.isNodeHighlyLoaded( + source.getLeft().getDatanodeDetails())) { + LOG.info("Source node {} is highly loaded, switching to " + + "reconstruction for decommissioning container {}", + source.getLeft().getDatanodeDetails(), container.containerID()); + return processReconstruction(replicaCount, + new ArrayList<>(decomIndexes), sources, availableSourceNodes, + excludedNodes, usedNodes); + } } + } - usedNodes.addAll(selectedDatanodes); - Iterator iterator = selectedDatanodes.iterator(); - // In this case we need to do one to one copy. - CommandTargetOverloadedException overloadedException = null; - for (Integer decomIndex : decomIndexes) { - Pair source = sources.get(decomIndex); - if (source == null) { - LOG.warn("Cannot find source replica for decommissioning index " + - "{} in container {}", decomIndex, container.containerID()); - continue; - } - ContainerReplica sourceReplica = source.getLeft(); - if (!iterator.hasNext()) { - LOG.warn("Couldn't find enough targets. Available source" - + " nodes: {}, the target nodes: {}, excluded nodes: {}," - + " usedNodes: {}, and the decommission indexes: {}", - sources.values().stream() - .map(Pair::getLeft).collect(Collectors.toSet()), - selectedDatanodes, excludedNodes, usedNodes, decomIndexes); - break; - } - try { - createReplicateCommand( - container, iterator, sourceReplica, replicaCount); - commandsSent++; - } catch (CommandTargetOverloadedException e) { - LOG.debug("Unable to send Replicate command for container {}" + - " index {} because the source node {} is overloaded.", - container.getContainerID(), sourceReplica.getReplicaIndex(), - sourceReplica.getDatanodeDetails()); - overloadedException = e; - } + LOG.debug("Processing decommissioning indexes {} for container {}.", + decomIndexes, container.containerID()); + final List selectedDatanodes = getTargetDatanodes( + container, decomIndexes.size(), usedNodes, excludedNodes); + + ContainerPlacementStatus placementStatusWithSelectedTargets = + validatePlacement(container, availableSourceNodes, selectedDatanodes); + if (!placementStatusWithSelectedTargets.isPolicySatisfied()) { + LOG.debug("Target nodes + existing nodes for EC container {}" + + " will not satisfy placement policy {}. Reason: {}. Selected" + + " nodes: {}. Available source nodes: {}. Resuming recovery " + + "regardless.", + container.containerID(), containerPlacement.getClass().getName(), + placementStatusWithSelectedTargets.misReplicatedReason(), + selectedDatanodes, availableSourceNodes); + } + + usedNodes.addAll(selectedDatanodes); + Iterator iterator = selectedDatanodes.iterator(); + // In this case we need to do one to one copy. + int commandsSent = 0; + CommandTargetOverloadedException overloadedException = null; + for (Integer decomIndex : decomIndexes) { + Pair source = sources.get(decomIndex); + if (source == null) { + LOG.warn("Cannot find source replica for decommissioning index " + + "{} in container {}", decomIndex, container.containerID()); + continue; + } + ContainerReplica sourceReplica = source.getLeft(); + if (!iterator.hasNext()) { + LOG.warn("Couldn't find enough targets. Available source" + + " nodes: {}, the target nodes: {}, excluded nodes: {}," + + " usedNodes: {}, and the decommission indexes: {}", + sources.values().stream() + .map(Pair::getLeft).collect(Collectors.toSet()), + selectedDatanodes, excludedNodes, usedNodes, decomIndexes); + break; } - if (overloadedException != null) { - throw overloadedException; + try { + createReplicateCommand( + container, iterator, sourceReplica, replicaCount); + commandsSent++; + } catch (CommandTargetOverloadedException e) { + LOG.debug("Unable to send Replicate command for container {}" + + " index {} because the source node {} is overloaded.", + container.getContainerID(), sourceReplica.getReplicaIndex(), + sourceReplica.getDatanodeDetails()); + overloadedException = e; } + } + if (overloadedException != null) { + throw overloadedException; + } - if (selectedDatanodes.size() != decomIndexes.size()) { - LOG.debug("Insufficient nodes were returned from the placement policy" + - " to fully replicate the decommission indexes for container {}." + - " Requested {} received {}", container.getContainerID(), - decomIndexes.size(), selectedDatanodes.size()); - metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal(); - throw new InsufficientDatanodesException(decomIndexes.size(), - selectedDatanodes.size()); - } + if (selectedDatanodes.size() != decomIndexes.size()) { + LOG.debug("Insufficient nodes were returned from the placement policy" + + " to fully replicate the decommission indexes for container {}." + + " Requested {} received {}", container.getContainerID(), + decomIndexes.size(), selectedDatanodes.size()); + metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal(); + throw new InsufficientDatanodesException(decomIndexes.size(), + selectedDatanodes.size()); } LOG.trace("Sent {} commands for container {}.", commandsSent, container.containerID()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java index fff263fee341..6de6cc771883 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java @@ -54,6 +54,11 @@ protected boolean inflightOperationLimitReached(ReplicationManager rm, return false; } + @Override + protected boolean reconstructionLimitReached(ReplicationManager rm) { + return false; + } + @Override protected int sendDatanodeCommands( ReplicationManager replicationManager, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 8cd8444d1d2f..48a7981365f1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -159,6 +160,18 @@ public class ReplicationManager implements SCMService, ContainerReplicaPendingOp private final Map excludedNodes = new ConcurrentHashMap<>(); + /** + * Track the number of active EC reconstruction commands across the cluster. + */ + private final AtomicInteger inflightReconstructionCount = new AtomicInteger(0); + + /** + * Mapping from reconstruction command ID to the number of pending fragments + * for that command. Used to know when the whole command is finished. + */ + private final Map reconstructionCommandIdToPendingFragmentCount = + new ConcurrentHashMap<>(); + /** * SCMService related variables. * After leaving safe mode, replicationMonitor needs to wait for a while @@ -422,6 +435,54 @@ public long getInflightReplicationCount() { .getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD); } + /** + * Returns the number of active EC reconstruction commands currently in + * progress across the cluster. + */ + public int getInflightReconstructionCount() { + return inflightReconstructionCount.get(); + } + + /** + * Returns the maximum number of inflight reconstruction commands allowed + * across the cluster at any given time. + * @return the maximum number of inflight reconstruction commands allowed + */ + public int getReconstructionInFlightLimit() { + return rmConf.getReconstructionGlobalLimit(); + } + + /** + * Returns true if the number of inflight reconstruction commands has reached + * the global limit. + * @return true if the limit is reached, false otherwise + */ + public boolean isReconstructionLimitReached() { + int limit = getReconstructionInFlightLimit(); + return limit > 0 && getInflightReconstructionCount() >= limit; + } + + /** + * Returns true if the given datanode's replication load (queued replication + * and reconstruction commands) exceeds the configured load factor threshold. + * + * @param datanode the datanode to check + * @return true if the node is highly loaded, false otherwise + */ + public boolean isNodeHighlyLoaded(DatanodeDetails datanode) { + try { + int limit = getReplicationLimit(datanode); + if (limit <= 0) { + return true; + } + double loadFactor = (double) getQueuedReplicationCount(datanode) / limit; + return loadFactor >= rmConf.getEcDecommissionReconstructionLoadFactor(); + } catch (NodeNotFoundException e) { + LOG.warn("Node {} not found when checking load factor", datanode, e); + return true; + } + } + /** * Sends delete container command for the given container to the given * datanode. @@ -697,6 +758,8 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, containerReplicaPendingOps.scheduleAddReplica(containerInfo.containerID(), targets.get(i), targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs, requiredSize, clock.millis()); } + inflightReconstructionCount.incrementAndGet(); + reconstructionCommandIdToPendingFragmentCount.put(cmd.getId(), targetIndexes.size()); getMetrics().incrEcReconstructionCmdsSentTotal(); } else if (cmd.getType() == Type.replicateContainerCommand) { ReplicateContainerCommand rcc = (ReplicateContainerCommand) cmd; @@ -1074,6 +1137,19 @@ ReplicationQueue getQueue() { @Override public void opCompleted(ContainerReplicaOp op, ContainerID containerID, boolean timedOut) { + if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD + && op.getCommand() != null + && op.getCommand().getType() == Type.reconstructECContainersCommand) { + long cmdId = op.getCommand().getId(); + reconstructionCommandIdToPendingFragmentCount.compute(cmdId, (k, v) -> { + if (v == null || v <= 1) { + inflightReconstructionCount.decrementAndGet(); + return null; + } + return v - 1; + }); + } + if (!(timedOut && op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE)) { // We only care about expired delete ops. All others should be ignored. return; @@ -1291,6 +1367,38 @@ public static class ReplicationManagerConfiguration ) private int containerSampleLimit = 100; + @Config(key = "hdds.scm.replication.decommission.ec.reconstruction.enabled", + type = ConfigType.BOOLEAN, + defaultValue = "false", + reconfigurable = true, + tags = { SCM }, + description = "If true, SCM will switch from 1-1 replication to " + + "multi-source reconstruction for EC containers on decommissioning " + + "nodes when the node's load exceeds the threshold." + ) + private boolean ecDecommissionReconstructionEnabled = false; + + @Config(key = "hdds.scm.replication.decommission.ec.reconstruction.load.factor", + type = ConfigType.DOUBLE, + defaultValue = "0.9", + reconfigurable = true, + tags = { SCM }, + description = "The threshold factor (between 0 and 1) of a node's " + + "replication limit at which SCM switches to reconstruction for " + + "EC decommission. Default is 0.9." + ) + private double ecDecommissionReconstructionLoadFactor = 0.9; + + @Config(key = "hdds.scm.replication.reconstruction.global.limit", + type = ConfigType.INT, + defaultValue = "50", + reconfigurable = true, + tags = { SCM }, + description = "A cluster-wide limit to restrict the total number of " + + "active EC reconstruction commands across the cluster." + ) + private int reconstructionGlobalLimit = 50; + @Config(key = "hdds.scm.replication.quasi.closed.stuck.best.origin.copies", type = ConfigType.INT, defaultValue = "3", @@ -1347,6 +1455,30 @@ public void setDatanodeReplicationLimit(int limit) { this.datanodeReplicationLimit = limit; } + public boolean isEcDecommissionReconstructionEnabled() { + return ecDecommissionReconstructionEnabled; + } + + public void setEcDecommissionReconstructionEnabled(boolean enabled) { + this.ecDecommissionReconstructionEnabled = enabled; + } + + public double getEcDecommissionReconstructionLoadFactor() { + return ecDecommissionReconstructionLoadFactor; + } + + public void setEcDecommissionReconstructionLoadFactor(double factor) { + this.ecDecommissionReconstructionLoadFactor = factor; + } + + public int getReconstructionGlobalLimit() { + return reconstructionGlobalLimit; + } + + public void setReconstructionGlobalLimit(int limit) { + this.reconstructionGlobalLimit = limit; + } + public void setMaintenanceRemainingRedundancy(int redundancy) { this.maintenanceRemainingRedundancy = redundancy; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java index 8f291158902a..20b8d05946bb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java @@ -52,6 +52,11 @@ protected boolean inflightOperationLimitReached(ReplicationManager rm, return rm.getInflightReplicationCount() >= pendingOpLimit; } + @Override + protected boolean reconstructionLimitReached(ReplicationManager rm) { + return rm.isReconstructionLimitReached(); + } + @Override protected int sendDatanodeCommands( ReplicationManager replicationManager, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java index 6508b73c10e6..c37aa7dd7058 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java @@ -76,6 +76,13 @@ protected abstract void requeueHealthResult( protected abstract boolean inflightOperationLimitReached( ReplicationManager rm, long inflightLimit); + /** + * Check if the reconstruction operation limit is reached. + * @param rm The ReplicationManager instance + * @return True if the limit is reached, false otherwise. + */ + protected abstract boolean reconstructionLimitReached(ReplicationManager rm); + /** * Read messages from the ReplicationManager under replicated queue and, * form commands to correct replication. The commands are added @@ -105,6 +112,12 @@ public void processAll(ReplicationQueue queue) { .getMetrics().incrPendingReplicationLimitReachedTotal(); break; } + if (reconstructionLimitReached(replicationManager)) { + LOG.info("The maximum number of pending reconstruction commands ({}) " + + "are scheduled. Ending the iteration.", + replicationManager.getReconstructionInFlightLimit()); + break; + } HealthResult healthResult = dequeueHealthResultFromQueue(queue); if (healthResult == null) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java index 5d2af561196b..8a00e486fc47 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java @@ -435,6 +435,56 @@ public void testUnderReplicationWithDecomNodesOverloaded() Lists.emptyList(), availableReplicas, 1, 0, policy)); } + @Test + public void testUnderReplicationWithDecomNodesSwitchToReconstruction() + throws IOException { + replicationManager.getConfig().setEcDecommissionReconstructionEnabled(true); + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2), + Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4), + Pair.of(IN_SERVICE, 5)); + + // Mock node 1 as highly loaded + DatanodeDetails decomNode = availableReplicas.stream() + .filter(r -> r.getReplicaIndex() == 1) + .findFirst().get().getDatanodeDetails(); + when(replicationManager.isNodeHighlyLoaded(decomNode)).thenReturn(true); + + ECUnderReplicationHandler ecURH = + new ECUnderReplicationHandler(policy, conf, replicationManager); + UnderReplicatedHealthResult result = + mock(UnderReplicatedHealthResult.class); + when(result.isUnrecoverable()).thenReturn(false); + when(result.getContainerInfo()).thenReturn(container); + + ecURH.processAndSendCommands(availableReplicas, ImmutableList.of(), + result, remainingMaintenanceRedundancy); + + // We expect 1 reconstruction command for index 1, and 0 replicate commands + int replicateCommand = 0; + int reconstructCommand = 0; + for (Pair> dnCommand : commandsSent) { + if (dnCommand.getValue() instanceof ReplicateContainerCommand) { + replicateCommand++; + } else if (dnCommand.getValue() instanceof ReconstructECContainersCommand) { + reconstructCommand++; + ReconstructECContainersCommand reconCmd = + (ReconstructECContainersCommand) dnCommand.getValue(); + assertEquals(ECUnderReplicationHandler.integers2ByteString( + ImmutableList.of(1)), reconCmd.getMissingContainerIndexes()); + + // verify source offloading: decomNode should NOT be in the source list + // because we have 4 other IN_SERVICE nodes (DATA=3) + for (ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex src : + reconCmd.getSources()) { + assertNotEquals(decomNode, src.getDnDetails()); + } + } + } + assertEquals(0, replicateCommand); + assertEquals(1, reconstructCommand); + } + @Test public void testUnderReplicationWithDecomIndex12() throws IOException { Set availableReplicas = ReplicationTestUtil diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 1d7d619efb0f..393d8b66f6e3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -48,7 +48,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; import java.io.IOException; import java.time.Instant; @@ -1801,4 +1803,70 @@ private void mockReplicationCommandCounts( }); } + @Test + public void testInflightReconstructionLimit() throws IOException, NodeNotFoundException { + rmConf.setReconstructionGlobalLimit(2); + ReplicationManager rm = createReplicationManager(); + assertEquals(2, rm.getReconstructionInFlightLimit()); + assertEquals(0, rm.getInflightReconstructionCount()); + assertFalse(rm.isReconstructionLimitReached()); + + mockReplicationCommandCounts(dn -> 0, dn -> 0); + + ContainerInfo container = ReplicationTestUtil.createContainerInfo( + repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20); + + // Send one reconstruction command with 2 fragments + ReconstructECContainersCommand cmd1 = new ReconstructECContainersCommand( + 1L, Collections.emptyList(), + ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails()), + integers2ByteString(ImmutableList.of(1, 2)), (ECReplicationConfig) repConfig); + + rm.sendThrottledReconstructionCommand(container, cmd1); + assertEquals(1, rm.getInflightReconstructionCount()); + assertFalse(rm.isReconstructionLimitReached()); + + // Send another reconstruction command with 1 fragment + ReconstructECContainersCommand cmd2 = new ReconstructECContainersCommand( + 2L, Collections.emptyList(), + ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails()), + integers2ByteString(ImmutableList.of(3)), (ECReplicationConfig) repConfig); + rm.sendThrottledReconstructionCommand(container, cmd2); + assertEquals(2, rm.getInflightReconstructionCount()); + assertTrue(rm.isReconstructionLimitReached()); + + // Complete one fragment of cmd1 + ContainerReplicaOp op1 = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, + cmd1.getTargetDatanodes().get(0), 1, cmd1, Long.MAX_VALUE, 0); + rm.opCompleted(op1, container.containerID(), false); + // Still 2 because cmd1 is not fully finished + assertEquals(2, rm.getInflightReconstructionCount()); + + // Complete second fragment of cmd1 + ContainerReplicaOp op2 = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, + cmd1.getTargetDatanodes().get(1), 2, cmd1, Long.MAX_VALUE, 0); + rm.opCompleted(op2, container.containerID(), false); + // Now 1 + assertEquals(1, rm.getInflightReconstructionCount()); + assertFalse(rm.isReconstructionLimitReached()); + + // Complete cmd2 + ContainerReplicaOp op3 = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, + cmd2.getTargetDatanodes().get(0), 3, cmd2, Long.MAX_VALUE, 0); + rm.opCompleted(op3, container.containerID(), false); + assertEquals(0, rm.getInflightReconstructionCount()); + } + + private static ByteString integers2ByteString(List src) { + byte[] dst = new byte[src.size()]; + for (int i = 0; i < src.size(); i++) { + dst[i] = src.get(i).byteValue(); + } + return dst.length > 0 ? UnsafeByteOperations.unsafeWrap(dst) + : ByteString.EMPTY; + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java index de93f7b9194d..958da96239a8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java @@ -131,4 +131,25 @@ public void testMessageNotProcessedIfGlobalLimitReached() throws IOException { assertEquals(1, rmMetrics.getPendingReplicationLimitReachedTotal()); } + @Test + public void testMessageNotProcessedIfReconstructionLimitReached() + throws IOException { + when(replicationManager.isReconstructionLimitReached()).thenReturn(true); + when(replicationManager.getReconstructionInFlightLimit()).thenReturn(10); + when(replicationManager.processUnderReplicatedContainer(any())).thenReturn(1); + + ContainerInfo container = ReplicationTestUtil + .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig); + UnderReplicatedHealthResult result = new UnderReplicatedHealthResult( + container, 3, false, false, false); + queue.enqueue(result); + + underReplicatedProcessor.processAll(queue); + + // The message should not be processed and still be on the queue (re-queued) + assertEquals(1, queue.underReplicatedQueueSize()); + // We should not have processed anything in RM + verify(replicationManager, times(0)).processUnderReplicatedContainer(any()); + } + } diff --git a/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java b/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java index d06633f14408..78f449c7a2a5 100644 --- a/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java +++ b/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.replication.ContainerReplicationSource; import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource; +import org.apache.hadoop.ozone.container.replication.ReplicationServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; @@ -66,7 +67,8 @@ public Void call() throws Exception { parent.loadContainersFromVolumes(); final ContainerReplicationSource replicationSource = - new OnDemandContainerReplicationSource(parent.getController()); + new OnDemandContainerReplicationSource(parent.getController(), + new ReplicationServer.ReplicationConfig()); for (int i = 0; i < containerCount; i++) { replicationSource.prepare(containerId);