YARN-7871. Node attributes reporting from NM to RM. Contributed by Weiwei Yang.

This commit is contained in:
Naganarasimha 2018-03-12 08:05:53 +08:00 committed by Sunil G
parent 86d024ef2a
commit 3b3b6efe21
15 changed files with 718 additions and 52 deletions

View File

@ -3548,9 +3548,12 @@ public static boolean areNodeLabelsEnabled(
public static final String NM_NODE_LABELS_PROVIDER_CONFIG =
NM_NODE_LABELS_PREFIX + "provider";
public static final String NM_NODE_ATTRIBUTES_PROVIDER_CONFIG =
NM_NODE_ATTRIBUTES_PREFIX + "provider";
// whitelist names for the yarn.nodemanager.node-labels.provider
public static final String CONFIG_NODE_LABELS_PROVIDER = "config";
public static final String SCRIPT_NODE_LABELS_PROVIDER = "script";
public static final String CONFIG_NODE_DESCRIPTOR_PROVIDER = "config";
public static final String SCRIPT_NODE_DESCRIPTOR_PROVIDER = "script";
private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
NM_NODE_LABELS_PREFIX + "provider.";
@ -3582,6 +3585,9 @@ public static boolean areNodeLabelsEnabled(
public static final String NM_PROVIDER_CONFIGURED_NODE_PARTITION =
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-partition";
public static final String NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES =
NM_NODE_ATTRIBUTES_PROVIDER_PREFIX + "configured-node-attributes";
private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX
+ "node-labels.";

View File

@ -35,15 +35,18 @@ public NodeAttributesManager(String name) {
/**
* To completely replace the mappings for a given node with the new Set of
* Attributes. If the mapping contains an attribute whose type does not match
* a previously existing Attribute under the same prefix (name space) then
* exception is thrown. Key would be name of the node and value would be set
* of Attributes to be mapped.
* Attributes which are under a given prefix. If the mapping contains an
* attribute whose type does not match a previously existing Attribute
* under the same prefix (name space) then exception is thrown.
* Key would be name of the node and value would be set of Attributes to
* be mapped. If the prefix is null, then all node attributes will be
* replaced regardless of what prefix they have.
*
* @param nodeAttributeMapping
* @throws IOException
* @param prefix node attribute prefix
* @param nodeAttributeMapping host name to a set of node attributes mapping
* @throws IOException if failed to replace attributes
*/
public abstract void replaceNodeAttributes(
public abstract void replaceNodeAttributes(String prefix,
Map<String, Set<NodeAttribute>> nodeAttributeMapping) throws IOException;
/**

View File

@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Utility class for all NodeLabel and NodeAttribute operations.
@ -125,4 +126,22 @@ public static void validateNodeAttributes(Set<NodeAttribute> attributeSet)
}
}
}
/**
* Filter a set of node attributes by a given prefix. Returns a filtered
* set of node attributes whose prefix equals the given prefix.
* If the prefix is null or empty, then the original set is returned.
* @param attributeSet node attribute set
* @param prefix node attribute prefix
* @return a filtered set of node attributes
*/
public static Set<NodeAttribute> filterAttributesByPrefix(
Set<NodeAttribute> attributeSet, String prefix) {
if (Strings.isNullOrEmpty(prefix)) {
return attributeSet;
}
return attributeSet.stream().filter(
nodeAttribute -> prefix.equals(nodeAttribute.getAttributePrefix()))
.collect(Collectors.toSet());
}
}

View File

@ -2902,6 +2902,20 @@
</property>
<!-- Distributed Node Attributes Configuration -->
<property>
<description>
This property determines which provider will be plugged by the
node manager to collect node-attributes. Administrators can
configure "config", "script" or the class name of the provider.
Configured class needs to extend
org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider.
If "config" is configured, then "ConfigurationNodeLabelsProvider" and if
"script" is configured, then "ScriptBasedNodeAttributesProvider"
will be used.
</description>
<name>yarn.nodemanager.node-attributes.provider</name>
</property>
<property>
<description>
The node attribute script NM runs to collect node attributes.
@ -2939,6 +2953,16 @@
<value>1200000</value>
</property>
<property>
<description>
When "yarn.nodemanager.node-attributes.provider" is configured with
"config" then ConfigurationNodeAttributesProvider fetches node attributes
from this parameter.
</description>
<name>yarn.nodemanager.node-attributes.provider.configured-node-attributes</name>
<value></value>
</property>
<property>
<description>
Timeout in seconds for YARN node graceful decommission.

View File

@ -66,6 +66,9 @@
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeAttributesProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeAttributesProvider;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -123,6 +126,7 @@ public int getExitCode() {
private ApplicationACLsManager aclsManager;
private NodeHealthCheckerService nodeHealthChecker;
private NodeLabelsProvider nodeLabelsProvider;
private NodeAttributesProvider nodeAttributesProvider;
private LocalDirsHandlerService dirsHandler;
private Context context;
private AsyncDispatcher dispatcher;
@ -162,14 +166,45 @@ public static long getNMStartupTime() {
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics, nodeLabelsProvider);
metrics);
}
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
NodeLabelsProvider nodeLabelsProvider) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics, nodeLabelsProvider);
protected NodeAttributesProvider createNodeAttributesProvider(
Configuration conf) throws IOException {
NodeAttributesProvider attributesProvider = null;
String providerString =
conf.get(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG, null);
if (providerString == null || providerString.trim().length() == 0) {
return attributesProvider;
}
switch (providerString.trim().toLowerCase()) {
case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:
attributesProvider = new ConfigurationNodeAttributesProvider();
break;
case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:
attributesProvider = new ScriptBasedNodeAttributesProvider();
break;
default:
try {
Class<? extends NodeAttributesProvider> labelsProviderClass =
conf.getClass(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG,
null, NodeAttributesProvider.class);
attributesProvider = labelsProviderClass.newInstance();
} catch (InstantiationException | IllegalAccessException
| RuntimeException e) {
LOG.error("Failed to create NodeAttributesProvider"
+ " based on Configuration", e);
throw new IOException(
"Failed to create NodeAttributesProvider : "
+ e.getMessage(), e);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Distributed Node Attributes is enabled"
+ " with provider class as : "
+ attributesProvider.getClass().toString());
}
return attributesProvider;
}
protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
@ -182,10 +217,10 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
return provider;
}
switch (providerString.trim().toLowerCase()) {
case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER:
case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:
provider = new ConfigurationNodeLabelsProvider();
break;
case YarnConfiguration.SCRIPT_NODE_LABELS_PROVIDER:
case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:
provider = new ScriptBasedNodeLabelsProvider();
break;
default:
@ -407,16 +442,19 @@ protected void serviceInit(Configuration conf) throws Exception {
((NMContext)context).setContainerExecutor(exec);
((NMContext)context).setDeletionService(del);
nodeLabelsProvider = createNodeLabelsProvider(conf);
if (null == nodeLabelsProvider) {
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
} else {
nodeLabelsProvider = createNodeLabelsProvider(conf);
if (nodeLabelsProvider != null) {
addIfService(nodeLabelsProvider);
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker,
nodeLabelsProvider);
nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider);
}
nodeAttributesProvider = createNodeAttributesProvider(conf);
if (nodeAttributesProvider != null) {
addIfService(nodeAttributesProvider);
nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider);
}
nodeResourceMonitor = createNodeResourceMonitor();

