YARN-1566. Changed Distributed Shell to retain containers across application attempts. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1557322 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-01-11 07:42:16 +00:00
parent 25bc68d15e
commit aa0c489a28
6 changed files with 155 additions and 17 deletions

View File

@ -67,6 +67,9 @@ Release 2.4.0 - UNRELEASED
ability in ResourceManager to optionally not kill containers when the
ApplicationMaster exits. (Jian He via vinodkv)
YARN-1566. Changed Distributed Shell to retain containers across application
attempts. (Jian He via vinodkv)
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

View File

@ -37,7 +37,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@ -89,6 +88,8 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.annotations.VisibleForTesting;
/**
* An ApplicationMaster for executing shell commands on a set of launched
* containers using the YARN framework.
@ -169,7 +170,8 @@ public class ApplicationMaster {
private NMCallbackHandler containerListener;
// Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID;
@VisibleForTesting
protected ApplicationAttemptId appAttemptID;
// TODO
// For status update for clients - yet to be implemented
@ -194,13 +196,15 @@ public class ApplicationMaster {
private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM
// allocated to us
private AtomicInteger numAllocatedContainers = new AtomicInteger();
@VisibleForTesting
protected AtomicInteger numAllocatedContainers = new AtomicInteger();
// Count of failed containers
private AtomicInteger numFailedContainers = new AtomicInteger();
// Count of containers already requested from the RM
// Needed as once requested, we should not request for containers again.
// Only request for more if the original requirement changes.
private AtomicInteger numRequestedContainers = new AtomicInteger();
@VisibleForTesting
protected AtomicInteger numRequestedContainers = new AtomicInteger();
// Shell command to be executed
private String shellCommand = "";
@ -251,6 +255,7 @@ public static void main(String[] args) {
System.exit(0);
}
result = appMaster.run();
appMaster.finish();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
System.exit(1);
@ -537,26 +542,25 @@ public boolean run() throws YarnException, IOException {
containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempt();
LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration.");
numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
int numTotalContainersToRequest =
numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
for (int i = 0; i < numTotalContainers; ++i) {
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
numRequestedContainers.set(numTotalContainers);
while (!done
&& (numCompletedContainers.get() != numTotalContainers)) {
try {
Thread.sleep(200);
} catch (InterruptedException ex) {}
}
finish();
numRequestedContainers.set(numTotalContainersToRequest);
return success;
}
@ -565,7 +569,15 @@ NMCallbackHandler createNMCallbackHandler() {
return new NMCallbackHandler(this);
}
private void finish() {
protected void finish() {
// wait for completion.
while (!done
&& (numCompletedContainers.get() != numTotalContainers)) {
try {
Thread.sleep(200);
} catch (InterruptedException ex) {}
}
// Join all launched threads
// needed for when we time out
// and we need to release containers

View File

@ -162,6 +162,9 @@ public class Client {
// Timeout threshold for client. Kill app after time interval expires.
private long clientTimeout = 600000;
// flag to indicate whether to keep containers across application attempts.
private boolean keepContainers = false;
// Debug flag
boolean debugFlag = false;
@ -243,6 +246,11 @@ public Client(Configuration conf) throws Exception {
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("keep_containers_across_application_attempts", false,
"Flag to indicate whether to keep containers across application attempts." +
" If the flag is true, running containers will not be killed when" +
" application attempt fails and these containers will be retrieved by" +
" the new application attempt ");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage");
@ -294,12 +302,17 @@ public boolean init(String[] args) throws ParseException {
}
if (cliParser.hasOption("keep_containers_across_application_attempts")) {
LOG.info("keep_containers_across_application_attempts");
keepContainers = true;
}
appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
if (amMemory < 0) {
throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
+ " Specified memory=" + amMemory);
@ -442,6 +455,8 @@ public boolean run() throws IOException, YarnException {
// set the application name
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
// Set up the container launch context for the application master

View File

@ -67,6 +67,7 @@ public static void main(String[] args) {
System.exit(0);
}
result = appMaster.run();
appMaster.finish();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
System.exit(1);

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.exceptions.YarnException;
public class TestDSFailedAppMaster extends ApplicationMaster {
private static final Log LOG = LogFactory.getLog(TestDSFailedAppMaster.class);
@Override
public boolean run() throws YarnException, IOException {
boolean res = super.run();
// for the 2nd attempt.
if (appAttemptID.getAttemptId() == 2) {
// should reuse the earlier running container, so numAllocatedContainers
// should be set to 1. And should ask no more containers, so
// numRequestedContainers should be set to 0.
if (numAllocatedContainers.get() != 1
|| numRequestedContainers.get() != 0) {
LOG.info("Application Master failed. exiting");
System.exit(200);
}
}
return res;
}
public static void main(String[] args) {
boolean result = false;
try {
TestDSFailedAppMaster appMaster = new TestDSFailedAppMaster();
boolean doRun = appMaster.init(args);
if (!doRun) {
System.exit(0);
}
result = appMaster.run();
if (appMaster.appAttemptID.getAttemptId() == 1) {
try {
// sleep some time, wait for the AM to launch a container.
Thread.sleep(3000);
} catch (InterruptedException e) {}
// fail the first am.
System.exit(100);
}
appMaster.finish();
} catch (Throwable t) {
System.exit(1);
}
if (result) {
LOG.info("Application Master completed successfully. exiting");
System.exit(0);
} else {
LOG.info("Application Master failed. exiting");
System.exit(2);
}
}
}

View File

@ -174,6 +174,35 @@ public void run() {
}
@Test(timeout=90000)
public void testDSRestartWithPreviousRunningContainers() throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"1",
"--shell_command",
Shell.WINDOWS ? "timeout 8" : "sleep 8",
"--master_memory",
"512",
"--container_memory",
"128",
"--keep_containers_across_application_attempts"
};
LOG.info("Initializing DS Client");
Client client = new Client(TestDSFailedAppMaster.class.getName(),
new Configuration(yarnCluster.getConfig()));
client.init(args);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
// application should succeed
Assert.assertTrue(result);
}
@Test(timeout=90000)
public void testDSShellWithCustomLogPropertyFile() throws Exception {
final File basedir =