YARN-6405. Improve configuring services through REST API. Contributed by Jian He

This commit is contained in:
Billie Rinaldi 2017-04-26 08:44:38 -07:00 committed by Jian He
parent 384ee13eec
commit 845764da14
32 changed files with 851 additions and 526 deletions

View File

@ -347,10 +347,10 @@ definitions:
description: The absolute path that this configuration file should be mounted as, in the application container.
src_file:
type: string
description: Required for type template. This provides the source location of the template which needs to be mounted as dest_file post property substitutions. Typically the src_file would point to a source controlled network accessible file maintained by tools like puppet, chef, etc.
description: This provides the source location of the configuration file, the content of which is dumped to dest_file post property substitutions, in the format as specified in type. Typically the src_file would point to a source controlled network accessible file maintained by tools like puppet, chef, or hdfs etc. Currently, only hdfs is supported.
props:
type: object
description: A blob of key value pairs that will be dumped in the dest_file in the format as specified in type. If the type is template then the attribute src_file is mandatory and the src_file content is dumped to dest_file post property substitutions.
description: A blob of key value pairs that will be dumped in the dest_file in the format as specified in type. If src_file is specified, src_file content are dumped in the dest_file and these properties will overwrite, if any, existing properties in src_file or be added as new properties in src_file.
Container:
description: An instance of a running application container.
properties:

View File

