YARN-6545. Followup fix for YARN-6405. Contributed by Jian He
This commit is contained in:
parent
399525c2e0
commit
ce05c6e981
@ -36,6 +36,10 @@ public interface ServiceApiConstants {
|
||||
|
||||
String SERVICE_NAME_LC = $("SERVICE_NAME.lc");
|
||||
|
||||
String USER = $("USER");
|
||||
|
||||
String DOMAIN = $("DOMAIN");
|
||||
|
||||
// Constants for component
|
||||
String COMPONENT_NAME = $("COMPONENT_NAME");
|
||||
|
||||
@ -47,4 +51,19 @@ public interface ServiceApiConstants {
|
||||
String COMPONENT_ID = $("COMPONENT_ID");
|
||||
|
||||
String CONTAINER_ID = $("CONTAINER_ID");
|
||||
|
||||
// Constants for default cluster ZK
|
||||
String CLUSTER_ZK_QUORUM = $("CLUSTER_ZK_QUORUM");
|
||||
|
||||
// URI for the default cluster fs
|
||||
String CLUSTER_FS_URI = $("CLUSTER_FS_URI");
|
||||
|
||||
// the host component of the cluster fs UI
|
||||
String CLUSTER_FS_HOST = $("CLUSTER_FS_HOST");
|
||||
|
||||
// Path in zookeeper for a specific service
|
||||
String SERVICE_ZK_PATH = $("SERVICE_ZK_PATH");
|
||||
|
||||
// Constants for service specific hdfs dir
|
||||
String SERVICE_HDFS_DIR = $("SERVICE_HDFS_DIR");
|
||||
}
|
||||
|
@ -105,10 +105,7 @@ public void setFiles(List<ConfigFile> files) {
|
||||
}
|
||||
|
||||
public long getPropertyLong(String name, long defaultValue) {
|
||||
if (name == null) {
|
||||
return defaultValue;
|
||||
}
|
||||
String value = properties.get(name.trim());
|
||||
String value = getProperty(name);
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
return defaultValue;
|
||||
}
|
||||
@ -116,10 +113,7 @@ public long getPropertyLong(String name, long defaultValue) {
|
||||
}
|
||||
|
||||
public int getPropertyInt(String name, int defaultValue) {
|
||||
if (name == null) {
|
||||
return defaultValue;
|
||||
}
|
||||
String value = properties.get(name.trim());
|
||||
String value = getProperty(name);
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
return defaultValue;
|
||||
}
|
||||
@ -127,10 +121,7 @@ public int getPropertyInt(String name, int defaultValue) {
|
||||
}
|
||||
|
||||
public boolean getPropertyBool(String name, boolean defaultValue) {
|
||||
if (name == null) {
|
||||
return defaultValue;
|
||||
}
|
||||
String value = properties.get(name.trim());
|
||||
String value = getProperty(name);
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
return defaultValue;
|
||||
}
|
||||
@ -138,10 +129,11 @@ public boolean getPropertyBool(String name, boolean defaultValue) {
|
||||
}
|
||||
|
||||
public String getProperty(String name, String defaultValue) {
|
||||
if (name == null) {
|
||||
String value = getProperty(name);
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
return defaultValue;
|
||||
}
|
||||
return properties.get(name.trim());
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setProperty(String name, String value) {
|
||||
@ -149,16 +141,10 @@ public void setProperty(String name, String value) {
|
||||
}
|
||||
|
||||
public String getProperty(String name) {
|
||||
if (name == null) {
|
||||
return null;
|
||||
}
|
||||
return properties.get(name.trim());
|
||||
}
|
||||
|
||||
public String getEnv(String name) {
|
||||
if (name == null) {
|
||||
return null;
|
||||
}
|
||||
return env.get(name.trim());
|
||||
}
|
||||
|
||||
|
@ -1112,18 +1112,20 @@ private int actionInstallResource(ActionResourceArgs resourceInfo)
|
||||
"not a directory", folder);
|
||||
}
|
||||
|
||||
for (File f : files) {
|
||||
srcFile = new Path(f.toURI());
|
||||
if (files != null) {
|
||||
for (File f : files) {
|
||||
srcFile = new Path(f.toURI());
|
||||
|
||||
Path fileInFs = new Path(pkgPath, srcFile.getName());
|
||||
log.info("Installing file {} at {} and overwrite is {}.",
|
||||
srcFile, fileInFs, resourceInfo.overwrite);
|
||||
require(!(sfs.exists(fileInFs) && !resourceInfo.overwrite),
|
||||
"File exists at %s. Use --overwrite to overwrite.", fileInFs.toUri());
|
||||
Path fileInFs = new Path(pkgPath, srcFile.getName());
|
||||
log.info("Installing file {} at {} and overwrite is {}.",
|
||||
srcFile, fileInFs, resourceInfo.overwrite);
|
||||
require(!(sfs.exists(fileInFs) && !resourceInfo.overwrite),
|
||||
"File exists at %s. Use --overwrite to overwrite.", fileInFs.toUri());
|
||||
|
||||
sfs.copyFromLocalFile(false, resourceInfo.overwrite, srcFile, fileInFs);
|
||||
sfs.setPermission(fileInFs,
|
||||
new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
|
||||
sfs.copyFromLocalFile(false, resourceInfo.overwrite, srcFile, fileInFs);
|
||||
sfs.setPermission(fileInFs,
|
||||
new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE));
|
||||
}
|
||||
}
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
|
@ -597,53 +597,6 @@ public void submitTarGzipAndUpdate(
|
||||
providerResources.put(SliderKeys.SLIDER_DEPENDENCY_LOCALIZED_DIR_LINK, lc);
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy local file(s) to destination HDFS directory. If {@code localPath} is a
|
||||
* local directory then all files matching the {@code filenameFilter}
|
||||
* (optional) are copied, otherwise {@code filenameFilter} is ignored.
|
||||
*
|
||||
* @param localPath
|
||||
* a local file or directory path
|
||||
* @param filenameFilter
|
||||
* if {@code localPath} is a directory then filenameFilter is used as
|
||||
* a filter (if specified)
|
||||
* @param destDir
|
||||
* the destination HDFS directory where the file(s) should be copied
|
||||
* @param fp
|
||||
* file permissions of all the directories and files that will be
|
||||
* created in this api
|
||||
* @throws IOException
|
||||
*/
|
||||
public void copyLocalFilesToHdfs(File localPath,
|
||||
FilenameFilter filenameFilter, Path destDir, FsPermission fp)
|
||||
throws IOException {
|
||||
if (localPath == null || destDir == null) {
|
||||
throw new IOException("Either localPath or destDir is null");
|
||||
}
|
||||
fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
|
||||
"000");
|
||||
fileSystem.mkdirs(destDir, fp);
|
||||
if (localPath.isDirectory()) {
|
||||
// copy all local files under localPath to destDir (honoring filename
|
||||
// filter if provided
|
||||
File[] localFiles = localPath.listFiles(filenameFilter);
|
||||
Path[] localFilePaths = new Path[localFiles.length];
|
||||
int i = 0;
|
||||
for (File localFile : localFiles) {
|
||||
localFilePaths[i++] = new Path(localFile.getPath());
|
||||
}
|
||||
log.info("Copying {} files from {} to {}", i, localPath.toURI(),
|
||||
destDir.toUri());
|
||||
fileSystem.copyFromLocalFile(false, true, localFilePaths, destDir);
|
||||
} else {
|
||||
log.info("Copying file {} to {}", localPath.toURI(), destDir.toUri());
|
||||
fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()),
|
||||
destDir);
|
||||
}
|
||||
// set permissions for all the files created in the destDir
|
||||
fileSystem.setPermission(destDir, fp);
|
||||
}
|
||||
|
||||
public void copyLocalFileToHdfs(File localPath,
|
||||
Path destPath, FsPermission fp)
|
||||
throws IOException {
|
||||
|
@ -25,7 +25,7 @@ public enum ConfigFormat {
|
||||
JSON("json"),
|
||||
PROPERTIES("properties"),
|
||||
XML("xml"),
|
||||
HADOOP_XML("hadoop-xml"),
|
||||
HADOOP_XML("hadoop_xml"),
|
||||
ENV("env"),
|
||||
TEMPLATE("template"),
|
||||
YAML("yaml"),
|
||||
|
@ -52,7 +52,7 @@ public class ZKIntegration implements Watcher, Closeable {
|
||||
public static final String SVC_SLIDER = "/" + ZK_SERVICES + "/" + ZK_SLIDER;
|
||||
public static final String SVC_SLIDER_USERS = SVC_SLIDER + "/" + ZK_USERS;
|
||||
|
||||
public static final List<String> ZK_USERS_PATH_LIST = new ArrayList<String>();
|
||||
private static final List<String> ZK_USERS_PATH_LIST = new ArrayList<String>();
|
||||
static {
|
||||
ZK_USERS_PATH_LIST.add(ZK_SERVICES);
|
||||
ZK_USERS_PATH_LIST.add(ZK_SLIDER);
|
||||
|
@ -20,10 +20,8 @@
|
||||
|
||||
import org.apache.slider.api.ResourceKeys;
|
||||
import org.apache.slider.api.resource.Component;
|
||||
import org.apache.slider.server.appmaster.state.AppState;
|
||||
import org.apache.slider.server.appmaster.state.RoleInstance;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
@ -44,8 +42,7 @@ public final class ProviderRole {
|
||||
public final String labelExpression;
|
||||
public final Component component;
|
||||
public AtomicLong componentIdCounter = null;
|
||||
public AppState appState;
|
||||
public Queue<String> failedInstanceName = new ConcurrentLinkedQueue<String>();
|
||||
public Queue<RoleInstance> failedInstances = new ConcurrentLinkedQueue<>();
|
||||
public ProviderRole(String name, int id) {
|
||||
this(name,
|
||||
id,
|
||||
@ -78,7 +75,7 @@ public ProviderRole(String name,
|
||||
nodeFailureThreshold,
|
||||
placementTimeoutSeconds,
|
||||
labelExpression,
|
||||
new Component().name(name).numberOfContainers(0L), null);
|
||||
new Component().name(name).numberOfContainers(0L));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -88,13 +85,13 @@ public ProviderRole(String name,
|
||||
* @param id ID. This becomes the YARN priority
|
||||
* @param policy placement policy
|
||||
* @param nodeFailureThreshold threshold for node failures (within a reset interval)
|
||||
* after which a node failure is considered an app failure
|
||||
* after which a node failure is considered an app failure
|
||||
* @param placementTimeoutSeconds for lax placement, timeout in seconds before
|
||||
* @param labelExpression label expression for requests; may be null
|
||||
*/
|
||||
public ProviderRole(String name, String group, int id, int policy,
|
||||
int nodeFailureThreshold, long placementTimeoutSeconds,
|
||||
String labelExpression, Component component, AppState state) {
|
||||
String labelExpression, Component component) {
|
||||
this.name = name;
|
||||
if (group == null) {
|
||||
this.group = name;
|
||||
@ -110,7 +107,6 @@ public ProviderRole(String name, String group, int id, int policy,
|
||||
if(component.getUniqueComponentSupport()) {
|
||||
componentIdCounter = new AtomicLong(0);
|
||||
}
|
||||
this.appState = state;
|
||||
}
|
||||
|
||||
|
||||
|
@ -32,7 +32,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.slider.api.ClusterNode;
|
||||
import org.apache.slider.api.OptionKeys;
|
||||
import org.apache.slider.api.ResourceKeys;
|
||||
import org.apache.slider.api.RoleKeys;
|
||||
import org.apache.slider.api.resource.Application;
|
||||
@ -59,7 +58,6 @@
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
@ -271,8 +269,8 @@ public static void addEnvForSubstitution(Map<String, String> env,
|
||||
// 2. Add the config file to localResource
|
||||
public synchronized void createConfigFileAndAddLocalResource(
|
||||
ContainerLauncher launcher, SliderFileSystem fs, Component component,
|
||||
Map<String, String> tokensForSubstitution, RoleInstance roleInstance)
|
||||
throws IOException {
|
||||
Map<String, String> tokensForSubstitution, RoleInstance roleInstance,
|
||||
StateAccessForProviders appState) throws IOException {
|
||||
Path compDir =
|
||||
new Path(new Path(fs.getAppDir(), "components"), component.getName());
|
||||
Path compInstanceDir =
|
||||
@ -315,12 +313,12 @@ public synchronized void createConfigFileAndAddLocalResource(
|
||||
case HADOOP_XML:
|
||||
// Hadoop_xml_template
|
||||
resolveHadoopXmlTemplateAndSaveOnHdfs(fs.getFileSystem(),
|
||||
tokensForSubstitution, configFile, remoteFile, roleInstance);
|
||||
tokensForSubstitution, configFile, remoteFile, appState);
|
||||
break;
|
||||
case TEMPLATE:
|
||||
// plain-template
|
||||
resolvePlainTemplateAndSaveOnHdfs(fs.getFileSystem(),
|
||||
tokensForSubstitution, configFile, remoteFile, roleInstance);
|
||||
tokensForSubstitution, configFile, remoteFile, appState);
|
||||
break;
|
||||
default:
|
||||
log.info("Not supporting loading src_file for " + configFile);
|
||||
@ -383,11 +381,11 @@ private void resolveNonTemplateConfigsAndSaveOnHdfs(SliderFileSystem fs,
|
||||
@SuppressWarnings("unchecked")
|
||||
private void resolveHadoopXmlTemplateAndSaveOnHdfs(FileSystem fs,
|
||||
Map<String, String> tokensForSubstitution, ConfigFile configFile,
|
||||
Path remoteFile, RoleInstance roleInstance) throws IOException {
|
||||
Path remoteFile, StateAccessForProviders appState) throws IOException {
|
||||
Map<String, String> conf;
|
||||
try {
|
||||
conf = (Map<String, String>) roleInstance.providerRole.
|
||||
appState.configFileCache.get(configFile);
|
||||
conf = (Map<String, String>) appState.getConfigFileCache()
|
||||
.get(configFile);
|
||||
} catch (ExecutionException e) {
|
||||
log.info("Failed to load config file: " + configFile, e);
|
||||
return;
|
||||
@ -426,17 +424,16 @@ private void resolveHadoopXmlTemplateAndSaveOnHdfs(FileSystem fs,
|
||||
// 3) save on hdfs
|
||||
private void resolvePlainTemplateAndSaveOnHdfs(FileSystem fs,
|
||||
Map<String, String> tokensForSubstitution, ConfigFile configFile,
|
||||
Path remoteFile, RoleInstance roleInstance) {
|
||||
Path remoteFile, StateAccessForProviders appState) {
|
||||
String content;
|
||||
try {
|
||||
content = (String) roleInstance.providerRole.appState.configFileCache
|
||||
.get(configFile);
|
||||
content = (String) appState.getConfigFileCache().get(configFile);
|
||||
} catch (ExecutionException e) {
|
||||
log.info("Failed to load config file: " + configFile, e);
|
||||
return;
|
||||
}
|
||||
// substitute tokens
|
||||
substituteStrWithTokens(content, tokensForSubstitution);
|
||||
content = substituteStrWithTokens(content, tokensForSubstitution);
|
||||
|
||||
try (OutputStream output = fs.create(remoteFile)) {
|
||||
org.apache.commons.io.IOUtils.write(content, output);
|
||||
@ -446,25 +443,13 @@ private void resolvePlainTemplateAndSaveOnHdfs(FileSystem fs,
|
||||
}
|
||||
|
||||
/**
|
||||
* Get initial token map to be substituted into config values.
|
||||
* @param appConf app configurations
|
||||
* @param clusterName app name
|
||||
* Get initial component token map to be substituted into config values.
|
||||
* @param roleInstance role instance
|
||||
* @return tokens to replace
|
||||
*/
|
||||
public Map<String, String> getStandardTokenMap(Configuration appConf,
|
||||
RoleInstance roleInstance, String clusterName) {
|
||||
|
||||
public Map<String, String> initCompTokensForSubstitute(
|
||||
RoleInstance roleInstance) {
|
||||
Map<String, String> tokens = new HashMap<>();
|
||||
|
||||
String nnuri = appConf.getProperty("fs.defaultFS");
|
||||
if (nnuri != null && !nnuri.isEmpty()) {
|
||||
tokens.put("${NN_URI}", nnuri);
|
||||
tokens.put("${NN_HOST}", URI.create(nnuri).getHost());
|
||||
}
|
||||
tokens.put("${ZK_HOST}", appConf.getProperty(OptionKeys.ZOOKEEPER_HOSTS));
|
||||
tokens.put("${DEFAULT_ZK_PATH}", appConf.getProperty(OptionKeys.ZOOKEEPER_PATH));
|
||||
tokens.put(SERVICE_NAME_LC, clusterName.toLowerCase());
|
||||
tokens.put(SERVICE_NAME, clusterName);
|
||||
tokens.put(COMPONENT_NAME, roleInstance.role);
|
||||
tokens.put(COMPONENT_NAME_LC, roleInstance.role.toLowerCase());
|
||||
tokens.put(COMPONENT_INSTANCE_NAME, roleInstance.getCompInstanceName());
|
||||
|
@ -88,10 +88,10 @@ public void buildContainerLaunchContext(ContainerLauncher launcher,
|
||||
|
||||
// Generate tokens (key-value pair) for config substitution.
|
||||
// Get pre-defined tokens
|
||||
Map<String, String> globalTokens = amState.getGlobalSubstitutionTokens();
|
||||
Map<String, String> tokensForSubstitution = providerUtils
|
||||
.getStandardTokenMap(application.getConfiguration(), roleInstance,
|
||||
application.getName());
|
||||
|
||||
.initCompTokensForSubstitute(roleInstance);
|
||||
tokensForSubstitution.putAll(globalTokens);
|
||||
// Set the environment variables in launcher
|
||||
launcher.putEnv(SliderUtils
|
||||
.buildEnvMap(component.getConfiguration(), tokensForSubstitution));
|
||||
@ -111,7 +111,7 @@ public void buildContainerLaunchContext(ContainerLauncher launcher,
|
||||
|
||||
// create config file on hdfs and add local resource
|
||||
providerUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
|
||||
component, tokensForSubstitution, roleInstance);
|
||||
component, tokensForSubstitution, roleInstance, amState);
|
||||
|
||||
// substitute launch command
|
||||
String launchCommand = ProviderUtils
|
||||
|
@ -150,10 +150,10 @@ public void run() {
|
||||
containerLauncher.setupUGI();
|
||||
containerLauncher.putEnv(envVars);
|
||||
|
||||
String failedInstance = role.failedInstanceName.poll();
|
||||
RoleInstance failedInstance = role.failedInstances.poll();
|
||||
RoleInstance instance;
|
||||
if (failedInstance != null) {
|
||||
instance = new RoleInstance(container, role, failedInstance);
|
||||
instance = new RoleInstance(container, failedInstance);
|
||||
} else {
|
||||
instance = new RoleInstance(container, role);
|
||||
}
|
||||
|
@ -819,6 +819,7 @@ private int createAndRunCluster(String appName) throws Throwable {
|
||||
binding.releaseSelector = new MostRecentContainerReleaseSelector();
|
||||
binding.nodeReports = nodeReports;
|
||||
binding.application = application;
|
||||
binding.serviceHdfsDir = fs.buildClusterDirPath(appName).toString();
|
||||
appState.buildInstance(binding);
|
||||
|
||||
// build up environment variables that the AM wants set in every container
|
||||
|
@ -24,11 +24,11 @@
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
@ -42,6 +42,7 @@
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.slider.api.ClusterNode;
|
||||
import org.apache.slider.api.InternalKeys;
|
||||
import org.apache.slider.api.ServiceApiConstants;
|
||||
import org.apache.slider.api.StatusKeys;
|
||||
import org.apache.slider.api.proto.Messages;
|
||||
import org.apache.slider.api.proto.Messages.ComponentCountProto;
|
||||
@ -61,6 +62,7 @@
|
||||
import org.apache.slider.core.exceptions.NoSuchNodeException;
|
||||
import org.apache.slider.core.exceptions.SliderInternalStateException;
|
||||
import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
|
||||
import org.apache.slider.core.zk.ZKIntegration;
|
||||
import org.apache.slider.providers.PlacementPolicy;
|
||||
import org.apache.slider.providers.ProviderRole;
|
||||
import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
|
||||
@ -75,6 +77,7 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
@ -89,7 +92,12 @@
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
|
||||
import static org.apache.hadoop.registry.client.api.RegistryConstants.DEFAULT_REGISTRY_ZK_QUORUM;
|
||||
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_DNS_DOMAIN;
|
||||
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
|
||||
import static org.apache.slider.api.ResourceKeys.*;
|
||||
import static org.apache.slider.api.ServiceApiConstants.*;
|
||||
import static org.apache.slider.api.StateValues.*;
|
||||
import static org.apache.slider.api.resource.ApplicationState.STARTED;
|
||||
|
||||
@ -193,14 +201,13 @@ public class AppState {
|
||||
private int containerMinMemory;
|
||||
|
||||
private RoleHistory roleHistory;
|
||||
private Configuration publishedProviderConf;
|
||||
private long startTimeThreshold;
|
||||
|
||||
private int failureThreshold = 10;
|
||||
private int nodeFailureThreshold = 3;
|
||||
|
||||
private String logServerURL = "";
|
||||
|
||||
public Map<String, String> globalTokens = new HashMap<>();
|
||||
/**
|
||||
* Selector of containers to release; application wide.
|
||||
*/
|
||||
@ -335,6 +342,7 @@ public synchronized void buildInstance(AppStateBindingInfo binding)
|
||||
DEFAULT_CONTAINER_FAILURE_THRESHOLD);
|
||||
nodeFailureThreshold = conf.getPropertyInt(NODE_FAILURE_THRESHOLD,
|
||||
DEFAULT_NODE_FAILURE_THRESHOLD);
|
||||
initGlobalTokensForSubstitute(binding);
|
||||
|
||||
//build the initial component list
|
||||
int priority = 1;
|
||||
@ -367,6 +375,34 @@ public synchronized void buildInstance(AppStateBindingInfo binding)
|
||||
createConfigFileCache(binding.fs);
|
||||
}
|
||||
|
||||
private void initGlobalTokensForSubstitute(AppStateBindingInfo binding)
|
||||
throws IOException {
|
||||
// ZK
|
||||
globalTokens.put(ServiceApiConstants.CLUSTER_ZK_QUORUM,
|
||||
binding.serviceConfig
|
||||
.getTrimmed(KEY_REGISTRY_ZK_QUORUM, DEFAULT_REGISTRY_ZK_QUORUM));
|
||||
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
globalTokens
|
||||
.put(SERVICE_ZK_PATH, ZKIntegration.mkClusterPath(user, app.getName()));
|
||||
|
||||
globalTokens.put(ServiceApiConstants.USER, user);
|
||||
String dnsDomain = binding.serviceConfig.getTrimmed(KEY_DNS_DOMAIN);
|
||||
if (dnsDomain != null && !dnsDomain.isEmpty()) {
|
||||
globalTokens.put(ServiceApiConstants.DOMAIN, dnsDomain);
|
||||
}
|
||||
// HDFS
|
||||
String clusterFs = binding.serviceConfig.getTrimmed(FS_DEFAULT_NAME_KEY);
|
||||
if (clusterFs != null && !clusterFs.isEmpty()) {
|
||||
globalTokens.put(ServiceApiConstants.CLUSTER_FS_URI, clusterFs);
|
||||
globalTokens.put(ServiceApiConstants.CLUSTER_FS_HOST,
|
||||
URI.create(clusterFs).getHost());
|
||||
}
|
||||
globalTokens.put(SERVICE_HDFS_DIR, binding.serviceHdfsDir);
|
||||
// service name
|
||||
globalTokens.put(SERVICE_NAME_LC, app.getName().toLowerCase());
|
||||
globalTokens.put(SERVICE_NAME, app.getName());
|
||||
}
|
||||
|
||||
private void createConfigFileCache(final FileSystem fileSystem) {
|
||||
this.configFileCache =
|
||||
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
|
||||
@ -411,7 +447,7 @@ public ProviderRole createComponent(String name, String group,
|
||||
DEF_YARN_LABEL_EXPRESSION);
|
||||
ProviderRole newRole =
|
||||
new ProviderRole(name, group, priority, (int)placementPolicy, threshold,
|
||||
placementTimeout, label, component, this);
|
||||
placementTimeout, label, component);
|
||||
buildRole(newRole, component);
|
||||
log.info("Created a new role " + newRole);
|
||||
return newRole;
|
||||
@ -1300,8 +1336,7 @@ public synchronized NodeCompletionResult onCompletedContainer(
|
||||
try {
|
||||
RoleStatus roleStatus = lookupRoleStatus(roleInstance.roleId);
|
||||
decRunningContainers(roleStatus);
|
||||
roleStatus.getProviderRole().failedInstanceName
|
||||
.offer(roleInstance.compInstanceName);
|
||||
roleStatus.getProviderRole().failedInstances.offer(roleInstance);
|
||||
boolean shortLived = isShortLived(roleInstance);
|
||||
String message;
|
||||
Container failedContainer = roleInstance.container;
|
||||
@ -1742,8 +1777,7 @@ private List<AbstractRMOperation> reviewOneRole(RoleStatus role)
|
||||
for (RoleInstance possible : finalCandidates) {
|
||||
log.info("Targeting for release: {}", possible);
|
||||
containerReleaseSubmitted(possible.container);
|
||||
role.getProviderRole().failedInstanceName
|
||||
.offer(possible.compInstanceName);
|
||||
role.getProviderRole().failedInstances.offer(possible);
|
||||
operations.add(new ContainerReleaseOperation(possible.getContainerId()));
|
||||
}
|
||||
}
|
||||
@ -1862,7 +1896,6 @@ public synchronized void onContainersAllocated(
|
||||
//get the role
|
||||
final ContainerId cid = container.getId();
|
||||
final RoleStatus role = lookupRoleStatus(container);
|
||||
decRequestedContainers(role);
|
||||
|
||||
//inc allocated count -this may need to be dropped in a moment,
|
||||
// but us needed to update the logic below
|
||||
@ -1888,6 +1921,7 @@ public synchronized void onContainersAllocated(
|
||||
role.getComponentMetrics().surplusContainers.incr();
|
||||
containersRunning.decr();
|
||||
} else {
|
||||
decRequestedContainers(role);
|
||||
log.info("Assigning role {} to container" + " {}," + " on {}:{},",
|
||||
roleName, cid, nodeId.getHost(), nodeId.getPort());
|
||||
|
||||
|
@ -25,6 +25,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.slider.api.resource.Application;
|
||||
import org.apache.slider.common.tools.CoreFileSystem;
|
||||
import org.apache.slider.providers.ProviderRole;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -45,6 +46,7 @@ public class AppStateBindingInfo {
|
||||
public Path historyPath;
|
||||
public List<Container> liveContainers = new ArrayList<>(0);
|
||||
public ContainerReleaseSelector releaseSelector = new SimpleReleaseSelector();
|
||||
public String serviceHdfsDir = "";
|
||||
/** node reports off the RM. */
|
||||
public List<NodeReport> nodeReports = new ArrayList<>(0);
|
||||
|
||||
|
@ -18,11 +18,13 @@
|
||||
|
||||
package org.apache.slider.server.appmaster.state;
|
||||
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.slider.api.ClusterNode;
|
||||
import org.apache.slider.api.resource.Application;
|
||||
import org.apache.slider.api.resource.ConfigFile;
|
||||
import org.apache.slider.api.types.ApplicationLivenessInformation;
|
||||
import org.apache.slider.api.types.ComponentInformation;
|
||||
import org.apache.slider.api.types.NodeInformation;
|
||||
@ -262,4 +264,14 @@ public NodeInformation getNodeInformation(String hostname) {
|
||||
public RoleStatistics getRoleStatistics() {
|
||||
return appState.getRoleStatistics();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getGlobalSubstitutionTokens() {
|
||||
return appState.globalTokens;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LoadingCache<ConfigFile, Object> getConfigFileCache() {
|
||||
return appState.configFileCache;
|
||||
}
|
||||
}
|
||||
|
@ -128,11 +128,11 @@ public RoleInstance(Container container, ProviderRole role) {
|
||||
this.providerRole = role;
|
||||
}
|
||||
|
||||
public RoleInstance(Container container, ProviderRole role,
|
||||
String compInstanceName) {
|
||||
public RoleInstance(Container container, RoleInstance failedInstance) {
|
||||
this(container);
|
||||
this.compInstanceName = compInstanceName;
|
||||
this.providerRole = role;
|
||||
this.componentId = failedInstance.componentId;
|
||||
this.compInstanceName = failedInstance.compInstanceName;
|
||||
this.providerRole = failedInstance.providerRole;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -18,12 +18,14 @@
|
||||
|
||||
package org.apache.slider.server.appmaster.state;
|
||||
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.slider.api.ClusterNode;
|
||||
import org.apache.slider.api.StatusKeys;
|
||||
import org.apache.slider.api.resource.Application;
|
||||
import org.apache.slider.api.resource.ConfigFile;
|
||||
import org.apache.slider.api.types.ApplicationLivenessInformation;
|
||||
import org.apache.slider.api.types.ComponentInformation;
|
||||
import org.apache.slider.api.types.NodeInformation;
|
||||
@ -260,4 +262,14 @@ List<RoleInstance> getLiveInstancesByContainerIDs(
|
||||
* @return role statistics
|
||||
*/
|
||||
RoleStatistics getRoleStatistics();
|
||||
|
||||
/**
|
||||
* Get global substitution tokens.
|
||||
*/
|
||||
Map<String, String> getGlobalSubstitutionTokens();
|
||||
|
||||
/**
|
||||
* Get config file cache.
|
||||
*/
|
||||
LoadingCache<ConfigFile, Object> getConfigFileCache();
|
||||
}
|
||||
|
@ -379,4 +379,52 @@ public void testDoubleNodeManagerStartEvent() throws Throwable {
|
||||
assertNull(ri3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoubleAllocate() throws Throwable {
|
||||
getRole0Status().setDesired(1);
|
||||
|
||||
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes();
|
||||
ContainerRequestOperation operation = (ContainerRequestOperation)ops.get(0);
|
||||
AMRMClient.ContainerRequest request = operation.getRequest();
|
||||
Container cont = engine.allocateContainer(request);
|
||||
List<Container> allocated = new ArrayList<>();
|
||||
allocated.add(cont);
|
||||
List<ContainerAssignment> assignments = new ArrayList<>();
|
||||
List<AbstractRMOperation> operations = new ArrayList<>();
|
||||
assertEquals(0L, getRole0Status().getRunning());
|
||||
assertEquals(1L, getRole0Status().getRequested());
|
||||
appState.onContainersAllocated(allocated, assignments, operations);
|
||||
|
||||
assertListLength(ops, 1);
|
||||
assertListLength(assignments, 1);
|
||||
ContainerAssignment assigned = assignments.get(0);
|
||||
Container target = assigned.container;
|
||||
assertEquals(target.getId(), cont.getId());
|
||||
int roleId = assigned.role.getPriority();
|
||||
assertEquals(roleId, extractRole(request.getPriority()));
|
||||
assertEquals(assigned.role.getName(), ROLE0);
|
||||
RoleInstance ri = roleInstance(assigned);
|
||||
//tell the app it arrived
|
||||
appState.containerStartSubmitted(target, ri);
|
||||
appState.innerOnNodeManagerContainerStarted(target.getId());
|
||||
assertEquals(1L, getRole0Status().getRunning());
|
||||
assertEquals(0L, getRole0Status().getRequested());
|
||||
|
||||
// now get an extra allocation that should be released
|
||||
cont = engine.allocateContainer(request);
|
||||
allocated = new ArrayList<>();
|
||||
allocated.add(cont);
|
||||
assignments = new ArrayList<>();
|
||||
operations = new ArrayList<>();
|
||||
appState.onContainersAllocated(allocated, assignments, operations);
|
||||
|
||||
assertListLength(operations, 1);
|
||||
assertTrue(operations.get(0) instanceof ContainerReleaseOperation);
|
||||
ContainerReleaseOperation release = (ContainerReleaseOperation)
|
||||
operations.get(0);
|
||||
assertEquals(release.getContainerId(), cont.getId());
|
||||
|
||||
assertEquals(1L, getRole0Status().getRunning());
|
||||
assertEquals(0L, getRole0Status().getRequested());
|
||||
}
|
||||
}
|
||||
|
@ -99,6 +99,7 @@ public static void verifyInstances(List<RoleInstance> instances, String
|
||||
assertEquals(roles[i], entry.getKey());
|
||||
RoleInstance instance = entry.getValue();
|
||||
assertEquals(roles[i], instance.compInstanceName);
|
||||
assertEquals(i, instance.componentId);
|
||||
assertEquals(group, instance.role);
|
||||
assertEquals(group, instance.providerRole.name);
|
||||
assertEquals(group, instance.providerRole.group);
|
||||
@ -129,7 +130,6 @@ public void testDynamicFlexDown() throws Throwable {
|
||||
createAndStartNodes();
|
||||
instances = appState.cloneOwnedContainerList();
|
||||
verifyInstances(instances, "group1", "group10", "group11", "group12");
|
||||
// fails because the names continue at N+1, with group12, group13, group14
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -176,11 +176,11 @@ public RoleStatus getRole2Status() {
|
||||
*/
|
||||
public RoleInstance roleInstance(ContainerAssignment assigned) {
|
||||
Container target = assigned.container;
|
||||
String failedInstance =
|
||||
assigned.role.getProviderRole().failedInstanceName.poll();
|
||||
RoleInstance failedInstance =
|
||||
assigned.role.getProviderRole().failedInstances.poll();
|
||||
RoleInstance ri;
|
||||
if (failedInstance != null) {
|
||||
ri = new RoleInstance(target, assigned.role.getProviderRole(), failedInstance);
|
||||
ri = new RoleInstance(target, failedInstance);
|
||||
} else {
|
||||
ri = new RoleInstance(target, assigned.role.getProviderRole());
|
||||
}
|
||||
|
@ -811,6 +811,8 @@ public String[] getIpAndHost(Container container) {
|
||||
LOG.error("Incorrect format for ip and host");
|
||||
return null;
|
||||
}
|
||||
// strip off quotes if any
|
||||
output = output.replaceAll("['\"]", "");
|
||||
String ips = output.substring(0, index).trim();
|
||||
String host = output.substring(index+1).trim();
|
||||
String[] ipAndHost = new String[2];
|
||||
|
Loading…
Reference in New Issue
Block a user