From 3b46aae977e078cf7eb5e6bbbc55aca7cecee4c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A0=E9=94=A1=E5=B9=B3?= <40832063+zhangxiping1@users.noreply.github.com> Date: Fri, 8 Apr 2022 15:19:17 +0800 Subject: [PATCH] YARN-11107. When NodeLabel is enabled for a YARN cluster, AM blacklist program does not work properly. Contributed by zhangxiping1 --- .../resourcemanager/DefaultAMSProcessor.java | 16 +- .../TestApplicationMasterServiceCapacity.java | 163 ++++++++++++++++++ 2 files changed, 178 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 12a3b21682..3797a6ed3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -351,7 +351,21 @@ public void allocate(ApplicationAttemptId appAttemptId, ((AbstractYarnScheduler)getScheduler()) .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors()); - response.setNumClusterNodes(getScheduler().getNumClusterNodes()); + String label=""; + try { + label = rmContext.getScheduler() + .getQueueInfo(app.getQueue(), false, false) + .getDefaultNodeLabelExpression(); + } catch (Exception e){ + //Queue may not exist since it could be auto-created in case of + // dynamic queues + } + + if (label == null || label.equals("")) { + response.setNumClusterNodes(getScheduler().getNumClusterNodes()); + } else { + response.setNumClusterNodes(rmContext.getNodeLabelManager().getActiveNMCountPerLabel(label)); + } // add collector address for this application if (timelineServiceV2Enabled) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java index 182016a6cf..ccbc375359 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceCapacity.java @@ -16,36 +16,46 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer .RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.collect.ImmutableMap; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSet; import static org.junit.Assert.fail; /** @@ -208,4 +218,157 @@ public void testPriorityInAllocatedResponse() throws Exception { Assert.assertEquals(appPriority2, response2.getApplicationPriority()); rm.stop(); } + + @Test(timeout = 300000) + public void testGetNMNumInAllocatedResponseWithOutNodeLabel() throws Exception { + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 node2 node3 node4 + MockNM nm1 = rm.registerNode("host1:1234", 6 * GB); + MockNM nm2 = rm.registerNode("host2:1234", 6 * GB); + MockNM nm3 = rm.registerNode("host3:1234", 6 * GB); + + // Submit an application + MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder + .createWithMemory(2048, rm) + .build(); + RMApp app1 = MockRMAppSubmitter.submit(rm, data); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + List release = new ArrayList(); + List ask = new ArrayList(); + allocateRequest.setReleaseList(release); + allocateRequest.setAskList(ask); + + AllocateResponse response1 = am1.allocate(allocateRequest); + Assert.assertEquals(3, response1.getNumClusterNodes()); + + rm.stop(); + } + + private Configuration getConfigurationWithQueueLabels(Configuration config) { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 50); + conf.setMaximumCapacity(A, 100); + conf.setAccessibleNodeLabels(A, toSet("x")); + conf.setDefaultNodeLabelExpression(A, "x"); + conf.setCapacityByLabel(A, "x", 100); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 50); + conf.setMaximumCapacity(B, 100); + conf.setAccessibleNodeLabels(B, toSet("y")); + conf.setDefaultNodeLabelExpression(B, "y"); + conf.setCapacityByLabel(B, "y", 100); + + return conf; + } + + @Test(timeout = 300000) + public void testGetNMNumInAllocatedResponseWithNodeLabel() throws Exception { + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + RMNodeLabelsManager mgr = new RMNodeLabelsManager(); + mgr.init(getConfig()); + return mgr; + } + }; + + // add node label "x","y" and set node to label mapping + Set clusterNodeLabels = new HashSet(); + clusterNodeLabels.add("x"); + clusterNodeLabels.add("y"); + + RMNodeLabelsManager nodeLabelManager = rm.getRMContext().getNodeLabelManager(); + nodeLabelManager. + addToCluserNodeLabelsWithDefaultExclusivity(clusterNodeLabels); + + //has 3 nodes with node label "x",1 node with node label "y" + nodeLabelManager + .addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host1", 1234), toSet("x"))); + nodeLabelManager + .addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host2", 1234), toSet("x"))); + nodeLabelManager + .addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host3", 1234), toSet("x"))); + nodeLabelManager + .addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host4", 1234), toSet("y"))); + rm.start(); + + // Register node1 node2 node3 node4 + MockNM nm1 = rm.registerNode("host1:1234", 6 * GB); + MockNM nm2 = rm.registerNode("host2:1234", 6 * GB); + MockNM nm3 = rm.registerNode("host3:1234", 6 * GB); + MockNM nm4 = rm.registerNode("host4:1234", 6 * GB); + + // submit an application to queue root.a expression as "x" + MockRMAppSubmissionData data1 = MockRMAppSubmissionData.Builder + .createWithMemory(2048, rm) + .withAppName("someApp1") + .withUser("someUser") + .withQueue("root.a") + .build(); + RMApp app1 = MockRMAppSubmitter.submit(rm, data1); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // submit an application to queue root.b expression as "y" + MockRMAppSubmissionData data2 = MockRMAppSubmissionData.Builder + .createWithMemory(2048, rm) + .withAppName("someApp2") + .withUser("someUser") + .withQueue("root.b") + .build(); + RMApp app2 = MockRMAppSubmitter.submit(rm, data2); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm4); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + List release = new ArrayList(); + List ask = new ArrayList(); + allocateRequest.setReleaseList(release); + allocateRequest.setAskList(ask); + + AllocateResponse response1 = am1.allocate(allocateRequest); + AllocateResponse response2 = am2.allocate(allocateRequest); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId()); + RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId()); + + // Do node heartbeats many times + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode3)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode4)); + } + + //has 3 nodes with node label "x" + Assert.assertEquals(3, response1.getNumClusterNodes()); + + //has 1 node with node label "y" + Assert.assertEquals(1, response2.getNumClusterNodes()); + + rm.stop(); + } }