View File

@ -20,6 +20,8 @@
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
public interface NodeStatusUpdater extends Service {
@ -59,4 +61,16 @@ public interface NodeStatusUpdater extends Service {
* @param ex exception that makes the node unhealthy
*/
void reportException(Exception ex);
/**
* Sets a node attributes provider to node manager.
* @param provider
*/
void setNodeAttributesProvider(NodeAttributesProvider provider);
/**
* Sets a node labels provider to the node manager.
* @param provider
*/
void setNodeLabelsProvider(NodeLabelsProvider provider);
}

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -85,6 +86,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
@ -152,21 +154,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
private NMNodeLabelsHandler nodeLabelsHandler;
private final NodeLabelsProvider nodeLabelsProvider;
private NMNodeAttributesHandler nodeAttributesHandler;
private NodeLabelsProvider nodeLabelsProvider;
private NodeAttributesProvider nodeAttributesProvider;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
this(context, dispatcher, healthChecker, metrics, null);
}
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
NodeLabelsProvider nodeLabelsProvider) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
this.context = context;
this.dispatcher = dispatcher;
this.nodeLabelsProvider = nodeLabelsProvider;
this.metrics = metrics;
this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
this.pendingCompletedContainers =
@ -175,6 +172,16 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
new ArrayList<LogAggregationReport>();
}
@Override
public void setNodeAttributesProvider(NodeAttributesProvider provider) {
this.nodeAttributesProvider = provider;
}
@Override
public void setNodeLabelsProvider(NodeLabelsProvider provider) {
this.nodeLabelsProvider = provider;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.totalResource = NodeManagerHardwareUtils.getNodeResources(conf);
@ -214,7 +221,11 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
nodeLabelsHandler =
createNMNodeLabelsHandler(nodeLabelsProvider);
nodeAttributesHandler =
createNMNodeAttributesHandler(nodeAttributesProvider);
// Default duration to track stopped containers on nodemanager is 10Min.
// This should not be assigned very large value as it will remember all the
// containers stopped during that time.
@ -856,6 +867,43 @@ private NMNodeLabelsHandler createNMNodeLabelsHandler(
}
}
/**
* Returns a handler based on the configured node attributes provider.
* returns null if no provider is configured.
* @param provider
* @return attributes handler
*/
private NMNodeAttributesHandler createNMNodeAttributesHandler(
NodeAttributesProvider provider) {
return provider == null ? null :
new NMDistributedNodeAttributesHandler(nodeAttributesProvider);
}
private interface NMNodeAttributesHandler {
/**
* @return the node attributes of this node manager.
*/
Set<NodeAttribute> getNodeAttributesForHeartbeat();
}
private static class NMDistributedNodeAttributesHandler
implements NMNodeAttributesHandler {
private final NodeAttributesProvider attributesProvider;
protected NMDistributedNodeAttributesHandler(
NodeAttributesProvider provider) {
this.attributesProvider = provider;
}
@Override
public Set<NodeAttribute> getNodeAttributesForHeartbeat() {
return attributesProvider.getDescriptors();
}
}
private static interface NMNodeLabelsHandler {
/**
* validates nodeLabels From Provider and returns it to the caller. Also
@ -1071,6 +1119,9 @@ public void run() {
NodeHeartbeatResponse response = null;
Set<NodeLabel> nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
Set<NodeAttribute> nodeAttributesForHeartbeat =
nodeAttributesHandler == null ? null :
nodeAttributesHandler.getNodeAttributesForHeartbeat();
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
@ -1079,6 +1130,7 @@ public void run() {
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat,
nodeAttributesForHeartbeat,
NodeStatusUpdaterImpl.this.context
.getRegisteringCollectors());

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.TimerTask;
import java.util.Set;
/**
* Configuration based node attributes provider.
*/
public class ConfigurationNodeAttributesProvider
extends NodeAttributesProvider {
private static final Logger LOG =
LoggerFactory.getLogger(ConfigurationNodeAttributesProvider.class);
public ConfigurationNodeAttributesProvider() {
super("Configuration Based Node Attributes Provider");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
long taskInterval = conf.getLong(YarnConfiguration
.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS,
YarnConfiguration
.DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS);
this.setIntervalTime(taskInterval);
super.serviceInit(conf);
}
private void updateNodeAttributesFromConfig(Configuration conf)
throws IOException {
String configuredNodeAttributes = conf.get(
YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, null);
setDescriptors(parseAttributes(configuredNodeAttributes));
}
// TODO parse attributes from configuration
@VisibleForTesting
public Set<NodeAttribute> parseAttributes(String config)
throws IOException {
return new HashSet<>();
}
private class ConfigurationMonitorTimerTask extends TimerTask {
@Override
public void run() {
try {
updateNodeAttributesFromConfig(new YarnConfiguration());
} catch (Exception e) {
LOG.error("Failed to update node attributes from "
+ YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, e);
}
}
}
@Override
protected void cleanUp() throws Exception {
// Nothing to cleanup
}
@Override
public TimerTask createTimerTask() {
return new ConfigurationMonitorTimerTask();
}
}

View File

@ -160,7 +160,7 @@ public void testCreationOfNodeLabelsProviderService()
// With valid whitelisted configurations
conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER);
YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER);
labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
Assert.assertNotNull("LabelsProviderService should be initialized When "
+ "node labels provider class is configured", labelsProviderService);

