diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index dc61971428..bfd854e9c6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -324,6 +324,9 @@ Release 2.4.0 - UNRELEASED YARN-1398. Fixed a deadlock in ResourceManager between users requesting queue-acls and completing containers. (vinodkv) + YARN-1071. Enabled ResourceManager to recover cluster metrics + numDecommissionedNMs after restarting. (Jian He via zjshen) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java index 5c94ef4190..942ec81128 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java @@ -91,7 +91,11 @@ public int getNumDecommisionedNMs() { public void incrDecommisionedNMs() { numDecommissionedNMs.incr(); } - + + public void setDecommisionedNMs(int num) { + numDecommissionedNMs.set(num); + } + public void decrDecommisionedNMs() { numDecommissionedNMs.decr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 06f35b811d..d7797cc5ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -75,6 +75,7 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); + setDecomissionedNMsMetrics(); printConfiguredHosts(); } catch (YarnException ex) { disableHostsFileReader(ex); @@ -120,10 +121,16 @@ public void refreshNodes(Configuration yarnConf) throws IOException, this.conf, includesFile), excludesFile.isEmpty() ? null : this.rmContext.getConfigurationProvider() .getConfigurationInputStream(this.conf, excludesFile)); + setDecomissionedNMsMetrics(); printConfiguredHosts(); } } + private void setDecomissionedNMsMetrics() { + Set excludeList = hostsReader.getExcludedHosts(); + ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size()); + } + public boolean isValidNode(String hostName) { synchronized (hostsReader) { Set hostsList = hostsReader.getHosts(); @@ -190,6 +197,7 @@ private void disableHostsFileReader(Exception ex) { conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); + setDecomissionedNMsMetrics(); } catch (IOException ioe2) { // Should *never* happen this.hostsReader = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 52bc285b3a..dc53a5d0d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -424,9 +425,22 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, break; } + // Decomissioned NMs equals to the nodes missing in include list (if + // include list not empty) or the nodes listed in excluded list. + // DecomissionedNMs as per exclude list is set upfront when the + // exclude list is read so that RM restart can also reflect the + // decomissionedNMs. Note that RM is still not able to know decomissionedNMs + // as per include list after it restarts as they are known when those nodes + // come for registration. + // DecomissionedNMs as per include list is incremented in this transition. switch (finalState) { case DECOMMISSIONED: - metrics.incrDecommisionedNMs(); + Set ecludedHosts = + context.getNodesListManager().getHostsReader().getExcludedHosts(); + if (!ecludedHosts.contains(hostName) + && !ecludedHosts.contains(NetUtils.normalizeHostName(hostName))) { + metrics.incrDecommisionedNMs(); + } break; case LOST: metrics.incrNumLostNMs(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index a966efdc18..4ff38f0dec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -30,6 +30,7 @@ import junit.framework.Assert; +import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -102,6 +103,10 @@ public void setUp() throws Exception { rmContext = new RMContextImpl(rmDispatcher, null, null, null, mock(DelegationTokenRenewer.class), null, null, null, null, null); + NodesListManager nodesListManager = mock(NodesListManager.class); + HostsFileReader reader = mock(HostsFileReader.class); + when(nodesListManager.getHostsReader()).thenReturn(reader); + ((RMContextImpl) rmContext).setNodesListManager(nodesListManager); scheduler = mock(YarnScheduler.class); doAnswer( new Answer() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index d50f0d7c5f..dff9019589 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -23,6 +23,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -38,7 +40,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; @@ -90,12 +94,16 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; public class TestRMRestart { + private final static File TEMP_DIR = new File(System.getProperty( + "test.build.data", "/tmp"), "decommision"); + private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); private YarnConfiguration conf; // Fake rmAddr for token-renewal @@ -113,6 +121,11 @@ public void setup() throws UnknownHostException { Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); } + @After + public void tearDown() { + TEMP_DIR.delete(); + } + @SuppressWarnings("rawtypes") @Test (timeout=180000) public void testRMRestart() throws Exception { @@ -1666,6 +1679,56 @@ private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted, appsCompleted + appsCompletedCarryOn); } + @Test + public void testDecomissionedNMsMetricsOnRMRestart() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + writeToHostsFile(""); + MockRM rm1 = new MockRM(conf); + rm1.start(); + rm1.registerNode("localhost:1234", 8000); + rm1.registerNode("host2:1234", 8000); + Assert + .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + String ip = NetUtils.normalizeHostName("localhost"); + // Add 2 hosts to exclude list. + writeToHostsFile("host2", ip); + + // refresh nodes + rm1.getNodesListManager().refreshNodes(conf); + Assert + .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + + // restart RM. + MockRM rm2 = new MockRM(conf); + rm2.start(); + Assert + .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + rm1.stop(); + rm2.stop(); + } + + private void writeToHostsFile(String... hosts) throws IOException { + if (!hostFile.exists()) { + TEMP_DIR.mkdirs(); + hostFile.createNewFile(); + } + FileOutputStream fStream = null; + try { + fStream = new FileOutputStream(hostFile); + for (int i = 0; i < hosts.length; i++) { + fStream.write(hosts[i].getBytes()); + fStream.write(System.getProperty("line.separator").getBytes()); + } + } finally { + if (fStream != null) { + IOUtils.closeStream(fStream); + fStream = null; + } + } + } + public class TestMemoryRMStateStore extends MemoryRMStateStore { int count = 0; public int updateApp = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index abb21edfc5..803e95f0e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -150,7 +150,6 @@ public void testDecommissionWithExcludeHosts() throws Exception { MockNM nm3 = rm.registerNode("localhost:4433", 1024); int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); - NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm2.nodeHeartbeat(true); @@ -161,18 +160,17 @@ public void testDecommissionWithExcludeHosts() throws Exception { writeToHostsFile("host2", ip); rm.getNodesListManager().refreshNodes(conf); + checkDecommissionedNMCount(rm, metricCount + 2); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm2.nodeHeartbeat(true); Assert.assertTrue("The decommisioned metrics are not updated", NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - checkDecommissionedNMCount(rm, ++metricCount); nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue("The decommisioned metrics are not updated", NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); - checkDecommissionedNMCount(rm, ++metricCount); } /**