YARN-5735. Make the service REST API use the app timeout feature YARN-4205. Contributed by Jian He
This commit is contained in:
parent
7da243ebe0
commit
4ec1cbe86d
@ -347,7 +347,7 @@ private String createSliderApp(Application application,
|
|||||||
if (queueName != null && queueName.trim().length() > 0) {
|
if (queueName != null && queueName.trim().length() > 0) {
|
||||||
createArgs.queue = queueName.trim();
|
createArgs.queue = queueName.trim();
|
||||||
}
|
}
|
||||||
|
createArgs.lifetime = application.getLifetime();
|
||||||
return invokeSliderClientRunnable(new SliderClientContextRunnable<String>() {
|
return invokeSliderClientRunnable(new SliderClientContextRunnable<String>() {
|
||||||
@Override
|
@Override
|
||||||
public String run(SliderClient sliderClient) throws YarnException,
|
public String run(SliderClient sliderClient) throws YarnException,
|
||||||
@ -1246,13 +1246,17 @@ public Response run(SliderClient sliderClient) throws YarnException,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private Response startSliderApplication(final String appName)
|
private Response startSliderApplication(final String appName, Application app)
|
||||||
throws IOException, YarnException, InterruptedException {
|
throws IOException, YarnException, InterruptedException {
|
||||||
return invokeSliderClientRunnable(new SliderClientContextRunnable<Response>() {
|
return invokeSliderClientRunnable(new SliderClientContextRunnable<Response>() {
|
||||||
@Override
|
@Override
|
||||||
public Response run(SliderClient sliderClient) throws YarnException,
|
public Response run(SliderClient sliderClient) throws YarnException,
|
||||||
IOException, InterruptedException {
|
IOException, InterruptedException {
|
||||||
ActionThawArgs thawArgs = new ActionThawArgs();
|
ActionThawArgs thawArgs = new ActionThawArgs();
|
||||||
|
if (app.getLifetime() == null) {
|
||||||
|
app.setLifetime(DEFAULT_UNLIMITED_LIFETIME);
|
||||||
|
}
|
||||||
|
thawArgs.lifetime = app.getLifetime();
|
||||||
int returnCode = sliderClient.actionThaw(appName, thawArgs);
|
int returnCode = sliderClient.actionThaw(appName, thawArgs);
|
||||||
if (returnCode == 0) {
|
if (returnCode == 0) {
|
||||||
logger.info("Successfully started application {}", appName);
|
logger.info("Successfully started application {}", appName);
|
||||||
@ -1344,7 +1348,7 @@ public Response updateApplication(@PathParam("app_name") String appName,
|
|||||||
try {
|
try {
|
||||||
int livenessCheck = getSliderList(appName);
|
int livenessCheck = getSliderList(appName);
|
||||||
if (livenessCheck != 0) {
|
if (livenessCheck != 0) {
|
||||||
return startSliderApplication(appName);
|
return startSliderApplication(appName, updateAppData);
|
||||||
} else {
|
} else {
|
||||||
logger.info("Application {} is already running", appName);
|
logger.info("Application {} is already running", appName);
|
||||||
ApplicationStatus applicationStatus = new ApplicationStatus();
|
ApplicationStatus applicationStatus = new ApplicationStatus();
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
@ -734,7 +735,7 @@ public int actionCreate(String clustername, ActionCreateArgs createArgs) throws
|
|||||||
sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
|
sliderFileSystem.getFileSystem().delete(clusterDirectory, true);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
return startCluster(clustername, createArgs);
|
return startCluster(clustername, createArgs, createArgs.lifetime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1960,14 +1961,13 @@ public void verifyBindingsDefined() throws BadCommandArgumentsException {
|
|||||||
*
|
*
|
||||||
* @param clustername name of the cluster.
|
* @param clustername name of the cluster.
|
||||||
* @param launchArgs launch arguments
|
* @param launchArgs launch arguments
|
||||||
|
* @param lifetime
|
||||||
* @return the exit code
|
* @return the exit code
|
||||||
* @throws YarnException
|
* @throws YarnException
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
protected int startCluster(String clustername,
|
protected int startCluster(String clustername, LaunchArgsAccessor launchArgs,
|
||||||
LaunchArgsAccessor launchArgs) throws
|
long lifetime) throws YarnException, IOException {
|
||||||
YarnException,
|
|
||||||
IOException {
|
|
||||||
Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
|
Path clusterDirectory = sliderFileSystem.buildClusterDirPath(clustername);
|
||||||
AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
|
AggregateConf instanceDefinition = loadInstanceDefinitionUnresolved(
|
||||||
clustername,
|
clustername,
|
||||||
@ -1975,7 +1975,7 @@ protected int startCluster(String clustername,
|
|||||||
|
|
||||||
LaunchedApplication launchedApplication =
|
LaunchedApplication launchedApplication =
|
||||||
launchApplication(clustername, clusterDirectory, instanceDefinition,
|
launchApplication(clustername, clusterDirectory, instanceDefinition,
|
||||||
serviceArgs.isDebug());
|
serviceArgs.isDebug(), lifetime);
|
||||||
|
|
||||||
if (launchArgs.getOutputFile() != null) {
|
if (launchArgs.getOutputFile() != null) {
|
||||||
// output file has been requested. Get the app report and serialize it
|
// output file has been requested. Get the app report and serialize it
|
||||||
@ -2044,9 +2044,8 @@ public AggregateConf loadInstanceDefinition(String name,
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected AppMasterLauncher setupAppMasterLauncher(String clustername,
|
protected AppMasterLauncher setupAppMasterLauncher(String clustername,
|
||||||
Path clusterDirectory,
|
Path clusterDirectory, AggregateConf instanceDefinition, boolean debugAM,
|
||||||
AggregateConf instanceDefinition,
|
long lifetime)
|
||||||
boolean debugAM)
|
|
||||||
throws YarnException, IOException{
|
throws YarnException, IOException{
|
||||||
deployedClusterName = clustername;
|
deployedClusterName = clustername;
|
||||||
validateClusterName(clustername);
|
validateClusterName(clustername);
|
||||||
@ -2119,7 +2118,10 @@ protected AppMasterLauncher setupAppMasterLauncher(String clustername,
|
|||||||
ApplicationId appId = amLauncher.getApplicationId();
|
ApplicationId appId = amLauncher.getApplicationId();
|
||||||
// set the application name;
|
// set the application name;
|
||||||
amLauncher.setKeepContainersOverRestarts(true);
|
amLauncher.setKeepContainersOverRestarts(true);
|
||||||
|
// set lifetime in submission context;
|
||||||
|
Map<ApplicationTimeoutType, Long> appTimeout = new HashMap<>();
|
||||||
|
appTimeout.put(ApplicationTimeoutType.LIFETIME, lifetime);
|
||||||
|
amLauncher.submissionContext.setApplicationTimeouts(appTimeout);
|
||||||
int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
|
int maxAppAttempts = config.getInt(KEY_AM_RESTART_LIMIT, 0);
|
||||||
amLauncher.setMaxAppAttempts(maxAppAttempts);
|
amLauncher.setMaxAppAttempts(maxAppAttempts);
|
||||||
|
|
||||||
@ -2383,20 +2385,19 @@ protected AppMasterLauncher setupAppMasterLauncher(String clustername,
|
|||||||
* @param clusterDirectory cluster dir
|
* @param clusterDirectory cluster dir
|
||||||
* @param instanceDefinition the instance definition
|
* @param instanceDefinition the instance definition
|
||||||
* @param debugAM enable debug AM options
|
* @param debugAM enable debug AM options
|
||||||
|
* @param lifetime
|
||||||
* @return the launched application
|
* @return the launched application
|
||||||
* @throws YarnException
|
* @throws YarnException
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public LaunchedApplication launchApplication(String clustername,
|
public LaunchedApplication launchApplication(String clustername, Path clusterDirectory,
|
||||||
Path clusterDirectory,
|
AggregateConf instanceDefinition, boolean debugAM, long lifetime)
|
||||||
AggregateConf instanceDefinition,
|
|
||||||
boolean debugAM)
|
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
|
|
||||||
AppMasterLauncher amLauncher = setupAppMasterLauncher(clustername,
|
AppMasterLauncher amLauncher = setupAppMasterLauncher(clustername,
|
||||||
clusterDirectory,
|
clusterDirectory,
|
||||||
instanceDefinition,
|
instanceDefinition,
|
||||||
debugAM);
|
debugAM, lifetime);
|
||||||
|
|
||||||
applicationId = amLauncher.getApplicationId();
|
applicationId = amLauncher.getApplicationId();
|
||||||
log.info("Submitting application {}", applicationId);
|
log.info("Submitting application {}", applicationId);
|
||||||
@ -3254,7 +3255,7 @@ public int actionThaw(String clustername, ActionThawArgs thaw) throws YarnExcept
|
|||||||
verifyNoLiveClusters(clustername, "Start");
|
verifyNoLiveClusters(clustername, "Start");
|
||||||
|
|
||||||
//start the cluster
|
//start the cluster
|
||||||
return startCluster(clustername, thaw);
|
return startCluster(clustername, thaw, thaw.lifetime);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -102,6 +102,11 @@ public abstract class AbstractClusterBuildingActionArgs extends
|
|||||||
description = "Queue to submit the application")
|
description = "Queue to submit the application")
|
||||||
public String queue;
|
public String queue;
|
||||||
|
|
||||||
|
@Parameter(names = {ARG_LIFETIME},
|
||||||
|
description = "Life time of the application since application started at"
|
||||||
|
+ " running state")
|
||||||
|
public long lifetime;
|
||||||
|
|
||||||
@ParametersDelegate
|
@ParametersDelegate
|
||||||
public ComponentArgsDelegate componentDelegate = new ComponentArgsDelegate();
|
public ComponentArgsDelegate componentDelegate = new ComponentArgsDelegate();
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.slider.common.params;
|
package org.apache.slider.common.params;
|
||||||
|
|
||||||
|
import com.beust.jcommander.Parameter;
|
||||||
import com.beust.jcommander.Parameters;
|
import com.beust.jcommander.Parameters;
|
||||||
import com.beust.jcommander.ParametersDelegate;
|
import com.beust.jcommander.ParametersDelegate;
|
||||||
|
|
||||||
@ -43,6 +44,11 @@ public int getWaittime() {
|
|||||||
@ParametersDelegate
|
@ParametersDelegate
|
||||||
LaunchArgsDelegate launchArgs = new LaunchArgsDelegate();
|
LaunchArgsDelegate launchArgs = new LaunchArgsDelegate();
|
||||||
|
|
||||||
|
@Parameter(names = {ARG_LIFETIME},
|
||||||
|
description = "Life time of the application since application started at"
|
||||||
|
+ " running state")
|
||||||
|
public long lifetime;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getRmAddress() {
|
public String getRmAddress() {
|
||||||
return launchArgs.getRmAddress();
|
return launchArgs.getRmAddress();
|
||||||
|
@ -103,6 +103,7 @@ public interface Arguments {
|
|||||||
String ARG_PRINCIPAL = "--principal";
|
String ARG_PRINCIPAL = "--principal";
|
||||||
String ARG_PROVIDER = "--provider";
|
String ARG_PROVIDER = "--provider";
|
||||||
String ARG_QUEUE = "--queue";
|
String ARG_QUEUE = "--queue";
|
||||||
|
String ARG_LIFETIME = "--lifetime";
|
||||||
String ARG_REPLACE_PKG = "--replacepkg";
|
String ARG_REPLACE_PKG = "--replacepkg";
|
||||||
String ARG_RESOURCE = "--resource";
|
String ARG_RESOURCE = "--resource";
|
||||||
String ARG_RESOURCES = "--resources";
|
String ARG_RESOURCES = "--resources";
|
||||||
|
Loading…
Reference in New Issue
Block a user