Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ public Builder setDatanodeDetails(DatanodeDetails details) {
this.id = details.id;
this.ipAddress = details.getIpAddressAsByteString();
this.hostName = details.getHostNameAsByteString();
this.networkName = details.getHostNameAsByteString();
this.networkName = details.getNetworkNameAsByteString();
this.networkLocation = details.getNetworkLocationAsByteString();
this.level = details.getLevel();
this.ports = details.getPorts();
Expand Down
10 changes: 10 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4422,6 +4422,16 @@
</description>
</property>

<property>
<name>ozone.om.container.location.datanode.cache.size</name>
<value>10000</value>
<tag>OZONE, OM</tag>
<description>
The size of the datanode details cache used to reduce duplicate datanode objects
retained by the container location cache in Ozone Manager.
</description>
</property>

<property>
<name>ozone.om.container.location.cache.ttl</name>
<value>360m</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,11 @@ public final class OMConfigKeys {
public static final int OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE_DEFAULT
= 100_000;

public static final String OZONE_OM_CONTAINER_LOCATION_DATANODE_CACHE_SIZE
= "ozone.om.container.location.datanode.cache.size";
public static final int
OZONE_OM_CONTAINER_LOCATION_DATANODE_CACHE_SIZE_DEFAULT = 10_000;

public static final String OZONE_OM_CONTAINER_LOCATION_CACHE_TTL
= "ozone.om.container.location.cache.ttl";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_TTL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_CACHE_TTL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_DATANODE_CACHE_SIZE;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_CONTAINER_LOCATION_DATANODE_CACHE_SIZE_DEFAULT;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
import com.google.common.cache.LoadingCache;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -37,7 +41,8 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
Expand Down Expand Up @@ -73,6 +78,14 @@ static LoadingCache<Long, Pipeline> createContainerLocationCache(
long ttl = configuration.getTimeDuration(
OZONE_OM_CONTAINER_LOCATION_CACHE_TTL,
OZONE_OM_CONTAINER_LOCATION_CACHE_TTL_DEFAULT.getDuration(), unit);
int datanodeCacheMaxSize = configuration.getInt(
OZONE_OM_CONTAINER_LOCATION_DATANODE_CACHE_SIZE,
OZONE_OM_CONTAINER_LOCATION_DATANODE_CACHE_SIZE_DEFAULT);

final Cache<DatanodeID, DatanodeDetails> datanodeDetailsCache =
CacheBuilder.newBuilder()
.maximumSize(datanodeCacheMaxSize)
.build();
Comment thread
ivandika3 marked this conversation as resolved.
Outdated
return CacheBuilder.newBuilder()
.maximumSize(maxSize)
.expireAfterWrite(ttl, unit)
Expand All @@ -81,7 +94,9 @@ static LoadingCache<Long, Pipeline> createContainerLocationCache(
@Nonnull
@Override
public Pipeline load(@Nonnull Long key) throws Exception {
return containerClient.getContainerWithPipeline(key).getPipeline();
Pipeline pipeline =
containerClient.getContainerWithPipeline(key).getPipeline();
return newPipelineWithDNCache(pipeline, datanodeDetailsCache);
}

@Nonnull
Expand All @@ -92,12 +107,32 @@ public Map<Long, Pipeline> loadAll(
.stream()
.collect(Collectors.toMap(
x -> x.getContainerInfo().getContainerID(),
ContainerWithPipeline::getPipeline
x -> newPipelineWithDNCache(x.getPipeline(),
datanodeDetailsCache)
));
}
});
}

static Pipeline newPipelineWithDNCache(Pipeline pipeline,
Cache<DatanodeID, DatanodeDetails> datanodeDetailsCache) {
Pipeline.Builder builder = pipeline.toBuilder();
List<DatanodeDetails> nodes = new ArrayList<>();
for (DatanodeDetails node : pipeline.getNodes()) {
DatanodeDetails datanodeDetails =
datanodeDetailsCache.getIfPresent(node.getID());
// Call compareNodeValues to handle IP / hostname changes
if (datanodeDetails != null && node.compareNodeValues(datanodeDetails)) {
nodes.add(datanodeDetails);
} else {
datanodeDetailsCache.put(node.getID(), node);
nodes.add(node);
}
}
builder.setNodes(nodes);
return builder.build();
}

public ScmBlockLocationProtocol getBlockClient() {
return this.blockClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import static java.util.Arrays.asList;
import static org.apache.hadoop.hdds.client.ReplicationConfig.fromTypeAndFactor;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -41,6 +44,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -106,9 +110,12 @@ public void testGetContainerLocations(String testCaseName,
throws IOException {

Map<Long, ContainerWithPipeline> actualLocations = new HashMap<>();

List<DatanodeDetails> dnList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
dnList.add(randomDatanode());
}
for (long containerId : prepopulatedIds) {
ContainerWithPipeline pipeline = createPipeline(containerId);
ContainerWithPipeline pipeline = createPipeline(containerId, dnList);
actualLocations.put(containerId, pipeline);
}

Expand All @@ -128,7 +135,7 @@ public void testGetContainerLocations(String testCaseName,
if (!expectedScmCallIds.isEmpty()) {
List<ContainerWithPipeline> scmLocations = new ArrayList<>();
for (long containerId : expectedScmCallIds) {
ContainerWithPipeline pipeline = createPipeline(containerId);
ContainerWithPipeline pipeline = createPipeline(containerId, dnList);
scmLocations.add(pipeline);
actualLocations.put(containerId, pipeline);
}
Expand Down Expand Up @@ -166,13 +173,37 @@ public void testGetContainerLocationsWithScmFailures() throws IOException {
assertEquals(runtimeException, actualRt.getCause());
}

ContainerWithPipeline createPipeline(long containerId) {
@Test
public void testDatanodeDetailsCacheUpdatesIpAddressChange() {
Cache<DatanodeID, DatanodeDetails> datanodeDetailsCache =
CacheBuilder.newBuilder().build();
DatanodeDetails original = randomDatanode();
DatanodeDetails updated = DatanodeDetails.newBuilder()
.setDatanodeDetails(original)
.setIpAddress("updated-ip")
.build();

ScmClient.newPipelineWithDNCache(
createPipeline(1L, asList(original)).getPipeline(),
datanodeDetailsCache);
Pipeline refreshed = ScmClient.newPipelineWithDNCache(
createPipeline(2L, asList(updated)).getPipeline(),
datanodeDetailsCache);

DatanodeDetails refreshedNode = refreshed.getNodes().get(0);
assertSame(updated, refreshedNode);
assertEquals("updated-ip", refreshedNode.getIpAddress());
assertSame(updated, datanodeDetailsCache.getIfPresent(original.getID()));
Comment thread
ivandika3 marked this conversation as resolved.
}

ContainerWithPipeline createPipeline(long containerId,
List<DatanodeDetails> dnList) {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerID(containerId)
.build();
Pipeline pipeline = Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setNodes(asList(randomDatanode(), randomDatanode()))
.setNodes(dnList)
.setReplicationConfig(fromTypeAndFactor(
ReplicationType.RATIS, ReplicationFactor.THREE))
.setState(Pipeline.PipelineState.OPEN)
Expand Down