YARN-4169. Fix racing condition of TestNodeStatusUpdaterForLabels. (Naganarasimha G R via wangda)

This commit is contained in:
Wangda Tan 2015-10-26 16:36:34 -07:00
parent 399ad00915
commit 6f606214e7
6 changed files with 143 additions and 110 deletions

View File

@ -1014,6 +1014,9 @@ Release 2.8.0 - UNRELEASED
YARN-4284. condition for AM blacklisting is too narrow (Sangjin Lee via
jlowe)
YARN-4169. Fix racing condition of TestNodeStatusUpdaterForLabels.
(Naganarasimha G R via wangda)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -22,8 +22,8 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
@ -33,47 +33,48 @@
import com.google.common.collect.Sets;
public class NodeLabelTestBase {
public static void assertMapEquals(Map<NodeId, Set<String>> m1,
ImmutableMap<NodeId, Set<String>> m2) {
Assert.assertEquals(m1.size(), m2.size());
for (NodeId k : m1.keySet()) {
Assert.assertTrue(m2.containsKey(k));
assertCollectionEquals(m1.get(k), m2.get(k));
public static void assertMapEquals(Map<NodeId, Set<String>> expected,
ImmutableMap<NodeId, Set<String>> actual) {
Assert.assertEquals(expected.size(), actual.size());
for (NodeId k : expected.keySet()) {
Assert.assertTrue(actual.containsKey(k));
assertCollectionEquals(expected.get(k), actual.get(k));
}
}
public static void assertLabelInfoMapEquals(Map<NodeId, Set<NodeLabel>> m1,
ImmutableMap<NodeId, Set<NodeLabel>> m2) {
Assert.assertEquals(m1.size(), m2.size());
for (NodeId k : m1.keySet()) {
Assert.assertTrue(m2.containsKey(k));
assertNLCollectionEquals(m1.get(k), m2.get(k));
public static void assertLabelInfoMapEquals(
Map<NodeId, Set<NodeLabel>> expected,
ImmutableMap<NodeId, Set<NodeLabel>> actual) {
Assert.assertEquals(expected.size(), actual.size());
for (NodeId k : expected.keySet()) {
Assert.assertTrue(actual.containsKey(k));
assertNLCollectionEquals(expected.get(k), actual.get(k));
}
}
public static void assertLabelsToNodesEquals(Map<String, Set<NodeId>> m1,
ImmutableMap<String, Set<NodeId>> m2) {
Assert.assertEquals(m1.size(), m2.size());
for (String k : m1.keySet()) {
Assert.assertTrue(m2.containsKey(k));
Set<NodeId> s1 = new HashSet<NodeId>(m1.get(k));
Set<NodeId> s2 = new HashSet<NodeId>(m2.get(k));
Assert.assertEquals(s1, s2);
Assert.assertTrue(s1.containsAll(s2));
public static void assertLabelsToNodesEquals(
Map<String, Set<NodeId>> expected,
ImmutableMap<String, Set<NodeId>> actual) {
Assert.assertEquals(expected.size(), actual.size());
for (String k : expected.keySet()) {
Assert.assertTrue(actual.containsKey(k));
Set<NodeId> expectedS1 = new HashSet<>(expected.get(k));
Set<NodeId> actualS2 = new HashSet<>(actual.get(k));
Assert.assertEquals(expectedS1, actualS2);
Assert.assertTrue(expectedS1.containsAll(actualS2));
}
}
public static ImmutableMap<String, Set<NodeId>> transposeNodeToLabels(
Map<NodeId, Set<String>> mapNodeToLabels) {
Map<String, Set<NodeId>> mapLabelsToNodes =
new HashMap<String, Set<NodeId>>();
Map<String, Set<NodeId>> mapLabelsToNodes = new HashMap<>();
for(Entry<NodeId, Set<String>> entry : mapNodeToLabels.entrySet()) {
NodeId node = entry.getKey();
Set<String> setLabels = entry.getValue();
for(String label : setLabels) {
Set<NodeId> setNode = mapLabelsToNodes.get(label);
if (setNode == null) {
setNode = new HashSet<NodeId>();
setNode = new HashSet<>();
}
setNode.add(NodeId.newInstance(node.getHost(), node.getPort()));
mapLabelsToNodes.put(label, setNode);
@ -82,28 +83,39 @@ public static ImmutableMap<String, Set<NodeId>> transposeNodeToLabels(
return ImmutableMap.copyOf(mapLabelsToNodes);
}
public static void assertMapContains(Map<NodeId, Set<String>> m1,
ImmutableMap<NodeId, Set<String>> m2) {
for (NodeId k : m2.keySet()) {
Assert.assertTrue(m1.containsKey(k));
assertCollectionEquals(m1.get(k), m2.get(k));
public static void assertMapContains(Map<NodeId, Set<String>> expected,
ImmutableMap<NodeId, Set<String>> actual) {
for (NodeId k : actual.keySet()) {
Assert.assertTrue(expected.containsKey(k));
assertCollectionEquals(expected.get(k), actual.get(k));
}
}
public static void assertCollectionEquals(Collection<String> c1,
Collection<String> c2) {
Set<String> s1 = new HashSet<String>(c1);
Set<String> s2 = new HashSet<String>(c2);
Assert.assertEquals(s1, s2);
Assert.assertTrue(s1.containsAll(s2));
public static void assertCollectionEquals(Collection<String> expected,
Collection<String> actual) {
if (expected == null) {
Assert.assertNull(actual);
} else {
Assert.assertNotNull(actual);
}
Set<String> expectedSet = new HashSet<>(expected);
Set<String> actualSet = new HashSet<>(actual);
Assert.assertEquals(expectedSet, actualSet);
Assert.assertTrue(expectedSet.containsAll(actualSet));
}
public static void assertNLCollectionEquals(Collection<NodeLabel> c1,
Collection<NodeLabel> c2) {
Set<NodeLabel> s1 = new HashSet<NodeLabel>(c1);
Set<NodeLabel> s2 = new HashSet<NodeLabel>(c2);
Assert.assertEquals(s1, s2);
Assert.assertTrue(s1.containsAll(s2));
public static void assertNLCollectionEquals(Collection<NodeLabel> expected,
Collection<NodeLabel> actual) {
if (expected == null) {
Assert.assertNull(actual);
} else {
Assert.assertNotNull(actual);
}
Set<NodeLabel> expectedSet = new HashSet<>(expected);
Set<NodeLabel> actualSet = new HashSet<>(actual);
Assert.assertEquals(expectedSet, actualSet);
Assert.assertTrue(expectedSet.containsAll(actualSet));
}
@SuppressWarnings("unchecked")
@ -112,12 +124,11 @@ public static <E> Set<E> toSet(E... elements) {
return set;
}
@SuppressWarnings("unchecked")
public static Set<NodeLabel> toNodeLabelSet(String... nodeLabelsStr) {
if (null == nodeLabelsStr) {
return null;
}
Set<NodeLabel> labels = new HashSet<NodeLabel>();
Set<NodeLabel> labels = new HashSet<>();
for (String label : nodeLabelsStr) {
labels.add(NodeLabel.newInstance(label));
}
@ -137,14 +148,15 @@ public NodeId toNodeId(String str) {
}
public static void assertLabelsInfoToNodesEquals(
Map<NodeLabel, Set<NodeId>> m1, ImmutableMap<NodeLabel, Set<NodeId>> m2) {
Assert.assertEquals(m1.size(), m2.size());
for (NodeLabel k : m1.keySet()) {
Assert.assertTrue(m2.containsKey(k));
Set<NodeId> s1 = new HashSet<NodeId>(m1.get(k));
Set<NodeId> s2 = new HashSet<NodeId>(m2.get(k));
Assert.assertEquals(s1, s2);
Assert.assertTrue(s1.containsAll(s2));
Map<NodeLabel, Set<NodeId>> expected,
ImmutableMap<NodeLabel, Set<NodeId>> actual) {
Assert.assertEquals(expected.size(), actual.size());
for (NodeLabel k : expected.keySet()) {
Assert.assertTrue(actual.containsKey(k));
Set<NodeId> expectedS1 = new HashSet<>(expected.get(k));
Set<NodeId> actualS2 = new HashSet<>(actual.get(k));
Assert.assertEquals(expectedS1, actualS2);
Assert.assertTrue(expectedS1.containsAll(actualS2));
}
}
}

View File

@ -280,18 +280,18 @@ public void testRemovelabelWithNodes() throws Exception {
mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1"));
assertMapEquals(mgr.getNodeLabels(),
ImmutableMap.of(toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p1"));
assertCollectionEquals(Arrays.asList("p1"), mgr.lastRemovedlabels);
mgr.removeFromClusterNodeLabels(ImmutableSet.of("p2", "p3"));
Assert.assertTrue(mgr.getNodeLabels().isEmpty());
Assert.assertTrue(mgr.getClusterNodeLabelNames().isEmpty());
assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p2", "p3"));
assertCollectionEquals(Arrays.asList("p2", "p3"), mgr.lastRemovedlabels);
}
@Test(timeout = 5000)
public void testTrimLabelsWhenAddRemoveNodeLabels() throws IOException {
mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet(" p1"));
assertCollectionEquals(mgr.getClusterNodeLabelNames(), toSet("p1"));
assertCollectionEquals(toSet("p1"), mgr.getClusterNodeLabelNames());
mgr.removeFromClusterNodeLabels(toSet("p1 "));
Assert.assertTrue(mgr.getClusterNodeLabelNames().isEmpty());
}

View File

@ -632,6 +632,11 @@ public void sendOutofBandHeartBeat() {
}
}
@VisibleForTesting
Thread.State getStatusUpdaterThreadState() {
return statusUpdater.getState();
}
public boolean isContainerRecentlyStopped(ContainerId containerId) {
synchronized (recentlyStoppedContainers) {
return recentlyStoppedContainers.containsKey(containerId);

View File

@ -23,6 +23,7 @@
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.Thread.State;
import java.nio.ByteBuffer;
import java.util.Set;
@ -105,29 +106,19 @@ public RegisterNodeManagerResponse registerNodeManager(
return response;
}
public void waitTillHeartbeat() {
public void waitTillHeartbeat() throws InterruptedException {
if (receivedNMHeartbeat) {
return;
}
int i = 10;
int i = 15;
while (!receivedNMHeartbeat && i > 0) {
synchronized (ResourceTrackerForLabels.class) {
if (!receivedNMHeartbeat) {
try {
System.out
.println("In ResourceTrackerForLabels waiting for heartbeat : "
+ System.currentTimeMillis());
ResourceTrackerForLabels.class.wait(500l);
// to avoid race condition, i.e. sendOutofBandHeartBeat can be
// sent before NSU thread has gone to sleep, hence we wait and try
// to resend heartbeat again
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
ResourceTrackerForLabels.class.wait(500l);
i--;
} catch (InterruptedException e) {
Assert.fail("Exception caught while waiting for Heartbeat");
e.printStackTrace();
}
System.out
.println("In ResourceTrackerForLabels waiting for heartbeat : "
+ System.currentTimeMillis());
ResourceTrackerForLabels.class.wait(200);
i--;
}
}
}
@ -136,18 +127,13 @@ public void waitTillHeartbeat() {
}
}
public void waitTillRegister() {
public void waitTillRegister() throws InterruptedException {
if (receivedNMRegister) {
return;
}
while (!receivedNMRegister) {
synchronized (ResourceTrackerForLabels.class) {
try {
ResourceTrackerForLabels.class.wait();
} catch (InterruptedException e) {
Assert.fail("Exception caught while waiting for register");
e.printStackTrace();
}
}
}
}
@ -213,7 +199,7 @@ private YarnConfiguration createNMConfigForDistributeNodeLabels() {
return conf;
}
@Test
@Test(timeout=20000)
public void testNodeStatusUpdaterForNodeLabels() throws InterruptedException,
IOException {
final ResourceTrackerForLabels resourceTracker =
@ -251,8 +237,8 @@ protected void stopRMProxy() {
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
resourceTracker.waitTillRegister();
assertNLCollectionEquals(resourceTracker.labels,
dummyLabelsProviderRef.getNodeLabels());
assertNLCollectionEquals(dummyLabelsProviderRef.getNodeLabels(),
resourceTracker.labels);
resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
resourceTracker.resetNMHeartbeatReceiveFlag();
@ -260,15 +246,14 @@ protected void stopRMProxy() {
// heartbeat with updated labels
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P"));
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertNLCollectionEquals(resourceTracker.labels,
dummyLabelsProviderRef
.getNodeLabels());
assertNLCollectionEquals(dummyLabelsProviderRef.getNodeLabels(),
resourceTracker.labels);
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat without updating labels
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
resourceTracker.resetNMHeartbeatReceiveFlag();
assertNull(
@ -277,7 +262,7 @@ protected void stopRMProxy() {
// provider return with null labels
dummyLabelsProviderRef.setNodeLabels(null);
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertNotNull(
"If provider sends null then empty label set should be sent and not null",
@ -292,7 +277,7 @@ protected void stopRMProxy() {
int nonNullLabels = 0;
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P1"));
for (int i = 0; i < 5; i++) {
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
if (null == resourceTracker.labels) {
nullLabels++;
@ -311,7 +296,7 @@ protected void stopRMProxy() {
nm.stop();
}
@Test
@Test(timeout=20000)
public void testInvalidNodeLabelsFromProvider() throws InterruptedException,
IOException {
final ResourceTrackerForLabels resourceTracker =
@ -353,7 +338,7 @@ protected void stopRMProxy() {
// heartbeat with invalid labels
dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("_.P"));
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertNull("On Invalid Labels we need to retain earlier labels, HB "
+ "needs to send null", resourceTracker.labels);
@ -362,10 +347,40 @@ protected void stopRMProxy() {
// on next heartbeat same invalid labels will be given by the provider, but
// again label validation check and reset RM with empty labels set should
// not happen
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
resourceTracker.resetNMHeartbeatReceiveFlag();
assertNull("NodeStatusUpdater need not send repeatedly empty labels on "
+ "invalid labels from provider ", resourceTracker.labels);
resourceTracker.resetNMHeartbeatReceiveFlag();
}
/**
* This is to avoid race condition in the test case. NodeStatusUpdater
* heartbeat thread after sending the heartbeat needs some time to process the
* response and then go wait state. But in the test case once the main test
* thread returns back after resourceTracker.waitTillHeartbeat() we proceed
* with next sendOutofBandHeartBeat before heartbeat thread is blocked on
* wait.
* @throws InterruptedException
* @throws IOException
*/
private void sendOutofBandHeartBeat()
throws InterruptedException, IOException {
int i = 0;
do {
State statusUpdaterThreadState = ((NodeStatusUpdaterImpl) nm.getNodeStatusUpdater())
.getStatusUpdaterThreadState();
if (statusUpdaterThreadState.equals(Thread.State.TIMED_WAITING)
|| statusUpdaterThreadState.equals(Thread.State.WAITING)) {
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
break;
}
if (++i <= 10) {
Thread.sleep(50);
} else {
throw new IOException(
"Waited for 500 ms but NodeStatusUpdaterThread not in waiting state");
}
} while (true);
}
}

View File

@ -538,40 +538,39 @@ public void testGetLabelsOnNodesWhenNodeActiveDeactive() throws Exception {
// Active/Deactive a node directly assigned label, should not remove from
// node->label map
mgr.activateNode(toNodeId("n1:1"), SMALL_RESOURCE);
assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1:1")),
toSet("p1"));
assertCollectionEquals(toSet("p1"),
mgr.getNodeLabels().get(toNodeId("n1:1")));
mgr.deactivateNode(toNodeId("n1:1"));
assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1:1")),
toSet("p1"));
assertCollectionEquals(toSet("p1"),
mgr.getNodeLabels().get(toNodeId("n1:1")));
// Host will not affected
assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1")),
toSet("p2"));
assertCollectionEquals(toSet("p2"),
mgr.getNodeLabels().get(toNodeId("n1")));
// Active/Deactive a node doesn't directly assigned label, should remove
// from node->label map
mgr.activateNode(toNodeId("n1:2"), SMALL_RESOURCE);
assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1:2")),
toSet("p2"));
assertCollectionEquals(toSet("p2"),
mgr.getNodeLabels().get(toNodeId("n1:2")));
mgr.deactivateNode(toNodeId("n1:2"));
Assert.assertNull(mgr.getNodeLabels().get(toNodeId("n1:2")));
// Host will not affected too
assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1")),
toSet("p2"));
assertCollectionEquals(toSet("p2"),
mgr.getNodeLabels().get(toNodeId("n1")));
// When we change label on the host after active a node without directly
// assigned label, such node will still be removed after deactive
// Active/Deactive a node doesn't directly assigned label, should remove
// from node->label map
mgr.activateNode(toNodeId("n1:2"), SMALL_RESOURCE);
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1:2")),
toSet("p3"));
assertCollectionEquals(toSet("p3"),
mgr.getNodeLabels().get(toNodeId("n1:2")));
mgr.deactivateNode(toNodeId("n1:2"));
Assert.assertNull(mgr.getNodeLabels().get(toNodeId("n1:2")));
// Host will not affected too
assertCollectionEquals(mgr.getNodeLabels().get(toNodeId("n1")),
toSet("p3"));
assertCollectionEquals(toSet("p3"),
mgr.getNodeLabels().get(toNodeId("n1")));
}
private void checkNodeLabelInfo(List<RMNodeLabel> infos, String labelName, int activeNMs, int memory) {
@ -674,5 +673,4 @@ public void testLabelsToNodesOnNodeActiveDeactive() throws Exception {
assertLabelsToNodesEquals(
mgr.getLabelsToNodes(), transposeNodeToLabels(mgr.getNodeLabels()));
}
}