YARN-11390. TestResourceTrackerService.testNodeRemovalNormally: Shutdown nodes should be 0 now expected: <1> but was: <0> (#5190)

Reviewed-by: Peter Szucs
Signed-off-by: Chris Nauroth <cnauroth@apache.org>
(cherry picked from commit ee7d1787cd)
This commit is contained in:
K0K0V0K 2022-12-08 18:52:19 +01:00 committed by Chris Nauroth
parent dafc9ef8b6
commit 8b748c1cb8

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.net.ServerSocketUtil;
@ -54,13 +55,16 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.HashSet; import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer; import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory; import javax.xml.transform.TransformerFactory;
@ -2284,8 +2288,7 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
} }
//Test decommed/ing node that transitions to untracked,timer should remove //Test decommed/ing node that transitions to untracked,timer should remove
testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3, testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3, doGraceful);
maxThreadSleeptime, doGraceful);
rm.stop(); rm.stop();
} }
@ -2293,41 +2296,41 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
// max allowed length. // max allowed length.
private void testNodeRemovalUtilDecomToUntracked( private void testNodeRemovalUtilDecomToUntracked(
RMContext rmContext, Configuration conf, RMContext rmContext, Configuration conf,
MockNM nm1, MockNM nm2, MockNM nm3, MockNM nm1, MockNM nm2, MockNM nm3, boolean doGraceful
long maxThreadSleeptime, boolean doGraceful) throws Exception { ) throws Exception {
ClusterMetrics metrics = ClusterMetrics.getMetrics(); ClusterMetrics metrics = ClusterMetrics.getMetrics();
String ip = NetUtils.normalizeHostName("localhost"); String ip = NetUtils.normalizeHostName("localhost");
CountDownLatch latch = new CountDownLatch(1);
writeToHostsFile("host1", ip, "host2"); writeToHostsFile("host1", ip, "host2");
writeToHostsFile(excludeHostFile, "host2"); writeToHostsFile(excludeHostFile, "host2");
refreshNodesOption(doGraceful, conf); refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
//nm2.nodeHeartbeat(true); //nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true); nm3.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); Supplier<RMNode> nodeSupplier = doGraceful
RMNode rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : ? () -> rmContext.getRMNodes().get(nm2.getNodeId())
rmContext.getInactiveRMNodes().get(nm2.getNodeId()); : () -> rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertNotEquals("Timer for this node was not canceled!", pollingAssert(() -> nodeSupplier.get() != null,
rmNode, null); "Timer for this node was not canceled!");
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", final List<NodeState> expectedStates = Arrays.asList(
(rmNode.getState() == NodeState.DECOMMISSIONED) || NodeState.DECOMMISSIONED,
(rmNode.getState() == NodeState.DECOMMISSIONING)); NodeState.DECOMMISSIONING
);
pollingAssert(() -> expectedStates.contains(nodeSupplier.get().getState()),
"Node should be in one of these states: " + expectedStates);
writeToHostsFile("host1", ip); writeToHostsFile("host1", ip);
writeToHostsFile(excludeHostFile, ""); writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf); refreshNodesOption(doGraceful, conf);
nm2.nodeHeartbeat(true); nm2.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); pollingAssert(() -> nodeSupplier.get() == null,
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : "Node should have been forgotten!");
rmContext.getInactiveRMNodes().get(nm2.getNodeId()); pollingAssert(metrics::getNumDecommisionedNMs, 0,
Assert.assertEquals("Node should have been forgotten!", "metrics#getNumDecommisionedNMs should be 0 now");
rmNode, null); pollingAssert(metrics::getNumShutdownNMs, 0,
Assert.assertEquals("Shutdown nodes should be 0 now", "metrics#getNumShutdownNMs should be 0 now");
metrics.getNumDecommisionedNMs(), 0); pollingAssert(metrics::getNumActiveNMs, 2,
Assert.assertEquals("Shutdown nodes should be 0 now", "metrics#getNumActiveNMs should be 2 now");
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
} }
private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception { private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
@ -2898,6 +2901,18 @@ protected ResourceTrackerService createResourceTrackerService() {
mockRM.stop(); mockRM.stop();
} }
private void pollingAssert(Supplier<Boolean> supplier, String message)
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(supplier,
100, 10_000, message);
}
private <T> void pollingAssert(Supplier<T> supplier, T expected, String message)
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(() -> Objects.equals(supplier.get(), expected),
100, 10_000, message);
}
/** /**
* A no-op implementation of NodeAttributeStore for testing * A no-op implementation of NodeAttributeStore for testing
*/ */