diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 0ef82436c3..5817360a0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.net.ServerSocketUtil; @@ -54,13 +55,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.transform.Transformer; import javax.xml.transform.TransformerFactory; @@ -2284,8 +2288,7 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { } //Test decommed/ing node that transitions to untracked,timer should remove - testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3, - maxThreadSleeptime, doGraceful); + testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3, doGraceful); rm.stop(); } @@ -2293,41 +2296,41 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception { // max allowed length. private void testNodeRemovalUtilDecomToUntracked( RMContext rmContext, Configuration conf, - MockNM nm1, MockNM nm2, MockNM nm3, - long maxThreadSleeptime, boolean doGraceful) throws Exception { + MockNM nm1, MockNM nm2, MockNM nm3, boolean doGraceful + ) throws Exception { ClusterMetrics metrics = ClusterMetrics.getMetrics(); String ip = NetUtils.normalizeHostName("localhost"); - CountDownLatch latch = new CountDownLatch(1); writeToHostsFile("host1", ip, "host2"); writeToHostsFile(excludeHostFile, "host2"); refreshNodesOption(doGraceful, conf); nm1.nodeHeartbeat(true); //nm2.nodeHeartbeat(true); nm3.nodeHeartbeat(true); - latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); - RMNode rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : - rmContext.getInactiveRMNodes().get(nm2.getNodeId()); - Assert.assertNotEquals("Timer for this node was not canceled!", - rmNode, null); - Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", - (rmNode.getState() == NodeState.DECOMMISSIONED) || - (rmNode.getState() == NodeState.DECOMMISSIONING)); + Supplier nodeSupplier = doGraceful + ? () -> rmContext.getRMNodes().get(nm2.getNodeId()) + : () -> rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + pollingAssert(() -> nodeSupplier.get() != null, + "Timer for this node was not canceled!"); + final List expectedStates = Arrays.asList( + NodeState.DECOMMISSIONED, + NodeState.DECOMMISSIONING + ); + pollingAssert(() -> expectedStates.contains(nodeSupplier.get().getState()), + "Node should be in one of these states: " + expectedStates); + writeToHostsFile("host1", ip); writeToHostsFile(excludeHostFile, ""); refreshNodesOption(doGraceful, conf); nm2.nodeHeartbeat(true); - latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); - rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : - rmContext.getInactiveRMNodes().get(nm2.getNodeId()); - Assert.assertEquals("Node should have been forgotten!", - rmNode, null); - Assert.assertEquals("Shutdown nodes should be 0 now", - metrics.getNumDecommisionedNMs(), 0); - Assert.assertEquals("Shutdown nodes should be 0 now", - metrics.getNumShutdownNMs(), 0); - Assert.assertEquals("Active nodes should be 2", - metrics.getNumActiveNMs(), 2); + pollingAssert(() -> nodeSupplier.get() == null, + "Node should have been forgotten!"); + pollingAssert(metrics::getNumDecommisionedNMs, 0, + "metrics#getNumDecommisionedNMs should be 0 now"); + pollingAssert(metrics::getNumShutdownNMs, 0, + "metrics#getNumShutdownNMs should be 0 now"); + pollingAssert(metrics::getNumActiveNMs, 2, + "metrics#getNumActiveNMs should be 2 now"); } private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception { @@ -2898,6 +2901,18 @@ protected ResourceTrackerService createResourceTrackerService() { mockRM.stop(); } + private void pollingAssert(Supplier supplier, String message) + throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(supplier, + 100, 10_000, message); + } + + private void pollingAssert(Supplier supplier, T expected, String message) + throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(() -> Objects.equals(supplier.get(), expected), + 100, 10_000, message); + } + /** * A no-op implementation of NodeAttributeStore for testing */