YARN-1101. Active nodes can be decremented below 0 (Robert Parker via tgraves_

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1518384 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2013-08-28 21:09:08 +00:00
parent 82fc0f1855
commit 56c1b9de0c
3 changed files with 64 additions and 5 deletions

View File

@ -117,6 +117,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-602. Fixed NodeManager to not let users override some mandatory
environmental variables. (Kenji Kikushima via vinodkv)
YARN-1101. Active nodes can be decremented below 0 (Robert Parker
via tgraves)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES
@ -1235,6 +1238,9 @@ Release 0.23.10 - UNRELEASED
YARN-337. RM handles killed application tracking URL poorly (jlowe)
YARN-1101. Active nodes can be decremented below 0 (Robert Parker
via tgraves)
Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES

View File

@ -393,9 +393,18 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
}
}
private void updateMetricsForDeactivatedNode(NodeState finalState) {
private void updateMetricsForDeactivatedNode(NodeState initialState,
NodeState finalState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
metrics.decrNumActiveNodes();
switch (initialState) {
case RUNNING:
metrics.decrNumActiveNodes();
break;
case UNHEALTHY:
metrics.decrNumUnhealthyNMs();
break;
}
switch (finalState) {
case DECOMMISSIONED:
@ -505,7 +514,8 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// If the current state is NodeState.UNHEALTHY
// Then node is already been removed from the
// Scheduler
if (!rmNode.getState().equals(NodeState.UNHEALTHY)) {
NodeState initialState = rmNode.getState();
if (!initialState.equals(NodeState.UNHEALTHY)) {
rmNode.context.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));
}
@ -520,7 +530,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(finalState);
rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
}
}
@ -550,7 +560,8 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode));
// Update metrics
rmNode.updateMetricsForDeactivatedNode(NodeState.UNHEALTHY);
rmNode.updateMetricsForDeactivatedNode(rmNode.getState(),
NodeState.UNHEALTHY);
return NodeState.UNHEALTHY;
}

View File

@ -267,7 +267,21 @@ public void testRunningExpire() {
@Test
public void testUnhealthyExpire() {
RMNodeImpl node = getUnhealthyNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost + 1, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy - 1, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.LOST, node.getState());
}
@ -291,8 +305,22 @@ public void testRunningDecommission() {
@Test
public void testUnhealthyDecommission() {
RMNodeImpl node = getUnhealthyNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.DECOMMISSION));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy - 1, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned + 1, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@ -307,8 +335,22 @@ public void testRunningRebooting() {
@Test
public void testUnhealthyRebooting() {
RMNodeImpl node = getUnhealthyNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.REBOOTING));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy - 1, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted + 1, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.REBOOTED, node.getState());
}