From b7613e0f406fb2b9bd5b1b3c79658e801f63c587 Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Wed, 15 Feb 2017 14:48:17 +0530 Subject: [PATCH] YARN-6156. AM blacklisting to consider node label partition (Bibin A Chundatt via Varun Saxena) --- .../server/resourcemanager/RMServerUtils.java | 22 ++++++ .../nodelabels/RMNodeLabelsManager.java | 16 ++++ .../resourcemanager/rmapp/RMAppImpl.java | 12 ++- .../rmapp/attempt/RMAppAttemptImpl.java | 4 +- .../TestCapacitySchedulerNodeLabelUpdate.java | 73 +++++++++++++++++++ 5 files changed, 118 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 224a1da69f..e98141b0b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt .RMAppAttemptState; @@ -561,4 +562,25 @@ public static Map validateISO8601AndConvertToLocal } return newApplicationTimeout; } + + /** + * Get applicable Node count for AM. + * + * @param rmContext context + * @param conf configuration + * @param amreq am resource request + * @return applicable node count + */ + public static int getApplicableNodeCountForAM(RMContext rmContext, + Configuration conf, ResourceRequest amreq) { + if (YarnConfiguration.areNodeLabelsEnabled(conf)) { + RMNodeLabelsManager labelManager = rmContext.getNodeLabelManager(); + String amNodeLabelExpression = amreq.getNodeLabelExpression(); + amNodeLabelExpression = (amNodeLabelExpression == null + || amNodeLabelExpression.trim().isEmpty()) + ? RMNodeLabelsManager.NO_LABEL : amNodeLabelExpression; + return labelManager.getActiveNMCountPerLabel(amNodeLabelExpression); + } + return rmContext.getScheduler().getNumClusterNodes(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 5dc839278d..effe422bf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -350,6 +350,22 @@ public Resource getQueueResource(String queueName, Set queueLabels, } } + /* + * Get active node count based on label. + */ + public int getActiveNMCountPerLabel(String label) { + if (label == null) { + return 0; + } + try { + readLock.lock(); + RMNodeLabel labelInfo = labelCollections.get(label); + return (labelInfo == null) ? 0 : labelInfo.getNumActiveNMs(); + } finally { + readLock.unlock(); + } + } + public Set getLabelsOnNode(NodeId nodeId) { try { readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 12ece3f6f3..516109b0ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -18,11 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; -import java.io.IOException; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -46,9 +44,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.ipc.CallerContext; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -987,9 +983,11 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) { // Transfer over the blacklist from the previous app-attempt. currentAMBlacklistManager = currentAttempt.getAMBlacklistManager(); } else { - if (amBlacklistingEnabled) { + if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) { currentAMBlacklistManager = new SimpleBlacklistManager( - scheduler.getNumClusterNodes(), blacklistDisableThreshold); + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, + getAMResourceRequest()), + blacklistDisableThreshold); } else { currentAMBlacklistManager = new DisabledBlacklistManager(); } @@ -1006,7 +1004,7 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) { attempts.put(appAttemptId, attempt); currentAttempt = attempt; } - + private void createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) { createNewAttempt(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index ab84985edc..1788722265 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1057,7 +1057,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, appAttempt.amReq.setRelaxLocality(true); appAttempt.getAMBlacklistManager().refreshNodeHostCount( - appAttempt.scheduler.getNumClusterNodes()); + RMServerUtils.getApplicableNodeCountForAM(appAttempt.rmContext, + appAttempt.conf, appAttempt.amReq)); ResourceBlacklistRequest amBlacklist = appAttempt.getAMBlacklistManager().getBlacklistUpdates(); @@ -1246,7 +1247,6 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } } - private void rememberTargetTransitions(RMAppAttemptEvent event, Object transitionToDo, RMAppAttemptState targetFinalState) { transitionTodo = transitionToDo; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 732b5d11a9..b4ebd15ccd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -721,6 +723,77 @@ public RMNodeLabelsManager createNodeLabelManager() { rm.close(); } + @Test(timeout = 30000) + public void testBlacklistAMDisableLabel() throws Exception { + conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, + true); + conf.setFloat( + YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD, + 0.5f); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("x"), + NodeId.newInstance("h3", 0), toSet("x"), NodeId.newInstance("h6", 0), + toSet("x"))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h4", 0), toSet("y"), + NodeId.newInstance("h5", 0), toSet("y"), NodeId.newInstance("h7", 0), + toSet("y"))); + + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + // Nodes in label default h1,h8,h9 + // Nodes in label x h2,h3,h6 + // Nodes in label y h4,h5,h7 + MockNM nm1 = rm.registerNode("h1:1234", 2048); + MockNM nm2 = rm.registerNode("h2:1234", 2048); + rm.registerNode("h3:1234", 2048); + rm.registerNode("h4:1234", 2048); + rm.registerNode("h5:1234", 2048); + rm.registerNode("h6:1234", 2048); + rm.registerNode("h7:1234", 2048); + rm.registerNode("h8:1234", 2048); + rm.registerNode("h9:1234", 2048); + + // Submit app with AM container launched on default partition i.e. h1. + RMApp app = rm.submitApp(GB, "app", "user", null, "a"); + MockRM.launchAndRegisterAM(app, rm, nm1); + RMAppAttempt appAttempt = app.getCurrentAppAttempt(); + // Add default node blacklist from default + appAttempt.getAMBlacklistManager().addNode("h1"); + ResourceBlacklistRequest blacklistUpdates = + appAttempt.getAMBlacklistManager().getBlacklistUpdates(); + Assert.assertEquals(1, blacklistUpdates.getBlacklistAdditions().size()); + Assert.assertEquals(0, blacklistUpdates.getBlacklistRemovals().size()); + // Adding second node from default parition + appAttempt.getAMBlacklistManager().addNode("h8"); + blacklistUpdates = appAttempt.getAMBlacklistManager().getBlacklistUpdates(); + Assert.assertEquals(0, blacklistUpdates.getBlacklistAdditions().size()); + Assert.assertEquals(2, blacklistUpdates.getBlacklistRemovals().size()); + + // Submission in label x + RMApp applabel = rm.submitApp(GB, "app", "user", null, "a", "x"); + MockRM.launchAndRegisterAM(applabel, rm, nm2); + RMAppAttempt appAttemptlabelx = applabel.getCurrentAppAttempt(); + appAttemptlabelx.getAMBlacklistManager().addNode("h2"); + ResourceBlacklistRequest blacklistUpdatesOnx = + appAttemptlabelx.getAMBlacklistManager().getBlacklistUpdates(); + Assert.assertEquals(1, blacklistUpdatesOnx.getBlacklistAdditions().size()); + Assert.assertEquals(0, blacklistUpdatesOnx.getBlacklistRemovals().size()); + // Adding second node from default parition + appAttemptlabelx.getAMBlacklistManager().addNode("h3"); + blacklistUpdatesOnx = + appAttempt.getAMBlacklistManager().getBlacklistUpdates(); + Assert.assertEquals(0, blacklistUpdatesOnx.getBlacklistAdditions().size()); + Assert.assertEquals(2, blacklistUpdatesOnx.getBlacklistRemovals().size()); + + rm.close(); + } + private void checkAMResourceLimit(MockRM rm, String queuename, int memory, String label) throws InterruptedException { Assert.assertEquals(memory,