From 86d024ef2a5844ac723c7b597689afa4b598ee27 Mon Sep 17 00:00:00 2001 From: Naganarasimha Date: Tue, 27 Feb 2018 18:46:16 +0800 Subject: [PATCH] YARN-7965. NodeAttributeManager add/get API is not working properly. Contributed by Weiwei Yang. --- .../nodelabels/NodeAttributesManager.java | 8 +- .../nodelabels/NodeAttributesManagerImpl.java | 38 ++- .../nodelabels/TestNodeAttributesManager.java | 258 ++++++++++++++++++ 3 files changed, 288 insertions(+), 16 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java index 63f3dcfcc5..effda9b670 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeAttributesManager.java @@ -69,10 +69,14 @@ public abstract class NodeAttributesManager extends AbstractService { Map> nodeAttributeMapping) throws IOException; /** + * Returns a set of node attributes whose prefix is one of the given + * prefixes; if the prefix set is null or empty, all attributes are returned; + * if prefix set is given but no mapping could be found, an empty set + * is returned. + * * @param prefix set of prefix string's for which the attributes needs to * returned - * @return set of node Attributes objects for the specified set of prefixes, - * else return all + * @return set of node Attributes */ public abstract Set getClusterNodeAttributes( Set prefix); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java index 2e63a7ccd4..a902ac68fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java @@ -27,6 +27,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentHashMap.KeySetView; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -141,6 +142,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { Host node = nodeCollections.get(nodeHost); if (node == null) { node = new Host(nodeHost); + nodeCollections.put(nodeHost, node); } switch (op) { case REMOVE: @@ -181,8 +183,16 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { private void removeNodeFromAttributes(String nodeHost, Set attributeMappings) { - for (NodeAttribute attribute : attributeMappings) { - clusterAttributes.get(attribute).removeNode(nodeHost); + for (NodeAttribute rmAttribute : attributeMappings) { + RMNodeAttribute host = clusterAttributes.get(rmAttribute); + if (host != null) { + host.removeNode(nodeHost); + // If there is no other host has such attribute, + // remove it from the global mapping. + if (host.getAssociatedNodeIds().isEmpty()) { + clusterAttributes.remove(rmAttribute); + } + } } } @@ -305,19 +315,19 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { @Override public Set getClusterNodeAttributes(Set prefix) { Set attributes = new HashSet<>(); - try { - readLock.lock(); - attributes.addAll(clusterAttributes.keySet()); - } finally { - readLock.unlock(); + KeySetView allAttributes = + clusterAttributes.keySet(); + // Return all if prefix is not given. + if (prefix == null || prefix.isEmpty()) { + attributes.addAll(allAttributes); + return attributes; } - if (prefix != null && prefix.isEmpty()) { - Iterator iterator = attributes.iterator(); - while (iterator.hasNext()) { - NodeAttribute attribute = iterator.next(); - if (!prefix.contains(attribute.getAttributePrefix())) { - iterator.remove(); - } + // Try search attributes by prefix and return valid ones. + Iterator iterator = allAttributes.iterator(); + while (iterator.hasNext()) { + NodeAttribute current = iterator.next(); + if (prefix.contains(current.getAttributePrefix())) { + attributes.add(current); } } return attributes; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java new file mode 100644 index 0000000000..b639a741c6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestNodeAttributesManager.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.apache.hadoop.yarn.nodelabels.AttributeValue; +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; +import org.junit.Assert; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Unit tests for node attribute manager. + */ +public class TestNodeAttributesManager { + + private NodeAttributesManager attributesManager; + private final static String[] PREFIXES = + new String[] {"yarn.test1.io", "yarn.test2.io", "yarn.test3.io"}; + private final static String[] HOSTNAMES = + new String[] {"host1", "host2", "host3"}; + + @Before + public void init() { + Configuration conf = new Configuration(); + attributesManager = new NodeAttributesManagerImpl(); + attributesManager.init(conf); + attributesManager.start(); + } + + @After + public void cleanUp() { + if (attributesManager != null) { + attributesManager.stop(); + } + } + + private Set createAttributesForTest(String attributePrefix, + int numOfAttributes, String attributeNamePrefix, + String attributeValuePrefix) { + Set attributes = new HashSet<>(); + for (int i = 0; i< numOfAttributes; i++) { + NodeAttribute attribute = NodeAttribute.newInstance( + attributePrefix, attributeNamePrefix + "_" + i, + NodeAttributeType.STRING, attributeValuePrefix + "_" + i); + attributes.add(attribute); + } + return attributes; + } + + private boolean sameAttributeSet(Set set1, + Set set2) { + return Sets.difference(set1, set2).isEmpty(); + } + + @Test + public void testAddNodeAttributes() throws IOException { + Map> toAddAttributes = new HashMap<>(); + Map nodeAttributes; + + // Add 3 attributes to host1 + // yarn.test1.io/A1=host1_v1_1 + // yarn.test1.io/A2=host1_v1_2 + // yarn.test1.io/A3=host1_v1_3 + toAddAttributes.put(HOSTNAMES[0], + createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1")); + + attributesManager.addNodeAttributes(toAddAttributes); + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + + Assert.assertEquals(3, nodeAttributes.size()); + Assert.assertTrue(sameAttributeSet(toAddAttributes.get(HOSTNAMES[0]), + nodeAttributes.keySet())); + + // Add 2 attributes to host2 + // yarn.test1.io/A1=host2_v1_1 + // yarn.test1.io/A2=host2_v1_2 + toAddAttributes.clear(); + toAddAttributes.put(HOSTNAMES[1], + createAttributesForTest(PREFIXES[0], 2, "A", "host2_v1")); + attributesManager.addNodeAttributes(toAddAttributes); + + // Verify host1 attributes are still valid. + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + Assert.assertEquals(3, nodeAttributes.size()); + + // Verify new added host2 attributes are correctly updated. + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[1]); + Assert.assertEquals(2, nodeAttributes.size()); + Assert.assertTrue(sameAttributeSet(toAddAttributes.get(HOSTNAMES[1]), + nodeAttributes.keySet())); + + // Cluster wide, it only has 3 attributes. + // yarn.test1.io/A1 + // yarn.test1.io/A2 + // yarn.test1.io/A3 + Set clusterAttributes = attributesManager + .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0])); + Assert.assertEquals(3, clusterAttributes.size()); + + // Query for attributes under a non-exist prefix, + // ensure it returns an empty set. + clusterAttributes = attributesManager + .getClusterNodeAttributes(Sets.newHashSet("non_exist_prefix")); + Assert.assertEquals(0, clusterAttributes.size()); + + // Not provide any prefix, ensure it returns all attributes. + clusterAttributes = attributesManager.getClusterNodeAttributes(null); + Assert.assertEquals(3, clusterAttributes.size()); + + // Add some other attributes with different prefixes on host1 and host2. + toAddAttributes.clear(); + + // Host1 + // yarn.test2.io/A_1=host1_v2_1 + // ... + // yarn.test2.io/A_10=host1_v2_10 + toAddAttributes.put(HOSTNAMES[0], + createAttributesForTest(PREFIXES[1], 10, "C", "host1_v2")); + // Host2 + // yarn.test2.io/C_1=host1_v2_1 + // ... + // yarn.test2.io/C_20=host1_v2_20 + toAddAttributes.put(HOSTNAMES[1], + createAttributesForTest(PREFIXES[1], 20, "C", "host1_v2")); + attributesManager.addNodeAttributes(toAddAttributes); + + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + Assert.assertEquals(13, nodeAttributes.size()); + + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[1]); + Assert.assertEquals(22, nodeAttributes.size()); + } + + @Test + public void testRemoveNodeAttributes() throws IOException { + Map> toAddAttributes = new HashMap<>(); + Map> toRemoveAttributes = new HashMap<>(); + Set allAttributesPerPrefix = new HashSet<>(); + Map nodeAttributes; + + // Host1 ----------------------- + // yarn.test1.io + // A1=host1_v1_1 + // A2=host1_v1_2 + // A3=host1_v1_3 + // yarn.test2.io + // B1=host1_v2_1 + // ... + // B5=host5_v2_5 + // Host2 ----------------------- + // yarn.test1.io + // A1=host2_v1_1 + // A2=host2_v1_2 + // yarn.test3.io + // C1=host2_v3_1 + // c2=host2_v3_2 + Set host1set = new HashSet<>(); + Set host1set1 = + createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1"); + Set host1set2 = + createAttributesForTest(PREFIXES[1], 5, "B", "host1_v1"); + host1set.addAll(host1set1); + host1set.addAll(host1set2); + + Set host2set = new HashSet<>(); + Set host2set1 = + createAttributesForTest(PREFIXES[0], 2, "A", "host2_v1"); + Set host2set2 = + createAttributesForTest(PREFIXES[2], 2, "C", "host2_v3"); + host2set.addAll(host2set1); + host2set.addAll(host2set2); + + toAddAttributes.put(HOSTNAMES[0], host1set); + toAddAttributes.put(HOSTNAMES[1], host2set); + attributesManager.addNodeAttributes(toAddAttributes); + + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + Assert.assertEquals(8, nodeAttributes.size()); + + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[1]); + Assert.assertEquals(4, nodeAttributes.size()); + + allAttributesPerPrefix = attributesManager + .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0])); + Assert.assertEquals(3, allAttributesPerPrefix.size()); + allAttributesPerPrefix = attributesManager + .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[1])); + Assert.assertEquals(5, allAttributesPerPrefix.size()); + allAttributesPerPrefix = attributesManager + .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[2])); + Assert.assertEquals(2, allAttributesPerPrefix.size()); + + // Remove "yarn.test1.io/A_2" from host1 + Set attributes2rm1 = new HashSet<>(); + attributes2rm1.add(NodeAttribute.newInstance(PREFIXES[0], "A_2", + NodeAttributeType.STRING, "anyValue")); + toRemoveAttributes.put(HOSTNAMES[0], attributes2rm1); + attributesManager.removeNodeAttributes(toRemoveAttributes); + + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + Assert.assertEquals(7, nodeAttributes.size()); + + // Remove again, but give a non-exist attribute name + attributes2rm1.clear(); + toRemoveAttributes.clear(); + attributes2rm1.add(NodeAttribute.newInstance(PREFIXES[0], "non_exist_name", + NodeAttributeType.STRING, "anyValue")); + toRemoveAttributes.put(HOSTNAMES[0], attributes2rm1); + attributesManager.removeNodeAttributes(toRemoveAttributes); + + nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]); + Assert.assertEquals(7, nodeAttributes.size()); + + // Remove "yarn.test1.io/A_2" from host2 too, + // by then there will be no such attribute exist in the cluster. + Set attributes2rm2 = new HashSet<>(); + attributes2rm2.add(NodeAttribute.newInstance(PREFIXES[0], "A_2", + NodeAttributeType.STRING, "anyValue")); + toRemoveAttributes.clear(); + toRemoveAttributes.put(HOSTNAMES[1], attributes2rm2); + attributesManager.removeNodeAttributes(toRemoveAttributes); + + // Make sure cluster wide attributes are still consistent. + // Since both host1 and host2 doesn't have "yarn.test1.io/A_2", + // get all attributes under prefix "yarn.test1.io" should only return + // us A_1 and A_3. + allAttributesPerPrefix = attributesManager + .getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0])); + Assert.assertEquals(2, allAttributesPerPrefix.size()); + } +}