View File

@ -225,11 +225,10 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
NodeLabelsProvider labelsProvider) {
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics, labelsProvider) {
metrics) {
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
@ -325,11 +324,10 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
NodeLabelsProvider labelsProvider) {
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics, labelsProvider) {
metrics) {
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Test;
import org.junit.Assert;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;
/**
* Test class for node configuration node attributes provider.
*/
public class TestConfigurationNodeAttributesProvider {
private static File testRootDir = new File("target",
TestConfigurationNodeAttributesProvider.class.getName() + "-localDir")
.getAbsoluteFile();
private ConfigurationNodeAttributesProvider nodeAttributesProvider;
@BeforeClass
public static void create() {
testRootDir.mkdirs();
}
@Before
public void setup() {
nodeAttributesProvider = new ConfigurationNodeAttributesProvider();
}
@After
public void tearDown() throws Exception {
if (nodeAttributesProvider != null) {
nodeAttributesProvider.close();
nodeAttributesProvider.stop();
}
}
@AfterClass
public static void remove() throws Exception {
if (testRootDir.exists()) {
FileContext.getLocalFSFileContext()
.delete(new Path(testRootDir.getAbsolutePath()), true);
}
}
@Test(timeout=30000L)
public void testNodeAttributesFetchInterval()
throws IOException, InterruptedException {
Set<NodeAttribute> expectedAttributes1 = new HashSet<>();
expectedAttributes1.add(NodeAttribute
.newInstance("test.io", "host",
NodeAttributeType.STRING, "host1"));
Configuration conf = new Configuration();
// Set fetch interval to 1s for testing
conf.setLong(
YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, 1000);
ConfigurationNodeAttributesProvider spyProvider =
Mockito.spy(nodeAttributesProvider);
Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
.thenReturn(expectedAttributes1);
spyProvider.init(conf);
spyProvider.start();
// Verify init value is honored.
Assert.assertEquals(expectedAttributes1, spyProvider.getDescriptors());
// Configuration provider provides a different set of attributes.
Set<NodeAttribute> expectedAttributes2 = new HashSet<>();
expectedAttributes2.add(NodeAttribute
.newInstance("test.io", "os",
NodeAttributeType.STRING, "windows"));
Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
.thenReturn(expectedAttributes2);
// Since we set fetch interval to 1s, it needs to wait for 1s until
// the updated attributes is updated to the provider. So we are expecting
// to see some old values for a short window.
ArrayList<String> keysMet = new ArrayList<>();
int numOfOldValue = 0;
int numOfNewValue = 0;
// Run 5 times in 500ms interval
int times=5;
while(times>0) {
Set<NodeAttribute> current = spyProvider.getDescriptors();
Assert.assertEquals(1, current.size());
String attributeName = current.iterator().next().getAttributeName();
if ("host".equals(attributeName)){
numOfOldValue++;
} else if ("os".equals(attributeName)) {
numOfNewValue++;
}
Thread.sleep(500);
times--;
}
// We should either see the old value or the new value.
Assert.assertEquals(5, numOfNewValue + numOfOldValue);
// Both values should be more than 0.
Assert.assertTrue(numOfOldValue > 0);
Assert.assertTrue(numOfNewValue > 0);
}
@Test
public void testDisableFetchNodeAttributes() throws IOException,
InterruptedException {
Set<NodeAttribute> expectedAttributes1 = new HashSet<>();
expectedAttributes1.add(NodeAttribute
.newInstance("test.io", "host",
NodeAttributeType.STRING, "host1"));
Configuration conf = new Configuration();
// Set fetch interval to -1 to disable refresh.
conf.setLong(
YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, -1);
ConfigurationNodeAttributesProvider spyProvider =
Mockito.spy(nodeAttributesProvider);
Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
.thenReturn(expectedAttributes1);
spyProvider.init(conf);
spyProvider.start();
Assert.assertEquals(expectedAttributes1,
spyProvider.getDescriptors());
// The configuration added another attribute,
// as we disabled the fetch interval, this value cannot be
// updated to the provider.
Set<NodeAttribute> expectedAttributes2 = new HashSet<>();
expectedAttributes2.add(NodeAttribute
.newInstance("test.io", "os",
NodeAttributeType.STRING, "windows"));
Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
.thenReturn(expectedAttributes2);
// Wait a few seconds until we get the value update, expecting a failure.
try {
GenericTestUtils.waitFor(() -> {
Set<NodeAttribute> attributes = spyProvider.getDescriptors();
return "os".equalsIgnoreCase(attributes
.iterator().next().getAttributeName());
}, 500, 1000);
} catch (Exception e) {
// Make sure we get the timeout exception.
Assert.assertTrue(e instanceof TimeoutException);
return;
}
Assert.fail("Expecting a failure in previous check!");
}
}

View File

@ -33,6 +33,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.collections.CollectionUtils;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -51,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -646,6 +648,34 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
this.rmContext.getNodeManagerQueueLimitCalculator()
.createContainerQueuingLimit());
}
// 8. Get node's attributes and update node-to-attributes mapping
// in RMNodeAttributeManager.
Set<NodeAttribute> nodeAttributes = request.getNodeAttributes();
if (nodeAttributes != null && !nodeAttributes.isEmpty()) {
nodeAttributes.forEach(nodeAttribute ->
LOG.debug(nodeId.toString() + " ATTRIBUTE : "
+ nodeAttribute.toString()));
// Validate attributes
if (!nodeAttributes.stream().allMatch(
nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED
.equals(nodeAttribute.getAttributePrefix()))) {
// All attributes must be in same prefix: nm.yarn.io.
// Since we have the checks in NM to make sure attributes reported
// in HB are with correct prefix, so it should not reach here.
LOG.warn("Reject invalid node attributes from host: "
+ nodeId.toString() + ", attributes in HB must have prefix "
+ NodeAttribute.PREFIX_DISTRIBUTED);
} else {
// Replace all distributed node attributes associated with this host
// with the new reported attributes in node attribute manager.
this.rmContext.getNodeAttributesManager()
.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
ImmutableMap.of(nodeId.getHost(), nodeAttributes));
}
}
return nodeHeartBeatResponse;
}

