Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.hadoop.ozone.container.checksum;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask;
import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
Expand Down Expand Up @@ -79,6 +82,19 @@ public String getMetricDescriptionSegment() {
return METRIC_DESCRIPTION_SEGMENT;
}

@Override
public Collection<HddsVolume> getVolumes() {
org.apache.hadoop.ozone.container.common.interfaces.Container<?>
container = controller.getContainer(getContainerId());
if (container != null) {
HddsVolume vol = (HddsVolume) container.getContainerData().getVolume();
if (vol != null) {
return Collections.singletonList(vol);
}
}
return Collections.emptyList();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
}
nextHB = new AtomicLong(Time.monotonicNow());

ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);

ContainerImporter importer = new ContainerImporter(conf,
container.getContainerSet(),
container.getController(),
Expand All @@ -205,15 +208,14 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
importer,
new SimpleContainerDownloader(conf, certClient));
ContainerReplicator pushReplicator = new PushReplicator(conf,
new OnDemandContainerReplicationSource(container.getController()),
new OnDemandContainerReplicationSource(container.getController(),
replicationConfig),
new GrpcContainerUploader(conf, certClient, container.getController())
);

pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull");
pushReplicatorWithMetrics = new MeasuredReplicator(pushReplicator, "push");

ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);
supervisor = ReplicationSupervisor.newBuilder()
.stateContext(context)
.datanodeConfig(dnConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void handle(SCMCommand<?> command, OzoneContainer container,
ECReconstructionCommandInfo reconstructionCommandInfo =
new ECReconstructionCommandInfo(ecContainersCommand);
ECReconstructionCoordinatorTask task = new ECReconstructionCoordinatorTask(
coordinator, reconstructionCommandInfo);
coordinator, reconstructionCommandInfo, container.getController(),
context.getParent().getDatanodeDetails());
this.supervisor.addTask(task);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public void handle(SCMCommand<?> command, OzoneContainer container,
replicateCommand.getTargetDatanode() == null ?
downloadReplicator : pushReplicator;

ReplicationTask task = new ReplicationTask(replicateCommand, replicator);
ReplicationTask task = new ReplicationTask(replicateCommand, replicator,
container.getController());
supervisor.addTask(task);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -107,6 +108,9 @@ public class HddsVolume extends StorageVolume {
private final AtomicBoolean dbLoaded = new AtomicBoolean(false);
private final AtomicBoolean dbLoadFailure = new AtomicBoolean(false);

private final AtomicInteger activeOutboundReplications =
new AtomicInteger(0);

/**
* Builder for HddsVolume.
*/
Expand Down Expand Up @@ -629,4 +633,16 @@ public void compactDb() {
LOG.warn("compact rocksdb error in {}", dbFilePath, e);
}
}

public int incActiveOutboundReplications() {
return activeOutboundReplications.incrementAndGet();
}

public int decActiveOutboundReplications() {
return activeOutboundReplications.decrementAndGet();
}

public int getActiveOutboundReplications() {
return activeOutboundReplications.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@

package org.apache.hadoop.ozone.container.ec.reconstruction;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.SortedMap;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
Expand All @@ -32,18 +40,24 @@ public class ECReconstructionCoordinatorTask
LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class);
private final ECReconstructionCoordinator reconstructionCoordinator;
private final ECReconstructionCommandInfo reconstructionCommandInfo;
private final ContainerController containerController;
private final DatanodeDetails self;
private final String debugString;
public static final String METRIC_NAME = "ECReconstructions";
public static final String METRIC_DESCRIPTION_SEGMENT = "EC reconstructions";

public ECReconstructionCoordinatorTask(
ECReconstructionCoordinator coordinator,
ECReconstructionCommandInfo reconstructionCommandInfo) {
ECReconstructionCommandInfo reconstructionCommandInfo,
ContainerController containerController,
DatanodeDetails self) {
super(reconstructionCommandInfo.getContainerID(),
reconstructionCommandInfo.getDeadline(),
reconstructionCommandInfo.getTerm());
this.reconstructionCoordinator = coordinator;
this.reconstructionCommandInfo = reconstructionCommandInfo;
this.containerController = containerController;
this.self = self;
debugString = reconstructionCommandInfo.toString();
}

Expand Down Expand Up @@ -117,4 +131,32 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(getContainerId());
}

@Override
public Collection<HddsVolume> getVolumes() {
if (containerController == null || self == null) {
return Collections.emptyList();
}
List<HddsVolume> volumes = new ArrayList<>();
SortedMap<Integer, DatanodeDetails> sources =
reconstructionCommandInfo.getSourceNodeMap();
boolean hasLocalSource = false;
for (DatanodeDetails dn : sources.values()) {
if (dn.equals(self)) {
hasLocalSource = true;
break;
}
}
if (hasLocalSource) {
org.apache.hadoop.ozone.container.common.interfaces.Container<?>
container = containerController.getContainer(getContainerId());
if (container != null) {
HddsVolume vol = (HddsVolume) container.getContainerData().getVolume();
if (vol != null) {
volumes.add(vol);
}
}
}
return volumes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collection;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;

/**
* Abstract class to capture common variables and methods for different types
Expand Down Expand Up @@ -90,6 +92,11 @@ public long getDeadline() {
return deadlineMsSinceEpoch;
}

/**
* Returns any volumes associated with this task.
*/
public abstract Collection<HddsVolume> getVolumes();

/**
* Abstract method which needs to be overridden by the sub classes to execute
* the task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.hadoop.ozone.container.replication;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;

import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;

Expand All @@ -34,10 +36,13 @@ public class OnDemandContainerReplicationSource
implements ContainerReplicationSource {

private final ContainerController controller;
private final ReplicationServer.ReplicationConfig config;

public OnDemandContainerReplicationSource(
ContainerController controller) {
ContainerController controller,
ReplicationServer.ReplicationConfig config) {
this.controller = controller;
this.config = config;
}

@Override
Expand All @@ -57,8 +62,24 @@ public void copyData(long containerId, OutputStream destination,
" is not found.", CONTAINER_NOT_FOUND);
}

controller.exportContainer(
container.getContainerType(), containerId, destination,
new TarContainerPacker(compression));
HddsVolume volume = (HddsVolume) container.getContainerData().getVolume();
if (volume != null) {
if (volume.getActiveOutboundReplications() >=
config.getVolumeOutboundLimit()) {
throw new StorageContainerException("Volume " + volume.getStorageID() +
" has reached the maximum number of concurrent replication reads ("
+ config.getVolumeOutboundLimit() + ")", CONTAINER_INTERNAL_ERROR);
}
volume.incActiveOutboundReplications();
}
try {
controller.exportContainer(
container.getContainerType(), containerId, destination,
new TarContainerPacker(compression));
} finally {
if (volume != null) {
volume.decActiveOutboundReplications();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class ReplicationServer {

private ContainerController controller;

private ReplicationConfig replicationConfig;

private int port;
private final ContainerImporter importer;

Expand All @@ -75,6 +77,7 @@ public ReplicationServer(ContainerController controller,
this.secConf = secConf;
this.caClient = caClient;
this.controller = controller;
this.replicationConfig = replicationConfig;
this.importer = importer;
this.port = replicationConfig.getPort();

Expand Down Expand Up @@ -103,7 +106,8 @@ public ReplicationServer(ContainerController controller,

public void init() {
GrpcReplicationService grpcReplicationService = new GrpcReplicationService(
new OnDemandContainerReplicationSource(controller), importer);
new OnDemandContainerReplicationSource(controller, replicationConfig),
importer);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.addService(ServerInterceptors.intercept(
Expand Down Expand Up @@ -225,10 +229,27 @@ public static final class ReplicationConfig {
)
private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT;

@Config(key = "hdds.datanode.replication.volume.outbound.limit",
type = ConfigType.INT,
defaultValue = "2",
tags = {DATANODE},
description = "The maximum number of concurrent replication reads " +
"allowed per physical disk volume."
)
private int volumeOutboundLimit = 2;

public double getOutOfServiceFactor() {
return outOfServiceFactor;
}

public int getVolumeOutboundLimit() {
return volumeOutboundLimit;
}

public void setVolumeOutboundLimit(int limit) {
this.volumeOutboundLimit = limit;
}

public int scaleOutOfServiceLimit(int original) {
return (int) Math.ceil(original * outOfServiceFactor);
}
Expand Down
Loading