HDDS-401. Update storage statistics on dead node. Contributed by LiXin Ge.
This commit is contained in:
parent
f9c0221623
commit
184544eff8
@ -42,19 +42,24 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
|
||||
|
||||
private final ContainerStateManager containerStateManager;
|
||||
|
||||
private final NodeManager nodeManager;
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(DeadNodeHandler.class);
|
||||
|
||||
public DeadNodeHandler(
|
||||
Node2ContainerMap node2ContainerMap,
|
||||
ContainerStateManager containerStateManager) {
|
||||
ContainerStateManager containerStateManager, NodeManager nodeManager) {
|
||||
this.node2ContainerMap = node2ContainerMap;
|
||||
this.containerStateManager = containerStateManager;
|
||||
this.nodeManager = nodeManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(DatanodeDetails datanodeDetails,
|
||||
EventPublisher publisher) {
|
||||
nodeManager.processDeadNode(datanodeDetails.getUuid());
|
||||
|
||||
Set<ContainerID> containers =
|
||||
node2ContainerMap.getContainers(datanodeDetails.getUuid());
|
||||
if (containers == null) {
|
||||
|
@ -147,4 +147,11 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
||||
* @param nodeReport
|
||||
*/
|
||||
void processNodeReport(UUID dnUuid, NodeReportProto nodeReport);
|
||||
|
||||
/**
|
||||
* Process a dead node event in this Node Manager.
|
||||
*
|
||||
* @param dnUuid datanode uuid.
|
||||
*/
|
||||
void processDeadNode(UUID dnUuid);
|
||||
}
|
||||
|
@ -490,4 +490,20 @@ public void onMessage(CommandForDatanode commandForDatanode,
|
||||
addDatanodeCommand(commandForDatanode.getDatanodeId(),
|
||||
commandForDatanode.getCommand());
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the node stats and update the storage stats
|
||||
* in this SCM Node Manager.
|
||||
*
|
||||
* @param dnUuid datanode uuid.
|
||||
*/
|
||||
@Override
|
||||
public void processDeadNode(UUID dnUuid) {
|
||||
SCMNodeStat stat = nodeStats.get(dnUuid);
|
||||
LOG.trace("Update stat values as Datanode {} is dead.", dnUuid);
|
||||
if (stat != null) {
|
||||
scmStat.subtract(stat);
|
||||
stat.set(0, 0, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -229,7 +229,7 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
|
||||
new StaleNodeHandler(node2ContainerMap,
|
||||
scmContainerManager.getPipelineSelector());
|
||||
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
|
||||
getScmContainerManager().getStateManager());
|
||||
getScmContainerManager().getStateManager(), scmNodeManager);
|
||||
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
|
||||
PendingDeleteHandler pendingDeleteHandler =
|
||||
new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
|
||||
|
@ -421,6 +421,21 @@ public void onMessage(CommandForDatanode commandForDatanode,
|
||||
commandForDatanode.getCommand());
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the node stats and update the storage stats
|
||||
* in this Node Manager.
|
||||
*
|
||||
* @param dnUuid UUID of the datanode.
|
||||
*/
|
||||
@Override
|
||||
public void processDeadNode(UUID dnUuid) {
|
||||
SCMNodeStat stat = this.nodeMetricMap.get(dnUuid);
|
||||
if (stat != null) {
|
||||
aggregateStat.subtract(stat);
|
||||
stat.set(0, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A class to declare some values for the nodes so that our tests
|
||||
* won't fail.
|
||||
|
@ -27,19 +27,27 @@
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
|
||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
|
||||
import org.apache.hadoop.hdds.scm.container.Mapping;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
|
||||
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
|
||||
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
|
||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import static org.mockito.Matchers.eq;
|
||||
@ -51,6 +59,29 @@
|
||||
public class TestDeadNodeHandler {
|
||||
|
||||
private List<ReplicationRequest> sentEvents = new ArrayList<>();
|
||||
private SCMNodeManager nodeManager;
|
||||
private Node2ContainerMap node2ContainerMap;
|
||||
private ContainerStateManager containerStateManager;
|
||||
private NodeReportHandler nodeReportHandler;
|
||||
private DeadNodeHandler deadNodeHandler;
|
||||
private EventPublisher publisher;
|
||||
private EventQueue eventQueue;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
node2ContainerMap = new Node2ContainerMap();
|
||||
containerStateManager = new ContainerStateManager(conf,
|
||||
Mockito.mock(Mapping.class),
|
||||
Mockito.mock(PipelineSelector.class));
|
||||
eventQueue = new EventQueue();
|
||||
nodeManager = new SCMNodeManager(conf, "cluster1", null, eventQueue);
|
||||
deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
|
||||
containerStateManager, nodeManager);
|
||||
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
|
||||
publisher = Mockito.mock(EventPublisher.class);
|
||||
nodeReportHandler = new NodeReportHandler(nodeManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnMessage() throws IOException {
|
||||
@ -58,13 +89,6 @@ public void testOnMessage() throws IOException {
|
||||
DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails();
|
||||
DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails();
|
||||
|
||||
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
|
||||
ContainerStateManager containerStateManager = new ContainerStateManager(
|
||||
new OzoneConfiguration(),
|
||||
Mockito.mock(Mapping.class),
|
||||
Mockito.mock(PipelineSelector.class)
|
||||
);
|
||||
|
||||
ContainerInfo container1 =
|
||||
TestUtils.allocateContainer(containerStateManager);
|
||||
ContainerInfo container2 =
|
||||
@ -72,9 +96,6 @@ public void testOnMessage() throws IOException {
|
||||
ContainerInfo container3 =
|
||||
TestUtils.allocateContainer(containerStateManager);
|
||||
|
||||
DeadNodeHandler handler =
|
||||
new DeadNodeHandler(node2ContainerMap, containerStateManager);
|
||||
|
||||
registerReplicas(node2ContainerMap, datanode1, container1, container2);
|
||||
registerReplicas(node2ContainerMap, datanode2, container1, container3);
|
||||
|
||||
@ -84,10 +105,8 @@ public void testOnMessage() throws IOException {
|
||||
|
||||
TestUtils.closeContainer(containerStateManager, container1);
|
||||
|
||||
EventPublisher publisher = Mockito.mock(EventPublisher.class);
|
||||
|
||||
//WHEN datanode1 is dead
|
||||
handler.onMessage(datanode1, publisher);
|
||||
deadNodeHandler.onMessage(datanode1, publisher);
|
||||
|
||||
//THEN
|
||||
//node2ContainerMap has not been changed
|
||||
@ -128,22 +147,76 @@ public void testOnMessage() throws IOException {
|
||||
replicationRequestParameter.getValue().getExpecReplicationCount());
|
||||
}
|
||||
|
||||
private void registerReplicas(ContainerStateManager containerStateManager,
|
||||
@Test
|
||||
public void testStatisticsUpdate() throws Exception {
|
||||
//GIVEN
|
||||
DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails();
|
||||
DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails();
|
||||
String storagePath1 = GenericTestUtils.getRandomizedTempPath()
|
||||
.concat("/" + datanode1.getUuidString());
|
||||
String storagePath2 = GenericTestUtils.getRandomizedTempPath()
|
||||
.concat("/" + datanode2.getUuidString());
|
||||
|
||||
StorageReportProto storageOne = TestUtils.createStorageReport(
|
||||
datanode1.getUuid(), storagePath1, 100, 10, 90, null);
|
||||
StorageReportProto storageTwo = TestUtils.createStorageReport(
|
||||
datanode2.getUuid(), storagePath2, 200, 20, 180, null);
|
||||
nodeReportHandler.onMessage(getNodeReport(datanode1, storageOne),
|
||||
Mockito.mock(EventPublisher.class));
|
||||
nodeReportHandler.onMessage(getNodeReport(datanode2, storageTwo),
|
||||
Mockito.mock(EventPublisher.class));
|
||||
|
||||
ContainerInfo container1 =
|
||||
TestUtils.allocateContainer(containerStateManager);
|
||||
registerReplicas(node2ContainerMap, datanode1, container1);
|
||||
|
||||
SCMNodeStat stat = nodeManager.getStats();
|
||||
Assert.assertTrue(stat.getCapacity().get() == 300);
|
||||
Assert.assertTrue(stat.getRemaining().get() == 270);
|
||||
Assert.assertTrue(stat.getScmUsed().get() == 30);
|
||||
|
||||
SCMNodeMetric nodeStat = nodeManager.getNodeStat(datanode1);
|
||||
Assert.assertTrue(nodeStat.get().getCapacity().get() == 100);
|
||||
Assert.assertTrue(nodeStat.get().getRemaining().get() == 90);
|
||||
Assert.assertTrue(nodeStat.get().getScmUsed().get() == 10);
|
||||
|
||||
//WHEN datanode1 is dead.
|
||||
eventQueue.fireEvent(SCMEvents.DEAD_NODE, datanode1);
|
||||
Thread.sleep(100);
|
||||
|
||||
//THEN statistics in SCM should changed.
|
||||
stat = nodeManager.getStats();
|
||||
Assert.assertTrue(stat.getCapacity().get() == 200);
|
||||
Assert.assertTrue(stat.getRemaining().get() == 180);
|
||||
Assert.assertTrue(stat.getScmUsed().get() == 20);
|
||||
|
||||
nodeStat = nodeManager.getNodeStat(datanode1);
|
||||
Assert.assertTrue(nodeStat.get().getCapacity().get() == 0);
|
||||
Assert.assertTrue(nodeStat.get().getRemaining().get() == 0);
|
||||
Assert.assertTrue(nodeStat.get().getScmUsed().get() == 0);
|
||||
}
|
||||
|
||||
private void registerReplicas(ContainerStateManager csm,
|
||||
ContainerInfo container, DatanodeDetails... datanodes) {
|
||||
containerStateManager.getContainerStateMap()
|
||||
csm.getContainerStateMap()
|
||||
.addContainerReplica(new ContainerID(container.getContainerID()),
|
||||
datanodes);
|
||||
}
|
||||
|
||||
private void registerReplicas(Node2ContainerMap node2ContainerMap,
|
||||
private void registerReplicas(Node2ContainerMap node2ConMap,
|
||||
DatanodeDetails datanode,
|
||||
ContainerInfo... containers)
|
||||
throws SCMException {
|
||||
node2ContainerMap
|
||||
node2ConMap
|
||||
.insertNewDatanode(datanode.getUuid(),
|
||||
Arrays.stream(containers)
|
||||
.map(container -> new ContainerID(container.getContainerID()))
|
||||
.collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
private NodeReportFromDatanode getNodeReport(DatanodeDetails dn,
|
||||
StorageReportProto... reports) {
|
||||
NodeReportProto nodeReportProto = TestUtils.createNodeReport(reports);
|
||||
return new NodeReportFromDatanode(dn, nodeReportProto);
|
||||
}
|
||||
}
|
@ -307,4 +307,13 @@ public void onMessage(CommandForDatanode commandForDatanode,
|
||||
EventPublisher publisher) {
|
||||
// do nothing.
|
||||
}
|
||||
|
||||
/**
|
||||
* Empty implementation for processDeadNode.
|
||||
* @param dnUuid
|
||||
*/
|
||||
@Override
|
||||
public void processDeadNode(UUID dnUuid) {
|
||||
// do nothing.
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user