@ -63,7 +63,7 @@ public class TestApplicationApiService {
// no name
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
Assert.fail(EXCEPTION_PREFIX + "application with no name");
} catch (IllegalArgumentException e) {
Assert.assertEquals(ERROR_APPLICATION_NAME_INVALID, e.getMessage());
@ -74,7 +74,7 @@ public class TestApplicationApiService {
for (String badName : badNames) {
app.setName(badName);
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
Assert.fail(EXCEPTION_PREFIX + "application with bad name " + badName);
} catch (IllegalArgumentException e) {
Assert.assertEquals(ERROR_APPLICATION_NAME_INVALID_FORMAT,
@ -85,7 +85,7 @@ public class TestApplicationApiService {
// no artifact
app.setName("finance_home");
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
Assert.fail(EXCEPTION_PREFIX + "application with no artifact");
} catch (IllegalArgumentException e) {
Assert.assertEquals(ERROR_ARTIFACT_INVALID, e.getMessage());
@ -95,7 +95,7 @@ public class TestApplicationApiService {
Artifact artifact = new Artifact();
app.setArtifact(artifact);
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
Assert.fail(EXCEPTION_PREFIX + "application with no artifact id");
} catch (IllegalArgumentException e) {
Assert.assertEquals(ERROR_ARTIFACT_ID_INVALID, e.getMessage());
@ -106,7 +106,7 @@ public class TestApplicationApiService {
artifact.setId("app.io/hbase:facebook_0.2");
app.setNumberOfContainers(5l);
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
} catch (IllegalArgumentException e) {
logger.error("application attributes specified should be valid here", e);
Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
@ -128,7 +128,7 @@ public class TestApplicationApiService {
// resource not specified
artifact.setId("docker.io/centos:centos7");
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
Assert.fail(EXCEPTION_PREFIX + "application with no resource");
} catch (IllegalArgumentException e) {
Assert.assertEquals(ERROR_RESOURCE_INVALID, e.getMessage());
@ -138,7 +138,7 @@ public class TestApplicationApiService {
Resource res = new Resource();
app.setResource(res);
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
Assert.fail(EXCEPTION_PREFIX + "application with no memory");
} catch (IllegalArgumentException e) {
Assert.assertEquals(ERROR_RESOURCE_MEMORY_INVALID, e.getMessage());
@ -149,7 +149,7 @@ public class TestApplicationApiService {
res.setMemory("100mb");
res.setCpus(-2);
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
Assert.fail(
EXCEPTION_PREFIX + "application with invalid no of cpups");
} catch (IllegalArgumentException e) {
@ -159,17 +159,17 @@ public class TestApplicationApiService {
// number of containers not specified
res.setCpus(2);
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
Assert.fail(
EXCEPTION_PREFIX + "application with no container count");
} catch (IllegalArgumentException e) {
Assert.assertEquals(ERROR_CONTAINERS_COUNT_INVALID, e.getMessage());
Assert.assertTrue(e.getMessage().contains(ERROR_CONTAINERS_COUNT_INVALID));
}
// specifying profile along with cpus/memory raises exception
res.setProfile("hbase_finance_large");
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
Assert.fail(EXCEPTION_PREFIX
+ "application with resource profile along with cpus/memory");
} catch (IllegalArgumentException e) {
@ -182,7 +182,7 @@ public class TestApplicationApiService {
res.setCpus(null);
res.setMemory(null);
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
Assert.fail(EXCEPTION_PREFIX
+ "application with resource profile only - NOT SUPPORTED");
} catch (IllegalArgumentException e) {
@ -198,7 +198,7 @@ public class TestApplicationApiService {
// everything valid here
app.setNumberOfContainers(5l);
try {
ServiceApiUtil.validateApplicationPostPayload(app);
ServiceApiUtil.validateApplicationPayload(app, null);
} catch (IllegalArgumentException e) {
logger.error("application attributes specified should be valid here", e);
Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.api;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import static org.apache.slider.util.ServiceApiUtil.$;
/**
* This class defines constants that can be used in input spec for
* variable substitutions
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface ServiceApiConstants {
// Constants for service
String SERVICE_NAME = $("SERVICE_NAME");
String SERVICE_NAME_LC = $("SERVICE_NAME.lc");
// Constants for component
String COMPONENT_NAME = $("COMPONENT_NAME");
String COMPONENT_NAME_LC = $("COMPONENT_NAME.lc");
String COMPONENT_INSTANCE_NAME = $("COMPONENT_INSTANCE_NAME");
// Constants for component instance
String COMPONENT_ID = $("COMPONENT_ID");
String CONTAINER_ID = $("CONTAINER_ID");
}

View File

@ -17,19 +17,18 @@
package org.apache.slider.api.resource;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* A config file that needs to be created and made available as a volume in an
@ -45,7 +44,7 @@ public class ConfigFile implements Serializable {
public enum TypeEnum {
XML("XML"), PROPERTIES("PROPERTIES"), JSON("JSON"), YAML("YAML"), TEMPLATE(
"TEMPLATE"), ENV("ENV"), HADOOP_XML("HADOOP_XML");
"TEMPLATE"), ENV("ENV"), HADOOP_XML("HADOOP_XML"),;
private String value;
@ -63,7 +62,18 @@ public class ConfigFile implements Serializable {
private TypeEnum type = null;
private String destFile = null;
private String srcFile = null;
private Map<String, String> props = null;
private Map<String, String> props = new HashMap<>();
public ConfigFile copy() {
ConfigFile copy = new ConfigFile();
copy.setType(this.getType());
copy.setSrcFile(this.getSrcFile());
copy.setDestFile(this.getDestFile());
if (this.getProps() != null && !this.getProps().isEmpty()) {
copy.getProps().putAll(this.getProps());
}
return copy;
}
/**
* Config file in the standard format like xml, properties, json, yaml,
@ -105,19 +115,20 @@ public class ConfigFile implements Serializable {
}
/**
* TODO this probably is not required for non-template configs. It is now used as symlink for localization for non-template configs - we could infer the name from destFile instead
*
* Required for type template. This provides the source location of the
* template which needs to be mounted as dest_file post property
* substitutions. Typically the src_file would point to a source controlled
* network accessible file maintained by tools like puppet, chef, etc.
* This provides the source location of the configuration file, the content
* of which is dumped to dest_file post property substitutions, in the format
* as specified in type. Typically the src_file would point to a source
* controlled network accessible file maintained by tools like puppet, chef,
* or hdfs etc. Currently, only hdfs is supported.
**/
public ConfigFile srcFile(String srcFile) {
this.srcFile = srcFile;
return this;
}
@ApiModelProperty(example = "null", value = "Required for type template. This provides the source location of the template which needs to be mounted as dest_file post property substitutions. Typically the src_file would point to a source controlled network accessible file maintained by tools like puppet, chef, etc.")
@ApiModelProperty(example = "null", value = "This provides the source location of the configuration file, "
+ "the content of which is dumped to dest_file post property substitutions, in the format as specified in type. "
+ "Typically the src_file would point to a source controlled network accessible file maintained by tools like puppet, chef, or hdfs etc. Currently, only hdfs is supported.")
@JsonProperty("src_file")
public String getSrcFile() {
return srcFile;
@ -129,17 +140,19 @@ public class ConfigFile implements Serializable {
}
/**
* A blob of key value pairs that will be dumped in the dest_file in the
* format as specified in type. If the type is template then the attribute
* src_file is mandatory and the src_file content is dumped to dest_file post
* property substitutions.
A blob of key value pairs that will be dumped in the dest_file in the format
as specified in type. If src_file is specified, src_file content are dumped
in the dest_file and these properties will overwrite, if any, existing
properties in src_file or be added as new properties in src_file.
**/
public ConfigFile props(Map<String, String> props) {
this.props = props;
return this;
}
@ApiModelProperty(example = "null", value = "A blob of key value pairs that will be dumped in the dest_file in the format as specified in type. If the type is template then the attribute src_file is mandatory and the src_file content is dumped to dest_file post property substitutions.")
@ApiModelProperty(example = "null", value = "A blob of key value pairs that will be dumped in the dest_file in the format as specified in type."
+ " If src_file is specified, src_file content are dumped in the dest_file and these properties will overwrite, if any,"
+ " existing properties in src_file or be added as new properties in src_file.")
@JsonProperty("props")
public Map<String, String> getProps() {
return props;
@ -175,8 +188,7 @@ public class ConfigFile implements Serializable {
ConfigFile configFile = (ConfigFile) o;
return Objects.equals(this.type, configFile.type)
&& Objects.equals(this.destFile, configFile.destFile)
&& Objects.equals(this.srcFile, configFile.srcFile)
&& Objects.equals(this.props, configFile.props);
&& Objects.equals(this.srcFile, configFile.srcFile);
}
@Override

View File

@ -17,8 +17,11 @@
package org.apache.slider.api.resource;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import org.apache.commons.lang.StringUtils;
import java.io.Serializable;
import java.util.ArrayList;
@ -27,10 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang.StringUtils;
/**
* Set of configuration properties that can be injected into the application
* components via envs, files and custom pluggable helper docker containers.
@ -156,6 +155,13 @@ public class Configuration implements Serializable {
return properties.get(name.trim());
}
public String getEnv(String name) {
if (name == null) {
return null;
}
return env.get(name.trim());
}
@Override
public boolean equals(java.lang.Object o) {
if (this == o) {
@ -198,4 +204,29 @@ public class Configuration implements Serializable {
}
return o.toString().replace("\n", "\n ");
}
/**
* Merge all properties and envs from that configuration to this configration.
* For ConfigFiles, all properties and envs of that ConfigFile are merged into
* this ConfigFile.
*/
public synchronized void mergeFrom(Configuration that) {
this.properties.putAll(that.getProperties());
this.env.putAll(that.getEnv());
Map<String, ConfigFile> thatMap = new HashMap<>();
for (ConfigFile file : that.getFiles()) {
thatMap.put(file.getDestFile(), file.copy());
}
for (ConfigFile thisFile : files) {
if(thatMap.containsKey(thisFile.getDestFile())) {
ConfigFile thatFile = thatMap.get(thisFile.getDestFile());
thisFile.getProps().putAll(thatFile.getProps());
thatMap.remove(thisFile.getDestFile());
}
}
// add remaining new files from that Configration
for (ConfigFile thatFile : thatMap.values()) {
files.add(thatFile.copy());
}
}
}

View File

@ -653,7 +653,8 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
public ApplicationId actionCreate(Application application)
throws IOException, YarnException {
ServiceApiUtil.validateApplicationPostPayload(application);
ServiceApiUtil.validateApplicationPayload(application,
sliderFileSystem.getFileSystem());
String appName = application.getName();
validateClusterName(appName);
verifyNoLiveApp(appName, "Create");
@ -692,7 +693,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
boolean hasSliderAMLog4j =
addAMLog4jResource(appName, conf, localResources);
// copy jars to hdfs and add to localResources
Path tempPath = addJarResource(appName, localResources);
addJarResource(appName, localResources);
// add keytab if in secure env
addKeytabResourceIfSecure(sliderFileSystem, localResources, conf, appName);
printLocalResources(localResources);
@ -700,7 +701,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
//TODO SliderAMClientProvider#copyEnvVars
//TODO localResource putEnv
Map<String, String> env = addAMEnv(conf, tempPath);
Map<String, String> env = addAMEnv(conf);
// create AM CLI
String cmdStr =
@ -805,7 +806,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
return cmdStr;
}
private Map<String, String> addAMEnv(Configuration conf, Path tempPath)
private Map<String, String> addAMEnv(Configuration conf)
throws IOException {
Map<String, String> env = new HashMap<>();
ClasspathConstructor classpath =
@ -819,6 +820,13 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
if (jaas != null) {
env.put(HADOOP_JAAS_DEBUG, jaas);
}
if (!UserGroupInformation.isSecurityEnabled()) {
String userName = UserGroupInformation.getCurrentUser().getUserName();
log.info("Run as user " + userName);
// HADOOP_USER_NAME env is used by UserGroupInformation when log in
// This env makes AM run as this user
env.put("HADOOP_USER_NAME", userName);
}
env.putAll(getAmLaunchEnv(conf));
log.info("AM env: \n{}", stringifyMap(env));
return env;

View File

@ -240,7 +240,7 @@ public interface SliderKeys extends SliderXmlConfKeys {
String STDERR_AM = "slider-err.txt";
String DEFAULT_GC_OPTS = "";
String HADOOP_USER_NAME = ApplicationConstants.Environment.USER.toString();
String HADOOP_USER_NAME = "HADOOP_USER_NAME";
String HADOOP_PROXY_USER = "HADOOP_PROXY_USER";
String SLIDER_PASSPHRASE = "SLIDER_PASSPHRASE";
@ -306,7 +306,7 @@ public interface SliderKeys extends SliderXmlConfKeys {
String CERT_FILE_LOCALIZATION_PATH = INFRA_RUN_SECURITY_DIR + "ca.crt";
String AM_CONFIG_GENERATION = "am.config.generation";
String APP_CONF_DIR = "app/conf";
String APP_CONF_DIR = "conf";
String APP_RESOURCES = "application.resources";
String APP_RESOURCES_DIR = "app/resources";

View File

@ -32,7 +32,6 @@ import java.util.List;
public class CommandLineBuilder {
protected final List<String> argumentList = new ArrayList<>(20);
/**
* Add an entry to the command list
* @param args arguments -these will be converted strings
@ -43,15 +42,6 @@ public class CommandLineBuilder {
}
}
/**
* Get the value at an offset
* @param offset offset
* @return the value at that point
*/
public String elt(int offset) {
return argumentList.get(offset);
}
/**
* Get the number of arguments
* @return an integer >= 0
@ -96,9 +86,4 @@ public class CommandLineBuilder {
public String build() {
return SliderUtils.join(argumentList, " ");
}
public List<String> getArgumentList() {
return argumentList;
}
}

View File

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.core.registry.docstore;
public class ConfigurationResolver {
}

View File

@ -20,6 +20,13 @@ package org.apache.slider.providers;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.resource.Component;
import org.apache.slider.server.appmaster.state.AppState;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
/**
* Provider role and key for use in app requests.
@ -36,7 +43,9 @@ public final class ProviderRole {
public final long placementTimeoutSeconds;
public final String labelExpression;
public final Component component;
public AtomicLong componentIdCounter = null;
public AppState appState;
public Queue<String> failedInstanceName = new ConcurrentLinkedQueue<String>();
public ProviderRole(String name, int id) {
this(name,
id,
@ -69,7 +78,7 @@ public final class ProviderRole {
nodeFailureThreshold,
placementTimeoutSeconds,
labelExpression,
new Component().name(name).numberOfContainers(0L));
new Component().name(name).numberOfContainers(0L), null);
}
/**
@ -79,18 +88,13 @@ public final class ProviderRole {
* @param id ID. This becomes the YARN priority
* @param policy placement policy
* @param nodeFailureThreshold threshold for node failures (within a reset interval)
* after which a node failure is considered an app failure
* after which a node failure is considered an app failure
* @param placementTimeoutSeconds for lax placement, timeout in seconds before
* @param labelExpression label expression for requests; may be null
*/
public ProviderRole(String name,
String group,
int id,
int policy,
int nodeFailureThreshold,
long placementTimeoutSeconds,
String labelExpression,
Component component) {
public ProviderRole(String name, String group, int id, int policy,
int nodeFailureThreshold, long placementTimeoutSeconds,
String labelExpression, Component component, AppState state) {
this.name = name;
if (group == null) {
this.group = name;
@ -103,9 +107,13 @@ public final class ProviderRole {
this.placementTimeoutSeconds = placementTimeoutSeconds;
this.labelExpression = labelExpression;
this.component = component;
if(component.getUniqueComponentSupport()) {
componentIdCounter = new AtomicLong(0);
}
this.appState = state;
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -26,6 +26,7 @@ import org.apache.slider.api.resource.Application;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.launch.ContainerLauncher;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
@ -38,7 +39,8 @@ public interface ProviderService extends Service {
*/
void buildContainerLaunchContext(ContainerLauncher containerLauncher,
Application application, Container container, ProviderRole providerRole,
SliderFileSystem sliderFileSystem) throws IOException, SliderException;
SliderFileSystem sliderFileSystem, RoleInstance roleInstance)
throws IOException, SliderException;
void setAMState(StateAccessForProviders stateAccessForProviders);

View File

@ -20,10 +20,10 @@ package org.apache.slider.providers;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
@ -32,7 +32,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.OptionKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.RoleKeys;
@ -59,6 +58,7 @@ import org.slf4j.Logger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
@ -66,8 +66,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import static org.apache.slider.api.ServiceApiConstants.*;
import static org.apache.slider.util.ServiceApiUtil.$;
/**
* This is a factoring out of methods handy for providers. It's bonded to a log
* at construction time.
@ -89,7 +93,7 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
* Add oneself to the classpath. This does not work
* on minicluster test runs where the JAR is not built up.
* @param providerResources map of provider resources to add these entries to
* @param provider provider to add
* @param providerClass provider to add
* @param jarName name of the jar to use
* @param sliderFileSystem target filesystem
* @param tempPath path in the cluster FS for temp files
@ -157,14 +161,19 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
libDir, libLocalSrcDir);
}
// Build key -> value map
// value will be substituted by corresponding data in tokenMap
public Map<String, String> substituteConfigs(Map<String, String> configs,
public static String substituteStrWithTokens(String content,
Map<String, String> tokensForSubstitution) {
for (Map.Entry<String, String> token : tokensForSubstitution.entrySet()) {
content =
content.replaceAll(Pattern.quote(token.getKey()), token.getValue());
}
return content;
}
// configs will be substituted by corresponding env in tokenMap
public void substituteMapWithTokens(Map<String, String> configs,
Map<String, String> tokenMap) {
String format = "${%s}";
Map<String, String> filteredOptions = new HashMap<>();
for (Map.Entry<String, String> entry : configs.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (tokenMap != null) {
for (Map.Entry<String, String> token : tokenMap.entrySet()) {
@ -172,10 +181,8 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
value.replaceAll(Pattern.quote(token.getKey()), token.getValue());
}
}
filteredOptions.put(String.format(format, key), value);
entry.setValue(value);
}
return filteredOptions;
}
/**
@ -249,78 +256,95 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
}
}
public static void addEnvForSubstitution(Map<String, String> env,
Map<String, String> tokensForSubstitution) {
if (env == null || env.isEmpty() || tokensForSubstitution == null
|| tokensForSubstitution.isEmpty()) {
return;
}
for (Map.Entry<String, String> entry : env.entrySet()) {
tokensForSubstitution.put($(entry.getKey()), entry.getValue());
}
}
// 1. Create all config files for a component on hdfs for localization
// 2. Add the config file to localResource
//TODO handle Template format config file
public void createConfigFileAndAddLocalResource(ContainerLauncher launcher,
SliderFileSystem fs, Component component,
Map<String, String> tokensForSubstitution,
StateAccessForProviders amState) throws IOException {
public synchronized void createConfigFileAndAddLocalResource(
ContainerLauncher launcher, SliderFileSystem fs, Component component,
Map<String, String> tokensForSubstitution, RoleInstance roleInstance)
throws IOException {
Path compDir =
new Path(new Path(fs.getAppDir(), "components"), component.getName());
if (!fs.getFileSystem().exists(compDir)) {
fs.getFileSystem().mkdirs(compDir,
Path compInstanceDir =
new Path(compDir, roleInstance.getCompInstanceName());
if (!fs.getFileSystem().exists(compInstanceDir)) {
fs.getFileSystem().mkdirs(compInstanceDir,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
log.info("Creating component dir: " + compDir);
roleInstance.compInstanceDir = compInstanceDir;
log.info("Creating component instance dir: " + compInstanceDir);
} else {
log.info("Component conf dir already exists: " + compDir);
return;
log.info("Component instance conf dir already exists: " + compInstanceDir);
}
for (ConfigFile configFile : component.getConfiguration().getFiles()) {
String fileName = configFile.getSrcFile();
// add Configuration#env into tokens substitution
addEnvForSubstitution(component.getConfiguration().getEnv(),
tokensForSubstitution);
log.info("Tokens substitution for component: " + roleInstance
.getCompInstanceName() + System.lineSeparator()
+ tokensForSubstitution);
for (ConfigFile originalFile : component.getConfiguration().getFiles()) {
ConfigFile configFile = originalFile.copy();
String fileName = new Path(configFile.getDestFile()).getName();
// substitute file name
for (Map.Entry<String, String> token : tokensForSubstitution.entrySet()) {
configFile.setDestFile(configFile.getDestFile()
.replaceAll(Pattern.quote(token.getKey()), token.getValue()));
}
// substitute configs
substituteConfigs(configFile.getProps(), tokensForSubstitution);
// write configs onto hdfs
PublishedConfiguration publishedConfiguration =
new PublishedConfiguration(fileName,
configFile.getProps().entrySet());
Path remoteFile = new Path(compDir, fileName);
Path remoteFile = new Path(compInstanceDir, fileName);
if (!fs.getFileSystem().exists(remoteFile)) {
synchronized (this) {
if (!fs.getFileSystem().exists(remoteFile)) {
PublishedConfigurationOutputter configurationOutputter =
PublishedConfigurationOutputter.createOutputter(
ConfigFormat.resolve(configFile.getType().toString()),
publishedConfiguration);
FSDataOutputStream os = null;
try {
os = fs.getFileSystem().create(remoteFile);
configurationOutputter.save(os);
os.flush();
log.info("Created config file on hdfs: " + remoteFile);
} finally {
IOUtils.closeStream(os);
}
log.info("Saving config file on hdfs for component " + roleInstance
.getCompInstanceName() + ": " + configFile);
if (configFile.getSrcFile() != null) {
// Load config file template
switch (configFile.getType()) {
case HADOOP_XML:
// Hadoop_xml_template
resolveHadoopXmlTemplateAndSaveOnHdfs(fs.getFileSystem(),
tokensForSubstitution, configFile, remoteFile, roleInstance);
break;
case TEMPLATE:
// plain-template
resolvePlainTemplateAndSaveOnHdfs(fs.getFileSystem(),
tokensForSubstitution, configFile, remoteFile, roleInstance);
break;
default:
log.info("Not supporting loading src_file for " + configFile);
break;
}
} else {
// non-template
resolveNonTemplateConfigsAndSaveOnHdfs(fs, tokensForSubstitution,
roleInstance, configFile, fileName, remoteFile);
}
}
// Publish configs
amState.getPublishedSliderConfigurations()
.put(configFile.getSrcFile(), publishedConfiguration);
// Add resource for localization
LocalResource configResource =
fs.createAmResource(remoteFile, LocalResourceType.FILE);
File destFile = new File(configFile.getDestFile());
//TODO why to we need to differetiate RESOURCE_DIR vs APP_CONF_DIR
String symlink = APP_CONF_DIR + "/" + fileName;
if (destFile.isAbsolute()) {
String symlink = RESOURCE_DIR + "/" + fileName;
launcher.addLocalResource(symlink, configResource,
configFile.getDestFile());
log.info("Add config file for localization: " + symlink + " -> "
+ configResource.getResource().getFile() + ", dest mount path: "
+ configFile.getDestFile());
} else {
String symlink = APP_CONF_DIR + "/" + fileName;
launcher.addLocalResource(symlink, configResource);
log.info("Add config file for localization: " + symlink + " -> "
+ configResource.getResource().getFile());
@ -328,23 +352,110 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
}
}
private void resolveNonTemplateConfigsAndSaveOnHdfs(SliderFileSystem fs,
Map<String, String> tokensForSubstitution, RoleInstance roleInstance,
ConfigFile configFile, String fileName, Path remoteFile)
throws IOException {
// substitute non-template configs
substituteMapWithTokens(configFile.getProps(), tokensForSubstitution);
// write configs onto hdfs
PublishedConfiguration publishedConfiguration =
new PublishedConfiguration(fileName,
configFile.getProps().entrySet());
if (!fs.getFileSystem().exists(remoteFile)) {
PublishedConfigurationOutputter configurationOutputter =
PublishedConfigurationOutputter.createOutputter(
ConfigFormat.resolve(configFile.getType().toString()),
publishedConfiguration);
try (FSDataOutputStream os = fs.getFileSystem().create(remoteFile)) {
configurationOutputter.save(os);
os.flush();
}
} else {
log.info("Component instance = " + roleInstance.getCompInstanceName()
+ ", config file already exists: " + remoteFile);
}
}
// 1. substitute config template - only handle hadoop_xml format
// 2. save on hdfs
@SuppressWarnings("unchecked")
private void resolveHadoopXmlTemplateAndSaveOnHdfs(FileSystem fs,
Map<String, String> tokensForSubstitution, ConfigFile configFile,
Path remoteFile, RoleInstance roleInstance) throws IOException {
Map<String, String> conf;
try {
conf = (Map<String, String>) roleInstance.providerRole.
appState.configFileCache.get(configFile);
} catch (ExecutionException e) {
log.info("Failed to load config file: " + configFile, e);
return;
}
// make a copy for substitution
org.apache.hadoop.conf.Configuration confCopy =
new org.apache.hadoop.conf.Configuration(false);
for (Map.Entry<String, String> entry : conf.entrySet()) {
confCopy.set(entry.getKey(), entry.getValue());
}
// substitute properties
for (Map.Entry<String, String> entry : configFile.getProps().entrySet()) {
confCopy.set(entry.getKey(), entry.getValue());
}
// substitute env variables
for (Map.Entry<String, String> entry : confCopy) {
String val = entry.getValue();
if (val != null) {
for (Map.Entry<String, String> token : tokensForSubstitution
.entrySet()) {
val = val.replaceAll(Pattern.quote(token.getKey()), token.getValue());
confCopy.set(entry.getKey(), val);
}
}
}
// save on hdfs
try (OutputStream output = fs.create(remoteFile)) {
confCopy.writeXml(output);
log.info("Reading config from: " + configFile.getSrcFile()
+ ", writing to: " + remoteFile);
}
}
// 1) read the template as a string
// 2) do token substitution
// 3) save on hdfs
private void resolvePlainTemplateAndSaveOnHdfs(FileSystem fs,
Map<String, String> tokensForSubstitution, ConfigFile configFile,
Path remoteFile, RoleInstance roleInstance) {
String content;
try {
content = (String) roleInstance.providerRole.appState.configFileCache
.get(configFile);
} catch (ExecutionException e) {
log.info("Failed to load config file: " + configFile, e);
return;
}
// substitute tokens
substituteStrWithTokens(content, tokensForSubstitution);
try (OutputStream output = fs.create(remoteFile)) {
org.apache.commons.io.IOUtils.write(content, output);
} catch (IOException e) {
log.info("Failed to create " + remoteFile);
}
}
/**
* Get initial token map to be substituted into config values.
* @param appConf app configurations
* @param componentName component name
* @param componentGroup component group
* @param containerId container ID
* @param clusterName app name
* @return tokens to replace
*/
public Map<String, String> getStandardTokenMap(
Configuration appConf, Configuration componentConf, String componentName,
String componentGroup, String containerId, String clusterName) {
public Map<String, String> getStandardTokenMap(Configuration appConf,
RoleInstance roleInstance, String clusterName) {
Map<String, String> tokens = new HashMap<>();
if (containerId != null) {
tokens.put("${CONTAINER_ID}", containerId);
}
String nnuri = appConf.getProperty("fs.defaultFS");
if (nnuri != null && !nnuri.isEmpty()) {
tokens.put("${NN_URI}", nnuri);
@ -352,34 +463,13 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
}
tokens.put("${ZK_HOST}", appConf.getProperty(OptionKeys.ZOOKEEPER_HOSTS));
tokens.put("${DEFAULT_ZK_PATH}", appConf.getProperty(OptionKeys.ZOOKEEPER_PATH));
String prefix = componentConf.getProperty(ROLE_PREFIX);
String dataDirSuffix = "";
if (prefix == null) {
prefix = "";
} else {
dataDirSuffix = "_" + SliderUtils.trimPrefix(prefix);
}
tokens.put("${DEFAULT_DATA_DIR}",
appConf.getProperty(InternalKeys.INTERNAL_DATA_DIR_PATH)
+ dataDirSuffix);
tokens.put("${JAVA_HOME}", appConf.getProperty(JAVA_HOME));
tokens.put("${COMPONENT_NAME}", componentName);
tokens.put("${COMPONENT_NAME.lc}", componentName.toLowerCase());
tokens.put("${COMPONENT_PREFIX}", prefix);
tokens.put("${COMPONENT_PREFIX.lc}", prefix.toLowerCase());
if (!componentName.equals(componentGroup) &&
componentName.startsWith(componentGroup)) {
tokens.put("${COMPONENT_ID}",
componentName.substring(componentGroup.length()));
}
if (clusterName != null) {
tokens.put("${CLUSTER_NAME}", clusterName);
tokens.put("${CLUSTER_NAME.lc}", clusterName.toLowerCase());
tokens.put("${APP_NAME}", clusterName);
tokens.put("${APP_NAME.lc}", clusterName.toLowerCase());
}
tokens.put("${APP_COMPONENT_NAME}", componentName);
tokens.put("${APP_COMPONENT_NAME.lc}", componentName.toLowerCase());
tokens.put(SERVICE_NAME_LC, clusterName.toLowerCase());
tokens.put(SERVICE_NAME, clusterName);
tokens.put(COMPONENT_NAME, roleInstance.role);
tokens.put(COMPONENT_NAME_LC, roleInstance.role.toLowerCase());
tokens.put(COMPONENT_INSTANCE_NAME, roleInstance.getCompInstanceName());
tokens.put(CONTAINER_ID, roleInstance.getContainerId().toString());
tokens.put(COMPONENT_ID, String.valueOf(roleInstance.componentId));
return tokens;
}
@ -388,7 +478,7 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
* @param tokens existing tokens
* @param amState access to AM state
*/
public void addRoleHostTokens(Map<String, String> tokens,
public void addComponentHostTokens(Map<String, String> tokens,
StateAccessForProviders amState) {
if (amState == null) {
return;
@ -398,7 +488,7 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
String tokenName = entry.getKey().toUpperCase(Locale.ENGLISH) + "_HOST";
String hosts = StringUtils .join(",",
getHostsList(entry.getValue().values(), true));
tokens.put("${" + tokenName + "}", hosts);
tokens.put($(tokenName), hosts);
}
}
@ -443,7 +533,8 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
// create and publish updated service record (including hostname & ip)
ServiceRecord record = new ServiceRecord();
record.set(YarnRegistryAttributes.YARN_ID, containerId);
record.description = roleName.replaceAll("_", "-");
String componentInstanceName = role.getCompInstanceName();
record.description = componentInstanceName.replaceAll("_", "-");
record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
PersistencePolicies.CONTAINER);
// TODO: use constants from YarnRegistryAttributes

View File

@ -46,6 +46,8 @@ import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import static org.apache.slider.util.ServiceApiUtil.$;
public class DockerProviderService extends AbstractService
implements ProviderService, DockerKeys, SliderKeys {
@ -70,14 +72,11 @@ public class DockerProviderService extends AbstractService
this.yarnRegistry = yarnRegistry;
}
public void buildContainerLaunchContext(ContainerLauncher launcher,
Application application, Container container, ProviderRole providerRole,
SliderFileSystem fileSystem)
SliderFileSystem fileSystem, RoleInstance roleInstance)
throws IOException, SliderException {
String roleName = providerRole.name;
String roleGroup = providerRole.group;
Component component = providerRole.component;
launcher.setYarnDockerMode(true);
launcher.setDockerImage(component.getArtifact().getId());
@ -86,16 +85,12 @@ public class DockerProviderService extends AbstractService
launcher.setRunPrivilegedContainer(component.getRunPrivilegedContainer());
// Generate tokens (key-value pair) for config substitution.
Map<String, String> standardTokens = providerUtils
.getStandardTokenMap(application.getConfiguration(),
component.getConfiguration(), roleName, roleGroup,
container.getId().toString(), application.getName());
Map<String, String> tokensForSubstitution = providerUtils.substituteConfigs(
component.getConfiguration().getProperties(), standardTokens);
// Get pre-defined tokens
Map<String, String> tokensForSubstitution = providerUtils
.getStandardTokenMap(application.getConfiguration(), roleInstance,
application.getName());
tokensForSubstitution.putAll(standardTokens);
// Set the environment variables
// Set the environment variables in launcher
launcher.putEnv(SliderUtils
.buildEnvMap(component.getConfiguration(), tokensForSubstitution));
launcher.setEnv("WORK_DIR", ApplicationConstants.Environment.PWD.$());
@ -108,33 +103,26 @@ public class DockerProviderService extends AbstractService
launcher.setEnv("LANGUAGE", "en_US.UTF-8");
for (Entry<String, String> entry : launcher.getEnv().entrySet()) {
tokensForSubstitution.put("${" + entry.getKey() + "}", entry.getValue());
}
providerUtils.addRoleHostTokens(tokensForSubstitution, amState);
log.info("Token for substitution: " + tokensForSubstitution);
if (SliderUtils.isHadoopClusterSecure(getConfig())) {
//TODO localize key tabs, WHY is this code needed ? WHY DOES CONTAINER REQUIRE AM KEYTAB??
providerUtils.localizeServiceKeytabs(launcher, fileSystem, application);
tokensForSubstitution.put($(entry.getKey()), entry.getValue());
}
providerUtils.addComponentHostTokens(tokensForSubstitution, amState);
// create config file on hdfs and add local resource
providerUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
component, tokensForSubstitution, amState);
component, tokensForSubstitution, roleInstance);
// substitute launch command
String launchCommand = ProviderUtils
.substituteStrWithTokens(component.getLaunchCommand(),
tokensForSubstitution);
CommandLineBuilder operation = new CommandLineBuilder();
operation.add(component.getLaunchCommand());
operation.add("> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/"
+ OUT_FILE + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/"
+ ERR_FILE);
operation.add(launchCommand);
operation.addOutAndErrFiles(OUT_FILE, ERR_FILE);
launcher.addCommand(operation.build());
// publish exports
// TODO move this to app level, no need to do this for every container launch
providerUtils
.substituteConfigs(application.getQuicklinks(), tokensForSubstitution);
.substituteMapWithTokens(application.getQuicklinks(), tokensForSubstitution);
PublishedConfiguration pubconf = new PublishedConfiguration(QUICK_LINKS,
application.getQuicklinks().entrySet());
amState.getPublishedSliderConfigurations().put(QUICK_LINKS, pubconf);

View File

@ -150,24 +150,26 @@ public class RoleLaunchService
containerLauncher.setupUGI();
containerLauncher.putEnv(envVars);
log.info("Launching container {} into RoleName = {}, RoleGroup = {}",
container.getId(), role.name, role.group);
provider.buildContainerLaunchContext(containerLauncher, application,
container, role, fs);
RoleInstance instance = new RoleInstance(container);
String failedInstance = role.failedInstanceName.poll();
RoleInstance instance;
if (failedInstance != null) {
instance = new RoleInstance(container, role, failedInstance);
} else {
instance = new RoleInstance(container, role);
}
String[] envDescription = containerLauncher.dumpEnvToString();
String commandsAsString = containerLauncher.getCommandsAsString();
log.info("Launching container {} for component instance = {}",
container.getId(), instance.getCompInstanceName());
log.info("Starting container with command: {}", commandsAsString);
instance.providerRole = role;
instance.command = commandsAsString;
instance.role = role.name;
instance.group = role.group;
instance.roleId = role.id;
instance.environment = envDescription;
provider.buildContainerLaunchContext(containerLauncher, application,
container, role, fs, instance);
long delay = role.component.getConfiguration()
.getPropertyLong(AgentKeys.KEY_CONTAINER_LAUNCH_DELAY, 0);
long maxDelay = getConfig()

View File

@ -25,6 +25,7 @@ import com.google.protobuf.BlockingService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -86,6 +87,7 @@ import org.apache.slider.api.RoleKeys;
import org.apache.slider.api.proto.Messages;
import org.apache.slider.api.proto.SliderClusterAPI;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.Component;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.params.AbstractActionArgs;
@ -109,7 +111,6 @@ import org.apache.slider.core.main.ServiceLauncher;
import org.apache.slider.core.persist.JsonSerDeser;
import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.apache.slider.providers.ProviderCompleted;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderService;
import org.apache.slider.providers.SliderProviderFactory;
import org.apache.slider.server.appmaster.actions.ActionHalt;
@ -136,7 +137,6 @@ import org.apache.slider.server.appmaster.operations.RMOperationHandler;
import org.apache.slider.server.appmaster.rpc.RpcBinder;
import org.apache.slider.server.appmaster.rpc.SliderClusterProtocolPBImpl;
import org.apache.slider.server.appmaster.rpc.SliderIPCService;
import org.apache.slider.server.appmaster.security.SecurityConfiguration;
import org.apache.slider.server.appmaster.state.AppState;
import org.apache.slider.server.appmaster.state.AppStateBindingInfo;
import org.apache.slider.server.appmaster.state.ContainerAssignment;
@ -170,7 +170,6 @@ import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -701,10 +700,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
registryOperations = startRegistryOperationsService();
log.info(registryOperations.toString());
//build the role map
List<ProviderRole> providerRoles = Collections.EMPTY_LIST;
// Start up the WebApp and track the URL for it
// Web service endpoints: initialize
WebAppApiImpl webAppApi =
new WebAppApiImpl(
@ -815,7 +811,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
//build the instance
AppStateBindingInfo binding = new AppStateBindingInfo();
binding.serviceConfig = serviceConf;
binding.roles = providerRoles;
binding.fs = fs.getFileSystem();
binding.historyPath = historyDir;
binding.liveContainers = liveContainers;
@ -873,6 +868,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
scheduleFailureWindowResets(application.getConfiguration());
scheduleEscalation(application.getConfiguration());
for (Component component : application.getComponents()) {
// Merge app-level configuration into component level configuration
component.getConfiguration().mergeFrom(application.getConfiguration());
}
try {
// schedule YARN Registry registration
queue(new ActionRegisterServiceInstance(appName, appid, application));
@ -1170,22 +1170,22 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
* Handler for {@link RegisterComponentInstance action}
* Register/re-register an ephemeral container that is already in the application state
* @param id the component
* @param description component description
* @param type component type
* @return true if the component is registered
*/
public boolean registerComponent(ContainerId id, String description,
String type) throws IOException {
public boolean registerComponent(ContainerId id, RoleInstance roleInstance)
throws IOException {
RoleInstance instance = appState.getOwnedContainer(id);
if (instance == null) {
return false;
}
// this is where component registrations go
log.info("Registering component {}", id);
String cid = RegistryPathUtils.encodeYarnID(id.toString());
ServiceRecord record = new ServiceRecord();
record.set(YarnRegistryAttributes.YARN_ID, cid);
record.description = description;
record.description = roleInstance.getCompInstanceName();
log.info("Registering component " + roleInstance.getCompInstanceName()
+ ", containerId = " + id);
record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
PersistencePolicies.CONTAINER);
setUserProvidedServiceRecordAttributes(
@ -1194,7 +1194,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
yarnRegistryOperations.putComponent(cid, record);
} catch (IOException e) {
log.warn("Failed to register container {}/{}: {}",
id, description, e, e);
id, roleInstance.role, e, e);
return false;
}
org.apache.slider.api.resource.Container container =
@ -1203,6 +1203,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
container.setLaunchTime(new Date());
container.setState(org.apache.slider.api.resource.ContainerState.INIT);
container.setBareHost(instance.host);
// TODO differentiate component name and component instance name ?
container.setComponentName(roleInstance.getCompInstanceName());
instance.providerRole.component.addContainer(container);
if (timelineServiceEnabled) {
@ -1228,20 +1230,38 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
*
* unregister a component. At the time this message is received,
* the component may not have been registered
* @param id the component
*/
public void unregisterComponent(ContainerId id) {
log.info("Unregistering component {}", id);
public void unregisterComponent(RoleInstance roleInstance) {
ContainerId containerId = roleInstance.getContainerId();
log.info(
"Unregistering component instance " + roleInstance.getCompInstanceName()
+ ", ContainerId = " + containerId);
if (yarnRegistryOperations == null) {
log.warn("Processing unregister component event before initialization " +
"completed; init flag ={}", initCompleted);
log.warn("Processing unregister component event before initialization "
+ "completed; init flag ={}", initCompleted);
return;
}
String cid = RegistryPathUtils.encodeYarnID(id.toString());
String cid = RegistryPathUtils.encodeYarnID(containerId.toString());
try {
yarnRegistryOperations.deleteComponent(cid);
} catch (IOException e) {
log.warn("Failed to delete container {} : {}", id, e, e);
log.warn("Failed to delete container {} : {}", containerId, e, e);
}
// remove component instance dir
try {
FileSystem fs = getClusterFS().getFileSystem();
if (roleInstance.compInstanceDir != null && fs
.exists(roleInstance.compInstanceDir)) {
boolean deleted = fs.delete(roleInstance.compInstanceDir, true);
if (!deleted) {
log.warn("Failed to delete component instance dir: "
+ roleInstance.compInstanceDir);
}
}
} catch (IOException e) {
log.error("Failed to delete component instance dir: "
+ roleInstance.compInstanceDir, e);
}
}
@ -1395,13 +1415,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
return exitCode;
}
/**
* Get diagnostics info about containers
*/
private String getContainerDiagnosticInfo() {
return appState.getContainerDiagnosticInfo();
}
public Object getProxy(Class protocol, InetSocketAddress addr) {
return yarnRPC.getProxy(protocol, addr, getConfig());
@ -1492,7 +1505,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
//for all the operations, exec them
execute(operations);
log.info("Diagnostics: {}", getContainerDiagnosticInfo());
}
@Override //AMRMClientAsync
@ -1519,8 +1531,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// known nodes trigger notifications
if(!result.unknownNode) {
queue(new UnregisterComponentInstance(containerId, 0,
TimeUnit.MILLISECONDS));
queue(new UnregisterComponentInstance(0,
TimeUnit.MILLISECONDS, result.roleInstance));
if (timelineServiceEnabled && result.roleInstance != null) {
serviceTimelinePublisher
.componentInstanceFinished(result.roleInstance);
@ -1936,7 +1949,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
nmClientAsync.getContainerStatusAsync(containerId,
cinfo.container.getNodeId());
// push out a registration
queue(new RegisterComponentInstance(containerId, cinfo.role, cinfo.group,
queue(new RegisterComponentInstance(containerId, cinfo,
0, TimeUnit.MILLISECONDS));
} else {

View File

@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.state.AppState;
import org.apache.slider.server.appmaster.state.RoleInstance;
import java.util.concurrent.TimeUnit;
@ -33,18 +34,15 @@ import java.util.concurrent.TimeUnit;
public class RegisterComponentInstance extends AsyncAction {
public final ContainerId containerId;
public final String description;
public final String type;
public final RoleInstance roleInstance;
public RegisterComponentInstance(ContainerId containerId,
String description,
String type,
RoleInstance roleInstance,
long delay,
TimeUnit timeUnit) {
super("RegisterComponentInstance :" + containerId,
delay, timeUnit);
this.description = description;
this.type = type;
this.roleInstance = roleInstance;
Preconditions.checkArgument(containerId != null);
this.containerId = containerId;
}
@ -54,6 +52,6 @@ public class RegisterComponentInstance extends AsyncAction {
QueueAccess queueService,
AppState appState) throws Exception {
appMaster.registerComponent(containerId, description, type);
appMaster.registerComponent(containerId, roleInstance);
}
}

View File

@ -21,31 +21,31 @@ package org.apache.slider.server.appmaster.actions;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.state.AppState;
import org.apache.slider.server.appmaster.state.RoleInstance;
import java.util.concurrent.TimeUnit;
/**
* Tell AM to unregister this component instance
* {@link SliderAppMaster#unregisterComponent(ContainerId)}
*/
public class UnregisterComponentInstance extends AsyncAction {
public final ContainerId containerId;
public final RoleInstance roleInstance;
public UnregisterComponentInstance(ContainerId containerId,
long delay,
TimeUnit timeUnit) {
super("UnregisterComponentInstance :" + containerId.toString(),
public UnregisterComponentInstance(long delay, TimeUnit timeUnit,
RoleInstance roleInstance) {
super("UnregisterComponentInstance :" + roleInstance.getCompInstanceName()
+ ", ContainerId = " + roleInstance.getContainerId(),
delay, timeUnit);
this.containerId = containerId;
this.roleInstance = roleInstance;
}
@Override
public void execute(SliderAppMaster appMaster,
QueueAccess queueService,
AppState appState) throws Exception {
appMaster.unregisterComponent(containerId);
appMaster.unregisterComponent(roleInstance);
}
}

View File

@ -39,24 +39,34 @@ public class SliderMetrics implements MetricsSource {
@Metric("containers requested")
public MutableGaugeInt containersRequested;
@Metric("anti-affinity containers pending")
public MutableGaugeInt pendingAAContainers;
@Metric("containers running")
public MutableGaugeInt containersRunning;
@Metric("containers desired")
public MutableGaugeInt containersDesired;
@Metric("containers completed")
public MutableGaugeInt containersCompleted;
@Metric("containers failed")
public MutableGaugeInt containersFailed;
@Metric("containers failed since last threshold")
public MutableGaugeInt failedSinceLastThreshold;
@Metric("containers preempted")
public MutableGaugeInt containersPreempted;
@Metric("containers exceeded limits")
public MutableGaugeInt containersLimitsExceeded;
@Metric("containers surplus")
public MutableGaugeInt surplusContainers;
@Metric("containers failed due to disk failure")
public MutableGaugeInt containersDiskFailure;
@ -80,5 +90,18 @@ public class SliderMetrics implements MetricsSource {
public void tag(String name, String description, String value) {
registry.tag(name, description, value);
}
@Override public String toString() {
return "SliderMetrics{"
+ "containersRequested=" + containersRequested.value()
+ ", pendingAAContainers=" + pendingAAContainers.value()
+ ", containersRunning=" + containersRunning.value()
+ ", containersDesired=" + containersDesired.value()
+ ", containersCompleted=" + containersCompleted.value()
+ ", containersFailed=" + containersFailed.value()
+ ", failedSinceLastThreshold=" + failedSinceLastThreshold.value()
+ ", containersPreempted=" + containersPreempted.value()
+ ", surplusContainers=" + surplusContainers.value() + '}';
}
}

View File

@ -77,7 +77,7 @@ public class ChaosKillContainer implements ChaosTarget {
RoleInstance roleInstance = liveContainers.get(target);
log.info("Killing {}", roleInstance);
queues.schedule(new ActionKillContainer(roleInstance.getId(),
queues.schedule(new ActionKillContainer(roleInstance.getContainerId(),
DELAY, TimeUnit.MILLISECONDS, operationHandler));
}
}

View File

@ -283,7 +283,7 @@ public class SliderIPCService extends AbstractService
//throws NoSuchNodeException if it is missing
RoleInstance instance =
state.getLiveInstanceByContainerID(containerID);
queue(new ActionKillContainer(instance.getId(), 0, TimeUnit.MILLISECONDS,
queue(new ActionKillContainer(instance.getContainerId(), 0, TimeUnit.MILLISECONDS,
amOperations));
Messages.KillContainerResponseProto.Builder builder =
Messages.KillContainerResponseProto.newBuilder();

View File

@ -20,7 +20,13 @@ package org.apache.slider.server.appmaster.state;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.yarn.api.records.Container;
@ -42,6 +48,7 @@ import org.apache.slider.api.proto.Messages.ComponentCountProto;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.ApplicationState;
import org.apache.slider.api.resource.Component;
import org.apache.slider.api.resource.ConfigFile;
import org.apache.slider.api.types.ApplicationLivenessInformation;
import org.apache.slider.api.types.ComponentInformation;
import org.apache.slider.api.types.RoleStatistics;
@ -79,6 +86,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.slider.api.ResourceKeys.*;
@ -99,7 +107,6 @@ public class AppState {
private final AbstractClusterServices recordFactory;
private final MetricsAndMonitoring metricsAndMonitoring;
/**
* Flag set to indicate the application is live -this only happens
* after the buildInstance operation
@ -108,9 +115,11 @@ public class AppState {
private Application app;
// priority_id -> RoleStatus
private final Map<Integer, RoleStatus> roleStatusMap =
new ConcurrentSkipListMap<>();
// component_name -> ProviderRole
private final Map<String, ProviderRole> roles =
new ConcurrentHashMap<>();
@ -202,6 +211,10 @@ public class AppState {
private SliderMetrics appMetrics;
private ServiceTimelinePublisher serviceTimelinePublisher;
// A cache for loading config files from remote such as hdfs
public LoadingCache<ConfigFile, Object> configFileCache = null;
/**
* Create an instance
* @param recordFactory factory for YARN records
@ -304,8 +317,6 @@ public class AppState {
public synchronized void buildInstance(AppStateBindingInfo binding)
throws BadClusterStateException, BadConfigException, IOException {
binding.validate();
log.debug("Building application state");
containerReleaseSelector = binding.releaseSelector;
// set the cluster specification (once its dependency the client properties
@ -313,10 +324,8 @@ public class AppState {
this.app = binding.application;
appMetrics = SliderMetrics.register(app.getName(),
"Metrics for service");
appMetrics
.tag("type", "Metrics type [component or service]", "service");
appMetrics
.tag("appId", "Application id for service", app.getId());
appMetrics.tag("type", "Metrics type [component or service]", "service");
appMetrics.tag("appId", "Application id for service", app.getId());
org.apache.slider.api.resource.Configuration conf = app.getConfiguration();
startTimeThreshold =
@ -327,12 +336,7 @@ public class AppState {
nodeFailureThreshold = conf.getPropertyInt(NODE_FAILURE_THRESHOLD,
DEFAULT_NODE_FAILURE_THRESHOLD);
//build the initial role list
List<ProviderRole> roleList = new ArrayList<>(binding.roles);
for (ProviderRole providerRole : roleList) {
buildRole(providerRole);
}
//build the initial component list
int priority = 1;
for (Component component : app.getComponents()) {
priority = getNewPriority(priority);
@ -340,25 +344,18 @@ public class AppState {
if (roles.containsKey(name)) {
continue;
}
if (component.getUniqueComponentSupport()) {
log.info("Skipping group " + name + ", as it's unique component");
continue;
}
log.info("Adding component: " + name);
ProviderRole dynamicRole =
createComponent(name, name, component, priority);
buildRole(dynamicRole);
roleList.add(dynamicRole);
createComponent(name, name, component, priority++);
}
//then pick up the requirements
buildRoleRequirementsFromResources();
// buildRoleRequirementsFromResources();
// set up the role history
roleHistory = new RoleHistory(roleStatusMap.values(), recordFactory);
roleHistory.onStart(binding.fs, binding.historyPath);
// trigger first node update
roleHistory.onNodesUpdated(binding.nodeReports);
//rebuild any live containers
rebuildModelFromRestart(binding.liveContainers);
@ -367,9 +364,39 @@ public class AppState {
//mark as live
applicationLive = true;
app.setState(STARTED);
createConfigFileCache(binding.fs);
}
private void createConfigFileCache(final FileSystem fileSystem) {
this.configFileCache =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
.build(new CacheLoader<ConfigFile, Object>() {
@Override public Object load(ConfigFile key) throws Exception {
switch (key.getType()) {
case HADOOP_XML:
try (FSDataInputStream input = fileSystem
.open(new Path(key.getSrcFile()))) {
org.apache.hadoop.conf.Configuration confRead =
new org.apache.hadoop.conf.Configuration(false);
confRead.addResource(input);
Map<String, String> map = new HashMap<>(confRead.size());
for (Map.Entry<String, String> entry : confRead) {
map.put(entry.getKey(), entry.getValue());
}
return map;
}
case TEMPLATE:
try (FSDataInputStream fileInput = fileSystem
.open(new Path(key.getSrcFile()))) {
return IOUtils.toString(fileInput);
}
default:
return null;
}
}
});
}
//TODO WHY do we need to create the component for AM ?
public ProviderRole createComponent(String name, String group,
Component component, int priority) throws BadConfigException {
org.apache.slider.api.resource.Configuration conf =
@ -384,26 +411,28 @@ public class AppState {
DEF_YARN_LABEL_EXPRESSION);
ProviderRole newRole =
new ProviderRole(name, group, priority, (int)placementPolicy, threshold,
placementTimeout, label, component);
placementTimeout, label, component, this);
buildRole(newRole, component);
log.info("Created a new role " + newRole);
return newRole;
}
@VisibleForTesting
public synchronized List<ProviderRole> updateComponents(Map<String, Long>
public synchronized void updateComponents(Map<String, Long>
componentCounts) throws BadConfigException {
for (Component component : app.getComponents()) {
if (componentCounts.containsKey(component.getName())) {
component.setNumberOfContainers(componentCounts.get(component
.getName()));
long count = componentCounts.get(component.getName());
component.setNumberOfContainers(count);
ProviderRole role = roles.get(component.getName());
if (role != null && roleStatusMap.get(role.id) != null) {
setDesiredContainers(roleStatusMap.get(role.id), (int) count);
}
}
}
//TODO update cluster description
return buildRoleRequirementsFromResources();
}
public synchronized List<ProviderRole> updateComponents(
public synchronized void updateComponents(
Messages.FlexComponentsRequestProto requestProto)
throws BadConfigException {
Map<String, Long> componentCounts = new HashMap<>();
@ -412,116 +441,119 @@ public class AppState {
componentCounts.put(componentCount.getName(), componentCount
.getNumberOfContainers());
}
return updateComponents(componentCounts);
updateComponents(componentCounts);
}
/**
* build the role requirements from the cluster specification
* @return a list of any dynamically added provider roles
*/
private List<ProviderRole> buildRoleRequirementsFromResources()
throws BadConfigException {
List<ProviderRole> newRoles = new ArrayList<>(0);
// now update every role's desired count.
// if there are no instance values, that role count goes to zero
// Add all the existing roles
// component name -> number of containers
Map<String, Integer> groupCounts = new HashMap<>();
for (RoleStatus roleStatus : getRoleStatusMap().values()) {
if (roleStatus.isExcludeFromFlexing()) {
// skip inflexible roles, e.g AM itself
continue;
}
long currentDesired = roleStatus.getDesired();
String role = roleStatus.getName();
String roleGroup = roleStatus.getGroup();
Component component = roleStatus.getProviderRole().component;
int desiredInstanceCount = component.getNumberOfContainers().intValue();
int newDesired = desiredInstanceCount;
if (component.getUniqueComponentSupport()) {
Integer groupCount = 0;
if (groupCounts.containsKey(roleGroup)) {
groupCount = groupCounts.get(roleGroup);
}
newDesired = desiredInstanceCount - groupCount;
if (newDesired > 0) {
newDesired = 1;
groupCounts.put(roleGroup, groupCount + newDesired);
} else {
newDesired = 0;
}
}
if (newDesired == 0) {
log.info("Role {} has 0 instances specified", role);
}
if (currentDesired != newDesired) {
log.info("Role {} flexed from {} to {}", role, currentDesired,
newDesired);
setDesiredContainers(roleStatus, newDesired);
}
}
// now the dynamic ones. Iterate through the the cluster spec and
// add any role status entries not in the role status
for (Component component : app.getComponents()) {
String name = component.getName();
if (roles.containsKey(name)) {
continue;
}
if (component.getUniqueComponentSupport()) {
// THIS NAME IS A GROUP
int desiredInstanceCount = component.getNumberOfContainers().intValue();
Integer groupCount = 0;
if (groupCounts.containsKey(name)) {
groupCount = groupCounts.get(name);
}
for (int i = groupCount + 1; i <= desiredInstanceCount; i++) {
// this is a new instance of an existing group
String newName = String.format("%s%d", name, i);
if (roles.containsKey(newName)) {
continue;
}
int newPriority = getNewPriority(i);
log.info("Adding new role {}", newName);
ProviderRole dynamicRole =
createComponent(newName, name, component, newPriority);
RoleStatus newRole = buildRole(dynamicRole);
incDesiredContainers(newRole);
log.info("New role {}", newRole);
if (roleHistory != null) {
roleHistory.addNewRole(newRole);
}
newRoles.add(dynamicRole);
}
} else {
// this is a new value
log.info("Adding new role {}, num containers {}", name,
component.getNumberOfContainers());
ProviderRole dynamicRole =
createComponent(name, name, component, getNewPriority(1));
RoleStatus newRole = buildRole(dynamicRole);
incDesiredContainers(newRole,
component.getNumberOfContainers().intValue());
log.info("New role {}", newRole);
if (roleHistory != null) {
roleHistory.addNewRole(newRole);
}
newRoles.add(dynamicRole);
}
}
// and fill in all those roles with their requirements
buildRoleResourceRequirements();
return newRoles;
}
// private List<ProviderRole> buildRoleRequirementsFromResources()
// throws BadConfigException {
//
// List<ProviderRole> newRoles = new ArrayList<>(0);
//
// // now update every role's desired count.
// // if there are no instance values, that role count goes to zero
// // Add all the existing roles
// // component name -> number of containers
// Map<String, Integer> groupCounts = new HashMap<>();
//
// for (RoleStatus roleStatus : getRoleStatusMap().values()) {
// if (roleStatus.isExcludeFromFlexing()) {
// // skip inflexible roles, e.g AM itself
// continue;
// }
// long currentDesired = roleStatus.getDesired();
// String role = roleStatus.getName();
// String roleGroup = roleStatus.getGroup();
// Component component = roleStatus.getProviderRole().component;
// int desiredInstanceCount = component.getNumberOfContainers().intValue();
//
// int newDesired = desiredInstanceCount;
// if (component.getUniqueComponentSupport()) {
// Integer groupCount = 0;
// if (groupCounts.containsKey(roleGroup)) {
// groupCount = groupCounts.get(roleGroup);
// }
//
// newDesired = desiredInstanceCount - groupCount;
//
// if (newDesired > 0) {
// newDesired = 1;
// groupCounts.put(roleGroup, groupCount + newDesired);
// } else {
// newDesired = 0;
// }
// }
//
// if (newDesired == 0) {
// log.info("Role {} has 0 instances specified", role);
// }
// if (currentDesired != newDesired) {
// log.info("Role {} flexed from {} to {}", role, currentDesired,
// newDesired);
// setDesiredContainers(roleStatus, newDesired);
// }
// }
//
// log.info("Counts per component: " + groupCounts);
// // now the dynamic ones. Iterate through the the cluster spec and
// // add any role status entries not in the role status
//
// List<RoleStatus> list = new ArrayList<>(getRoleStatusMap().values());
// for (RoleStatus roleStatus : list) {
// String name = roleStatus.getName();
// Component component = roleStatus.getProviderRole().component;
// if (roles.containsKey(name)) {
// continue;
// }
// if (component.getUniqueComponentSupport()) {
// // THIS NAME IS A GROUP
// int desiredInstanceCount = component.getNumberOfContainers().intValue();
// Integer groupCount = 0;
// if (groupCounts.containsKey(name)) {
// groupCount = groupCounts.get(name);
// }
// log.info("Component " + component.getName() + ", current count = "
// + groupCount + ", desired count = " + desiredInstanceCount);
// for (int i = groupCount + 1; i <= desiredInstanceCount; i++) {
// int priority = roleStatus.getPriority();
// // this is a new instance of an existing group
// String newName = String.format("%s%d", name, i);
// int newPriority = getNewPriority(priority + i - 1);
// log.info("Adding new role {}", newName);
// ProviderRole dynamicRole =
// createComponent(newName, name, component, newPriority);
// RoleStatus newRole = buildRole(dynamicRole);
// incDesiredContainers(newRole);
// log.info("New role {}", newRole);
// if (roleHistory != null) {
// roleHistory.addNewRole(newRole);
// }
// newRoles.add(dynamicRole);
// }
// } else {
// // this is a new value
// log.info("Adding new role {}", name);
// ProviderRole dynamicRole =
// createComponent(name, name, component, roleStatus.getPriority());
// RoleStatus newRole = buildRole(dynamicRole);
// incDesiredContainers(roleStatus,
// component.getNumberOfContainers().intValue());
// log.info("New role {}", newRole);
// if (roleHistory != null) {
// roleHistory.addNewRole(newRole);
// }
// newRoles.add(dynamicRole);
// }
// }
// // and fill in all those roles with their requirements
// buildRoleResourceRequirements();
//
// return newRoles;
// }
private int getNewPriority(int start) {
if (!rolePriorityMap.containsKey(start)) {
@ -539,16 +571,20 @@ public class AppState {
* @return the role status built up
* @throws BadConfigException if a role of that priority already exists
*/
public RoleStatus buildRole(ProviderRole providerRole) throws BadConfigException {
public RoleStatus buildRole(ProviderRole providerRole, Component component)
throws BadConfigException {
// build role status map
int priority = providerRole.id;
if (roleStatusMap.containsKey(priority)) {
throw new BadConfigException("Duplicate Provider Key: %s and %s",
providerRole,
roleStatusMap.get(priority)
.getProviderRole());
throw new BadConfigException("Duplicate component priority Key: %s and %s",
providerRole, roleStatusMap.get(priority));
}
RoleStatus roleStatus = new RoleStatus(providerRole);
roleStatus.setResourceRequirements(buildResourceRequirements(roleStatus));
long prev = roleStatus.getDesired();
setDesiredContainers(roleStatus, component.getNumberOfContainers().intValue());
log.info("Set desired containers for component " + component.getName() +
" from " + prev + " to " + roleStatus.getDesired());
roleStatusMap.put(priority, roleStatus);
String name = providerRole.name;
roles.put(name, providerRole);
@ -558,16 +594,6 @@ public class AppState {
return roleStatus;
}
/**
* Build up the requirements of every resource
*/
private void buildRoleResourceRequirements() {
for (RoleStatus role : roleStatusMap.values()) {
role.setResourceRequirements(buildResourceRequirements(role));
log.info("Setting resource requirements for {} to {}", role.getName(),
role.getResourceRequirements());
}
}
/**
* Look up the status entry of a role or raise an exception
* @param key role ID
@ -731,7 +757,7 @@ public class AppState {
}
/**
* Enum all nodes by role.
* Enum all nodes by role.
* @param role role, or "" for all roles
* @return a list of nodes, may be empty
*/
@ -785,7 +811,7 @@ public class AppState {
}
/**
* Build a map of role->nodename->node-info
* Build a map of Component_name -> ContainerId -> ClusterNode
*
* @return the map of Role name to list of Cluster Nodes
*/
@ -850,7 +876,7 @@ public class AppState {
/**
* Create a container request.
* Update internal state, such as the role request count.
* Update internal state, such as the role request count.
* Anti-Affine: the {@link RoleStatus#outstandingAArequest} is set here.
* This is where role history information will be used for placement decisions.
* @param role role
@ -942,18 +968,9 @@ public class AppState {
}
private void setDesiredContainers(RoleStatus role, int n) {
int delta = n - role.getComponentMetrics().containersDesired.value();
role.getComponentMetrics().containersDesired.set(n);
appMetrics.containersDesired.set(n);
}
private void incDesiredContainers(RoleStatus role) {
role.getComponentMetrics().containersDesired.incr();
appMetrics.containersDesired.incr();
}
private void incDesiredContainers(RoleStatus role, int n) {
role.getComponentMetrics().containersDesired.incr(n);
appMetrics.containersDesired.incr(n);
appMetrics.containersDesired.incr(delta);
}
private void incCompletedContainers(RoleStatus role) {
@ -1001,7 +1018,8 @@ public class AppState {
* Build up the resource requirements for this role from the cluster
* specification, including substituting max allowed values if the
* specification asked for it (except when
* {@link ResourceKeys#YARN_RESOURCE_NORMALIZATION_ENABLED} is set to false).
* {@link org.apache.slider.api.ResourceKeys#YARN_RESOURCE_NORMALIZATION_ENABLED}
* is set to false).
* @param role role
* during normalization
*/
@ -1009,11 +1027,6 @@ public class AppState {
// Set up resource requirements from role values
String name = role.getName();
Component component = role.getProviderRole().component;
if (component == null) {
// this is for AM container
// TODO why do we need to create the component for AM ?
return Resource.newInstance(1, 512);
}
int cores = DEF_YARN_CORES;
if (component.getResource() != null && component.getResource().getCpus()
!= null) {
@ -1282,10 +1295,13 @@ public class AppState {
if (roleInstance != null) {
int roleId = roleInstance.roleId;
String rolename = roleInstance.role;
log.info("Failed container in role[{}] : {}", roleId, rolename);
log.info("Failed container in role[{}] : {}", roleId,
roleInstance.getCompInstanceName());
try {
RoleStatus roleStatus = lookupRoleStatus(roleInstance.roleId);
decRunningContainers(roleStatus);
roleStatus.getProviderRole().failedInstanceName
.offer(roleInstance.compInstanceName);
boolean shortLived = isShortLived(roleInstance);
String message;
Container failedContainer = roleInstance.container;
@ -1571,7 +1587,7 @@ public class AppState {
/**
* Look at the allocation status of one role, and trigger add/release
* actions if the number of desired role instances doesn't equal
* actions if the number of desired role instances doesn't equal
* (actual + pending).
* <p>
* MUST be executed from within a synchronized method
@ -1584,7 +1600,6 @@ public class AppState {
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private List<AbstractRMOperation> reviewOneRole(RoleStatus role)
throws SliderInternalStateException, TriggerClusterTeardownException {
log.info("review one role " + role.getName());
List<AbstractRMOperation> operations = new ArrayList<>();
long delta;
long expected;
@ -1594,9 +1609,7 @@ public class AppState {
expected = role.getDesired();
}
log.info("Reviewing {} : ", role);
log.debug("Expected {}, Requested/Running {}, Delta: {}", expected,
role.getActualAndRequested(), delta);
log.info("Reviewing " + role.getName() + ": " + role.getComponentMetrics());
checkFailureThreshold(role);
if (expected < 0 ) {
@ -1729,7 +1742,9 @@ public class AppState {
for (RoleInstance possible : finalCandidates) {
log.info("Targeting for release: {}", possible);
containerReleaseSubmitted(possible.container);
operations.add(new ContainerReleaseOperation(possible.getId()));
role.getProviderRole().failedInstanceName
.offer(possible.compInstanceName);
operations.add(new ContainerReleaseOperation(possible.getContainerId()));
}
}
@ -1783,7 +1798,7 @@ public class AppState {
for (RoleInstance role : activeRoleInstances) {
if (role.container.getId().equals(containerId)) {
containerReleaseSubmitted(role.container);
operations.add(new ContainerReleaseOperation(role.getId()));
operations.add(new ContainerReleaseOperation(role.getContainerId()));
}
}
@ -1906,17 +1921,6 @@ public class AppState {
}
}
/**
* Get diagnostics info about containers
*/
public String getContainerDiagnosticInfo() {
StringBuilder builder = new StringBuilder();
for (RoleStatus roleStatus : getRoleStatusMap().values()) {
builder.append(roleStatus).append('\n');
}
return builder.toString();
}
/**
* Event handler for the list of active containers on restart.
* Sets the info key {@link StatusKeys#INFO_CONTAINERS_AM_RESTART}
@ -1965,10 +1969,10 @@ public class AppState {
//update app state internal structures and maps
//TODO recover the component instance name from zk registry ?
RoleInstance instance = new RoleInstance(container);
instance.command = roleName;
instance.role = roleName;
instance.group = role.getGroup();
instance.roleId = roleId;
instance.environment = new String[0];
instance.container = container;

View File

@ -19,6 +19,7 @@
package org.apache.slider.server.appmaster.state;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ProtocolTypes;
@ -27,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.proto.Messages;
import org.apache.slider.api.resource.ConfigFile;
import org.apache.slider.api.types.ContainerInformation;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.providers.ProviderRole;
@ -42,6 +44,8 @@ public final class RoleInstance implements Cloneable {
public Container container;
public ProviderRole providerRole;
public long componentId = -1;
public String compInstanceName = null;
/**
* Container ID
*/
@ -58,7 +62,6 @@ public final class RoleInstance implements Cloneable {
* Name of the role
*/
public String role;
public String group;
/**
* Version of the app
@ -106,7 +109,7 @@ public final class RoleInstance implements Cloneable {
public String host;
public String hostURL;
public ContainerAllocationOutcome placement;
public Path compInstanceDir;
/**
* A list of registered endpoints.
@ -114,10 +117,24 @@ public final class RoleInstance implements Cloneable {
private List<Endpoint> endpoints =
new ArrayList<>(2);
public RoleInstance(ContainerAssignment assignment) {
this(assignment.container);
placement = assignment.placement;
public RoleInstance(Container container, ProviderRole role) {
this(container);
if (role.componentIdCounter != null) {
componentId = role.componentIdCounter.getAndIncrement();
compInstanceName = role.name + componentId;
} else {
compInstanceName = role.name;
}
this.providerRole = role;
}
public RoleInstance(Container container, ProviderRole role,
String compInstanceName) {
this(container);
this.compInstanceName = compInstanceName;
this.providerRole = role;
}
/**
* Create an instance to track an allocated container
* @param container a container which must be non null, and have a non-null Id field.
@ -136,10 +153,6 @@ public final class RoleInstance implements Cloneable {
hostURL = "http://" + container.getNodeHttpAddress();
}
}
public ContainerId getId() {
return container.getId();
}
public NodeId getHost() {
return container.getNodeId();
@ -151,6 +164,7 @@ public final class RoleInstance implements Cloneable {
new StringBuilder("RoleInstance{");
sb.append("role='").append(role).append('\'');
sb.append(", id='").append(id).append('\'');
sb.append(", instanceName='").append(compInstanceName).append('\'');
sb.append(", container=").append(SliderUtils.containerToString(container));
sb.append(", createTime=").append(createTime);
sb.append(", startTime=").append(startTime);
@ -170,7 +184,7 @@ public final class RoleInstance implements Cloneable {
}
public ContainerId getContainerId() {
return container != null ? container.getId() : null;
return container.getId();
}
/**
@ -322,4 +336,8 @@ public final class RoleInstance implements Cloneable {
}
return info;
}
public String getCompInstanceName() {
return compInstanceName;
}
}

View File

@ -272,6 +272,7 @@ public final class RoleStatus implements MetricSet {
// containers -- maybe we need releasing
//if we are releasing, remove the number that are already released.
//but never switch to a positive
// TODO, WHY is this min operation even needed ??? if delta is negative, it's always < 0 ???
delta = Math.min(delta, 0);
}
return delta;

View File

@ -214,6 +214,7 @@ public interface StateAccessForProviders {
/**
* Find out about the nodes for specific roles
* Component_name -> ContainerId -> ClusterNode
* @return
*/
Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping();

View File

@ -59,7 +59,7 @@ public interface RestApiErrorMessages {
ERROR_RESOURCE_CPUS_INVALID_RANGE
+ " for component %s (or at the global level)";
String ERROR_CONTAINERS_COUNT_INVALID =
"Required no of containers not specified";
"Invalid no of containers specified";
String ERROR_CONTAINERS_COUNT_FOR_COMP_INVALID =
ERROR_CONTAINERS_COUNT_INVALID + ERROR_SUFFIX_FOR_COMPONENT;

View File

@ -20,17 +20,30 @@ package org.apache.slider.util;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.slider.api.resource.Application;
import org.apache.slider.api.resource.Artifact;
import org.apache.slider.api.resource.Component;
import org.apache.slider.api.resource.ConfigFile;
import org.apache.slider.api.resource.Configuration;
import org.apache.slider.api.resource.Resource;
import org.apache.slider.common.tools.SliderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ServiceApiUtil {
private static final Logger log =
LoggerFactory.getLogger(ServiceApiUtil.class);
@VisibleForTesting
public static void validateApplicationPostPayload(Application application) {
public static void validateApplicationPayload(Application application,
FileSystem fs) throws IOException {
if (StringUtils.isEmpty(application.getName())) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_APPLICATION_NAME_INVALID);
@ -64,11 +77,13 @@ public class ServiceApiUtil {
application.getArtifact().getType());
// container size
if (application.getNumberOfContainers() == null) {
if (application.getNumberOfContainers() == null
|| application.getNumberOfContainers() < 0) {
throw new IllegalArgumentException(
RestApiErrorMessages.ERROR_CONTAINERS_COUNT_INVALID);
RestApiErrorMessages.ERROR_CONTAINERS_COUNT_INVALID + ": "
+ application.getNumberOfContainers());
}
validateConfigFile(application.getConfiguration().getFiles(), fs);
// Since it is a simple app with no components, create a default component
application.getComponents().add(createDefaultComponent(application));
} else {
@ -114,11 +129,13 @@ public class ServiceApiUtil {
if (comp.getNumberOfContainers() == null) {
comp.setNumberOfContainers(globalNumberOfContainers);
}
if (comp.getNumberOfContainers() == null) {
if (comp.getNumberOfContainers() == null
|| comp.getNumberOfContainers() < 0) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_CONTAINERS_COUNT_FOR_COMP_INVALID,
comp.getName()));
RestApiErrorMessages.ERROR_CONTAINERS_COUNT_FOR_COMP_INVALID
+ ": " + comp.getNumberOfContainers(), comp.getName()));
}
validateConfigFile(comp.getConfiguration().getFiles(), fs);
}
}
@ -128,6 +145,46 @@ public class ServiceApiUtil {
}
}
// 1) Verify the src_file exists and non-empty for template
// 2) dest_file is absolute path
private static void validateConfigFile(List<ConfigFile> list, FileSystem fs)
throws IOException {
Set<String> destFileSet = new HashSet<>();
for (ConfigFile file : list) {
if (file.getType().equals(ConfigFile.TypeEnum.TEMPLATE) && StringUtils
.isEmpty(file.getSrcFile())) {
throw new IllegalArgumentException(
"Src_file is empty for " + ConfigFile.TypeEnum.TEMPLATE);
}
if (!StringUtils.isEmpty(file.getSrcFile())) {
Path p = new Path(file.getSrcFile());
if (!fs.exists(p)) {
throw new IllegalArgumentException(
"Src_file does not exist for config file: " + file
.getSrcFile());
}
}
if (StringUtils.isEmpty(file.getDestFile())) {
throw new IllegalArgumentException("Dest_file is empty.");
}
// validate dest_file is absolute
if (!Paths.get(file.getDestFile()).isAbsolute()) {
throw new IllegalArgumentException(
"Dest_file must be absolute path: " + file.getDestFile());
}
if (destFileSet.contains(file.getDestFile())) {
throw new IllegalArgumentException(
"Duplicated ConfigFile exists: " + file.getDestFile());
}
destFileSet.add(file.getDestFile());
}
}
private static void validateApplicationResource(Resource resource,
Component comp, Artifact.TypeEnum artifactType) {
// Only apps/components of type APPLICATION can skip resource requirement
@ -200,4 +257,8 @@ public class ServiceApiUtil {
comp.setLaunchCommand(app.getLaunchCommand());
return comp;
}
public static String $(String s) {
return "${" + s +"}";
}
}

View File

@ -67,7 +67,7 @@ public class TestMockAppStateDynamicHistory extends BaseMockAppStateTest
return new MockYarnEngine(8, 1);
}
@Test
// TODO does not support adding new components dynamically
public void testDynamicRoleHistory() throws Throwable {
String dynamic = "dynamicRole";
@ -81,12 +81,8 @@ public class TestMockAppStateDynamicHistory extends BaseMockAppStateTest
.COMPONENT_PLACEMENT_POLICY, "" + placementPolicy);
application.getComponents().add(component);
// write the definitions
List<ProviderRole> updates = appState.updateComponents(
appState.updateComponents(
Collections.singletonMap(dynamic, desired));
assertEquals(1, updates.size());
ProviderRole updatedRole = updates.get(0);
assertEquals(updatedRole.placementPolicy, placementPolicy);
// now look at the role map
assertNotNull(appState.getRoleMap().get(dynamic));

View File

@ -87,7 +87,7 @@ public class TestMockAppStateFlexDynamicRoles extends BaseMockAppStateTest
createAndStartNodes();
}
@Test
// TODO does not support adding new components dynamically
public void testDynamicFlexAddRole() throws Throwable {
Application application = appState.getClusterStatus();
Component component = new Component().name("dynamicAdd7")
@ -96,16 +96,12 @@ public class TestMockAppStateFlexDynamicRoles extends BaseMockAppStateTest
appState.updateComponents(Collections.singletonMap(component.getName(),
component.getNumberOfContainers()));
createAndStartNodes();
dumpClusterDescription("updated CD", appState.getClusterStatus());
appState.lookupRoleStatus("dynamicAdd7");
}
@Test
public void testDynamicFlexDropRole() throws Throwable {
appState.updateComponents(Collections.singletonMap("dynamic-6", 0L));
Application getCD = appState.getClusterStatus();
dumpClusterDescription("updated CD", getCD);
//status is retained for future
appState.lookupRoleStatus("dynamic-6");
}

View File

@ -26,10 +26,15 @@ import org.apache.slider.server.appmaster.model.mock.MockRoles;
import org.apache.slider.server.appmaster.model.mock.MockYarnEngine;
import org.apache.slider.server.appmaster.state.AppStateBindingInfo;
import org.apache.slider.server.appmaster.state.MostRecentContainerReleaseSelector;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.RoleStatus;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
/**
* Test that if you have more than one role, the right roles are chosen for
@ -72,40 +77,76 @@ public class TestMockAppStateUniqueNames extends BaseMockAppStateTest
return application;
}
public static Map<String, RoleInstance> organize(List<RoleInstance>
instances) {
Map<String, RoleInstance> map = new TreeMap<>();
for (RoleInstance instance : instances) {
assertFalse("Multiple role instances for unique name " + instance
.compInstanceName, map.containsKey(instance.compInstanceName));
System.out.println("Adding to map " + instance.compInstanceName + " for" +
instance.role);
map.put(instance.compInstanceName, instance);
}
return map;
}
public static void verifyInstances(List<RoleInstance> instances, String
group, String... roles) {
assertEquals(roles.length, instances.size());
Map<String, RoleInstance> map = organize(instances);
int i = 0;
for (Entry<String, RoleInstance> entry : map.entrySet()) {
assertEquals(roles[i], entry.getKey());
RoleInstance instance = entry.getValue();
assertEquals(roles[i], instance.compInstanceName);
assertEquals(group, instance.role);
assertEquals(group, instance.providerRole.name);
assertEquals(group, instance.providerRole.group);
// TODO remove group from provider role if it continues to be unused
i++;
}
}
@Test
public void testDynamicFlexDown() throws Throwable {
createAndStartNodes();
List<RoleInstance> instances = appState.cloneOwnedContainerList();
verifyInstances(instances, "group1", "group10", "group11");
appState.updateComponents(Collections.singletonMap("group1", 0L));
createAndStartNodes();
RoleStatus roleStatus = appState.lookupRoleStatus("group11");
instances = appState.cloneOwnedContainerList();
assertEquals(0, instances.size());
RoleStatus roleStatus = appState.lookupRoleStatus("group1");
assertEquals(0, roleStatus.getDesired());
assertEquals(1024L, roleStatus.getResourceRequirements().getMemorySize());
assertEquals(2, roleStatus.getResourceRequirements().getVirtualCores());
assertEquals("group1", roleStatus.getGroup());
// now flex back up
appState.updateComponents(Collections.singletonMap("group1", 3L));
createAndStartNodes();
instances = appState.cloneOwnedContainerList();
verifyInstances(instances, "group1", "group10", "group11", "group12");
// fails because the names continue at N+1, with group12, group13, group14
}
@Test
public void testDynamicFlexUp() throws Throwable {
createAndStartNodes();
List<RoleInstance> instances = appState.cloneOwnedContainerList();
verifyInstances(instances, "group1", "group10", "group11");
appState.updateComponents(Collections.singletonMap("group1", 3L));
createAndStartNodes();
RoleStatus group11 = appState.lookupRoleStatus("group11");
RoleStatus group12 = appState.lookupRoleStatus("group12");
RoleStatus group13 = appState.lookupRoleStatus("group13");
assertEquals(1, group11.getDesired());
assertEquals(1, group12.getDesired());
assertEquals(1, group13.getDesired());
assertEquals(1024L, group11.getResourceRequirements().getMemorySize());
assertEquals(1024L, group12.getResourceRequirements().getMemorySize());
assertEquals(1024L, group13.getResourceRequirements().getMemorySize());
assertEquals(2, group11.getResourceRequirements().getVirtualCores());
assertEquals(2, group12.getResourceRequirements().getVirtualCores());
assertEquals(2, group13.getResourceRequirements().getVirtualCores());
assertEquals("group1", group11.getGroup());
assertEquals("group1", group12.getGroup());
assertEquals("group1", group13.getGroup());
instances = appState.cloneOwnedContainerList();
verifyInstances(instances, "group1", "group10", "group11", "group12");
appState.refreshClusterStatus();
RoleStatus group1 = appState.lookupRoleStatus("group1");
assertEquals(3, group1.getDesired());
assertEquals(1024L, group1.getResourceRequirements().getMemorySize());
assertEquals("group1", group1.getGroup());
}
}

View File

@ -27,6 +27,7 @@ import org.apache.slider.server.appmaster.model.mock.MockAppState;
import org.apache.slider.server.appmaster.model.mock.MockRoles;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
import org.apache.slider.server.appmaster.state.RoleStatus;
import org.junit.Test;
import java.util.Collections;
@ -47,6 +48,11 @@ public class TestMockContainerResourceAllocations extends BaseMockAppStateTest {
Component role0 = appState.getClusterStatus().getComponent(MockRoles.ROLE0);
role0.resource(new org.apache.slider.api.resource.Resource().memory("512")
.cpus(2));
// hack - because role0 is created before the test run
RoleStatus role0Status =
appState.getRoleStatusMap().get(appState.getRoleMap().get(ROLE0).id);
role0Status.setResourceRequirements(
appState.buildResourceRequirements(role0Status));
appState.updateComponents(Collections.singletonMap(role0.getName(),
role0.getNumberOfContainers()));
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes();
@ -58,12 +64,17 @@ public class TestMockContainerResourceAllocations extends BaseMockAppStateTest {
assertEquals(2, requirements.getVirtualCores());
}
//TODO replace with resource profile feature in yarn
@Test
public void testMaxMemAllocations() throws Throwable {
// max core allocations no longer supported
Component role0 = appState.getClusterStatus().getComponent(MockRoles.ROLE0);
role0.resource(new org.apache.slider.api.resource.Resource()
.memory(ResourceKeys.YARN_RESOURCE_MAX).cpus(2));
RoleStatus role0Status =
appState.getRoleStatusMap().get(appState.getRoleMap().get(ROLE0).id);
role0Status.setResourceRequirements(
appState.buildResourceRequirements(role0Status));
appState.updateComponents(Collections.singletonMap(role0.getName(),
role0.getNumberOfContainers()));
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes();

View File

@ -176,7 +176,14 @@ public abstract class BaseMockAppStateTest extends SliderTestBase implements
*/
public RoleInstance roleInstance(ContainerAssignment assigned) {
Container target = assigned.container;
RoleInstance ri = new RoleInstance(target);
String failedInstance =
assigned.role.getProviderRole().failedInstanceName.poll();
RoleInstance ri;
if (failedInstance != null) {
ri = new RoleInstance(target, assigned.role.getProviderRole(), failedInstance);
} else {
ri = new RoleInstance(target, assigned.role.getProviderRole());
}
ri.roleId = assigned.role.getPriority();
ri.role = assigned.role.getName();
return ri;

View File

@ -30,6 +30,7 @@ import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.launch.ContainerLauncher;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderService;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
@ -118,7 +119,8 @@ public class MockProviderService implements ProviderService {
@Override
public void buildContainerLaunchContext(ContainerLauncher containerLauncher,
Application application, Container container, ProviderRole providerRole,
SliderFileSystem sliderFileSystem) throws IOException, SliderException {
SliderFileSystem sliderFileSystem, RoleInstance roleInstance)
throws IOException, SliderException {
}