View File

@ -33,6 +33,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -126,7 +127,8 @@ protected void initNodeAttributeStore(Configuration conf) throws Exception {
private void internalUpdateAttributesOnNodes(
Map<String, Map<NodeAttribute, AttributeValue>> nodeAttributeMapping,
AttributeMappingOperationType op,
Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded) {
Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded,
String attributePrefix) {
try {
writeLock.lock();
@ -156,8 +158,9 @@ private void internalUpdateAttributesOnNodes(
break;
case REPLACE:
clusterAttributes.putAll(newAttributesToBeAdded);
replaceNodeToAttribute(nodeHost, node.getAttributes(), attributes);
node.replaceAttributes(attributes);
replaceNodeToAttribute(nodeHost, attributePrefix,
node.getAttributes(), attributes);
node.replaceAttributes(attributes, attributePrefix);
break;
default:
break;
@ -199,15 +202,23 @@ private void removeNodeFromAttributes(String nodeHost,
private void addNodeToAttribute(String nodeHost,
Map<NodeAttribute, AttributeValue> attributeMappings) {
for (NodeAttribute attribute : attributeMappings.keySet()) {
clusterAttributes.get(attribute).addNode(nodeHost);
RMNodeAttribute rmNodeAttribute = clusterAttributes.get(attribute);
if (rmNodeAttribute != null) {
rmNodeAttribute.addNode(nodeHost);
} else {
clusterAttributes.put(attribute, new RMNodeAttribute(attribute));
}
}
}
private void replaceNodeToAttribute(String nodeHost,
private void replaceNodeToAttribute(String nodeHost, String prefix,
Map<NodeAttribute, AttributeValue> oldAttributeMappings,
Map<NodeAttribute, AttributeValue> newAttributeMappings) {
if (oldAttributeMappings != null) {
removeNodeFromAttributes(nodeHost, oldAttributeMappings.keySet());
Set<NodeAttribute> toRemoveAttributes =
NodeLabelUtil.filterAttributesByPrefix(
oldAttributeMappings.keySet(), prefix);
removeNodeFromAttributes(nodeHost, toRemoveAttributes);
}
addNodeToAttribute(nodeHost, newAttributeMappings);
}
@ -432,8 +443,19 @@ public void removeAttributes(
}
public void replaceAttributes(
Map<NodeAttribute, AttributeValue> attributesMapping) {
Map<NodeAttribute, AttributeValue> attributesMapping, String prefix) {
if (Strings.isNullOrEmpty(prefix)) {
this.attributes.clear();
} else {
Iterator<Entry<NodeAttribute, AttributeValue>> it =
this.attributes.entrySet().iterator();
while (it.hasNext()) {
Entry<NodeAttribute, AttributeValue> current = it.next();
if (prefix.equals(current.getKey().getAttributePrefix())) {
it.remove();
}
}
}
this.attributes.putAll(attributesMapping);
}
@ -506,9 +528,10 @@ protected void handleStoreEvent(NodeAttributesStoreEvent event) {
}
@Override
public void replaceNodeAttributes(
public void replaceNodeAttributes(String prefix,
Map<String, Set<NodeAttribute>> nodeAttributeMapping) throws IOException {
processMapping(nodeAttributeMapping, AttributeMappingOperationType.REPLACE);
processMapping(nodeAttributeMapping,
AttributeMappingOperationType.REPLACE, prefix);
}
@Override
@ -526,12 +549,19 @@ public void removeNodeAttributes(
private void processMapping(
Map<String, Set<NodeAttribute>> nodeAttributeMapping,
AttributeMappingOperationType mappingType) throws IOException {
processMapping(nodeAttributeMapping, mappingType, null);
}
private void processMapping(
Map<String, Set<NodeAttribute>> nodeAttributeMapping,
AttributeMappingOperationType mappingType, String attributePrefix)
throws IOException {
Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded =
new HashMap<>();
Map<String, Map<NodeAttribute, AttributeValue>> validMapping =
validate(nodeAttributeMapping, newAttributesToBeAdded, false);
internalUpdateAttributesOnNodes(validMapping, mappingType,
newAttributesToBeAdded);
newAttributesToBeAdded, attributePrefix);
}
}

View File

@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.xml.parsers.DocumentBuilderFactory;
@ -64,12 +65,16 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@ -817,6 +822,79 @@ protected RMNodeLabelsManager createNodeLabelManager() {
rm.stop();
}
@Test
public void testNodeHeartbeatWithNodeAttributes() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
// Register to RM
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest registerReq =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
registerReq.setResource(capability);
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
RegisterNodeManagerResponse registerResponse =
resourceTrackerService.registerNodeManager(registerReq);
Set<NodeAttribute> nodeAttributes = new HashSet<>();
nodeAttributes.add(NodeAttribute.newInstance(
NodeAttribute.PREFIX_DISTRIBUTED, "host",
NodeAttributeType.STRING, "host2"));
// Set node attributes in HB.
NodeHeartbeatRequest heartbeatReq =
Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
int responseId = nodeStatusObject.getResponseId();
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey());
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
.getContainerTokenMasterKey());
heartbeatReq.setNodeAttributes(nodeAttributes);
resourceTrackerService.nodeHeartbeat(heartbeatReq);
// Ensure RM gets correct node attributes update.
NodeAttributesManager attributeManager =
rm.getRMContext().getNodeAttributesManager();
Map<NodeAttribute, AttributeValue> attrs = attributeManager
.getAttributesForNode(nodeId.getHost());
Assert.assertEquals(1, attrs.size());
NodeAttribute na = attrs.keySet().iterator().next();
Assert.assertEquals("host", na.getAttributeName());
Assert.assertEquals("host2", na.getAttributeValue());
Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
// Send another HB to RM with updated node atrribute
nodeAttributes.clear();
nodeAttributes.add(NodeAttribute.newInstance(
NodeAttribute.PREFIX_DISTRIBUTED, "host",
NodeAttributeType.STRING, "host3"));
nodeStatusObject = getNodeStatusObject(nodeId);
nodeStatusObject.setResponseId(++responseId);
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setNodeAttributes(nodeAttributes);
resourceTrackerService.nodeHeartbeat(heartbeatReq);
// Make sure RM gets the updated attribute
attrs = attributeManager.getAttributesForNode(nodeId.getHost());
Assert.assertEquals(1, attrs.size());
na = attrs.keySet().iterator().next();
Assert.assertEquals("host", na.getAttributeName());
Assert.assertEquals("host3", na.getAttributeValue());
Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
}
@Test
public void testNodeHeartBeatWithInvalidLabels() throws Exception {
writeToHostsFile("host2");

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
@ -255,4 +257,101 @@ public void testRemoveNodeAttributes() throws IOException {
.getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0]));
Assert.assertEquals(2, allAttributesPerPrefix.size());
}
@Test
public void testReplaceNodeAttributes() throws IOException {
Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
Map<String, Set<NodeAttribute>> toReplaceMap = new HashMap<>();
Map<NodeAttribute, AttributeValue> nodeAttributes;
Set<NodeAttribute> filteredAttributes;
Set<NodeAttribute> clusterAttributes;
// Add 3 attributes to host1
// yarn.test1.io/A1=host1_v1_1
// yarn.test1.io/A2=host1_v1_2
// yarn.test1.io/A3=host1_v1_3
toAddAttributes.put(HOSTNAMES[0],
createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1"));
attributesManager.addNodeAttributes(toAddAttributes);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(3, nodeAttributes.size());
// Add 10 distributed node attributes to host1
// nn.yarn.io/dist-node-attribute1=dist_v1_1
// nn.yarn.io/dist-node-attribute2=dist_v1_2
// ...
// nn.yarn.io/dist-node-attribute10=dist_v1_10
toAddAttributes.clear();
toAddAttributes.put(HOSTNAMES[0],
createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED,
10, "dist-node-attribute", "dist_v1"));
attributesManager.addNodeAttributes(toAddAttributes);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(13, nodeAttributes.size());
clusterAttributes = attributesManager.getClusterNodeAttributes(
Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED, PREFIXES[0]));
Assert.assertEquals(13, clusterAttributes.size());
// Replace by prefix
// Same distributed attributes names, but different values.
Set<NodeAttribute> toReplaceAttributes =
createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, 5,
"dist-node-attribute", "dist_v2");
attributesManager.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
ImmutableMap.of(HOSTNAMES[0], toReplaceAttributes));
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(8, nodeAttributes.size());
clusterAttributes = attributesManager.getClusterNodeAttributes(
Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED, PREFIXES[0]));
Assert.assertEquals(8, clusterAttributes.size());
// Now we have 5 distributed attributes
filteredAttributes = NodeLabelUtil.filterAttributesByPrefix(
nodeAttributes.keySet(), NodeAttribute.PREFIX_DISTRIBUTED);
Assert.assertEquals(5, filteredAttributes.size());
// Values are updated to have prefix dist_v2
Assert.assertTrue(filteredAttributes.stream().allMatch(
nodeAttribute ->
nodeAttribute.getAttributeValue().startsWith("dist_v2")));
// We still have 3 yarn.test1.io attributes
filteredAttributes = NodeLabelUtil.filterAttributesByPrefix(
nodeAttributes.keySet(), PREFIXES[0]);
Assert.assertEquals(3, filteredAttributes.size());
// Replace with prefix
// Different attribute names
toReplaceAttributes =
createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, 1,
"dist-node-attribute-v2", "dist_v3");
attributesManager.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
ImmutableMap.of(HOSTNAMES[0], toReplaceAttributes));
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(4, nodeAttributes.size());
clusterAttributes = attributesManager.getClusterNodeAttributes(
Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED));
Assert.assertEquals(1, clusterAttributes.size());
NodeAttribute att = clusterAttributes.iterator().next();
Assert.assertEquals("dist-node-attribute-v2_0", att.getAttributeName());
Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED,
att.getAttributePrefix());
Assert.assertEquals("dist_v3_0", att.getAttributeValue());
// Replace all attributes
toReplaceMap.put(HOSTNAMES[0],
createAttributesForTest(PREFIXES[1], 2, "B", "B_v1"));
attributesManager.replaceNodeAttributes(null, toReplaceMap);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(2, nodeAttributes.size());
clusterAttributes = attributesManager
.getClusterNodeAttributes(Sets.newHashSet(PREFIXES[1]));
Assert.assertEquals(2, clusterAttributes.size());
clusterAttributes = attributesManager
.getClusterNodeAttributes(Sets.newHashSet(
NodeAttribute.PREFIX_DISTRIBUTED));
Assert.assertEquals(0, clusterAttributes.size());
}
}