diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b32132b93a..b1d25d0757 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -404,6 +404,9 @@ Release 2.6.0 - UNRELEASED sake of localization and log-aggregation for long-running services. (Jian He via vinodkv) + YARN-2502. Changed DistributedShell to support node labels. (Wangda Tan via + jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 2067aca481..0e9a4e4a49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -115,7 +115,7 @@ public class Client { private static final Log LOG = LogFactory.getLog(Client.class); - + // Configuration private Configuration conf; private YarnClient yarnClient; @@ -152,6 +152,7 @@ public class Client { private int containerVirtualCores = 1; // No. of containers in which the shell script needs to be executed private int numContainers = 1; + private String nodeLabelExpression = null; // log4j.properties file // if available, add to local resources and set into classpath @@ -280,7 +281,12 @@ public Client(Configuration conf) throws Exception { opts.addOption("create", false, "Flag to indicate whether to create the " + "domain specified with -domain."); opts.addOption("help", false, "Print usage"); - + opts.addOption("node_label_expression", true, + "Node label expression to determine the nodes" + + " where all the containers of this application" + + " will be allocated, \"\" means containers" + + " can be allocated anywhere, if you don't specify the option," + + " default node_label_expression of queue will be used."); } /** @@ -391,6 +397,7 @@ public boolean init(String[] args) throws ParseException { containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); + if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," @@ -399,6 +406,8 @@ public boolean init(String[] args) throws ParseException { + ", containerVirtualCores=" + containerVirtualCores + ", numContainer=" + numContainers); } + + nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null); clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); @@ -617,6 +626,9 @@ public boolean run() throws IOException, YarnException { vargs.add("--container_memory " + String.valueOf(containerMemory)); vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--num_containers " + String.valueOf(numContainers)); + if (null != nodeLabelExpression) { + appContext.setNodeLabelExpression(nodeLabelExpression); + } vargs.add("--priority " + String.valueOf(shellCmdPriority)); for (Map.Entry entry : shellEnv.entrySet()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 2414d4d3ea..0ded5bd60c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -30,7 +30,9 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -42,6 +44,7 @@ import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -49,40 +52,81 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.ImmutableMap; + public class TestDistributedShell { private static final Log LOG = LogFactory.getLog(TestDistributedShell.class); protected MiniYARNCluster yarnCluster = null; - protected Configuration conf = new YarnConfiguration(); + private int numNodeManager = 1; + + private YarnConfiguration conf = null; protected final static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); + + private void initializeNodeLabels() throws IOException { + RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext(); + + // Setup node labels + RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); + Set labels = new HashSet(); + labels.add("x"); + labelsMgr.addToCluserNodeLabels(labels); + + // Setup queue access to node labels + conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x"); + conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x"); + conf.set( + "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity", + "100"); + + rmContext.getScheduler().reinitialize(conf, rmContext); + + // Fetch node-ids from yarn cluster + NodeId[] nodeIds = new NodeId[numNodeManager]; + for (int i = 0; i < numNodeManager; i++) { + NodeManager mgr = this.yarnCluster.getNodeManager(i); + nodeIds[i] = mgr.getNMContext().getNodeId(); + } + + // Set label x to NM[1] + labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels)); + } @Before public void setup() throws Exception { LOG.info("Starting up YARN cluster"); + + conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); - conf.setClass(YarnConfiguration.RM_SCHEDULER, - FifoScheduler.class, ResourceScheduler.class); conf.set("yarn.log.dir", "target"); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); + numNodeManager = 2; + if (yarnCluster == null) { - yarnCluster = new MiniYARNCluster( - TestDistributedShell.class.getSimpleName(), 1, 1, 1, 1, true); + yarnCluster = + new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, + numNodeManager, 1, 1, true); yarnCluster.init(conf); + yarnCluster.start(); - NodeManager nm = yarnCluster.getNodeManager(0); - waitForNMToRegister(nm); + + waitForNMsToRegister(); + + // currently only capacity scheduler support node labels, + initializeNodeLabels(); URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml"); if (url == null) { @@ -757,13 +801,15 @@ public void testDSShellWithInvalidArgs() throws Exception { } } - protected static void waitForNMToRegister(NodeManager nm) - throws Exception { - int attempt = 60; - ContainerManagerImpl cm = - ((ContainerManagerImpl) nm.getNMContext().getContainerManager()); - while (cm.getBlockNewContainerRequestsStatus() && attempt-- > 0) { - Thread.sleep(2000); + protected void waitForNMsToRegister() throws Exception { + int sec = 60; + while (sec >= 0) { + if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size() + >= numNodeManager) { + break; + } + Thread.sleep(1000); + sec--; } } @@ -892,5 +938,88 @@ private int verifyContainerLog(int containerNum, } return numOfWords; } + + @Test(timeout=90000) + public void testDSShellWithNodeLabelExpression() throws Exception { + // Start NMContainerMonitor + NMContainerMonitor mon = new NMContainerMonitor(); + Thread t = new Thread(mon); + t.start(); + + // Submit a job which will sleep for 60 sec + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "4", + "--shell_command", + "sleep", + "--shell_args", + "15", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--node_label_expression", + "x" + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + + t.interrupt(); + + // Check maximum number of containers on each NMs + int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); + // Check no container allocated on NM[0] + Assert.assertEquals(0, maxRunningContainersOnNMs[0]); + // Check there're some containers allocated on NM[1] + Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); + } + + /** + * Monitor containers running on NMs + */ + private class NMContainerMonitor implements Runnable { + // The interval of milliseconds of sampling (500ms) + final static int SAMPLING_INTERVAL_MS = 500; + + // The maximum number of containers running on each NMs + int[] maxRunningContainersOnNMs = new int[numNodeManager]; + + @Override + public void run() { + while (true) { + for (int i = 0; i < numNodeManager; i++) { + int nContainers = + yarnCluster.getNodeManager(i).getNMContext().getContainers() + .size(); + if (nContainers > maxRunningContainersOnNMs[i]) { + maxRunningContainersOnNMs[i] = nContainers; + } + } + try { + Thread.sleep(SAMPLING_INTERVAL_MS); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + } + + public int[] getMaxRunningContainersReport() { + return maxRunningContainersOnNMs; + } + } }