YARN-11360: Add number of decommissioning/shutdown nodes to YARN cluster metrics. (#5060)

This commit is contained in:
Chris Nauroth 2022-10-28 11:07:01 -07:00 committed by GitHub
parent 88f7f5bc01
commit bfb84cd7f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 204 additions and 8 deletions

View File

@ -53,6 +53,20 @@ public static YarnClusterMetrics newInstance(int numNodeManagers) {
@Unstable
public abstract void setNumNodeManagers(int numNodeManagers);
/**
* Get the number of <code>DecommissioningNodeManager</code>s in the cluster.
*
* @return number of <code>DecommissioningNodeManager</code>s in the cluster
*/
@Public
@Unstable
public abstract int getNumDecommissioningNodeManagers();
@Private
@Unstable
public abstract void setNumDecommissioningNodeManagers(
int numDecommissioningNodeManagers);
/**
* Get the number of <code>DecommissionedNodeManager</code>s in the cluster.
*
@ -119,4 +133,16 @@ public abstract void setNumDecommissionedNodeManagers(
@Unstable
public abstract void setNumRebootedNodeManagers(int numRebootedNodeManagers);
/**
* Get the number of <code>ShutdownNodeManager</code>s in the cluster.
*
* @return number of <code>ShutdownNodeManager</code>s in the cluster
*/
@Public
@Unstable
public abstract int getNumShutdownNodeManagers();
@Private
@Unstable
public abstract void setNumShutdownNodeManagers(int numShutdownNodeManagers);
}

View File

@ -572,6 +572,8 @@ message YarnClusterMetricsProto {
optional int32 num_lost_nms = 4;
optional int32 num_unhealthy_nms = 5;
optional int32 num_rebooted_nms = 6;
optional int32 num_decommissioning_nms = 7;
optional int32 num_shutdown_nms = 8;
}
enum QueueStateProto {

View File

@ -339,9 +339,11 @@ private static class NodesInformation {
int totalNodes;
int runningNodes;
int unhealthyNodes;
int decommissioningNodes;
int decommissionedNodes;
int lostNodes;
int rebootedNodes;
int shutdownNodes;
}
private static class QueueMetrics {
@ -696,6 +698,8 @@ protected NodesInformation getNodesInfo() {
return nodeInfo;
}
nodeInfo.decommissioningNodes =
yarnClusterMetrics.getNumDecommissioningNodeManagers();
nodeInfo.decommissionedNodes =
yarnClusterMetrics.getNumDecommissionedNodeManagers();
nodeInfo.totalNodes = yarnClusterMetrics.getNumNodeManagers();
@ -703,6 +707,7 @@ protected NodesInformation getNodesInfo() {
nodeInfo.lostNodes = yarnClusterMetrics.getNumLostNodeManagers();
nodeInfo.unhealthyNodes = yarnClusterMetrics.getNumUnhealthyNodeManagers();
nodeInfo.rebootedNodes = yarnClusterMetrics.getNumRebootedNodeManagers();
nodeInfo.shutdownNodes = yarnClusterMetrics.getNumShutdownNodeManagers();
return nodeInfo;
}
@ -880,11 +885,11 @@ String getHeader(QueueMetrics queueMetrics, NodesInformation nodes) {
ret.append(CLEAR_LINE)
.append(limitLineLength(String.format(
"NodeManager(s)"
+ ": %d total, %d active, %d unhealthy, %d decommissioned,"
+ " %d lost, %d rebooted%n",
+ ": %d total, %d active, %d unhealthy, %d decommissioning,"
+ " %d decommissioned, %d lost, %d rebooted, %d shutdown%n",
nodes.totalNodes, nodes.runningNodes, nodes.unhealthyNodes,
nodes.decommissionedNodes, nodes.lostNodes,
nodes.rebootedNodes), terminalWidth, true));
nodes.decommissioningNodes, nodes.decommissionedNodes, nodes.lostNodes,
nodes.rebootedNodes, nodes.shutdownNodes), terminalWidth, true));
ret.append(CLEAR_LINE)
.append(limitLineLength(String.format(
@ -1039,7 +1044,8 @@ protected void showFieldsScreen() {
}
}
protected void showTopScreen() {
@VisibleForTesting
void showTopScreen() {
List<ApplicationInformation> appsInfo = new ArrayList<>();
List<ApplicationReport> apps;
try {

View File

@ -18,7 +18,12 @@
package org.apache.hadoop.yarn.client.cli;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
@ -27,9 +32,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -47,6 +56,9 @@ public class TestTopCLI {
private static Map<String, String> savedStaticResolution = new HashMap<>();
private PrintStream stdout;
private PrintStream stderr;
@BeforeClass
public static void initializeDummyHostnameResolution() throws Exception {
String previousIpAddress;
@ -68,6 +80,18 @@ public static void restoreDummyHostnameResolution() throws Exception {
}
}
@Before
public void before() {
this.stdout = System.out;
this.stderr = System.err;
}
@After
public void after() {
System.setOut(this.stdout);
System.setErr(this.stderr);
}
@Test
public void testHAClusterInfoURL() throws IOException, InterruptedException {
TopCLI topcli = new TopCLI();
@ -103,4 +127,44 @@ public void testHAClusterInfoURL() throws IOException, InterruptedException {
Assert.assertEquals("https", clusterUrl.getProtocol());
Assert.assertEquals(rm1Address, clusterUrl.getAuthority());
}
}
@Test
public void testHeaderNodeManagers() throws Exception {
YarnClusterMetrics ymetrics = mock(YarnClusterMetrics.class);
when(ymetrics.getNumNodeManagers()).thenReturn(0);
when(ymetrics.getNumDecommissioningNodeManagers()).thenReturn(1);
when(ymetrics.getNumDecommissionedNodeManagers()).thenReturn(2);
when(ymetrics.getNumActiveNodeManagers()).thenReturn(3);
when(ymetrics.getNumLostNodeManagers()).thenReturn(4);
when(ymetrics.getNumUnhealthyNodeManagers()).thenReturn(5);
when(ymetrics.getNumRebootedNodeManagers()).thenReturn(6);
when(ymetrics.getNumShutdownNodeManagers()).thenReturn(7);
YarnClient client = mock(YarnClient.class);
when(client.getYarnClusterMetrics()).thenReturn(ymetrics);
TopCLI topcli = new TopCLI() {
@Override protected void createAndStartYarnClient() {
}
};
topcli.setClient(client);
topcli.terminalWidth = 200;
String actual;
try (ByteArrayOutputStream outStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(outStream)) {
System.setOut(out);
System.setErr(out);
topcli.showTopScreen();
out.flush();
actual = outStream.toString("UTF-8");
}
String expected = "NodeManager(s)"
+ ": 0 total, 3 active, 5 unhealthy, 1 decommissioning,"
+ " 2 decommissioned, 4 lost, 6 rebooted, 7 shutdown";
Assert.assertTrue(
String.format("Expected output to contain [%s], actual output was [%s].", expected, actual),
actual.contains(expected));
}
}

View File

@ -89,6 +89,22 @@ public void setNumNodeManagers(int numNodeManagers) {
builder.setNumNodeManagers((numNodeManagers));
}
@Override
public int getNumDecommissioningNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
if (p.hasNumDecommissioningNms()) {
return (p.getNumDecommissioningNms());
}
return 0;
}
@Override
public void
setNumDecommissioningNodeManagers(int numDecommissioningNodeManagers) {
maybeInitBuilder();
builder.setNumDecommissioningNms(numDecommissioningNodeManagers);
}
@Override
public int getNumDecommissionedNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
@ -165,4 +181,19 @@ public void setNumRebootedNodeManagers(int numRebootedNodeManagers) {
maybeInitBuilder();
builder.setNumRebootedNms((numRebootedNodeManagers));
}
}
@Override
public int getNumShutdownNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
if (p.hasNumShutdownNms()) {
return (p.getNumShutdownNms());
}
return 0;
}
@Override
public void setNumShutdownNodeManagers(int numShutdownNodeManagers) {
maybeInitBuilder();
builder.setNumShutdownNms(numShutdownNodeManagers);
}
}

View File

@ -860,12 +860,14 @@ public GetClusterMetricsResponse getClusterMetrics(
.newRecordInstance(YarnClusterMetrics.class);
ymetrics.setNumNodeManagers(this.rmContext.getRMNodes().size());
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
ymetrics.setNumDecommissioningNodeManagers(clusterMetrics.getNumDecommissioningNMs());
ymetrics.setNumDecommissionedNodeManagers(clusterMetrics
.getNumDecommisionedNMs());
ymetrics.setNumActiveNodeManagers(clusterMetrics.getNumActiveNMs());
ymetrics.setNumLostNodeManagers(clusterMetrics.getNumLostNMs());
ymetrics.setNumUnhealthyNodeManagers(clusterMetrics.getUnhealthyNMs());
ymetrics.setNumRebootedNodeManagers(clusterMetrics.getNumRebootedNMs());
ymetrics.setNumShutdownNodeManagers(clusterMetrics.getNumShutdownNMs());
response.setClusterMetrics(ymetrics);
return response;
}

View File

@ -58,6 +58,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.MockApps;
@ -75,6 +76,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
@ -139,6 +141,7 @@
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
@ -2809,10 +2812,63 @@ protected ClientRMService createClientRMService() {
rm.close();
}
@Test
public void testGetClusterMetrics() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
rm.start();
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
clusterMetrics.incrDecommissioningNMs();
repeat(2, clusterMetrics::incrDecommisionedNMs);
repeat(3, clusterMetrics::incrNumActiveNodes);
repeat(4, clusterMetrics::incrNumLostNMs);
repeat(5, clusterMetrics::incrNumUnhealthyNMs);
repeat(6, clusterMetrics::incrNumRebootedNMs);
repeat(7, clusterMetrics::incrNumShutdownNMs);
// Create a client.
Configuration conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client =
(ApplicationClientProtocol) rpc
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
YarnClusterMetrics ymetrics = client.getClusterMetrics(
GetClusterMetricsRequest.newInstance()).getClusterMetrics();
Assert.assertEquals(0, ymetrics.getNumNodeManagers());
Assert.assertEquals(1, ymetrics.getNumDecommissioningNodeManagers());
Assert.assertEquals(2, ymetrics.getNumDecommissionedNodeManagers());
Assert.assertEquals(3, ymetrics.getNumActiveNodeManagers());
Assert.assertEquals(4, ymetrics.getNumLostNodeManagers());
Assert.assertEquals(5, ymetrics.getNumUnhealthyNodeManagers());
Assert.assertEquals(6, ymetrics.getNumRebootedNodeManagers());
Assert.assertEquals(7, ymetrics.getNumShutdownNodeManagers());
rpc.stopProxy(client, conf);
rm.close();
}
@After
public void tearDown(){
if (resourceTypesFile != null && resourceTypesFile.exists()) {
resourceTypesFile.delete();
}
ClusterMetrics.destroy();
DefaultMetricsSystem.shutdown();
}
private static void repeat(int n, Runnable r) {
for (int i = 0; i < n; ++i) {
r.run();
}
}
}

