Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
}
nextHB = new AtomicLong(Time.monotonicNow());

ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);

ContainerImporter importer = new ContainerImporter(conf,
container.getContainerSet(),
container.getController(),
Expand All @@ -205,15 +208,14 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
importer,
new SimpleContainerDownloader(conf, certClient));
ContainerReplicator pushReplicator = new PushReplicator(conf,
new OnDemandContainerReplicationSource(container.getController()),
new OnDemandContainerReplicationSource(container.getController(),
replicationConfig),
new GrpcContainerUploader(conf, certClient, container.getController())
);

pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull");
pushReplicatorWithMetrics = new MeasuredReplicator(pushReplicator, "push");

ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);
supervisor = ReplicationSupervisor.newBuilder()
.stateContext(context)
.datanodeConfig(dnConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -107,6 +108,9 @@ public class HddsVolume extends StorageVolume {
private final AtomicBoolean dbLoaded = new AtomicBoolean(false);
private final AtomicBoolean dbLoadFailure = new AtomicBoolean(false);

private final AtomicInteger activeOutboundReplications =
new AtomicInteger(0);

/**
* Builder for HddsVolume.
*/
Expand Down Expand Up @@ -629,4 +633,16 @@ public void compactDb() {
LOG.warn("compact rocksdb error in {}", dbFilePath, e);
}
}

public int incActiveOutboundReplications() {
return activeOutboundReplications.incrementAndGet();
}

public int decActiveOutboundReplications() {
return activeOutboundReplications.decrementAndGet();
}

public int getActiveOutboundReplications() {
return activeOutboundReplications.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.hadoop.ozone.container.replication;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;

import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;

Expand All @@ -34,10 +36,13 @@ public class OnDemandContainerReplicationSource
implements ContainerReplicationSource {

private final ContainerController controller;
private final ReplicationServer.ReplicationConfig config;

public OnDemandContainerReplicationSource(
ContainerController controller) {
ContainerController controller,
ReplicationServer.ReplicationConfig config) {
this.controller = controller;
this.config = config;
}

@Override
Expand All @@ -57,8 +62,24 @@ public void copyData(long containerId, OutputStream destination,
" is not found.", CONTAINER_NOT_FOUND);
}

controller.exportContainer(
container.getContainerType(), containerId, destination,
new TarContainerPacker(compression));
HddsVolume volume = (HddsVolume) container.getContainerData().getVolume();
if (volume != null) {
if (volume.getActiveOutboundReplications() >=
config.getVolumeOutboundLimit()) {
throw new StorageContainerException("Volume " + volume.getStorageID() +
" has reached the maximum number of concurrent replication reads ("
+ config.getVolumeOutboundLimit() + ")", CONTAINER_INTERNAL_ERROR);
}
volume.incActiveOutboundReplications();
}
try {
controller.exportContainer(
container.getContainerType(), containerId, destination,
new TarContainerPacker(compression));
} finally {
if (volume != null) {
volume.decActiveOutboundReplications();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class ReplicationServer {

private ContainerController controller;

private ReplicationConfig replicationConfig;

private int port;
private final ContainerImporter importer;

Expand All @@ -75,6 +77,7 @@ public ReplicationServer(ContainerController controller,
this.secConf = secConf;
this.caClient = caClient;
this.controller = controller;
this.replicationConfig = replicationConfig;
this.importer = importer;
this.port = replicationConfig.getPort();

Expand Down Expand Up @@ -103,7 +106,8 @@ public ReplicationServer(ContainerController controller,

public void init() {
GrpcReplicationService grpcReplicationService = new GrpcReplicationService(
new OnDemandContainerReplicationSource(controller), importer);
new OnDemandContainerReplicationSource(controller, replicationConfig),
importer);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.addService(ServerInterceptors.intercept(
Expand Down Expand Up @@ -225,10 +229,27 @@ public static final class ReplicationConfig {
)
private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT;

@Config(key = "hdds.datanode.replication.volume.outbound.limit",
type = ConfigType.INT,
defaultValue = "2",
tags = {DATANODE},
description = "The maximum number of concurrent replication reads " +
"allowed per physical disk volume."
)
private int volumeOutboundLimit = 2;

public double getOutOfServiceFactor() {
return outOfServiceFactor;
}

public int getVolumeOutboundLimit() {
return volumeOutboundLimit;
}

public void setVolumeOutboundLimit(int limit) {
this.volumeOutboundLimit = limit;
}

public int scaleOutOfServiceLimit(int original) {
return (int) Math.ceil(original * outOfServiceFactor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ public void testDownload() throws IOException {
@Test
public void testUpload() {
ContainerReplicationSource source =
new OnDemandContainerReplicationSource(containerController);
new OnDemandContainerReplicationSource(containerController,
new ReplicationServer.ReplicationConfig());

GrpcContainerUploader uploader = new GrpcContainerUploader(conf, null, containerController);

Expand Down
Loading