diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 3b5896177e..333e00ccdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -271,6 +271,8 @@ public enum DSEntity { private ExecutionType containerType = ExecutionType.GUARANTEED; // Whether to automatically promote opportunistic containers. private boolean autoPromoteContainers = false; + // Whether to enforce execution type of the containers. + private boolean enforceExecType = false; // Resource profile for the container private String containerResourceProfile = ""; @@ -466,6 +468,8 @@ public boolean init(String[] args) throws ParseException, IOException { opts.addOption("promote_opportunistic_after_start", false, "Flag to indicate whether to automatically promote opportunistic" + " containers to guaranteed."); + opts.addOption("enforce_execution_type", false, + "Flag to indicate whether to enforce execution type of containers"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, @@ -661,6 +665,9 @@ public boolean init(String[] args) throws ParseException, IOException { if (cliParser.hasOption("promote_opportunistic_after_start")) { autoPromoteContainers = true; } + if (cliParser.hasOption("enforce_execution_type")) { + enforceExecType = true; + } containerMemory = Integer.parseInt(cliParser.getOptionValue( "container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( @@ -1560,7 +1567,7 @@ private ContainerRequest setupContainerAskForRM() { ContainerRequest request = new ContainerRequest( getTaskResourceCapability(), null, null, pri, 0, true, null, - ExecutionTypeRequest.newInstance(containerType), + ExecutionTypeRequest.newInstance(containerType, enforceExecType), containerResourceProfile); LOG.info("Requested container ask: " + request.toString()); return request; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 369d94b806..1666325b99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -192,6 +192,8 @@ public class Client { private ExecutionType containerType = ExecutionType.GUARANTEED; // Whether to auto promote opportunistic containers private boolean autoPromoteContainers = false; + // Whether to enforce execution type of containers + private boolean enforceExecType = false; // Placement specification private String placementSpec = ""; @@ -337,6 +339,8 @@ public Client(Configuration conf) throws Exception { opts.addOption("promote_opportunistic_after_start", false, "Flag to indicate whether to automatically promote opportunistic" + " containers to guaranteed."); + opts.addOption("enforce_execution_type", false, + "Flag to indicate whether to enforce execution type of containers"); opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application " @@ -532,6 +536,9 @@ public boolean init(String[] args) throws ParseException { if (cliParser.hasOption("promote_opportunistic_after_start")) { autoPromoteContainers = true; } + if (cliParser.hasOption("enforce_execution_type")) { + enforceExecType = true; + } containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); containerVirtualCores = @@ -926,6 +933,9 @@ public boolean run() throws IOException, YarnException { if (autoPromoteContainers) { vargs.add("--promote_opportunistic_after_start"); } + if (enforceExecType) { + vargs.add("--enforce_execution_type"); + } if (containerMemory > 0) { vargs.add("--container_memory " + String.valueOf(containerMemory)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 80c1e208c6..e67e541629 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Supplier; import org.apache.commons.cli.MissingArgumentException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -53,6 +54,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -174,6 +177,8 @@ private void setupInternal(int numNodeManager, float timelineVersion) true); conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); // ATS version specific settings if (timelineVersion == 1.0f) { @@ -1469,6 +1474,97 @@ public void testDSShellWithOpportunisticContainers() throws Exception { } } + @Test + @TimelineVersion(2.0f) + public void testDSShellWithEnforceExecutionType() throws Exception { + Client client = new Client(new Configuration(yarnCluster.getConfig())); + try { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--shell_command", + "date", + "--container_type", + "OPPORTUNISTIC", + "--enforce_execution_type" + }; + client.init(args); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(yarnCluster.getConfig())); + yarnClient.start(); + waitForContainersLaunch(yarnClient, 2); + List apps = yarnClient.getApplications(); + ApplicationReport appReport = apps.get(0); + ApplicationId appId = appReport.getApplicationId(); + List appAttempts = + yarnClient.getApplicationAttempts(appId); + ApplicationAttemptReport appAttemptReport = appAttempts.get(0); + ApplicationAttemptId appAttemptId = + appAttemptReport.getApplicationAttemptId(); + List containers = + yarnClient.getContainers(appAttemptId); + // we should get two containers. + Assert.assertEquals(2, containers.size()); + ContainerId amContainerId = appAttemptReport.getAMContainerId(); + for (ContainerReport container : containers) { + if (!container.getContainerId().equals(amContainerId)) { + Assert.assertEquals(container.getExecutionType(), + ExecutionType.OPPORTUNISTIC); + } + } + } catch (Exception e) { + Assert.fail("Job execution with enforce execution type failed."); + } + } + + private void waitForContainersLaunch(YarnClient client, + int nContainers) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + try { + List apps = client.getApplications(); + if (apps == null || apps.isEmpty()) { + return false; + } + ApplicationId appId = apps.get(0).getApplicationId(); + List appAttempts = + client.getApplicationAttempts(appId); + if (appAttempts == null || appAttempts.isEmpty()) { + return false; + } + ApplicationAttemptId attemptId = + appAttempts.get(0).getApplicationAttemptId(); + List containers = client.getContainers(attemptId); + return (containers.size() == nContainers); + } catch (Exception e) { + return false; + } + } + }, 10, 60000); + } + @Test @TimelineVersion(2.0f) public void testDistributedShellWithResources() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm index 272c932873..b1eea9ed27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm @@ -84,6 +84,7 @@ $ yarn org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/ha ``` By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`. By adding flag `-promote_opportunistic_after_start` to the above command, application master will attempt to promote all opportunistic containers to guaranteed once they are started. +By adding flag '-enforce_execution_type' to the above command, scheduler will honor execution type of the containers. $H3 Opportunistic Containers in Web UI