HDDS-1025. Handle replication of closed containers in DeadNodeHanlder. Contributed by Bharat Viswanadham.
This commit is contained in:
parent
0ab7fc9200
commit
16195eaee1
@ -21,6 +21,7 @@
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerException;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
||||
@ -81,8 +82,6 @@ public void onMessage(DatanodeDetails datanodeDetails,
|
||||
try {
|
||||
final ContainerInfo container = containerManager.getContainer(id);
|
||||
// TODO: For open containers, trigger close on other nodes
|
||||
// TODO: Check replica count and call replication manager
|
||||
// on these containers.
|
||||
if (!container.isOpen()) {
|
||||
Set<ContainerReplica> replicas = containerManager
|
||||
.getContainerReplicas(id);
|
||||
@ -92,6 +91,9 @@ public void onMessage(DatanodeDetails datanodeDetails,
|
||||
.ifPresent(replica -> {
|
||||
try {
|
||||
containerManager.removeContainerReplica(id, replica);
|
||||
ContainerInfo containerInfo =
|
||||
containerManager.getContainer(id);
|
||||
replicateIfNeeded(containerInfo, publisher);
|
||||
} catch (ContainerException ex) {
|
||||
LOG.warn("Exception while removing container replica #{} " +
|
||||
"for container #{}.", replica, container, ex);
|
||||
@ -109,13 +111,21 @@ public void onMessage(DatanodeDetails datanodeDetails,
|
||||
*/
|
||||
private void replicateIfNeeded(ContainerInfo container,
|
||||
EventPublisher publisher) throws ContainerNotFoundException {
|
||||
final int existingReplicas = containerManager
|
||||
.getContainerReplicas(container.containerID()).size();
|
||||
final int expectedReplicas = container.getReplicationFactor().getNumber();
|
||||
if (existingReplicas != expectedReplicas) {
|
||||
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
|
||||
new ReplicationRequest(
|
||||
container.getContainerID(), existingReplicas, expectedReplicas));
|
||||
// Replicate only closed and Quasi closed containers
|
||||
if (container.getState() == HddsProtos.LifeCycleState.CLOSED ||
|
||||
container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
|
||||
final int existingReplicas = containerManager
|
||||
.getContainerReplicas(container.containerID()).size();
|
||||
final int expectedReplicas = container.getReplicationFactor().getNumber();
|
||||
if (existingReplicas != expectedReplicas) {
|
||||
LOG.debug("Replicate Request fired for container {}, exisiting " +
|
||||
"replica count {}, expected replica count {}",
|
||||
container.getContainerID(), existingReplicas, expectedReplicas);
|
||||
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
|
||||
new ReplicationRequest(
|
||||
container.getContainerID(), existingReplicas,
|
||||
expectedReplicas));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -446,4 +446,19 @@ public static void closeContainer(ContainerManager containerManager,
|
||||
id, HddsProtos.LifeCycleEvent.CLOSE);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Move the container to Quaise close state.
|
||||
* @param containerManager
|
||||
* @param id
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void quasiCloseContainer(ContainerManager containerManager,
|
||||
ContainerID id) throws IOException {
|
||||
containerManager.updateContainerState(
|
||||
id, HddsProtos.LifeCycleEvent.FINALIZE);
|
||||
containerManager.updateContainerState(
|
||||
id, HddsProtos.LifeCycleEvent.QUASI_CLOSE);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -59,6 +59,7 @@
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
/**
|
||||
* Test DeadNodeHandler.
|
||||
@ -140,17 +141,25 @@ public void testOnMessage() throws IOException, NodeNotFoundException {
|
||||
TestUtils.allocateContainer(containerManager);
|
||||
ContainerInfo container3 =
|
||||
TestUtils.allocateContainer(containerManager);
|
||||
ContainerInfo container4 =
|
||||
TestUtils.allocateContainer(containerManager);
|
||||
|
||||
registerReplicas(datanode1, container1, container2);
|
||||
registerReplicas(datanode2, container1, container3);
|
||||
registerContainers(datanode1, container1, container2, container4);
|
||||
registerContainers(datanode2, container1, container2);
|
||||
registerContainers(datanode3, container3);
|
||||
|
||||
registerReplicas(containerManager, container1, datanode1, datanode2);
|
||||
registerReplicas(containerManager, container2, datanode1);
|
||||
registerReplicas(containerManager, container3, datanode2);
|
||||
registerReplicas(containerManager, container2, datanode1, datanode2);
|
||||
registerReplicas(containerManager, container3, datanode3);
|
||||
registerReplicas(containerManager, container4, datanode1);
|
||||
|
||||
TestUtils.closeContainer(containerManager, container1.containerID());
|
||||
TestUtils.closeContainer(containerManager, container2.containerID());
|
||||
TestUtils.closeContainer(containerManager, container3.containerID());
|
||||
TestUtils.quasiCloseContainer(containerManager, container3.containerID());
|
||||
|
||||
GenericTestUtils.setLogLevel(DeadNodeHandler.getLogger(), Level.DEBUG);
|
||||
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
|
||||
.captureLogs(DeadNodeHandler.getLogger());
|
||||
|
||||
deadNodeHandler.onMessage(datanode1, publisher);
|
||||
|
||||
@ -162,13 +171,33 @@ public void testOnMessage() throws IOException, NodeNotFoundException {
|
||||
|
||||
Set<ContainerReplica> container2Replicas = containerManager
|
||||
.getContainerReplicas(new ContainerID(container2.getContainerID()));
|
||||
Assert.assertEquals(0, container2Replicas.size());
|
||||
Assert.assertEquals(1, container2Replicas.size());
|
||||
Assert.assertEquals(datanode2,
|
||||
container2Replicas.iterator().next().getDatanodeDetails());
|
||||
|
||||
Set<ContainerReplica> container3Replicas = containerManager
|
||||
.getContainerReplicas(new ContainerID(container3.getContainerID()));
|
||||
Assert.assertEquals(1, container3Replicas.size());
|
||||
Assert.assertEquals(datanode2,
|
||||
Assert.assertEquals(datanode3,
|
||||
container3Replicas.iterator().next().getDatanodeDetails());
|
||||
|
||||
// Replicate should be fired for container 1 and container 2 as now
|
||||
// datanode 1 is dead, these 2 will not match with expected replica count
|
||||
// and their state is one of CLOSED/QUASI_CLOSE.
|
||||
Assert.assertTrue(logCapturer.getOutput().contains(
|
||||
"Replicate Request fired for container " +
|
||||
container1.getContainerID()));
|
||||
Assert.assertTrue(logCapturer.getOutput().contains(
|
||||
"Replicate Request fired for container " +
|
||||
container2.getContainerID()));
|
||||
|
||||
// as container4 is still in open state, replicate event should not have
|
||||
// fired for this.
|
||||
Assert.assertFalse(logCapturer.getOutput().contains(
|
||||
"Replicate Request fired for container " +
|
||||
container4.getContainerID()));
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -272,7 +301,13 @@ private void registerReplicas(ContainerManager contManager,
|
||||
}
|
||||
}
|
||||
|
||||
private void registerReplicas(DatanodeDetails datanode,
|
||||
/**
|
||||
* Update containers available on the datanode.
|
||||
* @param datanode
|
||||
* @param containers
|
||||
* @throws NodeNotFoundException
|
||||
*/
|
||||
private void registerContainers(DatanodeDetails datanode,
|
||||
ContainerInfo... containers)
|
||||
throws NodeNotFoundException {
|
||||
nodeManager
|
||||
|
Loading…
Reference in New Issue
Block a user