YARN-8094. Support configuration based Node Attribute provider. Contributed by Weiwei Yang.

This commit is contained in:
Sunil G 2018-03-31 19:53:06 +05:30
parent 6f4bc49c6d
commit 440ff7f563
3 changed files with 150 additions and 2 deletions

View File

@ -18,13 +18,19 @@
package org.apache.hadoop.yarn.server.nodemanager.nodelabels; package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.EnumUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.Set; import java.util.Set;
@ -38,6 +44,9 @@ public class ConfigurationNodeAttributesProvider
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ConfigurationNodeAttributesProvider.class); LoggerFactory.getLogger(ConfigurationNodeAttributesProvider.class);
private static final String NODE_ATTRIBUTES_DELIMITER = ":";
private static final String NODE_ATTRIBUTE_DELIMITER = ",";
public ConfigurationNodeAttributesProvider() { public ConfigurationNodeAttributesProvider() {
super("Configuration Based Node Attributes Provider"); super("Configuration Based Node Attributes Provider");
} }
@ -59,11 +68,68 @@ private void updateNodeAttributesFromConfig(Configuration conf)
setDescriptors(parseAttributes(configuredNodeAttributes)); setDescriptors(parseAttributes(configuredNodeAttributes));
} }
// TODO parse attributes from configuration
@VisibleForTesting @VisibleForTesting
public Set<NodeAttribute> parseAttributes(String config) public Set<NodeAttribute> parseAttributes(String config)
throws IOException { throws IOException {
return new HashSet<>(); if (Strings.isNullOrEmpty(config)) {
return ImmutableSet.of();
}
Set<NodeAttribute> attributeSet = new HashSet<>();
// Configuration value should be in one line, format:
// "ATTRIBUTE_NAME,ATTRIBUTE_TYPE,ATTRIBUTE_VALUE",
// multiple node-attributes are delimited by ":".
// Each attribute str should not container any space.
String[] attributeStrs = config.split(NODE_ATTRIBUTES_DELIMITER);
for (String attributeStr : attributeStrs) {
String[] fields = attributeStr.split(NODE_ATTRIBUTE_DELIMITER);
if (fields.length != 3) {
throw new IOException("Invalid value for "
+ YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES
+ "=" + config);
}
// We don't allow user config to overwrite our dist prefix,
// so disallow any prefix set in the configuration.
if (fields[0].contains("/")) {
throw new IOException("Node attribute set in "
+ YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES
+ " should not contain any prefix.");
}
// Make sure attribute type is valid.
if (!EnumUtils.isValidEnum(NodeAttributeType.class, fields[1])) {
throw new IOException("Invalid node attribute type: "
+ fields[1] + ", valid values are "
+ Arrays.asList(NodeAttributeType.values()));
}
// Automatically setup prefix for collected attributes
NodeAttribute na = NodeAttribute.newInstance(
NodeAttribute.PREFIX_DISTRIBUTED,
fields[0],
NodeAttributeType.valueOf(fields[1]),
fields[2]);
// Since a NodeAttribute is identical with another one as long as
// their prefix and name are same, to avoid attributes getting
// overwritten by ambiguous attribute, make sure it fails in such
// case.
if (!attributeSet.add(na)) {
throw new IOException("Ambiguous node attribute is found: "
+ na.toString() + ", a same attribute already exists");
}
}
// Before updating the attributes to the provider,
// verify if they are valid
try {
NodeLabelUtil.validateNodeAttributes(attributeSet);
} catch (IOException e) {
throw new IOException("Node attributes set by configuration property: "
+ YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES
+ " is not valid. Detail message: " + e.getMessage());
}
return attributeSet;
} }
private class ConfigurationMonitorTimerTask extends TimerTask { private class ConfigurationMonitorTimerTask extends TimerTask {

View File

@ -117,6 +117,14 @@ Set<NodeAttribute> parseOutput(String scriptOutput) throws IOException {
+ NODE_ATTRIBUTE_DELIMITER + "ATTRIBUTE_VALUE; but get " + NODE_ATTRIBUTE_DELIMITER + "ATTRIBUTE_VALUE; but get "
+ nodeAttribute); + nodeAttribute);
} }
// We don't allow script to overwrite our dist prefix,
// so disallow any prefix set in the script.
if (attributeStrs[0].contains("/")) {
throw new IOException("Node attributes reported by script"
+ " should not contain any prefix.");
}
// Automatically setup prefix for collected attributes // Automatically setup prefix for collected attributes
NodeAttribute na = NodeAttribute NodeAttribute na = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, .newInstance(NodeAttribute.PREFIX_DISTRIBUTED,

View File

@ -36,6 +36,7 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -182,4 +183,77 @@ public void testDisableFetchNodeAttributes() throws IOException,
Assert.fail("Expecting a failure in previous check!"); Assert.fail("Expecting a failure in previous check!");
} }
@Test
public void testFetchAttributesFromConfiguration() {
Configuration conf = new Configuration();
// Set fetch interval to -1 to disable refresh.
conf.setLong(
YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, -1);
conf.setStrings(
YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, "");
}
@Test
public void testParseConfiguration() throws IOException {
// ATTRIBUTE_NAME,ATTRIBUTE_TYPE,ATTRIBUTE_VALUE
String attributesStr = "hostname,STRING,host1234:uptime,STRING,321543";
Set<NodeAttribute> attributes = nodeAttributesProvider
.parseAttributes(attributesStr);
Assert.assertEquals(2, attributes.size());
Iterator<NodeAttribute> ait = attributes.iterator();
while(ait.hasNext()) {
NodeAttribute at = ait.next();
if (at.getAttributeName().equals("hostname")) {
Assert.assertEquals("hostname", at.getAttributeName());
Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED,
at.getAttributePrefix());
Assert.assertEquals(NodeAttributeType.STRING,
at.getAttributeType());
Assert.assertEquals("host1234", at.getAttributeValue());
} else if (at.getAttributeName().equals("uptime")) {
Assert.assertEquals("uptime", at.getAttributeName());
Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED,
at.getAttributePrefix());
Assert.assertEquals(NodeAttributeType.STRING,
at.getAttributeType());
Assert.assertEquals("321543", at.getAttributeValue());
} else {
Assert.fail("Unexpected attribute");
}
}
// Missing type
attributesStr = "hostname,host1234";
try {
nodeAttributesProvider.parseAttributes(attributesStr);
Assert.fail("Expecting a parsing failure");
} catch (IOException e) {
Assert.assertNotNull(e);
Assert.assertTrue(e.getMessage().contains("Invalid value"));
}
// Extra prefix
attributesStr = "prefix/hostname,STRING,host1234";
try {
nodeAttributesProvider.parseAttributes(attributesStr);
Assert.fail("Expecting a parsing failure");
} catch (IOException e) {
Assert.assertNotNull(e);
Assert.assertTrue(e.getMessage()
.contains("should not contain any prefix."));
}
// Invalid type
attributesStr = "hostname,T,host1234";
try {
nodeAttributesProvider.parseAttributes(attributesStr);
Assert.fail("Expecting a parsing failure");
} catch (IOException e) {
e.printStackTrace();
Assert.assertNotNull(e);
Assert.assertTrue(e.getMessage()
.contains("Invalid node attribute type"));
}
}
} }