YARN-8372. Distributed shell app master should not release containers when shutdown if keep-container is true. (Suma Shivaprasad via wangda)
Change-Id: Ief04d1ca865621f348fba4ac85fa78bc47465904
This commit is contained in:
parent
931f78718f
commit
8956e5b8db
@ -269,6 +269,8 @@ public enum DSEntity {
|
|||||||
private String containerResourceProfile = "";
|
private String containerResourceProfile = "";
|
||||||
Map<String, Resource> resourceProfiles;
|
Map<String, Resource> resourceProfiles;
|
||||||
|
|
||||||
|
private boolean keepContainersAcrossAttempts = false;
|
||||||
|
|
||||||
// Counter for completed containers ( complete denotes successful or failed )
|
// Counter for completed containers ( complete denotes successful or failed )
|
||||||
private AtomicInteger numCompletedContainers = new AtomicInteger();
|
private AtomicInteger numCompletedContainers = new AtomicInteger();
|
||||||
// Allocated container count so that we know how many containers has the RM
|
// Allocated container count so that we know how many containers has the RM
|
||||||
@ -483,6 +485,13 @@ public boolean init(String[] args) throws ParseException, IOException {
|
|||||||
+ " the number of container retry attempts");
|
+ " the number of container retry attempts");
|
||||||
opts.addOption("placement_spec", true, "Placement specification");
|
opts.addOption("placement_spec", true, "Placement specification");
|
||||||
opts.addOption("debug", false, "Dump out debug information");
|
opts.addOption("debug", false, "Dump out debug information");
|
||||||
|
opts.addOption("keep_containers_across_application_attempts", false,
|
||||||
|
"Flag to indicate whether to keep containers across application "
|
||||||
|
+ "attempts."
|
||||||
|
+ " If the flag is true, running containers will not be killed when"
|
||||||
|
+ " application attempt fails and these containers will be "
|
||||||
|
+ "retrieved by"
|
||||||
|
+ " the new application attempt ");
|
||||||
|
|
||||||
opts.addOption("help", false, "Print usage");
|
opts.addOption("help", false, "Print usage");
|
||||||
CommandLine cliParser = new GnuParser().parse(opts, args);
|
CommandLine cliParser = new GnuParser().parse(opts, args);
|
||||||
@ -646,6 +655,9 @@ public boolean init(String[] args) throws ParseException, IOException {
|
|||||||
containerResourceProfile =
|
containerResourceProfile =
|
||||||
cliParser.getOptionValue("container_resource_profile", "");
|
cliParser.getOptionValue("container_resource_profile", "");
|
||||||
|
|
||||||
|
keepContainersAcrossAttempts = cliParser.hasOption(
|
||||||
|
"keep_containers_across_application_attempts");
|
||||||
|
|
||||||
if (this.placementSpecs == null) {
|
if (this.placementSpecs == null) {
|
||||||
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
|
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
|
||||||
"num_containers", "1"));
|
"num_containers", "1"));
|
||||||
@ -1152,9 +1164,15 @@ public void onRequestsRejected(List<RejectedSchedulingRequest> rejReqs) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override public void onShutdownRequest() {
|
||||||
public void onShutdownRequest() {
|
if (keepContainersAcrossAttempts) {
|
||||||
done = true;
|
LOG.info("Shutdown request received. Ignoring since "
|
||||||
|
+ "keep_containers_across_application_attempts is enabled");
|
||||||
|
} else{
|
||||||
|
LOG.info("Shutdown request received. Processing since "
|
||||||
|
+ "keep_containers_across_application_attempts is disabled");
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -332,10 +332,12 @@ public Client(Configuration conf) throws Exception {
|
|||||||
+ " containers to guaranteed.");
|
+ " containers to guaranteed.");
|
||||||
opts.addOption("log_properties", true, "log4j.properties file");
|
opts.addOption("log_properties", true, "log4j.properties file");
|
||||||
opts.addOption("keep_containers_across_application_attempts", false,
|
opts.addOption("keep_containers_across_application_attempts", false,
|
||||||
"Flag to indicate whether to keep containers across application attempts." +
|
"Flag to indicate whether to keep containers across application "
|
||||||
" If the flag is true, running containers will not be killed when" +
|
+ "attempts."
|
||||||
" application attempt fails and these containers will be retrieved by" +
|
+ " If the flag is true, running containers will not be killed when"
|
||||||
" the new application attempt ");
|
+ " application attempt fails and these containers will be "
|
||||||
|
+ "retrieved by"
|
||||||
|
+ " the new application attempt ");
|
||||||
opts.addOption("attempt_failures_validity_interval", true,
|
opts.addOption("attempt_failures_validity_interval", true,
|
||||||
"when attempt_failures_validity_interval in milliseconds is set to > 0," +
|
"when attempt_failures_validity_interval in milliseconds is set to > 0," +
|
||||||
"the failure number will not take failures which happen out of " +
|
"the failure number will not take failures which happen out of " +
|
||||||
@ -891,6 +893,10 @@ public boolean run() throws IOException, YarnException {
|
|||||||
}
|
}
|
||||||
vargs.add("--priority " + String.valueOf(shellCmdPriority));
|
vargs.add("--priority " + String.valueOf(shellCmdPriority));
|
||||||
|
|
||||||
|
if (keepContainers) {
|
||||||
|
vargs.add("--keep_containers_across_application_attempts");
|
||||||
|
}
|
||||||
|
|
||||||
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
|
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
|
||||||
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
|
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user