YARN-11360: Add number of decommissioning/shutdown nodes to YARN cluster metrics. (#5060)
(cherry picked from commit bfb84cd7f6
)
This commit is contained in:
parent
6accb7809f
commit
33293d4ba4
@ -53,6 +53,20 @@ public static YarnClusterMetrics newInstance(int numNodeManagers) {
|
||||
@Unstable
|
||||
public abstract void setNumNodeManagers(int numNodeManagers);
|
||||
|
||||
/**
|
||||
* Get the number of <code>DecommissioningNodeManager</code>s in the cluster.
|
||||
*
|
||||
* @return number of <code>DecommissioningNodeManager</code>s in the cluster
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract int getNumDecommissioningNodeManagers();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setNumDecommissioningNodeManagers(
|
||||
int numDecommissioningNodeManagers);
|
||||
|
||||
/**
|
||||
* Get the number of <code>DecommissionedNodeManager</code>s in the cluster.
|
||||
*
|
||||
@ -119,4 +133,16 @@ public abstract void setNumDecommissionedNodeManagers(
|
||||
@Unstable
|
||||
public abstract void setNumRebootedNodeManagers(int numRebootedNodeManagers);
|
||||
|
||||
/**
|
||||
* Get the number of <code>ShutdownNodeManager</code>s in the cluster.
|
||||
*
|
||||
* @return number of <code>ShutdownNodeManager</code>s in the cluster
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract int getNumShutdownNodeManagers();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setNumShutdownNodeManagers(int numShutdownNodeManagers);
|
||||
}
|
||||
|
@ -571,6 +571,8 @@ message YarnClusterMetricsProto {
|
||||
optional int32 num_lost_nms = 4;
|
||||
optional int32 num_unhealthy_nms = 5;
|
||||
optional int32 num_rebooted_nms = 6;
|
||||
optional int32 num_decommissioning_nms = 7;
|
||||
optional int32 num_shutdown_nms = 8;
|
||||
}
|
||||
|
||||
enum QueueStateProto {
|
||||
|
@ -339,9 +339,11 @@ private static class NodesInformation {
|
||||
int totalNodes;
|
||||
int runningNodes;
|
||||
int unhealthyNodes;
|
||||
int decommissioningNodes;
|
||||
int decommissionedNodes;
|
||||
int lostNodes;
|
||||
int rebootedNodes;
|
||||
int shutdownNodes;
|
||||
}
|
||||
|
||||
private static class QueueMetrics {
|
||||
@ -696,6 +698,8 @@ protected NodesInformation getNodesInfo() {
|
||||
return nodeInfo;
|
||||
}
|
||||
|
||||
nodeInfo.decommissioningNodes =
|
||||
yarnClusterMetrics.getNumDecommissioningNodeManagers();
|
||||
nodeInfo.decommissionedNodes =
|
||||
yarnClusterMetrics.getNumDecommissionedNodeManagers();
|
||||
nodeInfo.totalNodes = yarnClusterMetrics.getNumNodeManagers();
|
||||
@ -703,6 +707,7 @@ protected NodesInformation getNodesInfo() {
|
||||
nodeInfo.lostNodes = yarnClusterMetrics.getNumLostNodeManagers();
|
||||
nodeInfo.unhealthyNodes = yarnClusterMetrics.getNumUnhealthyNodeManagers();
|
||||
nodeInfo.rebootedNodes = yarnClusterMetrics.getNumRebootedNodeManagers();
|
||||
nodeInfo.shutdownNodes = yarnClusterMetrics.getNumShutdownNodeManagers();
|
||||
return nodeInfo;
|
||||
}
|
||||
|
||||
@ -880,11 +885,11 @@ String getHeader(QueueMetrics queueMetrics, NodesInformation nodes) {
|
||||
ret.append(CLEAR_LINE)
|
||||
.append(limitLineLength(String.format(
|
||||
"NodeManager(s)"
|
||||
+ ": %d total, %d active, %d unhealthy, %d decommissioned,"
|
||||
+ " %d lost, %d rebooted%n",
|
||||
+ ": %d total, %d active, %d unhealthy, %d decommissioning,"
|
||||
+ " %d decommissioned, %d lost, %d rebooted, %d shutdown%n",
|
||||
nodes.totalNodes, nodes.runningNodes, nodes.unhealthyNodes,
|
||||
nodes.decommissionedNodes, nodes.lostNodes,
|
||||
nodes.rebootedNodes), terminalWidth, true));
|
||||
nodes.decommissioningNodes, nodes.decommissionedNodes, nodes.lostNodes,
|
||||
nodes.rebootedNodes, nodes.shutdownNodes), terminalWidth, true));
|
||||
|
||||
ret.append(CLEAR_LINE)
|
||||
.append(limitLineLength(String.format(
|
||||
@ -1039,7 +1044,8 @@ protected void showFieldsScreen() {
|
||||
}
|
||||
}
|
||||
|
||||
protected void showTopScreen() {
|
||||
@VisibleForTesting
|
||||
void showTopScreen() {
|
||||
List<ApplicationInformation> appsInfo = new ArrayList<>();
|
||||
List<ApplicationReport> apps;
|
||||
try {
|
||||
|
@ -18,7 +18,12 @@
|
||||
|
||||
package org.apache.hadoop.yarn.client.cli;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
@ -27,9 +32,13 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -47,6 +56,9 @@ public class TestTopCLI {
|
||||
|
||||
private static Map<String, String> savedStaticResolution = new HashMap<>();
|
||||
|
||||
private PrintStream stdout;
|
||||
private PrintStream stderr;
|
||||
|
||||
@BeforeClass
|
||||
public static void initializeDummyHostnameResolution() throws Exception {
|
||||
String previousIpAddress;
|
||||
@ -68,6 +80,18 @@ public static void restoreDummyHostnameResolution() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
this.stdout = System.out;
|
||||
this.stderr = System.err;
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
System.setOut(this.stdout);
|
||||
System.setErr(this.stderr);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHAClusterInfoURL() throws IOException, InterruptedException {
|
||||
TopCLI topcli = new TopCLI();
|
||||
@ -103,4 +127,44 @@ public void testHAClusterInfoURL() throws IOException, InterruptedException {
|
||||
Assert.assertEquals("https", clusterUrl.getProtocol());
|
||||
Assert.assertEquals(rm1Address, clusterUrl.getAuthority());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHeaderNodeManagers() throws Exception {
|
||||
YarnClusterMetrics ymetrics = mock(YarnClusterMetrics.class);
|
||||
when(ymetrics.getNumNodeManagers()).thenReturn(0);
|
||||
when(ymetrics.getNumDecommissioningNodeManagers()).thenReturn(1);
|
||||
when(ymetrics.getNumDecommissionedNodeManagers()).thenReturn(2);
|
||||
when(ymetrics.getNumActiveNodeManagers()).thenReturn(3);
|
||||
when(ymetrics.getNumLostNodeManagers()).thenReturn(4);
|
||||
when(ymetrics.getNumUnhealthyNodeManagers()).thenReturn(5);
|
||||
when(ymetrics.getNumRebootedNodeManagers()).thenReturn(6);
|
||||
when(ymetrics.getNumShutdownNodeManagers()).thenReturn(7);
|
||||
|
||||
YarnClient client = mock(YarnClient.class);
|
||||
when(client.getYarnClusterMetrics()).thenReturn(ymetrics);
|
||||
|
||||
TopCLI topcli = new TopCLI() {
|
||||
@Override protected void createAndStartYarnClient() {
|
||||
}
|
||||
};
|
||||
topcli.setClient(client);
|
||||
topcli.terminalWidth = 200;
|
||||
|
||||
String actual;
|
||||
try (ByteArrayOutputStream outStream = new ByteArrayOutputStream();
|
||||
PrintStream out = new PrintStream(outStream)) {
|
||||
System.setOut(out);
|
||||
System.setErr(out);
|
||||
topcli.showTopScreen();
|
||||
out.flush();
|
||||
actual = outStream.toString("UTF-8");
|
||||
}
|
||||
|
||||
String expected = "NodeManager(s)"
|
||||
+ ": 0 total, 3 active, 5 unhealthy, 1 decommissioning,"
|
||||
+ " 2 decommissioned, 4 lost, 6 rebooted, 7 shutdown";
|
||||
Assert.assertTrue(
|
||||
String.format("Expected output to contain [%s], actual output was [%s].", expected, actual),
|
||||
actual.contains(expected));
|
||||
}
|
||||
}
|
@ -89,6 +89,22 @@ public void setNumNodeManagers(int numNodeManagers) {
|
||||
builder.setNumNodeManagers((numNodeManagers));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumDecommissioningNodeManagers() {
|
||||
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (p.hasNumDecommissioningNms()) {
|
||||
return (p.getNumDecommissioningNms());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void
|
||||
setNumDecommissioningNodeManagers(int numDecommissioningNodeManagers) {
|
||||
maybeInitBuilder();
|
||||
builder.setNumDecommissioningNms(numDecommissioningNodeManagers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumDecommissionedNodeManagers() {
|
||||
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
|
||||
@ -165,4 +181,19 @@ public void setNumRebootedNodeManagers(int numRebootedNodeManagers) {
|
||||
maybeInitBuilder();
|
||||
builder.setNumRebootedNms((numRebootedNodeManagers));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumShutdownNodeManagers() {
|
||||
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (p.hasNumShutdownNms()) {
|
||||
return (p.getNumShutdownNms());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNumShutdownNodeManagers(int numShutdownNodeManagers) {
|
||||
maybeInitBuilder();
|
||||
builder.setNumShutdownNms(numShutdownNodeManagers);
|
||||
}
|
||||
}
|
@ -861,12 +861,14 @@ public GetClusterMetricsResponse getClusterMetrics(
|
||||
.newRecordInstance(YarnClusterMetrics.class);
|
||||
ymetrics.setNumNodeManagers(this.rmContext.getRMNodes().size());
|
||||
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
|
||||
ymetrics.setNumDecommissioningNodeManagers(clusterMetrics.getNumDecommissioningNMs());
|
||||
ymetrics.setNumDecommissionedNodeManagers(clusterMetrics
|
||||
.getNumDecommisionedNMs());
|
||||
ymetrics.setNumActiveNodeManagers(clusterMetrics.getNumActiveNMs());
|
||||
ymetrics.setNumLostNodeManagers(clusterMetrics.getNumLostNMs());
|
||||
ymetrics.setNumUnhealthyNodeManagers(clusterMetrics.getUnhealthyNMs());
|
||||
ymetrics.setNumRebootedNodeManagers(clusterMetrics.getNumRebootedNMs());
|
||||
ymetrics.setNumShutdownNodeManagers(clusterMetrics.getNumShutdownNMs());
|
||||
response.setClusterMetrics(ymetrics);
|
||||
return response;
|
||||
}
|
||||
|
@ -60,6 +60,7 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
@ -76,6 +77,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
|
||||
@ -140,6 +142,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
@ -2809,10 +2812,63 @@ protected ClientRMService createClientRMService() {
|
||||
rm.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetClusterMetrics() throws Exception {
|
||||
MockRM rm = new MockRM() {
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(this.rmContext, scheduler,
|
||||
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
|
||||
this.getRMContext().getRMDelegationTokenSecretManager());
|
||||
};
|
||||
};
|
||||
rm.start();
|
||||
|
||||
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
|
||||
clusterMetrics.incrDecommissioningNMs();
|
||||
repeat(2, clusterMetrics::incrDecommisionedNMs);
|
||||
repeat(3, clusterMetrics::incrNumActiveNodes);
|
||||
repeat(4, clusterMetrics::incrNumLostNMs);
|
||||
repeat(5, clusterMetrics::incrNumUnhealthyNMs);
|
||||
repeat(6, clusterMetrics::incrNumRebootedNMs);
|
||||
repeat(7, clusterMetrics::incrNumShutdownNMs);
|
||||
|
||||
// Create a client.
|
||||
Configuration conf = new Configuration();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
|
||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||
ApplicationClientProtocol client =
|
||||
(ApplicationClientProtocol) rpc
|
||||
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
|
||||
|
||||
YarnClusterMetrics ymetrics = client.getClusterMetrics(
|
||||
GetClusterMetricsRequest.newInstance()).getClusterMetrics();
|
||||
|
||||
Assert.assertEquals(0, ymetrics.getNumNodeManagers());
|
||||
Assert.assertEquals(1, ymetrics.getNumDecommissioningNodeManagers());
|
||||
Assert.assertEquals(2, ymetrics.getNumDecommissionedNodeManagers());
|
||||
Assert.assertEquals(3, ymetrics.getNumActiveNodeManagers());
|
||||
Assert.assertEquals(4, ymetrics.getNumLostNodeManagers());
|
||||
Assert.assertEquals(5, ymetrics.getNumUnhealthyNodeManagers());
|
||||
Assert.assertEquals(6, ymetrics.getNumRebootedNodeManagers());
|
||||
Assert.assertEquals(7, ymetrics.getNumShutdownNodeManagers());
|
||||
|
||||
rpc.stopProxy(client, conf);
|
||||
rm.close();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown(){
|
||||
if (resourceTypesFile != null && resourceTypesFile.exists()) {
|
||||
resourceTypesFile.delete();
|
||||
}
|
||||
ClusterMetrics.destroy();
|
||||
DefaultMetricsSystem.shutdown();
|
||||
}
|
||||
|
||||
private static void repeat(int n, Runnable r) {
|
||||
for (int i = 0; i < n; ++i) {
|
||||
r.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -39,6 +39,9 @@ public static GetClusterMetricsResponse merge(
|
||||
tmp.getNumNodeManagers() + metrics.getNumNodeManagers());
|
||||
tmp.setNumActiveNodeManagers(
|
||||
tmp.getNumActiveNodeManagers() + metrics.getNumActiveNodeManagers());
|
||||
tmp.setNumDecommissioningNodeManagers(
|
||||
tmp.getNumDecommissioningNodeManagers() + metrics
|
||||
.getNumDecommissioningNodeManagers());
|
||||
tmp.setNumDecommissionedNodeManagers(
|
||||
tmp.getNumDecommissionedNodeManagers() + metrics
|
||||
.getNumDecommissionedNodeManagers());
|
||||
@ -49,6 +52,9 @@ public static GetClusterMetricsResponse merge(
|
||||
tmp.setNumUnhealthyNodeManagers(
|
||||
tmp.getNumUnhealthyNodeManagers() + metrics
|
||||
.getNumUnhealthyNodeManagers());
|
||||
tmp.setNumShutdownNodeManagers(
|
||||
tmp.getNumShutdownNodeManagers() + metrics
|
||||
.getNumShutdownNodeManagers());
|
||||
}
|
||||
return GetClusterMetricsResponse.newInstance(tmp);
|
||||
}
|
||||
|
@ -38,10 +38,12 @@ public void testClusterMetricsMerge() {
|
||||
YarnClusterMetrics resultMetrics = result.getClusterMetrics();
|
||||
Assert.assertEquals(3, resultMetrics.getNumNodeManagers());
|
||||
Assert.assertEquals(3, resultMetrics.getNumActiveNodeManagers());
|
||||
Assert.assertEquals(3, resultMetrics.getNumDecommissioningNodeManagers());
|
||||
Assert.assertEquals(3, resultMetrics.getNumDecommissionedNodeManagers());
|
||||
Assert.assertEquals(3, resultMetrics.getNumLostNodeManagers());
|
||||
Assert.assertEquals(3, resultMetrics.getNumRebootedNodeManagers());
|
||||
Assert.assertEquals(3, resultMetrics.getNumUnhealthyNodeManagers());
|
||||
Assert.assertEquals(3, resultMetrics.getNumShutdownNodeManagers());
|
||||
}
|
||||
|
||||
public GetClusterMetricsResponse getClusterMetricsResponse(int value) {
|
||||
@ -49,9 +51,11 @@ public GetClusterMetricsResponse getClusterMetricsResponse(int value) {
|
||||
metrics.setNumUnhealthyNodeManagers(value);
|
||||
metrics.setNumRebootedNodeManagers(value);
|
||||
metrics.setNumLostNodeManagers(value);
|
||||
metrics.setNumDecommissioningNodeManagers(value);
|
||||
metrics.setNumDecommissionedNodeManagers(value);
|
||||
metrics.setNumActiveNodeManagers(value);
|
||||
metrics.setNumNodeManagers(value);
|
||||
metrics.setNumShutdownNodeManagers(value);
|
||||
return GetClusterMetricsResponse.newInstance(metrics);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user