YARN-5701. Fix issues in yarn native services apps-of-apps. Contributed by Billie Rinaldi

This commit is contained in:
Jian He 2016-10-16 17:01:09 -07:00
parent 4ec1cbe86d
commit f206a43954
5 changed files with 164 additions and 65 deletions

View File

@ -178,6 +178,7 @@
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.Console;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
@ -918,57 +919,56 @@ protected static void checkForCredentials(Configuration conf,
return;
}
BufferedReader br = null;
try {
for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
String provider = cred.getKey()
.replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName)
.replaceAll(Pattern.quote("${CLUSTER}"), clusterName);
List<String> aliases = cred.getValue();
if (aliases == null || aliases.isEmpty()) {
continue;
}
Configuration c = new Configuration(conf);
c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
CredentialProvider credentialProvider = CredentialProviderFactory.getProviders(c).get(0);
Set<String> existingAliases = new HashSet<>(credentialProvider.getAliases());
for (String alias : aliases) {
if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) {
log.info("Credentials for " + alias + " found in " + provider);
} else {
if (br == null) {
br = new BufferedReader(new InputStreamReader(System.in));
}
char[] pass = readPassword(alias, br);
credentialProvider.createCredentialEntry(alias, pass);
credentialProvider.flush();
Arrays.fill(pass, ' ');
Console console = System.console();
for (Entry<String, List<String>> cred : tree.credentials.entrySet()) {
String provider = cred.getKey()
.replaceAll(Pattern.quote("${CLUSTER_NAME}"), clusterName)
.replaceAll(Pattern.quote("${CLUSTER}"), clusterName);
List<String> aliases = cred.getValue();
if (aliases == null || aliases.isEmpty()) {
continue;
}
Configuration c = new Configuration(conf);
c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider);
CredentialProvider credentialProvider = CredentialProviderFactory.getProviders(c).get(0);
Set<String> existingAliases = new HashSet<>(credentialProvider.getAliases());
for (String alias : aliases) {
if (existingAliases.contains(alias.toLowerCase(Locale.ENGLISH))) {
log.info("Credentials for " + alias + " found in " + provider);
} else {
if (console == null) {
throw new IOException("Unable to input password for " + alias +
" because System.console() is null; provider " + provider +
" must be populated manually");
}
char[] pass = readPassword(alias, console);
credentialProvider.createCredentialEntry(alias, pass);
credentialProvider.flush();
Arrays.fill(pass, ' ');
}
}
} finally {
org.apache.hadoop.io.IOUtils.closeStream(br);
}
}
private static char[] readOnePassword(String alias) throws IOException {
try(BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
return readPassword(alias, br);
Console console = System.console();
if (console == null) {
throw new IOException("Unable to input password for " + alias +
" because System.console() is null");
}
return readPassword(alias, console);
}
// using a normal reader instead of a secure one,
// because stdin is not hooked up to the command line
private static char[] readPassword(String alias, BufferedReader br)
private static char[] readPassword(String alias, Console console)
throws IOException {
char[] cred = null;
boolean noMatch;
do {
log.info(String.format("%s %s: ", PASSWORD_PROMPT, alias));
char[] newPassword1 = br.readLine().toCharArray();
log.info(String.format("%s %s again: ", PASSWORD_PROMPT, alias));
char[] newPassword2 = br.readLine().toCharArray();
console.printf("%s %s: \n", PASSWORD_PROMPT, alias);
char[] newPassword1 = console.readPassword();
console.printf("%s %s again: \n", PASSWORD_PROMPT, alias);
char[] newPassword2 = console.readPassword();
noMatch = !Arrays.equals(newPassword1, newPassword2);
if (noMatch) {
if (newPassword1 != null) Arrays.fill(newPassword1, ' ');

View File

@ -439,6 +439,10 @@ public void resolve()
}
log.info("External appdefs after {}: {}", component, externalAppDefs);
SliderUtils.mergeMapsIgnoreDuplicateKeys(
appConf.getConfTree().credentials,
componentAppConf.getConfTree().credentials);
mergeExternalComponent(appConf, componentAppConf, component, null);
mergeExternalComponent(resources, componentConf.getResourceOperations(),
component, getNextPriority());

View File

@ -308,6 +308,7 @@ public void propagateOptions(Map<String, String> options,
public Map<String, String> filterSiteOptions(Map<String, String> options,
Map<String, String> tokenMap) {
String prefix = OptionKeys.SITE_XML_PREFIX;
String format = "${%s}";
Map<String, String> filteredOptions = new HashMap<>();
for (Map.Entry<String, String> entry : options.entrySet()) {
String key = entry.getKey();
@ -319,7 +320,7 @@ public Map<String, String> filterSiteOptions(Map<String, String> options,
token.getValue());
}
}
filteredOptions.put(key, value);
filteredOptions.put(String.format(format, key), value);
}
}
return filteredOptions;
@ -545,9 +546,14 @@ private synchronized Path uploadResource(File resource,
* @param clusterName app name
* @throws IOException file cannot be created
*/
private void createConfigFile(SliderFileSystem fileSystem, File file,
ConfigFormat configFormat, String configFileDN,
private synchronized void createConfigFile(SliderFileSystem fileSystem,
File file, ConfigFormat configFormat, String configFileDN,
Map<String, String> config, String clusterName) throws IOException {
if (file.exists()) {
log.info("Skipping writing {} file {} because it already exists",
configFormat, file);
return;
}
log.info("Writing {} file {}", configFormat, file);
ConfigUtils.prepConfigForTemplateOutputter(configFormat, config,
@ -643,11 +649,10 @@ public void localizeConfigFile(ContainerLauncher launcher,
String fileName = ConfigUtils.replaceProps(config, configFileName);
File localFile = new File(RESOURCE_DIR);
if (!localFile.exists()) {
if (!localFile.mkdir()) {
if (!localFile.mkdir() && !localFile.exists()) {
throw new IOException(RESOURCE_DIR + " could not be created!");
}
}
localFile = new File(localFile, new File(fileName).getName());
String folder = null;
if ("true".equals(config.get(PER_COMPONENT))) {
@ -655,12 +660,25 @@ public void localizeConfigFile(ContainerLauncher launcher,
} else if ("true".equals(config.get(PER_GROUP))) {
folder = roleGroup;
}
if (folder != null) {
localFile = new File(localFile, folder);
if (!localFile.exists()) {
if (!localFile.mkdir() && !localFile.exists()) {
throw new IOException(localFile + " could not be created!");
}
}
}
localFile = new File(localFile, new File(fileName).getName());
log.info("Localizing {} configs to config file {} (destination {}) " +
"based on {} configs", config.size(), localFile, fileName,
configFileDN);
createConfigFile(fileSystem, localFile, configFormat, configFileDN, config,
clusterName);
if (!localFile.exists()) {
createConfigFile(fileSystem, localFile, configFormat, configFileDN,
config, clusterName);
} else {
log.info("Local {} file {} already exists", configFormat, localFile);
}
Path destPath = uploadResource(localFile, fileSystem, folder, clusterName);
LocalResource configResource = fileSystem.createAmResource(destPath,
LocalResourceType.FILE);
@ -807,12 +825,12 @@ public void localizePackages(ContainerLauncher launcher,
*/
public Map<String, Map<String, String>> buildConfigurations(
ConfTreeOperations appConf, ConfTreeOperations internalsConf,
String containerId, String roleName, String roleGroup,
String containerId, String clusterName, String roleName, String roleGroup,
StateAccessForProviders amState) {
Map<String, Map<String, String>> configurations = new TreeMap<>();
Map<String, String> tokens = getStandardTokenMap(appConf,
internalsConf, roleName, roleGroup, containerId);
internalsConf, roleName, roleGroup, containerId, clusterName);
Set<String> configs = new HashSet<>();
configs.addAll(getApplicationConfigurationTypes(roleGroup, appConf));
@ -1163,6 +1181,32 @@ public Iterable<String> getHostsList(Collection<ClusterNode> values,
return hosts;
}
/**
* Return a list of hostnames based on current ClusterNodes.
* @param values cluster nodes
* @return list of hosts
*/
public Iterable<String> getHostNamesList(Collection<ClusterNode> values) {
List<String> hosts = new ArrayList<>();
for (ClusterNode cn : values) {
hosts.add(cn.hostname);
}
return hosts;
}
/**
* Return a list of IPs based on current ClusterNodes.
* @param values cluster nodes
* @return list of hosts
*/
public Iterable<String> getIPsList(Collection<ClusterNode> values) {
List<String> hosts = new ArrayList<>();
for (ClusterNode cn : values) {
hosts.add(cn.ip);
}
return hosts;
}
/**
* Update ServiceRecord in Registry with IP and hostname.
* @param amState access to AM state

View File

@ -82,8 +82,8 @@ public void validateInstanceDefinition(AggregateConf instanceDefinition,
if (appConf.getComponentOptBool(roleGroup, AM_CONFIG_GENERATION, false)) {
// build and localize configuration files
Map<String, Map<String, String>> configurations =
providerUtils.buildConfigurations(appConf, appConf, null, roleGroup,
roleGroup, null);
providerUtils.buildConfigurations(appConf, appConf, null,
null, roleGroup, roleGroup, null);
try {
providerUtils.localizeConfigFiles(null, roleGroup, roleGroup, appConf,
configurations, null, fs, null);

View File

@ -41,6 +41,7 @@
import org.apache.slider.core.registry.docstore.ConfigUtils;
import org.apache.slider.core.registry.docstore.ExportEntry;
import org.apache.slider.providers.AbstractProviderService;
import org.apache.slider.providers.MonitorDetail;
import org.apache.slider.providers.ProviderCore;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils;
@ -62,6 +63,9 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Scanner;
import java.util.regex.Pattern;
import static org.apache.slider.api.RoleKeys.ROLE_PREFIX;
public class DockerProviderService extends AbstractProviderService implements
ProviderCore,
@ -130,10 +134,13 @@ public void buildContainerLaunchContext(ContainerLauncher launcher,
DOCKER_USE_PRIVILEGED, false));
// Set the environment
launcher.putEnv(SliderUtils.buildEnvMap(appComponent,
providerUtils.getStandardTokenMap(getAmState().getAppConfSnapshot(),
getAmState().getInternalsSnapshot(), roleName, roleGroup,
getClusterName())));
Map<String, String> standardTokens = providerUtils.getStandardTokenMap(
getAmState().getAppConfSnapshot(), getAmState().getInternalsSnapshot(),
roleName, roleGroup, container.getId().toString(), getClusterName());
Map<String, String> replaceTokens = providerUtils.filterSiteOptions(
appConf.getComponent(roleGroup).options, standardTokens);
replaceTokens.putAll(standardTokens);
launcher.putEnv(SliderUtils.buildEnvMap(appComponent, replaceTokens));
String workDir = ApplicationConstants.Environment.PWD.$();
launcher.setEnv("WORK_DIR", workDir);
@ -169,8 +176,8 @@ public void buildContainerLaunchContext(ContainerLauncher launcher,
providerUtils.buildConfigurations(
instanceDefinition.getAppConfOperations(),
instanceDefinition.getInternalOperations(),
container.getId().toString(), roleName, roleGroup,
getAmState());
container.getId().toString(), getClusterName(),
roleName, roleGroup, getAmState());
providerUtils.localizeConfigFiles(launcher, roleName, roleGroup,
appConf, configurations, launcher.getEnv(), fileSystem,
getClusterName());
@ -251,8 +258,8 @@ public void applyInitialRegistryDefinitions(URL amWebURI,
// build and localize configuration files
Map<String, Map<String, String>> configurations =
providerUtils.buildConfigurations(appConf, getAmState()
.getInternalsSnapshot(), null, clientName, clientName,
getAmState());
.getInternalsSnapshot(), null, getClusterName(), clientName,
clientName, getAmState());
for (String configFileDN : configurations.keySet()) {
String configFileName = appConf.getComponentOpt(clientName,
@ -316,19 +323,45 @@ protected void publishExportGroups(String containerId,
getAmState().getAppConfSnapshot(), roleGroup);
String hostKeyFormat = "${%s_HOST}";
String hostNameKeyFormat = "${%s_HOSTNAME}";
String ipKeyFormat = "${%s_IP}";
// publish export groups if any
Map<String, String> replaceTokens =
providerUtils.filterSiteOptions(
appConf.getComponent(roleGroup).options,
providerUtils.getStandardTokenMap(appConf, internalsConf, roleName,
roleGroup, containerId, getClusterName()));
Map<String, String> standardTokens = providerUtils.getStandardTokenMap(
appConf, internalsConf, roleName, roleGroup, containerId,
getClusterName());
Map<String, String> replaceTokens = providerUtils.filterSiteOptions(
appConf.getComponent(roleGroup).options, standardTokens);
replaceTokens.putAll(standardTokens);
String rolePrefix = appConf.getComponentOpt(roleGroup, ROLE_PREFIX, "");
for (Map.Entry<String, Map<String, ClusterNode>> entry :
getAmState().getRoleClusterNodeMapping().entrySet()) {
String hostName = providerUtils.getHostsList(
String otherRolePrefix = appConf.getComponentOpt(entry.getKey(),
ROLE_PREFIX, "");
if (!otherRolePrefix.equals(rolePrefix)) {
// hostname replacements are only made within role prefix groups
continue;
}
String key = entry.getKey();
if (!rolePrefix.isEmpty()) {
if (!key.startsWith(rolePrefix)) {
log.warn("Something went wrong, {} doesn't start with {}", key,
rolePrefix);
continue;
}
key = key.substring(rolePrefix.length());
}
key = key.toUpperCase(Locale.ENGLISH);
String host = providerUtils.getHostsList(
entry.getValue().values(), true).iterator().next();
replaceTokens.put(String.format(hostKeyFormat,
entry.getKey().toUpperCase(Locale.ENGLISH)), hostName);
replaceTokens.put(String.format(hostKeyFormat, key), host);
String hostName = providerUtils.getHostNamesList(
entry.getValue().values()).iterator().next();
replaceTokens.put(String.format(hostNameKeyFormat, key), hostName);
String ip = providerUtils.getIPsList(
entry.getValue().values()).iterator().next();
replaceTokens.put(String.format(ipKeyFormat, key), ip);
}
replaceTokens.put("${THIS_HOST}", thisHost);
@ -338,7 +371,7 @@ protected void publishExportGroups(String containerId,
// replace host names and site properties
for (String token : replaceTokens.keySet()) {
if (value.contains(token)) {
value = value.replace(token, replaceTokens.get(token));
value = value.replaceAll(Pattern.quote(token), replaceTokens.get(token));
}
}
ExportEntry entry = new ExportEntry();
@ -350,6 +383,24 @@ protected void publishExportGroups(String containerId,
log.info("Preparing to publish. Key {} and Value {}",
export.getKey(), value);
}
providerUtils.publishExportGroup(entries, getAmState(), EXPORT_GROUP);
if (!entries.isEmpty()) {
providerUtils.publishExportGroup(entries, getAmState(), EXPORT_GROUP);
}
}
@Override
public Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterDesc) {
Map<String, MonitorDetail> details = super.buildMonitorDetails(clusterDesc);
buildRoleHostDetails(details);
return details;
}
private void buildRoleHostDetails(Map<String, MonitorDetail> details) {
for (Map.Entry<String, Map<String, ClusterNode>> entry :
getAmState().getRoleClusterNodeMapping().entrySet()) {
details.put(entry.getKey() + " Host(s)/Container(s)",
new MonitorDetail(providerUtils.getHostsList(
entry.getValue().values(), false).toString(), false));
}
}
}