View File

@ -80,6 +80,9 @@ public static GetClusterMetricsResponse merge(
tmp.getNumNodeManagers() + metrics.getNumNodeManagers());
tmp.setNumActiveNodeManagers(
tmp.getNumActiveNodeManagers() + metrics.getNumActiveNodeManagers());
tmp.setNumDecommissioningNodeManagers(
tmp.getNumDecommissioningNodeManagers() + metrics
.getNumDecommissioningNodeManagers());
tmp.setNumDecommissionedNodeManagers(
tmp.getNumDecommissionedNodeManagers() + metrics
.getNumDecommissionedNodeManagers());
@ -90,6 +93,9 @@ public static GetClusterMetricsResponse merge(
tmp.setNumUnhealthyNodeManagers(
tmp.getNumUnhealthyNodeManagers() + metrics
.getNumUnhealthyNodeManagers());
tmp.setNumShutdownNodeManagers(
tmp.getNumShutdownNodeManagers() + metrics
.getNumShutdownNodeManagers());
}
return GetClusterMetricsResponse.newInstance(tmp);
}
@ -526,4 +532,3 @@ public static GetNodesToAttributesResponse mergeNodesToAttributesResponse(
return GetNodesToAttributesResponse.newInstance(attributesMap);
}
}

View File

@ -82,10 +82,12 @@ public void testClusterMetricsMerge() {
YarnClusterMetrics resultMetrics = result.getClusterMetrics();
Assert.assertEquals(3, resultMetrics.getNumNodeManagers());
Assert.assertEquals(3, resultMetrics.getNumActiveNodeManagers());
Assert.assertEquals(3, resultMetrics.getNumDecommissioningNodeManagers());
Assert.assertEquals(3, resultMetrics.getNumDecommissionedNodeManagers());
Assert.assertEquals(3, resultMetrics.getNumLostNodeManagers());
Assert.assertEquals(3, resultMetrics.getNumRebootedNodeManagers());
Assert.assertEquals(3, resultMetrics.getNumUnhealthyNodeManagers());
Assert.assertEquals(3, resultMetrics.getNumShutdownNodeManagers());
}
public GetClusterMetricsResponse getClusterMetricsResponse(int value) {
@ -93,9 +95,11 @@ public GetClusterMetricsResponse getClusterMetricsResponse(int value) {
metrics.setNumUnhealthyNodeManagers(value);
metrics.setNumRebootedNodeManagers(value);
metrics.setNumLostNodeManagers(value);
metrics.setNumDecommissioningNodeManagers(value);
metrics.setNumDecommissionedNodeManagers(value);
metrics.setNumActiveNodeManagers(value);
metrics.setNumNodeManagers(value);
metrics.setNumShutdownNodeManagers(value);
return GetClusterMetricsResponse.newInstance(metrics);
}