YARN-2729. Support script based NodeLabelsProvider Interface in Distributed Node Label Configuration Setup. (Naganarasimha G R via rohithsharmaks)
This commit is contained in:
parent
092883b34a
commit
5acdde4744
@ -241,6 +241,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-3739. Add reservation system recovery to RM recovery process.
|
||||
(Subru Krishnan via adhoot)
|
||||
|
||||
YARN-2729. Support script based NodeLabelsProvider Interface in Distributed Node Label
|
||||
Configuration Setup. (Naganarasimha G R via rohithsharmaks)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
@ -2112,6 +2112,12 @@ public static boolean isDelegatedCentralizedNodeLabelConfiguration(
|
||||
NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
|
||||
}
|
||||
|
||||
@Private
|
||||
public static boolean areNodeLabelsEnabled(
|
||||
Configuration conf) {
|
||||
return conf.getBoolean(NODE_LABELS_ENABLED, DEFAULT_NODE_LABELS_ENABLED);
|
||||
}
|
||||
|
||||
private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX
|
||||
+ "node-labels.";
|
||||
|
||||
@ -2120,6 +2126,7 @@ public static boolean isDelegatedCentralizedNodeLabelConfiguration(
|
||||
|
||||
// whitelist names for the yarn.nodemanager.node-labels.provider
|
||||
public static final String CONFIG_NODE_LABELS_PROVIDER = "config";
|
||||
public static final String SCRIPT_NODE_LABELS_PROVIDER = "script";
|
||||
|
||||
private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
|
||||
NM_NODE_LABELS_PREFIX + "provider.";
|
||||
@ -2145,8 +2152,8 @@ public static boolean isDelegatedCentralizedNodeLabelConfiguration(
|
||||
public static final long DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS =
|
||||
DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS * 2;
|
||||
|
||||
public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
|
||||
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
|
||||
public static final String NM_PROVIDER_CONFIGURED_NODE_PARTITION =
|
||||
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-partition";
|
||||
|
||||
private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX
|
||||
+ "node-labels.";
|
||||
@ -2174,6 +2181,15 @@ public static boolean isDelegatedCentralizedNodeLabelConfiguration(
|
||||
public static final float DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD = 0.8f;
|
||||
|
||||
|
||||
private static final String NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_PREFIX =
|
||||
NM_NODE_LABELS_PROVIDER_PREFIX + "script.";
|
||||
|
||||
public static final String NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_PATH =
|
||||
NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_PREFIX + "path";
|
||||
|
||||
public static final String NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_SCRIPT_OPTS =
|
||||
NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_PREFIX + "opts";
|
||||
|
||||
public YarnConfiguration() {
|
||||
super();
|
||||
}
|
||||
|
@ -216,9 +216,7 @@ protected void initDispatcher(Configuration conf) {
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
// set if node labels enabled
|
||||
nodeLabelsEnabled =
|
||||
conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
|
||||
YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED);
|
||||
nodeLabelsEnabled = YarnConfiguration.areNodeLabelsEnabled(conf);
|
||||
|
||||
isCentralizedNodeLabelConfiguration =
|
||||
YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
|
||||
|
@ -2169,14 +2169,14 @@
|
||||
<!-- Distributed Node Labels Configuration -->
|
||||
<property>
|
||||
<description>
|
||||
When node labels "yarn.node-labels.configuration-type" is
|
||||
of type "distributed" Administrators can configure the source of the
|
||||
node labels provider by configuring this parameter. Administrators can
|
||||
specify either "config" or the class name of the provider. Configured
|
||||
When "yarn.node-labels.configuration-type" parameter in RM is configured as
|
||||
"distributed", Administrators can configure in NM, the provider for the
|
||||
node labels by configuring this parameter. Administrators can
|
||||
specify "config", "script" or the class name of the provider. Configured
|
||||
class needs to extend
|
||||
org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider.
|
||||
If "config" is specified then, "ConfigurationNodeLabelsProvider" will
|
||||
be used.
|
||||
If "config" is specified then "ConfigurationNodeLabelsProvider" and
|
||||
"script" then "ScriptNodeLabelsProvider" will be used.
|
||||
</description>
|
||||
<name>yarn.nodemanager.node-labels.provider</name>
|
||||
</property>
|
||||
@ -2207,9 +2207,9 @@
|
||||
<description>
|
||||
When node labels "yarn.nodemanager.node-labels.provider"
|
||||
is of type "config" then ConfigurationNodeLabelsProvider fetches the
|
||||
labels this parameter.
|
||||
partition from this parameter.
|
||||
</description>
|
||||
<name>yarn.nodemanager.node-labels.provider.configured-node-labels</name>
|
||||
<name>yarn.nodemanager.node-labels.provider.configured-node-partition</name>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
@ -2248,6 +2248,18 @@
|
||||
<name>yarn.resourcemanager.node-labels.provider.fetch-interval-ms</name>
|
||||
<value>1800000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The Node Label script to run. Script output Lines starting with
|
||||
"NODE_PARTITION:" will be considered for Node Labels. In case of multiple
|
||||
lines having the pattern, last one will be considered</description>
|
||||
<name>yarn.nodemanager.node-labels.provider.script.path</name>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The arguments to pass to the Node label script.</description>
|
||||
<name>yarn.nodemanager.node-labels.provider.script.opts</name>
|
||||
</property>
|
||||
<!-- Other Configuration -->
|
||||
|
||||
<property>
|
||||
|
@ -65,6 +65,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabelsProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
@ -137,18 +138,21 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
|
||||
case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER:
|
||||
provider = new ConfigurationNodeLabelsProvider();
|
||||
break;
|
||||
case YarnConfiguration.SCRIPT_NODE_LABELS_PROVIDER:
|
||||
provider = new ScriptBasedNodeLabelsProvider();
|
||||
break;
|
||||
default:
|
||||
try {
|
||||
Class<? extends NodeLabelsProvider> labelsProviderClass =
|
||||
conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null,
|
||||
NodeLabelsProvider.class);
|
||||
conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
|
||||
null, NodeLabelsProvider.class);
|
||||
provider = labelsProviderClass.newInstance();
|
||||
} catch (InstantiationException | IllegalAccessException
|
||||
| RuntimeException e) {
|
||||
LOG.error("Failed to create NodeLabelsProvider based on Configuration",
|
||||
e);
|
||||
throw new IOException("Failed to create NodeLabelsProvider : "
|
||||
+ e.getMessage(), e);
|
||||
throw new IOException(
|
||||
"Failed to create NodeLabelsProvider : " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -315,7 +319,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
nodeStatusUpdater =
|
||||
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
|
||||
} else {
|
||||
addService(nodeLabelsProvider);
|
||||
addIfService(nodeLabelsProvider);
|
||||
nodeStatusUpdater =
|
||||
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker,
|
||||
nodeLabelsProvider);
|
||||
|
@ -976,7 +976,12 @@ public String verifyRMRegistrationResponseForNodeLabels(
|
||||
.append(StringUtils.join(",", previousNodeLabels)).append("}");
|
||||
} else {
|
||||
// case where provider is set but RM did not accept the Node Labels
|
||||
LOG.error(regNMResponse.getDiagnosticsMessage());
|
||||
String errorMsgFromRM = regNMResponse.getDiagnosticsMessage();
|
||||
LOG.error(
|
||||
"NodeLabels sent from NM while registration were rejected by RM. "
|
||||
+ ((errorMsgFromRM == null)
|
||||
? "Seems like RM is configured with Centralized Labels."
|
||||
: "And with message " + regNMResponse.getDiagnosticsMessage()));
|
||||
}
|
||||
return successfulNodeLabelsRegistrationMsg.toString();
|
||||
}
|
||||
|
@ -26,6 +26,7 @@
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
@ -34,7 +35,8 @@
|
||||
* Provides base implementation of NodeLabelsProvider with Timer and expects
|
||||
* subclass to provide TimerTask which can fetch NodeLabels
|
||||
*/
|
||||
public abstract class AbstractNodeLabelsProvider extends NodeLabelsProvider {
|
||||
public abstract class AbstractNodeLabelsProvider extends AbstractService
|
||||
implements NodeLabelsProvider {
|
||||
public static final long DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER = -1;
|
||||
|
||||
// Delay after which timer task are triggered to fetch NodeLabels
|
||||
@ -94,9 +96,15 @@ protected void serviceStop() throws Exception {
|
||||
if (nodeLabelsScheduler != null) {
|
||||
nodeLabelsScheduler.cancel();
|
||||
}
|
||||
cleanUp();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* method for subclasses to cleanup.
|
||||
*/
|
||||
protected abstract void cleanUp() throws Exception ;
|
||||
|
||||
/**
|
||||
* @return Returns output from provider.
|
||||
*/
|
||||
@ -119,6 +127,15 @@ protected void setNodeLabels(Set<NodeLabel> nodeLabelsSet) {
|
||||
}
|
||||
}
|
||||
|
||||
static Set<NodeLabel> convertToNodeLabelSet(String partitionNodeLabel) {
|
||||
if (null == partitionNodeLabel) {
|
||||
return null;
|
||||
}
|
||||
Set<NodeLabel> labels = new HashSet<NodeLabel>();
|
||||
labels.add(NodeLabel.newInstance(partitionNodeLabel));
|
||||
return labels;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used only by tests to access the timer task directly
|
||||
*
|
||||
@ -128,16 +145,5 @@ TimerTask getTimerTask() {
|
||||
return timerTask;
|
||||
}
|
||||
|
||||
static Set<NodeLabel> convertToNodeLabelSet(Set<String> nodeLabels) {
|
||||
if (null == nodeLabels) {
|
||||
return null;
|
||||
}
|
||||
Set<NodeLabel> labels = new HashSet<NodeLabel>();
|
||||
for (String label : nodeLabels) {
|
||||
labels.add(NodeLabel.newInstance(label));
|
||||
}
|
||||
return labels;
|
||||
}
|
||||
|
||||
public abstract TimerTask createTimerTask();
|
||||
}
|
||||
|
@ -19,14 +19,11 @@
|
||||
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
@ -43,13 +40,9 @@ public ConfigurationNodeLabelsProvider() {
|
||||
|
||||
private void updateNodeLabelsFromConfig(Configuration conf)
|
||||
throws IOException {
|
||||
String confLabelString =
|
||||
conf.get(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, null);
|
||||
String[] nodeLabelsFromConfiguration =
|
||||
(confLabelString == null || confLabelString.isEmpty()) ? new String[] {}
|
||||
: StringUtils.getStrings(confLabelString);
|
||||
setNodeLabels(convertToNodeLabelSet(new HashSet<String>(
|
||||
Arrays.asList(nodeLabelsFromConfiguration))));
|
||||
String configuredNodePartition =
|
||||
conf.get(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_PARTITION, null);
|
||||
setNodeLabels(convertToNodeLabelSet(configuredNodePartition));
|
||||
}
|
||||
|
||||
private class ConfigurationMonitorTimerTask extends TimerTask {
|
||||
@ -67,4 +60,9 @@ public void run() {
|
||||
public TimerTask createTimerTask() {
|
||||
return new ConfigurationMonitorTimerTask();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanUp() throws Exception {
|
||||
//No cleanup Req!
|
||||
}
|
||||
}
|
||||
|
@ -20,18 +20,13 @@
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
|
||||
/**
|
||||
* Interface which will be responsible for fetching the labels
|
||||
*
|
||||
*/
|
||||
public abstract class NodeLabelsProvider extends AbstractService {
|
||||
|
||||
public NodeLabelsProvider(String name) {
|
||||
super(name);
|
||||
}
|
||||
public interface NodeLabelsProvider {
|
||||
|
||||
/**
|
||||
* Provides the labels. LabelProvider is expected to give same Labels
|
||||
|
@ -0,0 +1,190 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
* The class which provides functionality of getting the labels of the node
|
||||
* using the configured node labels provider script. "NODE_PARTITION:" is the
|
||||
* pattern which will be used to search node label partition from the out put of
|
||||
* the NodeLabels provider script
|
||||
*/
|
||||
public class ScriptBasedNodeLabelsProvider extends AbstractNodeLabelsProvider {
|
||||
/** Absolute path to the node labels script. */
|
||||
private String nodeLabelsScriptPath;
|
||||
|
||||
/** Time after which the script should be timed out */
|
||||
private long scriptTimeout;
|
||||
|
||||
/** ShellCommandExecutor used to execute monitoring script */
|
||||
ShellCommandExecutor shexec = null;
|
||||
|
||||
/** Pattern used for searching in the output of the node labels script */
|
||||
public static final String NODE_LABEL_PARTITION_PATTERN = "NODE_PARTITION:";
|
||||
|
||||
private String[] scriptArgs;
|
||||
|
||||
public ScriptBasedNodeLabelsProvider() {
|
||||
super(ScriptBasedNodeLabelsProvider.class.getName());
|
||||
}
|
||||
|
||||
/*
|
||||
* Method which initializes the values for the script path and interval time.
|
||||
*/
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
this.nodeLabelsScriptPath =
|
||||
conf.get(YarnConfiguration.NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_PATH);
|
||||
this.scriptTimeout =
|
||||
conf.getLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS,
|
||||
YarnConfiguration.DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS);
|
||||
scriptArgs = conf.getStrings(
|
||||
YarnConfiguration.NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_SCRIPT_OPTS,
|
||||
new String[] {});
|
||||
|
||||
verifyConfiguredScript();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method used to determine if or not node labels fetching script is
|
||||
* configured and whether it is fit to run. Returns true if following
|
||||
* conditions are met:
|
||||
*
|
||||
* <ol>
|
||||
* <li>Path to Node Labels fetch script is not empty</li>
|
||||
* <li>Node Labels fetch script file exists</li>
|
||||
* </ol>
|
||||
*
|
||||
* @param conf
|
||||
* @return true if node labels script can be run.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void verifyConfiguredScript()
|
||||
throws IOException {
|
||||
boolean invalidConfiguration = false;
|
||||
if (nodeLabelsScriptPath == null
|
||||
|| nodeLabelsScriptPath.trim().isEmpty()) {
|
||||
invalidConfiguration = true;
|
||||
} else {
|
||||
File f = new File(nodeLabelsScriptPath);
|
||||
invalidConfiguration = !f.exists() || !FileUtil.canExecute(f);
|
||||
}
|
||||
if (invalidConfiguration) {
|
||||
throw new IOException(
|
||||
"Distributed Node labels provider script \"" + nodeLabelsScriptPath
|
||||
+ "\" is not configured properly. Please check whether the script "
|
||||
+ "path exists, owner and the access rights are suitable for NM "
|
||||
+ "process to execute it");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Method used to terminate the Node Labels Fetch script.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public void cleanUp() {
|
||||
if (shexec != null) {
|
||||
Process p = shexec.getProcess();
|
||||
if (p != null) {
|
||||
p.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimerTask createTimerTask() {
|
||||
return new NodeLabelsScriptRunner();
|
||||
}
|
||||
|
||||
/**
|
||||
* Class which is used by the {@link Timer} class to periodically execute the
|
||||
* node labels script.
|
||||
*/
|
||||
private class NodeLabelsScriptRunner extends TimerTask {
|
||||
|
||||
private final Log LOG = LogFactory.getLog(NodeLabelsScriptRunner.class);
|
||||
|
||||
public NodeLabelsScriptRunner() {
|
||||
ArrayList<String> execScript = new ArrayList<String>();
|
||||
execScript.add(nodeLabelsScriptPath);
|
||||
if (scriptArgs != null) {
|
||||
execScript.addAll(Arrays.asList(scriptArgs));
|
||||
}
|
||||
shexec = new ShellCommandExecutor(
|
||||
execScript.toArray(new String[execScript.size()]), null, null,
|
||||
scriptTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
shexec.execute();
|
||||
setNodeLabels(fetchLabelsFromScriptOutput(shexec.getOutput()));
|
||||
} catch (Exception e) {
|
||||
if (shexec.isTimedOut()) {
|
||||
LOG.warn("Node Labels script timed out, Caught exception : "
|
||||
+ e.getMessage(), e);
|
||||
} else {
|
||||
LOG.warn("Execution of Node Labels script failed, Caught exception : "
|
||||
+ e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Method which collect lines from the output string which begins with
|
||||
* Patterns provided.
|
||||
*
|
||||
* @param scriptOutput string
|
||||
* @return true if output string has error pattern in it.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Set<NodeLabel> fetchLabelsFromScriptOutput(String scriptOutput)
|
||||
throws IOException {
|
||||
String nodePartitionLabel = null;
|
||||
String[] splits = scriptOutput.split("\n");
|
||||
for (String line : splits) {
|
||||
String trimmedLine = line.trim();
|
||||
if (trimmedLine.startsWith(NODE_LABEL_PARTITION_PATTERN)) {
|
||||
nodePartitionLabel =
|
||||
trimmedLine.substring(NODE_LABEL_PARTITION_PATTERN.length());
|
||||
}
|
||||
}
|
||||
return convertToNodeLabelSet(nodePartitionLabel);
|
||||
}
|
||||
}
|
||||
}
|
@ -192,14 +192,10 @@ public UnRegisterNodeManagerResponse unRegisterNodeManager(
|
||||
}
|
||||
}
|
||||
|
||||
public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
|
||||
public static class DummyNodeLabelsProvider implements NodeLabelsProvider {
|
||||
|
||||
private Set<NodeLabel> nodeLabels = CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
|
||||
|
||||
public DummyNodeLabelsProvider() {
|
||||
super(DummyNodeLabelsProvider.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Set<NodeLabel> getNodeLabels() {
|
||||
return nodeLabels;
|
||||
|
@ -51,8 +51,11 @@ public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase {
|
||||
|
||||
private ConfigurationNodeLabelsProvider nodeLabelsProvider;
|
||||
|
||||
private static ClassLoader classContextClassLoader;
|
||||
|
||||
@BeforeClass
|
||||
public static void create() {
|
||||
classContextClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
loader =
|
||||
new XMLPathClassLoader(
|
||||
TestConfigurationNodeLabelsProvider.class.getClassLoader());
|
||||
@ -75,9 +78,14 @@ public void tearDown() throws Exception {
|
||||
|
||||
@AfterClass
|
||||
public static void remove() throws Exception {
|
||||
if (classContextClassLoader != null) {
|
||||
// testcases will fail after testcases present in this class, as
|
||||
// yarn-site.xml will be deleted
|
||||
Thread.currentThread().setContextClassLoader(classContextClassLoader);
|
||||
}
|
||||
if (testRootDir.exists()) {
|
||||
FileContext.getLocalFSFileContext().delete(
|
||||
new Path(testRootDir.getAbsolutePath()), true);
|
||||
FileContext.getLocalFSFileContext()
|
||||
.delete(new Path(testRootDir.getAbsolutePath()), true);
|
||||
}
|
||||
}
|
||||
|
||||
@ -85,25 +93,25 @@ public static void remove() throws Exception {
|
||||
public void testNodeLabelsFromConfig() throws IOException,
|
||||
InterruptedException {
|
||||
Configuration conf = new Configuration();
|
||||
modifyConf("A,B,CX");
|
||||
modifyConf("A");
|
||||
nodeLabelsProvider.init(conf);
|
||||
// test for ensuring labels are set during initialization of the class
|
||||
nodeLabelsProvider.start();
|
||||
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
|
||||
assertNLCollectionEquals(toNodeLabelSet("A"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
|
||||
// test for valid Modification
|
||||
TimerTask timerTask = nodeLabelsProvider.getTimerTask();
|
||||
modifyConf("X,y,Z");
|
||||
modifyConf("X");
|
||||
timerTask.run();
|
||||
assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"),
|
||||
assertNLCollectionEquals(toNodeLabelSet("X"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigForNoTimer() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
modifyConf("A,B,CX");
|
||||
modifyConf("A");
|
||||
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
|
||||
AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER);
|
||||
nodeLabelsProvider.init(conf);
|
||||
@ -114,14 +122,14 @@ public void testConfigForNoTimer() throws Exception {
|
||||
nodeLabelsProvider.nodeLabelsScheduler);
|
||||
// Ensure that even though timer is not run, node labels are fetched at least once so
|
||||
// that NM registers/updates Labels with RM
|
||||
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
|
||||
assertNLCollectionEquals(toNodeLabelSet("A"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigTimer() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
modifyConf("A,B,CX");
|
||||
modifyConf("A");
|
||||
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
|
||||
1000);
|
||||
nodeLabelsProvider.init(conf);
|
||||
@ -129,11 +137,11 @@ public void testConfigTimer() throws Exception {
|
||||
// Ensure that even though timer is not run, node labels are fetched at
|
||||
// least once so
|
||||
// that NM registers/updates Labels with RM
|
||||
assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
|
||||
assertNLCollectionEquals(toNodeLabelSet("A"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
modifyConf("X,y,Z");
|
||||
modifyConf("X");
|
||||
Thread.sleep(1500);
|
||||
assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"),
|
||||
assertNLCollectionEquals(toNodeLabelSet("X"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
|
||||
}
|
||||
@ -141,7 +149,7 @@ public void testConfigTimer() throws Exception {
|
||||
private static void modifyConf(String nodeLabels)
|
||||
throws FileNotFoundException, IOException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, nodeLabels);
|
||||
conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_PARTITION, nodeLabels);
|
||||
FileOutputStream confStream = new FileOutputStream(nodeLabelsConfigFile);
|
||||
conf.writeXml(confStream);
|
||||
IOUtils.closeQuietly(confStream);
|
||||
|
@ -0,0 +1,209 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.service.ServiceStateException;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestScriptBasedNodeLabelsProvider extends NodeLabelTestBase {
|
||||
|
||||
protected static File testRootDir = new File("target",
|
||||
TestScriptBasedNodeLabelsProvider.class.getName() + "-localDir")
|
||||
.getAbsoluteFile();
|
||||
|
||||
private final File nodeLabelsScriptFile =
|
||||
new File(testRootDir, Shell.appendScriptExtension("failingscript"));
|
||||
|
||||
private ScriptBasedNodeLabelsProvider nodeLabelsProvider;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
testRootDir.mkdirs();
|
||||
nodeLabelsProvider = new ScriptBasedNodeLabelsProvider();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (testRootDir.exists()) {
|
||||
FileContext.getLocalFSFileContext()
|
||||
.delete(new Path(testRootDir.getAbsolutePath()), true);
|
||||
}
|
||||
if (nodeLabelsProvider != null) {
|
||||
nodeLabelsProvider.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration getConfForNodeLabelScript() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_PATH,
|
||||
nodeLabelsScriptFile.getAbsolutePath());
|
||||
// set bigger interval so that test cases can be run
|
||||
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
|
||||
1 * 60 * 60 * 1000l);
|
||||
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS,
|
||||
1000);
|
||||
return conf;
|
||||
}
|
||||
|
||||
private void writeNodeLabelsScriptFile(String scriptStr,
|
||||
boolean setExecutable) throws IOException {
|
||||
PrintWriter pw = null;
|
||||
try {
|
||||
FileUtil.setWritable(nodeLabelsScriptFile, true);
|
||||
FileUtil.setReadable(nodeLabelsScriptFile, true);
|
||||
pw = new PrintWriter(new FileOutputStream(nodeLabelsScriptFile));
|
||||
pw.println(scriptStr);
|
||||
pw.flush();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
Assert.fail();
|
||||
} finally {
|
||||
if (null != pw) {
|
||||
pw.close();
|
||||
}
|
||||
}
|
||||
FileUtil.setExecutable(nodeLabelsScriptFile, setExecutable);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeLabelsScriptRunnerCreation() throws IOException {
|
||||
// If no script configured then initialization of service should fail
|
||||
ScriptBasedNodeLabelsProvider nodeLabelsProvider =
|
||||
new ScriptBasedNodeLabelsProvider();
|
||||
initilizeServiceFailTest(
|
||||
"Expected to fail fast when no script is configured and "
|
||||
+ "ScriptBasedNodeLabelsProvider service is inited",
|
||||
nodeLabelsProvider);
|
||||
|
||||
// If script configured is blank then initialization of service should fail
|
||||
nodeLabelsProvider = new ScriptBasedNodeLabelsProvider();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.NM_SCRIPT_BASED_NODE_LABELS_PROVIDER_PATH, "");
|
||||
initilizeServiceFailTest(
|
||||
"Expected to fail fast when script path configuration is blank"
|
||||
+ "and ScriptBasedNodeLabelsProvider service is inited.",
|
||||
nodeLabelsProvider);
|
||||
|
||||
// If script configured is not executable then no timertask /
|
||||
// NodeLabelsScriptRunner initialized
|
||||
nodeLabelsProvider = new ScriptBasedNodeLabelsProvider();
|
||||
writeNodeLabelsScriptFile("", false);
|
||||
initilizeServiceFailTest(
|
||||
"Expected to fail fast when script is not executable"
|
||||
+ "and ScriptBasedNodeLabelsProvider service is inited.",
|
||||
nodeLabelsProvider);
|
||||
|
||||
// If configured script is executable then timertask /
|
||||
// NodeLabelsScriptRunner should be initialized
|
||||
nodeLabelsProvider = new ScriptBasedNodeLabelsProvider();
|
||||
writeNodeLabelsScriptFile("", true);
|
||||
nodeLabelsProvider.init(getConfForNodeLabelScript());
|
||||
nodeLabelsProvider.start();
|
||||
Assert
|
||||
.assertNotNull("Node Label Script runner should be started when script"
|
||||
+ " is executable", nodeLabelsProvider.getTimerTask());
|
||||
nodeLabelsProvider.stop();
|
||||
}
|
||||
|
||||
private void initilizeServiceFailTest(String message,
|
||||
ScriptBasedNodeLabelsProvider nodeLabelsProvider) {
|
||||
try {
|
||||
nodeLabelsProvider.init(new Configuration());
|
||||
Assert.fail(message);
|
||||
} catch (ServiceStateException ex) {
|
||||
Assert.assertEquals("IOException was expected", IOException.class,
|
||||
ex.getCause().getClass());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigForNoTimer() throws Exception {
|
||||
Configuration conf = getConfForNodeLabelScript();
|
||||
conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
|
||||
AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER);
|
||||
String normalScript = "echo NODE_PARTITION:X86";
|
||||
writeNodeLabelsScriptFile(normalScript, true);
|
||||
nodeLabelsProvider.init(conf);
|
||||
nodeLabelsProvider.start();
|
||||
Assert.assertNull(
|
||||
"Timer is not expected to be created when interval is configured as -1",
|
||||
nodeLabelsProvider.nodeLabelsScheduler);
|
||||
// Ensure that even though timer is not run script is run at least once so
|
||||
// that NM registers/updates Labels with RM
|
||||
assertNLCollectionEquals(toNodeLabelSet("X86"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeLabelsScript() throws Exception {
|
||||
String scriptWithoutLabels = "";
|
||||
String normalScript = "echo NODE_PARTITION:Windows";
|
||||
String scrptWithMultipleLinesHavingNodeLabels =
|
||||
"echo NODE_PARTITION:RAM\n echo NODE_PARTITION:JDK1_6";
|
||||
String timeOutScript = Shell.WINDOWS
|
||||
? "@echo off\nping -n 4 127.0.0.1 >nul\n" + "echo NODE_PARTITION:ALL"
|
||||
: "sleep 4\necho NODE_PARTITION:ALL";
|
||||
|
||||
writeNodeLabelsScriptFile(scriptWithoutLabels, true);
|
||||
nodeLabelsProvider.init(getConfForNodeLabelScript());
|
||||
nodeLabelsProvider.start();
|
||||
Thread.sleep(500l);
|
||||
TimerTask timerTask = nodeLabelsProvider.getTimerTask();
|
||||
timerTask.run();
|
||||
Assert.assertNull(
|
||||
"Node Label Script runner should return null when script doesnt "
|
||||
+ "give any Labels output",
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
|
||||
writeNodeLabelsScriptFile(normalScript, true);
|
||||
timerTask.run();
|
||||
assertNLCollectionEquals(toNodeLabelSet("Windows"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
|
||||
// multiple lines with partition tag then the last line's partition info
|
||||
// needs to be taken.
|
||||
writeNodeLabelsScriptFile(scrptWithMultipleLinesHavingNodeLabels, true);
|
||||
timerTask.run();
|
||||
assertNLCollectionEquals(toNodeLabelSet("JDK1_6"),
|
||||
nodeLabelsProvider.getNodeLabels());
|
||||
|
||||
// timeout script.
|
||||
writeNodeLabelsScriptFile(timeOutScript, true);
|
||||
timerTask.run();
|
||||
|
||||
Assert.assertNotEquals("Node Labels should not be set after timeout ",
|
||||
toNodeLabelSet("ALL"), nodeLabelsProvider.getNodeLabels());
|
||||
}
|
||||
}
|
@ -149,10 +149,12 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
|
||||
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
|
||||
|
||||
if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
|
||||
isDistributedNodeLabelsConf =
|
||||
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
|
||||
isDelegatedCentralizedNodeLabelsConf = YarnConfiguration
|
||||
.isDelegatedCentralizedNodeLabelConfiguration(conf);
|
||||
isDelegatedCentralizedNodeLabelsConf =
|
||||
YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
|
||||
}
|
||||
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user