YARN-3226. UI changes for decommissioning node. Contributed by Sunil G.
This commit is contained in:
parent
5cb1e0118b
commit
1de56b0448
@ -280,6 +280,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-3623. Add a new config to indicate the Timeline Service version.
|
||||
(Xuan Gong via junping_du)
|
||||
|
||||
YARN-3226. UI changes for decommissioning node. (Sunil G via
|
||||
junping_du)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
@ -40,6 +40,7 @@ public class ClusterMetrics {
|
||||
private static AtomicBoolean isInitialized = new AtomicBoolean(false);
|
||||
|
||||
@Metric("# of active NMs") MutableGaugeInt numActiveNMs;
|
||||
@Metric("# of decommissioning NMs") MutableGaugeInt numDecommissioningNMs;
|
||||
@Metric("# of decommissioned NMs") MutableGaugeInt numDecommissionedNMs;
|
||||
@Metric("# of lost NMs") MutableGaugeInt numLostNMs;
|
||||
@Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
|
||||
@ -87,6 +88,23 @@ public int getNumActiveNMs() {
|
||||
return numActiveNMs.value();
|
||||
}
|
||||
|
||||
// Decommissioning NMs
|
||||
public int getNumDecommissioningNMs() {
|
||||
return numDecommissioningNMs.value();
|
||||
}
|
||||
|
||||
public void incrDecommissioningNMs() {
|
||||
numDecommissioningNMs.incr();
|
||||
}
|
||||
|
||||
public void setDecommissioningNMs(int num) {
|
||||
numDecommissioningNMs.set(num);
|
||||
}
|
||||
|
||||
public void decrDecommissioningNMs() {
|
||||
numDecommissioningNMs.decr();
|
||||
}
|
||||
|
||||
//Decommisioned NMs
|
||||
public int getNumDecommisionedNMs() {
|
||||
return numDecommissionedNMs.value();
|
||||
|
@ -647,13 +647,34 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
|
||||
}
|
||||
}
|
||||
|
||||
// Treats nodes in decommissioning as active nodes
|
||||
// TODO we may want to differentiate active nodes and decommissioning node in
|
||||
// metrics later.
|
||||
private void updateMetricsForGracefulDecommissionOnUnhealthyNode() {
|
||||
// Update metrics when moving to Decommissioning state
|
||||
private void updateMetricsForGracefulDecommission(NodeState initialState,
|
||||
NodeState finalState) {
|
||||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||
metrics.incrNumActiveNodes();
|
||||
switch (initialState) {
|
||||
case UNHEALTHY :
|
||||
metrics.decrNumUnhealthyNMs();
|
||||
break;
|
||||
case RUNNING :
|
||||
metrics.decrNumActiveNodes();
|
||||
break;
|
||||
case DECOMMISSIONING :
|
||||
metrics.decrDecommissioningNMs();
|
||||
break;
|
||||
default :
|
||||
LOG.warn("Unexpcted initial state");
|
||||
}
|
||||
|
||||
switch (finalState) {
|
||||
case DECOMMISSIONING :
|
||||
metrics.incrDecommissioningNMs();
|
||||
break;
|
||||
case RUNNING :
|
||||
metrics.incrNumActiveNodes();
|
||||
break;
|
||||
default :
|
||||
LOG.warn("Unexpected final state");
|
||||
}
|
||||
}
|
||||
|
||||
private void updateMetricsForDeactivatedNode(NodeState initialState,
|
||||
@ -665,13 +686,13 @@ private void updateMetricsForDeactivatedNode(NodeState initialState,
|
||||
metrics.decrNumActiveNodes();
|
||||
break;
|
||||
case DECOMMISSIONING:
|
||||
metrics.decrNumActiveNodes();
|
||||
metrics.decrDecommissioningNMs();
|
||||
break;
|
||||
case UNHEALTHY:
|
||||
metrics.decrNumUnhealthyNMs();
|
||||
break;
|
||||
default:
|
||||
LOG.debug("Unexpected inital state");
|
||||
LOG.warn("Unexpected initial state");
|
||||
}
|
||||
|
||||
switch (finalState) {
|
||||
@ -691,7 +712,7 @@ private void updateMetricsForDeactivatedNode(NodeState initialState,
|
||||
metrics.incrNumShutdownNMs();
|
||||
break;
|
||||
default:
|
||||
LOG.debug("Unexpected final state");
|
||||
LOG.warn("Unexpected final state");
|
||||
}
|
||||
}
|
||||
|
||||
@ -1014,9 +1035,8 @@ public DecommissioningNodeTransition(NodeState initState,
|
||||
@Override
|
||||
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
|
||||
if (initState.equals(NodeState.UNHEALTHY)) {
|
||||
rmNode.updateMetricsForGracefulDecommissionOnUnhealthyNode();
|
||||
}
|
||||
// Update NM metrics during graceful decommissioning.
|
||||
rmNode.updateMetricsForGracefulDecommission(initState, finalState);
|
||||
// TODO (in YARN-3223) Keep NM's available resource to be 0
|
||||
}
|
||||
}
|
||||
@ -1033,6 +1053,8 @@ public RecommissionNodeTransition(NodeState finalState) {
|
||||
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " +
|
||||
"recommissioned back to RUNNING.");
|
||||
rmNode
|
||||
.updateMetricsForGracefulDecommission(rmNode.getState(), finalState);
|
||||
// TODO handle NM resource resume in YARN-3223.
|
||||
}
|
||||
}
|
||||
|
@ -53,8 +53,7 @@ protected void render(Block html) {
|
||||
//CSS in the correct spot
|
||||
html.style(".metrics {margin-bottom:5px}");
|
||||
|
||||
ClusterMetricsInfo clusterMetrics =
|
||||
new ClusterMetricsInfo(this.rm);
|
||||
ClusterMetricsInfo clusterMetrics = new ClusterMetricsInfo(this.rm);
|
||||
|
||||
DIV<Hamlet> div = html.div().$class("metrics");
|
||||
|
||||
@ -73,12 +72,6 @@ protected void render(Block html) {
|
||||
th().$class("ui-state-default")._("VCores Used")._().
|
||||
th().$class("ui-state-default")._("VCores Total")._().
|
||||
th().$class("ui-state-default")._("VCores Reserved")._().
|
||||
th().$class("ui-state-default")._("Active Nodes")._().
|
||||
th().$class("ui-state-default")._("Decommissioned Nodes")._().
|
||||
th().$class("ui-state-default")._("Lost Nodes")._().
|
||||
th().$class("ui-state-default")._("Unhealthy Nodes")._().
|
||||
th().$class("ui-state-default")._("Rebooted Nodes")._().
|
||||
th().$class("ui-state-default")._("Shutdown Nodes")._().
|
||||
_().
|
||||
_().
|
||||
tbody().$class("ui-widget-content").
|
||||
@ -99,7 +92,26 @@ protected void render(Block html) {
|
||||
td(String.valueOf(clusterMetrics.getAllocatedVirtualCores())).
|
||||
td(String.valueOf(clusterMetrics.getTotalVirtualCores())).
|
||||
td(String.valueOf(clusterMetrics.getReservedVirtualCores())).
|
||||
_().
|
||||
_()._();
|
||||
|
||||
div.h3("Cluster Nodes Metrics").
|
||||
table("#nodemetricsoverview").
|
||||
thead().$class("ui-widget-header").
|
||||
tr().
|
||||
th().$class("ui-state-default")._("Active Nodes")._().
|
||||
th().$class("ui-state-default")._("Decommissioning Nodes")._().
|
||||
th().$class("ui-state-default")._("Decommissioned Nodes")._().
|
||||
th().$class("ui-state-default")._("Lost Nodes")._().
|
||||
th().$class("ui-state-default")._("Unhealthy Nodes")._().
|
||||
th().$class("ui-state-default")._("Rebooted Nodes")._().
|
||||
th().$class("ui-state-default")._("Shutdown Nodes")._().
|
||||
_().
|
||||
_().
|
||||
tbody().$class("ui-widget-content").
|
||||
tr().
|
||||
td().a(url("nodes"),String.valueOf(clusterMetrics.getActiveNodes()))._().
|
||||
td().a(url("nodes/decommissioning"), String.valueOf(clusterMetrics.getDecommissioningNodes()))._().
|
||||
td().a(url("nodes/decommissioned"),String.valueOf(clusterMetrics.getDecommissionedNodes()))._().
|
||||
td().a(url("nodes/lost"),String.valueOf(clusterMetrics.getLostNodes()))._().
|
||||
td().a(url("nodes/unhealthy"),String.valueOf(clusterMetrics.getUnhealthyNodes()))._().
|
||||
|
@ -94,6 +94,9 @@ protected void render(Block html) {
|
||||
rmNodes = this.rm.getRMContext().getInactiveRMNodes().values();
|
||||
isInactive = true;
|
||||
break;
|
||||
case DECOMMISSIONING:
|
||||
// Do nothing
|
||||
break;
|
||||
default:
|
||||
LOG.debug("Unexpected state filter for inactive RM node");
|
||||
}
|
||||
|
@ -54,6 +54,7 @@ public class ClusterMetricsInfo {
|
||||
protected int totalNodes;
|
||||
protected int lostNodes;
|
||||
protected int unhealthyNodes;
|
||||
protected int decommissioningNodes;
|
||||
protected int decommissionedNodes;
|
||||
protected int rebootedNodes;
|
||||
protected int activeNodes;
|
||||
@ -91,6 +92,7 @@ public ClusterMetricsInfo(final ResourceManager rm) {
|
||||
this.activeNodes = clusterMetrics.getNumActiveNMs();
|
||||
this.lostNodes = clusterMetrics.getNumLostNMs();
|
||||
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
|
||||
this.decommissioningNodes = clusterMetrics.getNumDecommissioningNMs();
|
||||
this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
|
||||
this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
|
||||
this.shutdownNodes = clusterMetrics.getNumShutdownNMs();
|
||||
@ -186,6 +188,10 @@ public int getUnhealthyNodes() {
|
||||
return this.unhealthyNodes;
|
||||
}
|
||||
|
||||
public int getDecommissioningNodes() {
|
||||
return this.decommissioningNodes;
|
||||
}
|
||||
|
||||
public int getDecommissionedNodes() {
|
||||
return this.decommissionedNodes;
|
||||
}
|
||||
|
@ -236,29 +236,49 @@ public void testExpiredContainer() {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStatusUpdateOnDecommissioningNode(){
|
||||
public void testStatusUpdateOnDecommissioningNode() {
|
||||
RMNodeImpl node = getDecommissioningNode();
|
||||
ClusterMetrics cm = ClusterMetrics.getMetrics();
|
||||
int initialActive = cm.getNumActiveNMs();
|
||||
int initialDecommissioning = cm.getNumDecommissioningNMs();
|
||||
int initialDecommissioned = cm.getNumDecommisionedNMs();
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
|
||||
// Verify node in DECOMMISSIONING won't be changed by status update
|
||||
// with running apps
|
||||
RMNodeStatusEvent statusEvent = getMockRMNodeStatusEventWithRunningApps();
|
||||
node.handle(statusEvent);
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
|
||||
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
|
||||
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning,
|
||||
cm.getNumDecommissioningNMs());
|
||||
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
|
||||
cm.getNumDecommisionedNMs());
|
||||
|
||||
// Verify node in DECOMMISSIONING will be changed by status update
|
||||
// without running apps
|
||||
statusEvent = getMockRMNodeStatusEventWithoutRunningApps();
|
||||
node.handle(statusEvent);
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
|
||||
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
|
||||
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
|
||||
cm.getNumDecommissioningNMs());
|
||||
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1,
|
||||
cm.getNumDecommisionedNMs());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecommissionNode(){
|
||||
public void testRecommissionNode() {
|
||||
RMNodeImpl node = getDecommissioningNode();
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
|
||||
node.handle(new RMNodeEvent(node.getNodeID(),
|
||||
RMNodeEventType.RECOMMISSION));
|
||||
ClusterMetrics cm = ClusterMetrics.getMetrics();
|
||||
int initialActive = cm.getNumActiveNMs();
|
||||
int initialDecommissioning = cm.getNumDecommissioningNMs();
|
||||
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.RECOMMISSION));
|
||||
Assert.assertEquals(NodeState.RUNNING, node.getState());
|
||||
Assert
|
||||
.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
|
||||
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
|
||||
cm.getNumDecommissioningNMs());
|
||||
}
|
||||
|
||||
@Test (timeout = 5000)
|
||||
@ -481,16 +501,18 @@ public void testDecommissionOnDecommissioningNode() {
|
||||
int initialUnhealthy = cm.getUnhealthyNMs();
|
||||
int initialDecommissioned = cm.getNumDecommisionedNMs();
|
||||
int initialRebooted = cm.getNumRebootedNMs();
|
||||
node.handle(new RMNodeEvent(node.getNodeID(),
|
||||
RMNodeEventType.DECOMMISSION));
|
||||
Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
|
||||
int initialDecommissioning = cm.getNumDecommissioningNMs();
|
||||
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.DECOMMISSION));
|
||||
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
|
||||
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
|
||||
Assert.assertEquals("Unhealthy Nodes",
|
||||
initialUnhealthy, cm.getUnhealthyNMs());
|
||||
Assert.assertEquals("Decommissioned Nodes",
|
||||
initialDecommissioned + 1, cm.getNumDecommisionedNMs());
|
||||
Assert.assertEquals("Rebooted Nodes",
|
||||
initialRebooted, cm.getNumRebootedNMs());
|
||||
Assert.assertEquals("Unhealthy Nodes", initialUnhealthy,
|
||||
cm.getUnhealthyNMs());
|
||||
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
|
||||
cm.getNumDecommissioningNMs());
|
||||
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1,
|
||||
cm.getNumDecommisionedNMs());
|
||||
Assert.assertEquals("Rebooted Nodes", initialRebooted,
|
||||
cm.getNumRebootedNMs());
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
|
||||
}
|
||||
|
||||
@ -525,16 +547,19 @@ public void testUnhealthyDecommissioning() {
|
||||
int initialLost = cm.getNumLostNMs();
|
||||
int initialUnhealthy = cm.getUnhealthyNMs();
|
||||
int initialDecommissioned = cm.getNumDecommisionedNMs();
|
||||
int initialDecommissioning = cm.getNumDecommissioningNMs();
|
||||
int initialRebooted = cm.getNumRebootedNMs();
|
||||
node.handle(new RMNodeEvent(node.getNodeID(),
|
||||
RMNodeEventType.GRACEFUL_DECOMMISSION));
|
||||
Assert.assertEquals("Active Nodes", initialActive + 1,
|
||||
Assert.assertEquals("Active Nodes", initialActive,
|
||||
cm.getNumActiveNMs());
|
||||
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
|
||||
Assert.assertEquals("Unhealthy Nodes",
|
||||
initialUnhealthy - 1, cm.getUnhealthyNMs());
|
||||
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
|
||||
cm.getNumDecommisionedNMs());
|
||||
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
|
||||
cm.getNumDecommissioningNMs());
|
||||
Assert.assertEquals("Rebooted Nodes",
|
||||
initialRebooted, cm.getNumRebootedNMs());
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
|
||||
@ -681,9 +706,16 @@ private RMNodeImpl getRunningNode(String nmVersion, int port) {
|
||||
|
||||
private RMNodeImpl getDecommissioningNode() {
|
||||
RMNodeImpl node = getRunningNode();
|
||||
ClusterMetrics cm = ClusterMetrics.getMetrics();
|
||||
int initialActive = cm.getNumActiveNMs();
|
||||
int initialDecommissioning = cm.getNumDecommissioningNMs();
|
||||
node.handle(new RMNodeEvent(node.getNodeID(),
|
||||
RMNodeEventType.GRACEFUL_DECOMMISSION));
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
|
||||
Assert
|
||||
.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
|
||||
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
|
||||
cm.getNumDecommissioningNMs());
|
||||
return node;
|
||||
}
|
||||
|
||||
@ -774,16 +806,30 @@ public void testReconnect() {
|
||||
@Test
|
||||
public void testReconnectOnDecommissioningNode() {
|
||||
RMNodeImpl node = getDecommissioningNode();
|
||||
ClusterMetrics cm = ClusterMetrics.getMetrics();
|
||||
int initialActive = cm.getNumActiveNMs();
|
||||
int initialDecommissioning = cm.getNumDecommissioningNMs();
|
||||
int initialDecommissioned = cm.getNumDecommisionedNMs();
|
||||
|
||||
// Reconnect event with running app
|
||||
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
|
||||
getAppIdList(), null));
|
||||
// still decommissioning
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
|
||||
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
|
||||
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning,
|
||||
cm.getNumDecommissioningNMs());
|
||||
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
|
||||
cm.getNumDecommisionedNMs());
|
||||
|
||||
// Reconnect event without any running app
|
||||
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
|
||||
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
|
||||
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
|
||||
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
|
||||
cm.getNumDecommissioningNMs());
|
||||
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1,
|
||||
cm.getNumDecommisionedNMs());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -846,17 +892,26 @@ public void testResourceUpdateOnNewNode() {
|
||||
@Test
|
||||
public void testResourceUpdateOnRebootedNode() {
|
||||
RMNodeImpl node = getRebootedNode();
|
||||
ClusterMetrics cm = ClusterMetrics.getMetrics();
|
||||
int initialActive = cm.getNumActiveNMs();
|
||||
int initialUnHealthy = cm.getUnhealthyNMs();
|
||||
int initialDecommissioning = cm.getNumDecommissioningNMs();
|
||||
Resource oldCapacity = node.getTotalCapability();
|
||||
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
|
||||
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
|
||||
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
|
||||
ResourceOption.newInstance(Resource.newInstance(2048, 2),
|
||||
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), ResourceOption
|
||||
.newInstance(Resource.newInstance(2048, 2),
|
||||
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
|
||||
Resource newCapacity = node.getTotalCapability();
|
||||
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
|
||||
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
|
||||
|
||||
Assert.assertEquals(NodeState.REBOOTED, node.getState());
|
||||
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
|
||||
Assert.assertEquals("Unhelathy Nodes", initialUnHealthy,
|
||||
cm.getUnhealthyNMs());
|
||||
Assert.assertEquals("Decommissioning Nodes", initialDecommissioning,
|
||||
cm.getNumDecommissioningNMs());
|
||||
}
|
||||
|
||||
// Test unhealthy report on a decommissioning node will make it
|
||||
|
@ -47,7 +47,7 @@ public class TestNodesPage {
|
||||
|
||||
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
|
||||
// future. In that case this value should be adjusted to the new value.
|
||||
final int numberOfThInMetricsTable = 22;
|
||||
final int numberOfThInMetricsTable = 23;
|
||||
final int numberOfActualTableHeaders = 13;
|
||||
|
||||
private Injector injector;
|
||||
|
@ -429,7 +429,7 @@ public void verifyClusterMetricsJSON(JSONObject json) throws JSONException,
|
||||
Exception {
|
||||
assertEquals("incorrect number of elements", 1, json.length());
|
||||
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
|
||||
assertEquals("incorrect number of elements", 24, clusterinfo.length());
|
||||
assertEquals("incorrect number of elements", 25, clusterinfo.length());
|
||||
verifyClusterMetrics(
|
||||
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
|
||||
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
|
||||
|
Loading…
Reference in New Issue
Block a user