From 735d8b27f78ea8be839008650a3e88db37dc507d Mon Sep 17 00:00:00 2001 From: Luke Lu Date: Fri, 11 Oct 2013 08:18:57 +0000 Subject: [PATCH] YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1531222 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 2 ++ .../distributedshell/ApplicationMaster.java | 23 ++++++++++-- .../applications/distributedshell/Client.java | 35 ++++++++++++++++--- .../TestDistributedShell.java | 33 +++++++++++++++-- 4 files changed, 85 insertions(+), 8 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9d279f3a27..e50461948d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -34,6 +34,8 @@ Release 2.3.0 - UNRELEASED IMPROVEMENTS + YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) + YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza) YARN-1098. Separate out RM services into Always On and Active (Karthik diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index fa6eb9040d..364476171f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -180,6 +180,8 @@ public class ApplicationMaster { private int numTotalContainers = 1; // Memory to request for the container on which the shell command will run private int containerMemory = 10; + // VirtualCores to request for the container on which the shell command will run + private int containerVirtualCores = 1; // Priority of the request private int requestPriority; @@ -309,6 +311,8 @@ public boolean init(String[] args) throws ParseException, IOException { "Environment for shell script. Specified as env_key=env_val pairs"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); + opts.addOption("container_vcores", true, + "Amount of virtual cores to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); @@ -421,6 +425,8 @@ public boolean init(String[] args) throws ParseException, IOException { containerMemory = Integer.parseInt(cliParser.getOptionValue( "container_memory", "10")); + containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( + "container_vcores", "1")); numTotalContainers = Integer.parseInt(cliParser.getOptionValue( "num_containers", "1")); if (numTotalContainers == 0) { @@ -492,6 +498,9 @@ public boolean run() throws YarnException, IOException { // resource manager int maxMem = response.getMaximumResourceCapability().getMemory(); LOG.info("Max mem capabililty of resources in this cluster " + maxMem); + + int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); + LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores); // A resource ask cannot exceed the max. if (containerMemory > maxMem) { @@ -501,6 +510,13 @@ public boolean run() throws YarnException, IOException { containerMemory = maxMem; } + if (containerVirtualCores > maxVCores) { + LOG.info("Container virtual cores specified above max threshold of cluster." + + " Using max value." + ", specified=" + containerVirtualCores + ", max=" + + maxVCores); + containerVirtualCores = maxVCores; + } + // Setup ask for containers from RM // Send request for containers to RM // Until we get our fully allocated quota, we keep on polling RM for @@ -645,7 +661,9 @@ public void onContainersAllocated(List allocatedContainers) { + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" - + allocatedContainer.getResource().getMemory()); + + allocatedContainer.getResource().getMemory() + + ", containerResourceVirtualCores" + + allocatedContainer.getResource().getVirtualCores()); // + ", containerToken" // +allocatedContainer.getContainerToken().getIdentifier().toString()); @@ -872,9 +890,10 @@ private ContainerRequest setupContainerAskForRM() { pri.setPriority(requestPriority); // Set up resource type requirements - // For now, only memory is supported so we set memory requirements + // For now, memory and CPU are supported so we set memory and cpu requirements Resource capability = Records.newRecord(Resource.class); capability.setMemory(containerMemory); + capability.setVirtualCores(containerVirtualCores); ContainerRequest request = new ContainerRequest(capability, null, null, pri); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 01e030a677..199a16d56f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -121,6 +121,8 @@ public class Client { private String amQueue = ""; // Amt. of memory resource to request for to run the App Master private int amMemory = 10; + // Amt. of virtual core resource to request for to run the App Master + private int amVCores = 1; // Application master jar file private String appMasterJar = ""; @@ -140,6 +142,8 @@ public class Client { // Amt of memory to request for container in which shell script will be executed private int containerMemory = 10; + // Amt. of virtual cores to request for container in which shell script will be executed + private int containerVirtualCores = 1; // No. of containers in which the shell script needs to be executed private int numContainers = 1; @@ -208,6 +212,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); opts.addOption("timeout", true, "Application timeout in milliseconds"); opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); + opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); opts.addOption("jar", true, "Jar file containing the application master"); opts.addOption("shell_command", true, "Shell command to be executed by the Application Master"); opts.addOption("shell_script", true, "Location of the shell script to be executed"); @@ -215,6 +220,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); + opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("debug", false, "Dump out debug information"); @@ -263,11 +269,16 @@ public boolean init(String[] args) throws ParseException { amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); amQueue = cliParser.getOptionValue("queue", "default"); amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); - + amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1")); + if (amMemory < 0) { throw new IllegalArgumentException("Invalid memory specified for application master, exiting." + " Specified memory=" + amMemory); } + if (amVCores < 0) { + throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." + + " Specified virtual cores=" + amVCores); + } if (!cliParser.hasOption("jar")) { throw new IllegalArgumentException("No jar file specified for application master"); @@ -306,11 +317,14 @@ public boolean init(String[] args) throws ParseException { shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); + containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); - if (containerMemory < 0 || numContainers < 1) { - throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting." + if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { + throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," + + " exiting." + " Specified containerMemory=" + containerMemory + + ", containerVirtualCores=" + containerVirtualCores + ", numContainer=" + numContainers); } @@ -383,6 +397,16 @@ public boolean run() throws IOException, YarnException { amMemory = maxMem; } + int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores(); + LOG.info("Max virtual cores capabililty of resources in this cluster " + maxVCores); + + if (amVCores > maxVCores) { + LOG.info("AM virtual cores specified above max threshold of cluster. " + + "Using max value." + ", specified=" + amVCores + + ", max=" + maxVCores); + amVCores = maxVCores; + } + // set the application name ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); @@ -514,6 +538,7 @@ public boolean run() throws IOException, YarnException { vargs.add(appMasterMainClass); // Set params for Application Master vargs.add("--container_memory " + String.valueOf(containerMemory)); + vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--priority " + String.valueOf(shellCmdPriority)); if (!shellCommand.isEmpty()) { @@ -544,9 +569,11 @@ public boolean run() throws IOException, YarnException { amContainer.setCommands(commands); // Set up resource type requirements - // For now, only memory is supported so we set memory requirements + // For now, both memory and vcores are supported, so we set memory and + // vcores requirements Resource capability = Records.newRecord(Resource.class); capability.setMemory(amMemory); + capability.setVirtualCores(amVCores); appContext.setResource(capability); // Service data is a binary blob that can be passed to the application diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index f8a41b7395..2f311b5114 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -105,7 +105,7 @@ public static void tearDown() throws IOException { } } } - + @Test(timeout=90000) public void testDSShell() throws Exception { @@ -118,8 +118,12 @@ public void testDSShell() throws Exception { Shell.WINDOWS ? "dir" : "ls", "--master_memory", "512", + "--master_vcores", + "2", "--container_memory", - "128" + "128", + "--container_vcores", + "1" }; LOG.info("Initializing DS Client"); @@ -237,6 +241,31 @@ public void testDSShellWithInvalidArgs() throws Exception { Assert.assertTrue("The throw exception is not expected", e.getMessage().contains("Invalid no. of containers")); } + + LOG.info("Initializing DS Client with invalid no. of vcores"); + try { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_memory", + "512", + "--master_vcores", + "-2", + "--container_memory", + "128", + "--container_vcores", + "1" + }; + client.init(args); + Assert.fail("Exception is expected"); + } catch (IllegalArgumentException e) { + Assert.assertTrue("The throw exception is not expected", + e.getMessage().contains("Invalid virtual cores specified")); + } } protected static void waitForNMToRegister(NodeManager nm)