YARN-9191. Add cli option in DS to support enforceExecutionType in resource requests. Contributed by Abhishek Modi.

This commit is contained in:
Giovanni Matteo Fumarola 2019-01-31 11:24:15 -08:00
parent bcc3a79f58
commit f738b397ae
4 changed files with 115 additions and 1 deletions

View File

@ -271,6 +271,8 @@ public enum DSEntity {
private ExecutionType containerType = ExecutionType.GUARANTEED; private ExecutionType containerType = ExecutionType.GUARANTEED;
// Whether to automatically promote opportunistic containers. // Whether to automatically promote opportunistic containers.
private boolean autoPromoteContainers = false; private boolean autoPromoteContainers = false;
// Whether to enforce execution type of the containers.
private boolean enforceExecType = false;
// Resource profile for the container // Resource profile for the container
private String containerResourceProfile = ""; private String containerResourceProfile = "";
@ -466,6 +468,8 @@ public boolean init(String[] args) throws ParseException, IOException {
opts.addOption("promote_opportunistic_after_start", false, opts.addOption("promote_opportunistic_after_start", false,
"Flag to indicate whether to automatically promote opportunistic" "Flag to indicate whether to automatically promote opportunistic"
+ " containers to guaranteed."); + " containers to guaranteed.");
opts.addOption("enforce_execution_type", false,
"Flag to indicate whether to enforce execution type of containers");
opts.addOption("container_memory", true, opts.addOption("container_memory", true,
"Amount of memory in MB to be requested to run the shell command"); "Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true, opts.addOption("container_vcores", true,
@ -661,6 +665,9 @@ public boolean init(String[] args) throws ParseException, IOException {
if (cliParser.hasOption("promote_opportunistic_after_start")) { if (cliParser.hasOption("promote_opportunistic_after_start")) {
autoPromoteContainers = true; autoPromoteContainers = true;
} }
if (cliParser.hasOption("enforce_execution_type")) {
enforceExecType = true;
}
containerMemory = Integer.parseInt(cliParser.getOptionValue( containerMemory = Integer.parseInt(cliParser.getOptionValue(
"container_memory", "-1")); "container_memory", "-1"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
@ -1560,7 +1567,7 @@ private ContainerRequest setupContainerAskForRM() {
ContainerRequest request = new ContainerRequest( ContainerRequest request = new ContainerRequest(
getTaskResourceCapability(), getTaskResourceCapability(),
null, null, pri, 0, true, null, null, null, pri, 0, true, null,
ExecutionTypeRequest.newInstance(containerType), ExecutionTypeRequest.newInstance(containerType, enforceExecType),
containerResourceProfile); containerResourceProfile);
LOG.info("Requested container ask: " + request.toString()); LOG.info("Requested container ask: " + request.toString());
return request; return request;

View File

@ -192,6 +192,8 @@ public class Client {
private ExecutionType containerType = ExecutionType.GUARANTEED; private ExecutionType containerType = ExecutionType.GUARANTEED;
// Whether to auto promote opportunistic containers // Whether to auto promote opportunistic containers
private boolean autoPromoteContainers = false; private boolean autoPromoteContainers = false;
// Whether to enforce execution type of containers
private boolean enforceExecType = false;
// Placement specification // Placement specification
private String placementSpec = ""; private String placementSpec = "";
@ -337,6 +339,8 @@ public Client(Configuration conf) throws Exception {
opts.addOption("promote_opportunistic_after_start", false, opts.addOption("promote_opportunistic_after_start", false,
"Flag to indicate whether to automatically promote opportunistic" "Flag to indicate whether to automatically promote opportunistic"
+ " containers to guaranteed."); + " containers to guaranteed.");
opts.addOption("enforce_execution_type", false,
"Flag to indicate whether to enforce execution type of containers");
opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("keep_containers_across_application_attempts", false, opts.addOption("keep_containers_across_application_attempts", false,
"Flag to indicate whether to keep containers across application " "Flag to indicate whether to keep containers across application "
@ -532,6 +536,9 @@ public boolean init(String[] args) throws ParseException {
if (cliParser.hasOption("promote_opportunistic_after_start")) { if (cliParser.hasOption("promote_opportunistic_after_start")) {
autoPromoteContainers = true; autoPromoteContainers = true;
} }
if (cliParser.hasOption("enforce_execution_type")) {
enforceExecType = true;
}
containerMemory = containerMemory =
Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); Integer.parseInt(cliParser.getOptionValue("container_memory", "-1"));
containerVirtualCores = containerVirtualCores =
@ -926,6 +933,9 @@ public boolean run() throws IOException, YarnException {
if (autoPromoteContainers) { if (autoPromoteContainers) {
vargs.add("--promote_opportunistic_after_start"); vargs.add("--promote_opportunistic_after_start");
} }
if (enforceExecType) {
vargs.add("--enforce_execution_type");
}
if (containerMemory > 0) { if (containerMemory > 0) {
vargs.add("--container_memory " + String.valueOf(containerMemory)); vargs.add("--container_memory " + String.valueOf(containerMemory));
} }

View File

@ -39,6 +39,7 @@
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Supplier;
import org.apache.commons.cli.MissingArgumentException; import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -53,6 +54,7 @@
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -63,6 +65,7 @@
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -174,6 +177,8 @@ private void setupInternal(int numNodeManager, float timelineVersion)
true); true);
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
true); true);
conf.setBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
// ATS version specific settings // ATS version specific settings
if (timelineVersion == 1.0f) { if (timelineVersion == 1.0f) {
@ -1469,6 +1474,97 @@ public void testDSShellWithOpportunisticContainers() throws Exception {
} }
} }
@Test
@TimelineVersion(2.0f)
public void testDSShellWithEnforceExecutionType() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig()));
try {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"2",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--shell_command",
"date",
"--container_type",
"OPPORTUNISTIC",
"--enforce_execution_type"
};
client.init(args);
final AtomicBoolean result = new AtomicBoolean(false);
Thread t = new Thread() {
public void run() {
try {
result.set(client.run());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
t.start();
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration(yarnCluster.getConfig()));
yarnClient.start();
waitForContainersLaunch(yarnClient, 2);
List<ApplicationReport> apps = yarnClient.getApplications();
ApplicationReport appReport = apps.get(0);
ApplicationId appId = appReport.getApplicationId();
List<ApplicationAttemptReport> appAttempts =
yarnClient.getApplicationAttempts(appId);
ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
ApplicationAttemptId appAttemptId =
appAttemptReport.getApplicationAttemptId();
List<ContainerReport> containers =
yarnClient.getContainers(appAttemptId);
// we should get two containers.
Assert.assertEquals(2, containers.size());
ContainerId amContainerId = appAttemptReport.getAMContainerId();
for (ContainerReport container : containers) {
if (!container.getContainerId().equals(amContainerId)) {
Assert.assertEquals(container.getExecutionType(),
ExecutionType.OPPORTUNISTIC);
}
}
} catch (Exception e) {
Assert.fail("Job execution with enforce execution type failed.");
}
}
private void waitForContainersLaunch(YarnClient client,
int nContainers) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
try {
List<ApplicationReport> apps = client.getApplications();
if (apps == null || apps.isEmpty()) {
return false;
}
ApplicationId appId = apps.get(0).getApplicationId();
List<ApplicationAttemptReport> appAttempts =
client.getApplicationAttempts(appId);
if (appAttempts == null || appAttempts.isEmpty()) {
return false;
}
ApplicationAttemptId attemptId =
appAttempts.get(0).getApplicationAttemptId();
List<ContainerReport> containers = client.getContainers(attemptId);
return (containers.size() == nContainers);
} catch (Exception e) {
return false;
}
}
}, 10, 60000);
}
@Test @Test
@TimelineVersion(2.0f) @TimelineVersion(2.0f)
public void testDistributedShellWithResources() throws Exception { public void testDistributedShellWithResources() throws Exception {

View File

@ -84,6 +84,7 @@ $ yarn org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/ha
``` ```
By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`. By adding flag `-promote_opportunistic_after_start` to the above command, application master will attempt to promote all opportunistic containers to guaranteed once they are started. By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in the above command, we can specify the tasks to be running in opportunistic or guaranteed containers. The default type is `GUARANTEED`. By adding flag `-promote_opportunistic_after_start` to the above command, application master will attempt to promote all opportunistic containers to guaranteed once they are started.
By adding flag '-enforce_execution_type' to the above command, scheduler will honor execution type of the containers.
$H3 Opportunistic Containers in Web UI $H3 Opportunistic Containers in Web UI