Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.hadoop.hdds.scm.ha.SCMService.Event.PRE_CHECK_COMPLETED;
import static org.apache.hadoop.hdds.scm.ha.SCMService.Event.UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.time.Clock;
Expand Down Expand Up @@ -199,47 +200,25 @@ private boolean skipCreation(ReplicationConfig replicationConfig,
// used.
return ((StandaloneReplicationConfig) replicationConfig)
.getReplicationFactor() != ReplicationFactor.ONE;
} else if (replicationConfig.getReplicationType().equals(EC)) {
return false;
}
return true;
}

private void createPipelines() throws RuntimeException {
// TODO: #CLUTIL Different replication factor may need to be supported
HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
boolean autoCreateFactorOne = conf.getBoolean(
ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
boolean autoCreateFactorOne = conf.getBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE_DEFAULT);

List<ReplicationConfig> list =
new ArrayList<>();
for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
.values()) {
if (factor == ReplicationFactor.ZERO) {
continue; // Ignore it.
}
final ReplicationConfig replicationConfig;
if (type != EC) {
replicationConfig =
ReplicationConfig.fromProtoTypeAndFactor(type, factor);
} else if (factor == ReplicationFactor.ONE) {
replicationConfig =
ReplicationConfig.fromProtoTypeAndFactor(RATIS, factor);
} else {
continue;
}
if (skipCreation(replicationConfig, autoCreateFactorOne)) {
// Skip this iteration for creating pipeline
continue;
}
list.add(replicationConfig);
List<ReplicationConfig> list = getReplicationConfigs(autoCreateFactorOne);
if (list.isEmpty()) {
LOG.debug("No replication configs selected for background pipeline creation.");
return;
}

LoopingIterator it = new LoopingIterator(list);
while (it.hasNext()) {
ReplicationConfig replicationConfig =
(ReplicationConfig) it.next();
ReplicationConfig replicationConfig = (ReplicationConfig) it.next();

try {
Pipeline pipeline = pipelineManager.createPipeline(replicationConfig);
Expand All @@ -255,6 +234,49 @@ private void createPipelines() throws RuntimeException {
LOG.debug("BackgroundPipelineCreator createPipelines finished.");
}

@VisibleForTesting
List<ReplicationConfig> getReplicationConfigs(boolean autoCreateFactorOne) {
List<ReplicationConfig> list = new ArrayList<>();
// TODO: #CLUTIL Different replication factor may need to be supported
HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE, OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
if (type == EC) {
try {
ReplicationConfig defaultConfig = ReplicationConfig.getDefault(conf);
if (defaultConfig.getReplicationType() == EC) {
list.add(defaultConfig);
}
} catch (IllegalArgumentException e) {
LOG.warn(
"Skipping EC pipeline creation due to invalid default EC " + "replication config. type={}, replication={}",
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE, OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT),
conf.get(OzoneConfigKeys.OZONE_REPLICATION, OzoneConfigKeys.OZONE_REPLICATION_DEFAULT), e);
}
}

for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor.values()) {
if (factor == ReplicationFactor.ZERO) {
continue; // Ignore it.
}
final ReplicationConfig replicationConfig;
if (type != EC) {
replicationConfig = ReplicationConfig.fromProtoTypeAndFactor(type, factor);
} else if (factor == ReplicationFactor.ONE) {
replicationConfig = ReplicationConfig.fromProtoTypeAndFactor(RATIS, factor);
} else {
continue;
}
if (skipCreation(replicationConfig, autoCreateFactorOne)) {
// Skip this iteration for creating pipeline
continue;
}
if (!list.contains(replicationConfig)) {
list.add(replicationConfig);
}
}
return list;
}

@Override
public void notifyStatusChanged() {
serviceLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
Expand Down Expand Up @@ -66,6 +65,9 @@ public class HealthyPipelineSafeModeRule extends SafeModeExitRule<Pipeline> {
private final SCMContext scmContext;
private final Set<PipelineID> unProcessedPipelineSet = new HashSet<>();
private final NodeManager nodeManager;
private final ReplicationConfig targetReplicationConfig;
private final int targetRequiredNodes;
private final String targetReplicationLabel;

HealthyPipelineSafeModeRule(EventQueue eventQueue,
PipelineManager pipelineManager, SCMSafeModeManager manager,
Expand All @@ -74,13 +76,15 @@ public class HealthyPipelineSafeModeRule extends SafeModeExitRule<Pipeline> {
this.pipelineManager = pipelineManager;
this.scmContext = scmContext;
this.nodeManager = nodeManager;
this.targetReplicationConfig = ReplicationConfig.getDefault(configuration);
this.targetRequiredNodes = targetReplicationConfig.getRequiredNodes();
this.targetReplicationLabel = targetReplicationConfig.configFormat();
healthyPipelinesPercent =
configuration.getDouble(HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);

// We only care about THREE replica pipeline
minHealthyPipelines = getMinHealthyPipelines(configuration);

Preconditions.checkArgument(
Expand All @@ -97,8 +101,7 @@ private int getMinHealthyPipelines(ConfigurationSource config) {
HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_DATANODE,
HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_DATANODE_DEFAULT);

// We only care about THREE replica pipeline
return minDatanodes / HddsProtos.ReplicationFactor.THREE_VALUE;
return minDatanodes / targetRequiredNodes;

}

Expand Down Expand Up @@ -141,14 +144,12 @@ protected synchronized void process(Pipeline pipeline) {
// datanode can send pipeline report again, or SCMPipelineManager will
// create new pipelines.

// Only handle RATIS + 3-replica pipelines.
if (pipeline.getType() != HddsProtos.ReplicationType.RATIS ||
((RatisReplicationConfig) pipeline.getReplicationConfig()).getReplicationFactor() !=
HddsProtos.ReplicationFactor.THREE) {
// Only handle pipelines matching the configured default replication.
if (!targetReplicationConfig.equals(pipeline.getReplicationConfig())) {
Logger safeModeManagerLog = SCMSafeModeManager.getLogger();
if (safeModeManagerLog.isDebugEnabled()) {
safeModeManagerLog.debug("Skipping pipeline safemode report processing as Replication type isn't RATIS " +
"or replication factor isn't 3.");
safeModeManagerLog.debug("Skipping pipeline safemode report processing as replication config {} " +
"does not match target {}.", pipeline.getReplicationConfig(), targetReplicationConfig);
}
return;
}
Expand All @@ -161,9 +162,9 @@ protected synchronized void process(Pipeline pipeline) {
}

List<DatanodeDetails> pipelineDns = pipeline.getNodes();
if (pipelineDns.size() != 3) {
LOG.warn("Only {} DNs reported this pipeline: {}, all 3 DNs should report the pipeline", pipelineDns.size(),
pipeline.getId());
if (pipelineDns.size() != targetRequiredNodes) {
LOG.warn("Only {} DNs reported this pipeline: {}, all {} DNs should report the pipeline",
pipelineDns.size(), pipeline.getId(), targetRequiredNodes);
return;
}

Expand Down Expand Up @@ -218,8 +219,7 @@ public synchronized void refresh(boolean forceRefresh) {

private synchronized void initializeRule(boolean refresh) {
unProcessedPipelineSet.addAll(pipelineManager.getPipelines(
RatisReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.THREE),
targetReplicationConfig,
Pipeline.PipelineState.OPEN).stream().map(Pipeline::getId)
.collect(Collectors.toSet()));

Expand All @@ -245,10 +245,11 @@ private synchronized void initializeRule(boolean refresh) {
private boolean validateHealthyPipelineSafeModeRuleUsingPipelineManager() {
// Query PipelineManager directly for healthy pipeline count
List<Pipeline> openPipelines = pipelineManager.getPipelines(
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
targetReplicationConfig,
Pipeline.PipelineState.OPEN);

LOG.debug("Found {} open RATIS/THREE pipelines", openPipelines.size());

LOG.debug("Found {} open {} pipelines", openPipelines.size(),
targetReplicationLabel);

int pipelineCount = openPipelines.size();
healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
Expand All @@ -271,11 +272,11 @@ private boolean validateHealthyPipelineSafeModeRuleUsingPipelineManager() {
}

boolean isPipelineHealthy(Pipeline pipeline) {
// Verify pipeline has all 3 nodes
// Verify pipeline has all required nodes for target replication.
List<DatanodeDetails> nodes = pipeline.getNodes();
if (nodes.size() != 3) {
LOG.debug("Pipeline {} is not healthy: has {} nodes instead of 3",
pipeline.getId(), nodes.size());
if (nodes.size() != targetRequiredNodes) {
LOG.debug("Pipeline {} is not healthy: has {} nodes instead of {}",
pipeline.getId(), nodes.size(), targetRequiredNodes);
return false;
}

Expand Down Expand Up @@ -316,8 +317,9 @@ public synchronized int getHealthyPipelineThresholdCount() {
@Override
public String getStatusText() {
String status = String.format(
"healthy Ratis/THREE pipelines (=%d) >= healthyPipelineThresholdCount" +
" (=%d)", getCurrentHealthyPipelineCount(),
"healthy %s pipelines (=%d) >= healthyPipelineThresholdCount" +
" (=%d)", targetReplicationLabel,
getCurrentHealthyPipelineCount(),
getHealthyPipelineThresholdCount());
status = updateStatusTextWithSamplePipelines(status);
return status;
Expand All @@ -327,7 +329,7 @@ private synchronized String updateStatusTextWithSamplePipelines(
String status) {
if (validateBasedOnReportProcessing()) {
List<Pipeline> openPipelines = pipelineManager.getPipelines(
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
targetReplicationConfig,
Pipeline.PipelineState.OPEN);

Set<PipelineID> unhealthyPipelines = openPipelines.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -58,6 +57,8 @@ public class OneReplicaPipelineSafeModeRule extends
private int currentReportedPipelineCount = 0;
private PipelineManager pipelineManager;
private final double pipelinePercent;
private final ReplicationConfig targetReplicationConfig;
private final String targetReplicationLabel;

public OneReplicaPipelineSafeModeRule(EventQueue eventQueue, PipelineManager pipelineManager,
SCMSafeModeManager safeModeManager, ConfigurationSource configuration) {
Expand All @@ -76,6 +77,8 @@ public OneReplicaPipelineSafeModeRule(EventQueue eventQueue, PipelineManager pip
" value should be >= 0.0 and <= 1.0");

this.pipelineManager = pipelineManager;
this.targetReplicationConfig = ReplicationConfig.getDefault(configuration);
this.targetReplicationLabel = targetReplicationConfig.configFormat();
initializeRule(false);

}
Expand Down Expand Up @@ -108,8 +111,7 @@ protected synchronized void process(PipelineReportFromDatanode report) {
continue;
}

if (RatisReplicationConfig
.hasFactor(pipeline.getReplicationConfig(), ReplicationFactor.THREE)
if (targetReplicationConfig.equals(pipeline.getReplicationConfig())
&& pipeline.isOpen() &&
!reportedPipelineIDSet.contains(pipeline.getId())) {
if (oldPipelineIDSet.contains(pipeline.getId())) {
Expand Down Expand Up @@ -152,8 +154,9 @@ Set<PipelineID> getReportedPipelineIDSet() {
@Override
public String getStatusText() {
String status = String.format(
"reported Ratis/THREE pipelines with at least one datanode (=%d) "
+ ">= threshold (=%d)", getCurrentReportedPipelineCount(),
"reported %s pipelines with at least one datanode (=%d) "
+ ">= threshold (=%d)", targetReplicationLabel,
getCurrentReportedPipelineCount(),
getThresholdCount());
status = updateStatusTextWithSamplePipelines(status);
return status;
Expand Down Expand Up @@ -184,11 +187,11 @@ public synchronized void refresh(boolean forceRefresh) {
}

private void updateReportedPipelineSet() {
List<Pipeline> openRatisPipelines =
pipelineManager.getPipelines(RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
List<Pipeline> openTargetPipelines =
pipelineManager.getPipelines(targetReplicationConfig,
Pipeline.PipelineState.OPEN);

for (Pipeline pipeline : openRatisPipelines) {
for (Pipeline pipeline : openTargetPipelines) {
PipelineID pipelineID = pipeline.getId();
if (!pipeline.getNodeSet().isEmpty()
&& oldPipelineIDSet.contains(pipelineID)
Expand All @@ -202,7 +205,7 @@ private void updateReportedPipelineSet() {
private void initializeRule(boolean refresh) {

oldPipelineIDSet = pipelineManager.getPipelines(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
targetReplicationConfig,
Pipeline.PipelineState.OPEN)
.stream().map(p -> p.getId()).collect(Collectors.toSet());

Expand Down
Loading
Loading