HDDS-228. Add the ReplicaMaps to ContainerStateManager.
Contributed by Ajay Kumar.
This commit is contained in:
parent
a08812a1b1
commit
5ee90efed3
@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
@ -488,4 +489,37 @@ public class ContainerStateManager implements Closeable {
|
||||
public void close() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the latest list of DataNodes where replica for given containerId
|
||||
* exist. Throws an SCMException if no entry is found for given containerId.
|
||||
*
|
||||
* @param containerID
|
||||
* @return Set<DatanodeDetails>
|
||||
*/
|
||||
public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID)
|
||||
throws SCMException {
|
||||
return containers.getContainerReplicas(containerID);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a container Replica for given DataNode.
|
||||
*
|
||||
* @param containerID
|
||||
* @param dn
|
||||
*/
|
||||
public void addContainerReplica(ContainerID containerID, DatanodeDetails dn) {
|
||||
containers.addContainerReplica(containerID, dn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a container Replica for given DataNode.
|
||||
*
|
||||
* @param containerID
|
||||
* @param dn
|
||||
* @return True of dataNode is removed successfully else false.
|
||||
*/
|
||||
public boolean removeContainerReplica(ContainerID containerID,
|
||||
DatanodeDetails dn) throws SCMException {
|
||||
return containers.removeContainerReplica(containerID, dn);
|
||||
}
|
||||
}
|
||||
|
@ -18,13 +18,18 @@
|
||||
|
||||
package org.apache.hadoop.hdds.scm.container.states;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -83,6 +88,8 @@ public class ContainerStateMap {
|
||||
private final ContainerAttribute<ReplicationType> typeMap;
|
||||
|
||||
private final Map<ContainerID, ContainerInfo> containerMap;
|
||||
// Map to hold replicas of given container.
|
||||
private final Map<ContainerID, Set<DatanodeDetails>> contReplicaMap;
|
||||
private final static NavigableSet<ContainerID> EMPTY_SET =
|
||||
Collections.unmodifiableNavigableSet(new TreeSet<>());
|
||||
|
||||
@ -101,6 +108,7 @@ public class ContainerStateMap {
|
||||
typeMap = new ContainerAttribute<>();
|
||||
containerMap = new HashMap<>();
|
||||
autoLock = new AutoCloseableLock();
|
||||
contReplicaMap = new HashMap<>();
|
||||
// new InstrumentedLock(getClass().getName(), LOG,
|
||||
// new ReentrantLock(),
|
||||
// 1000,
|
||||
@ -157,6 +165,84 @@ public class ContainerStateMap {
|
||||
return containerMap.get(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the latest list of DataNodes where replica for given containerId
|
||||
* exist. Throws an SCMException if no entry is found for given containerId.
|
||||
*
|
||||
* @param containerID
|
||||
* @return Set<DatanodeDetails>
|
||||
*/
|
||||
public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID)
|
||||
throws SCMException {
|
||||
Preconditions.checkNotNull(containerID);
|
||||
try (AutoCloseableLock lock = autoLock.acquire()) {
|
||||
if (contReplicaMap.containsKey(containerID)) {
|
||||
return Collections
|
||||
.unmodifiableSet(contReplicaMap.get(containerID));
|
||||
}
|
||||
}
|
||||
throw new SCMException(
|
||||
"No entry exist for containerId: " + containerID + " in replica map.",
|
||||
ResultCodes.FAILED_TO_FIND_CONTAINER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds given datanodes as nodes where replica for given containerId exist.
|
||||
* Logs a debug entry if a datanode is already added as replica for given
|
||||
* ContainerId.
|
||||
*
|
||||
* @param containerID
|
||||
* @param dnList
|
||||
*/
|
||||
public void addContainerReplica(ContainerID containerID,
|
||||
DatanodeDetails... dnList) {
|
||||
Preconditions.checkNotNull(containerID);
|
||||
// Take lock to avoid race condition around insertion.
|
||||
try (AutoCloseableLock lock = autoLock.acquire()) {
|
||||
for (DatanodeDetails dn : dnList) {
|
||||
Preconditions.checkNotNull(dn);
|
||||
if (contReplicaMap.containsKey(containerID)) {
|
||||
if(!contReplicaMap.get(containerID).add(dn)) {
|
||||
LOG.debug("ReplicaMap already contains entry for container Id: "
|
||||
+ "{},DataNode: {}", containerID, dn);
|
||||
}
|
||||
} else {
|
||||
Set<DatanodeDetails> dnSet = new HashSet<>();
|
||||
dnSet.add(dn);
|
||||
contReplicaMap.put(containerID, dnSet);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a container Replica for given DataNode.
|
||||
*
|
||||
* @param containerID
|
||||
* @param dn
|
||||
* @return True of dataNode is removed successfully else false.
|
||||
*/
|
||||
public boolean removeContainerReplica(ContainerID containerID,
|
||||
DatanodeDetails dn) throws SCMException {
|
||||
Preconditions.checkNotNull(containerID);
|
||||
Preconditions.checkNotNull(dn);
|
||||
|
||||
// Take lock to avoid race condition.
|
||||
try (AutoCloseableLock lock = autoLock.acquire()) {
|
||||
if (contReplicaMap.containsKey(containerID)) {
|
||||
return contReplicaMap.get(containerID).remove(dn);
|
||||
}
|
||||
}
|
||||
throw new SCMException(
|
||||
"No entry exist for containerId: " + containerID + " in replica map.",
|
||||
ResultCodes.FAILED_TO_FIND_CONTAINER);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static Logger getLOG() {
|
||||
return LOG;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the full container Map.
|
||||
*
|
||||
|
@ -17,14 +17,22 @@
|
||||
package org.apache.hadoop.hdds.scm.container;
|
||||
|
||||
import com.google.common.primitives.Longs;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -35,6 +43,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
/**
|
||||
* Tests for ContainerStateManager.
|
||||
@ -333,4 +342,74 @@ public class TestContainerStateManager {
|
||||
Assert.assertEquals(allocatedSize, currentInfo.getAllocatedBytes());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicaMap() throws Exception {
|
||||
GenericTestUtils.setLogLevel(ContainerStateMap.getLOG(), Level.DEBUG);
|
||||
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
|
||||
.captureLogs(ContainerStateMap.getLOG());
|
||||
DatanodeDetails dn1 = DatanodeDetails.newBuilder().setHostName("host1")
|
||||
.setIpAddress("1.1.1.1")
|
||||
.setUuid(UUID.randomUUID().toString()).build();
|
||||
DatanodeDetails dn2 = DatanodeDetails.newBuilder().setHostName("host2")
|
||||
.setIpAddress("2.2.2.2")
|
||||
.setUuid(UUID.randomUUID().toString()).build();
|
||||
|
||||
// Test 1: no replica's exist
|
||||
ContainerID containerID = ContainerID.valueof(RandomUtils.nextLong());
|
||||
Set<DatanodeDetails> replicaSet;
|
||||
LambdaTestUtils.intercept(SCMException.class, "", () -> {
|
||||
containerStateManager.getContainerReplicas(containerID);
|
||||
});
|
||||
|
||||
// Test 2: Add replica nodes and then test
|
||||
containerStateManager.addContainerReplica(containerID, dn1);
|
||||
containerStateManager.addContainerReplica(containerID, dn2);
|
||||
replicaSet = containerStateManager.getContainerReplicas(containerID);
|
||||
Assert.assertEquals(2, replicaSet.size());
|
||||
Assert.assertTrue(replicaSet.contains(dn1));
|
||||
Assert.assertTrue(replicaSet.contains(dn2));
|
||||
|
||||
// Test 3: Remove one replica node and then test
|
||||
containerStateManager.removeContainerReplica(containerID, dn1);
|
||||
replicaSet = containerStateManager.getContainerReplicas(containerID);
|
||||
Assert.assertEquals(1, replicaSet.size());
|
||||
Assert.assertFalse(replicaSet.contains(dn1));
|
||||
Assert.assertTrue(replicaSet.contains(dn2));
|
||||
|
||||
// Test 3: Remove second replica node and then test
|
||||
containerStateManager.removeContainerReplica(containerID, dn2);
|
||||
replicaSet = containerStateManager.getContainerReplicas(containerID);
|
||||
Assert.assertEquals(0, replicaSet.size());
|
||||
Assert.assertFalse(replicaSet.contains(dn1));
|
||||
Assert.assertFalse(replicaSet.contains(dn2));
|
||||
|
||||
// Test 4: Re-insert dn1
|
||||
containerStateManager.addContainerReplica(containerID, dn1);
|
||||
replicaSet = containerStateManager.getContainerReplicas(containerID);
|
||||
Assert.assertEquals(1, replicaSet.size());
|
||||
Assert.assertTrue(replicaSet.contains(dn1));
|
||||
Assert.assertFalse(replicaSet.contains(dn2));
|
||||
|
||||
// Re-insert dn2
|
||||
containerStateManager.addContainerReplica(containerID, dn2);
|
||||
replicaSet = containerStateManager.getContainerReplicas(containerID);
|
||||
Assert.assertEquals(2, replicaSet.size());
|
||||
Assert.assertTrue(replicaSet.contains(dn1));
|
||||
Assert.assertTrue(replicaSet.contains(dn2));
|
||||
|
||||
Assert.assertFalse(logCapturer.getOutput().contains(
|
||||
"ReplicaMap already contains entry for container Id: " + containerID
|
||||
.toString() + ",DataNode: " + dn1.toString()));
|
||||
// Re-insert dn1
|
||||
containerStateManager.addContainerReplica(containerID, dn1);
|
||||
replicaSet = containerStateManager.getContainerReplicas(containerID);
|
||||
Assert.assertEquals(2, replicaSet.size());
|
||||
Assert.assertTrue(replicaSet.contains(dn1));
|
||||
Assert.assertTrue(replicaSet.contains(dn2));
|
||||
Assert.assertTrue(logCapturer.getOutput().contains(
|
||||
"ReplicaMap already contains entry for container Id: " + containerID
|
||||
.toString() + ",DataNode: " + dn1.toString()));
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user