diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8a2cfc8c50..b51d89e798 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1014,6 +1014,9 @@ Release 2.8.0 - UNRELEASED YARN-4284. condition for AM blacklisting is too narrow (Sangjin Lee via jlowe) + YARN-4169. Fix racing condition of TestNodeStatusUpdaterForLabels. + (Naganarasimha G R via wangda) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java index f834d542ee..1f64e5074f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java @@ -22,8 +22,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; +import java.util.Set; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; @@ -33,47 +33,48 @@ import com.google.common.collect.Sets; public class NodeLabelTestBase { - public static void assertMapEquals(Map> m1, - ImmutableMap> m2) { - Assert.assertEquals(m1.size(), m2.size()); - for (NodeId k : m1.keySet()) { - Assert.assertTrue(m2.containsKey(k)); - assertCollectionEquals(m1.get(k), m2.get(k)); + public static void assertMapEquals(Map> expected, + ImmutableMap> actual) { + Assert.assertEquals(expected.size(), actual.size()); + for (NodeId k : expected.keySet()) { + Assert.assertTrue(actual.containsKey(k)); + assertCollectionEquals(expected.get(k), actual.get(k)); } } - public static void assertLabelInfoMapEquals(Map> m1, - ImmutableMap> m2) { - Assert.assertEquals(m1.size(), m2.size()); - for (NodeId k : m1.keySet()) { - Assert.assertTrue(m2.containsKey(k)); - assertNLCollectionEquals(m1.get(k), m2.get(k)); + public static void assertLabelInfoMapEquals( + Map> expected, + ImmutableMap> actual) { + Assert.assertEquals(expected.size(), actual.size()); + for (NodeId k : expected.keySet()) { + Assert.assertTrue(actual.containsKey(k)); + assertNLCollectionEquals(expected.get(k), actual.get(k)); } } - public static void assertLabelsToNodesEquals(Map> m1, - ImmutableMap> m2) { - Assert.assertEquals(m1.size(), m2.size()); - for (String k : m1.keySet()) { - Assert.assertTrue(m2.containsKey(k)); - Set s1 = new HashSet(m1.get(k)); - Set s2 = new HashSet(m2.get(k)); - Assert.assertEquals(s1, s2); - Assert.assertTrue(s1.containsAll(s2)); + public static void assertLabelsToNodesEquals( + Map> expected, + ImmutableMap> actual) { + Assert.assertEquals(expected.size(), actual.size()); + for (String k : expected.keySet()) { + Assert.assertTrue(actual.containsKey(k)); + Set expectedS1 = new HashSet<>(expected.get(k)); + Set actualS2 = new HashSet<>(actual.get(k)); + Assert.assertEquals(expectedS1, actualS2); + Assert.assertTrue(expectedS1.containsAll(actualS2)); } } public static ImmutableMap> transposeNodeToLabels( Map> mapNodeToLabels) { - Map> mapLabelsToNodes = - new HashMap>(); + Map> mapLabelsToNodes = new HashMap<>(); for(Entry> entry : mapNodeToLabels.entrySet()) { NodeId node = entry.getKey(); Set setLabels = entry.getValue(); for(String label : setLabels) { Set setNode = mapLabelsToNodes.get(label); if (setNode == null) { - setNode = new HashSet(); + setNode = new HashSet<>(); } setNode.add(NodeId.newInstance(node.getHost(), node.getPort())); mapLabelsToNodes.put(label, setNode); @@ -82,28 +83,39 @@ public static ImmutableMap> transposeNodeToLabels( return ImmutableMap.copyOf(mapLabelsToNodes); } - public static void assertMapContains(Map> m1, - ImmutableMap> m2) { - for (NodeId k : m2.keySet()) { - Assert.assertTrue(m1.containsKey(k)); - assertCollectionEquals(m1.get(k), m2.get(k)); + public static void assertMapContains(Map> expected, + ImmutableMap> actual) { + for (NodeId k : actual.keySet()) { + Assert.assertTrue(expected.containsKey(k)); + assertCollectionEquals(expected.get(k), actual.get(k)); } } - public static void assertCollectionEquals(Collection c1, - Collection c2) { - Set s1 = new HashSet(c1); - Set s2 = new HashSet(c2); - Assert.assertEquals(s1, s2); - Assert.assertTrue(s1.containsAll(s2)); + public static void assertCollectionEquals(Collection expected, + Collection actual) { + if (expected == null) { + Assert.assertNull(actual); + } else { + Assert.assertNotNull(actual); + } + Set expectedSet = new HashSet<>(expected); + Set actualSet = new HashSet<>(actual); + Assert.assertEquals(expectedSet, actualSet); + Assert.assertTrue(expectedSet.containsAll(actualSet)); } - public static void assertNLCollectionEquals(Collection c1, - Collection c2) { - Set s1 = new HashSet(c1); - Set s2 = new HashSet(c2); - Assert.assertEquals(s1, s2); - Assert.assertTrue(s1.containsAll(s2)); + public static void assertNLCollectionEquals(Collection expected, + Collection actual) { + if (expected == null) { + Assert.assertNull(actual); + } else { + Assert.assertNotNull(actual); + } + + Set expectedSet = new HashSet<>(expected); + Set actualSet = new HashSet<>(actual); + Assert.assertEquals(expectedSet, actualSet); + Assert.assertTrue(expectedSet.containsAll(actualSet)); } @SuppressWarnings("unchecked") @@ -112,12 +124,11 @@ public static Set toSet(E... elements) { return set; } - @SuppressWarnings("unchecked") public static Set toNodeLabelSet(String... nodeLabelsStr) { if (null == nodeLabelsStr) { return null; } - Set labels = new HashSet(); + Set labels = new HashSet<>(); for (String label : nodeLabelsStr) { labels.add(NodeLabel.newInstance(label)); } @@ -137,14 +148,15 @@ public NodeId toNodeId(String str) { } public static void assertLabelsInfoToNodesEquals( - Map> m1, ImmutableMap> m2) { - Assert.assertEquals(m1.size(), m2.size()); - for (NodeLabel k : m1.keySet()) { - Assert.assertTrue(m2.containsKey(k)); - Set s1 = new HashSet(m1.get(k)); - Set s2 = new HashSet(m2.get(k)); - Assert.assertEquals(s1, s2); - Assert.assertTrue(s1.containsAll(s2)); + Map> expected, + ImmutableMap> actual) { + Assert.assertEquals(expected.size(), actual.size()); + for (NodeLabel k : expected.keySet()) { + Assert.assertTrue(actual.containsKey(k)); + Set expectedS1 = new HashSet<>(expected.get(k)); + Set actualS2 = new HashSet<>(actual.get(k)); + Assert.assertEquals(expectedS1, actualS2); + Assert.assertTrue(expectedS1.containsAll(actualS2)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java index 95e90023bc..35bad4d6a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java @@ -280,18 +280,18 @@ public void testRemovelabelWithNodes() throws Exception { mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1")); assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3"))); - assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p1")); + assertCollectionEquals(Arrays.asList("p1"), mgr.lastRemovedlabels); mgr.removeFromClusterNodeLabels(ImmutableSet.of("p2", "p3")); Assert.assertTrue(mgr.getNodeLabels().isEmpty()); Assert.assertTrue(mgr.getClusterNodeLabelNames().isEmpty()); - assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p2", "p3")); + assertCollectionEquals(Arrays.asList("p2", "p3"), mgr.lastRemovedlabels); } @Test(timeout = 5000) public void testTrimLabelsWhenAddRemoveNodeLabels() throws IOException { mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet(" p1")); - assertCollectionEquals(mgr.getClusterNodeLabelNames(), toSet("p1")); + assertCollectionEquals(toSet("p1"), mgr.getClusterNodeLabelNames()); mgr.removeFromClusterNodeLabels(toSet("p1 ")); Assert.assertTrue(mgr.getClusterNodeLabelNames().isEmpty()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index c43d467c42..3f8cf32fab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -632,6 +632,11 @@ public void sendOutofBandHeartBeat() { } } + @VisibleForTesting + Thread.State getStatusUpdaterThreadState() { + return statusUpdater.getState(); + } + public boolean isContainerRecentlyStopped(ContainerId containerId) { synchronized (recentlyStoppedContainers) { return recentlyStoppedContainers.containsKey(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java index d72eeb0c14..563104e58b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.lang.Thread.State; import java.nio.ByteBuffer; import java.util.Set; @@ -105,29 +106,19 @@ public RegisterNodeManagerResponse registerNodeManager( return response; } - public void waitTillHeartbeat() { + public void waitTillHeartbeat() throws InterruptedException { if (receivedNMHeartbeat) { return; } - int i = 10; + int i = 15; while (!receivedNMHeartbeat && i > 0) { synchronized (ResourceTrackerForLabels.class) { if (!receivedNMHeartbeat) { - try { - System.out - .println("In ResourceTrackerForLabels waiting for heartbeat : " - + System.currentTimeMillis()); - ResourceTrackerForLabels.class.wait(500l); - // to avoid race condition, i.e. sendOutofBandHeartBeat can be - // sent before NSU thread has gone to sleep, hence we wait and try - // to resend heartbeat again - nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); - ResourceTrackerForLabels.class.wait(500l); - i--; - } catch (InterruptedException e) { - Assert.fail("Exception caught while waiting for Heartbeat"); - e.printStackTrace(); - } + System.out + .println("In ResourceTrackerForLabels waiting for heartbeat : " + + System.currentTimeMillis()); + ResourceTrackerForLabels.class.wait(200); + i--; } } } @@ -136,18 +127,13 @@ public void waitTillHeartbeat() { } } - public void waitTillRegister() { + public void waitTillRegister() throws InterruptedException { if (receivedNMRegister) { return; } while (!receivedNMRegister) { synchronized (ResourceTrackerForLabels.class) { - try { ResourceTrackerForLabels.class.wait(); - } catch (InterruptedException e) { - Assert.fail("Exception caught while waiting for register"); - e.printStackTrace(); - } } } } @@ -213,7 +199,7 @@ private YarnConfiguration createNMConfigForDistributeNodeLabels() { return conf; } - @Test + @Test(timeout=20000) public void testNodeStatusUpdaterForNodeLabels() throws InterruptedException, IOException { final ResourceTrackerForLabels resourceTracker = @@ -251,8 +237,8 @@ protected void stopRMProxy() { resourceTracker.resetNMHeartbeatReceiveFlag(); nm.start(); resourceTracker.waitTillRegister(); - assertNLCollectionEquals(resourceTracker.labels, - dummyLabelsProviderRef.getNodeLabels()); + assertNLCollectionEquals(dummyLabelsProviderRef.getNodeLabels(), + resourceTracker.labels); resourceTracker.waitTillHeartbeat();// wait till the first heartbeat resourceTracker.resetNMHeartbeatReceiveFlag(); @@ -260,15 +246,14 @@ protected void stopRMProxy() { // heartbeat with updated labels dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P")); - nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + sendOutofBandHeartBeat(); resourceTracker.waitTillHeartbeat(); - assertNLCollectionEquals(resourceTracker.labels, - dummyLabelsProviderRef - .getNodeLabels()); + assertNLCollectionEquals(dummyLabelsProviderRef.getNodeLabels(), + resourceTracker.labels); resourceTracker.resetNMHeartbeatReceiveFlag(); // heartbeat without updating labels - nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + sendOutofBandHeartBeat(); resourceTracker.waitTillHeartbeat(); resourceTracker.resetNMHeartbeatReceiveFlag(); assertNull( @@ -277,7 +262,7 @@ protected void stopRMProxy() { // provider return with null labels dummyLabelsProviderRef.setNodeLabels(null); - nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + sendOutofBandHeartBeat(); resourceTracker.waitTillHeartbeat(); assertNotNull( "If provider sends null then empty label set should be sent and not null", @@ -292,7 +277,7 @@ protected void stopRMProxy() { int nonNullLabels = 0; dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P1")); for (int i = 0; i < 5; i++) { - nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + sendOutofBandHeartBeat(); resourceTracker.waitTillHeartbeat(); if (null == resourceTracker.labels) { nullLabels++; @@ -311,7 +296,7 @@ protected void stopRMProxy() { nm.stop(); } - @Test + @Test(timeout=20000) public void testInvalidNodeLabelsFromProvider() throws InterruptedException, IOException { final ResourceTrackerForLabels resourceTracker = @@ -353,7 +338,7 @@ protected void stopRMProxy() { // heartbeat with invalid labels dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("_.P")); - nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + sendOutofBandHeartBeat(); resourceTracker.waitTillHeartbeat(); assertNull("On Invalid Labels we need to retain earlier labels, HB " + "needs to send null", resourceTracker.labels); @@ -362,10 +347,40 @@ protected void stopRMProxy() { // on next heartbeat same invalid labels will be given by the provider, but // again label validation check and reset RM with empty labels set should // not happen - nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + sendOutofBandHeartBeat(); resourceTracker.waitTillHeartbeat(); - resourceTracker.resetNMHeartbeatReceiveFlag(); assertNull("NodeStatusUpdater need not send repeatedly empty labels on " + "invalid labels from provider ", resourceTracker.labels); + resourceTracker.resetNMHeartbeatReceiveFlag(); + } + + /** + * This is to avoid race condition in the test case. NodeStatusUpdater + * heartbeat thread after sending the heartbeat needs some time to process the + * response and then go wait state. But in the test case once the main test + * thread returns back after resourceTracker.waitTillHeartbeat() we proceed + * with next sendOutofBandHeartBeat before heartbeat thread is blocked on + * wait. + * @throws InterruptedException + * @throws IOException + */ + private void sendOutofBandHeartBeat() + throws InterruptedException, IOException { + int i = 0; + do { + State statusUpdaterThreadState = ((NodeStatusUpdaterImpl) nm.getNodeStatusUpdater()) + .getStatusUpdaterThreadState(); + if (statusUpdaterThreadState.equals(Thread.State.TIMED_WAITING) + || statusUpdaterThreadState.equals(Thread.State.WAITING)) { + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + break; + } + if (++i <= 10) { + Thread.sleep(50); + } else { + throw new IOException( + "Waited for 500 ms but NodeStatusUpdaterThread not in waiting state"); + } + } while (true); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java index 47e4830583..43fd588ca8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java @@ -538,40 +538,39 @@ public void testGetLabelsOnNodesWhenNodeActiveDeactive() throws Exception { // Active/Deactive a node directly assigned label, should not remove from // node->label map mgr.activateNode(toNodeId("n1:1"), SMALL_RESOURCE); - assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1:1")), - toSet("p1")); + assertCollectionEquals(toSet("p1"), + mgr.getNodeLabels().get(toNodeId("n1:1"))); mgr.deactivateNode(toNodeId("n1:1")); - assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1:1")), - toSet("p1")); + assertCollectionEquals(toSet("p1"), + mgr.getNodeLabels().get(toNodeId("n1:1"))); // Host will not affected - assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1")), - toSet("p2")); - + assertCollectionEquals(toSet("p2"), + mgr.getNodeLabels().get(toNodeId("n1"))); + // Active/Deactive a node doesn't directly assigned label, should remove // from node->label map mgr.activateNode(toNodeId("n1:2"), SMALL_RESOURCE); - assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1:2")), - toSet("p2")); + assertCollectionEquals(toSet("p2"), + mgr.getNodeLabels().get(toNodeId("n1:2"))); mgr.deactivateNode(toNodeId("n1:2")); Assert.assertNull(mgr.getNodeLabels().get(toNodeId("n1:2"))); // Host will not affected too - assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1")), - toSet("p2")); - + assertCollectionEquals(toSet("p2"), + mgr.getNodeLabels().get(toNodeId("n1"))); + // When we change label on the host after active a node without directly // assigned label, such node will still be removed after deactive // Active/Deactive a node doesn't directly assigned label, should remove // from node->label map mgr.activateNode(toNodeId("n1:2"), SMALL_RESOURCE); mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p3"))); - assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1:2")), - toSet("p3")); + assertCollectionEquals(toSet("p3"), + mgr.getNodeLabels().get(toNodeId("n1:2"))); mgr.deactivateNode(toNodeId("n1:2")); Assert.assertNull(mgr.getNodeLabels().get(toNodeId("n1:2"))); // Host will not affected too - assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1")), - toSet("p3")); - + assertCollectionEquals(toSet("p3"), + mgr.getNodeLabels().get(toNodeId("n1"))); } private void checkNodeLabelInfo(List infos, String labelName, int activeNMs, int memory) { @@ -674,5 +673,4 @@ public void testLabelsToNodesOnNodeActiveDeactive() throws Exception { assertLabelsToNodesEquals( mgr.getLabelsToNodes(), transposeNodeToLabels(mgr.getNodeLabels())); } - }