YARN-11390. TestResourceTrackerService.testNodeRemovalNormally: Shutdown nodes should be 0 now expected: <1> but was: <0> (#5190)
Reviewed-by: Peter Szucs Signed-off-by: Chris Nauroth <cnauroth@apache.org>
This commit is contained in:
parent
0a4528cd7f
commit
ee7d1787cd
@ -56,13 +56,16 @@
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import javax.xml.parsers.DocumentBuilderFactory;
|
import javax.xml.parsers.DocumentBuilderFactory;
|
||||||
import javax.xml.transform.Transformer;
|
import javax.xml.transform.Transformer;
|
||||||
import javax.xml.transform.TransformerFactory;
|
import javax.xml.transform.TransformerFactory;
|
||||||
@ -2345,8 +2348,7 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Test decommed/ing node that transitions to untracked,timer should remove
|
//Test decommed/ing node that transitions to untracked,timer should remove
|
||||||
testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3,
|
testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3, doGraceful);
|
||||||
maxThreadSleeptime, doGraceful);
|
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2354,41 +2356,41 @@ public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
|
|||||||
// max allowed length.
|
// max allowed length.
|
||||||
private void testNodeRemovalUtilDecomToUntracked(
|
private void testNodeRemovalUtilDecomToUntracked(
|
||||||
RMContext rmContext, Configuration conf,
|
RMContext rmContext, Configuration conf,
|
||||||
MockNM nm1, MockNM nm2, MockNM nm3,
|
MockNM nm1, MockNM nm2, MockNM nm3, boolean doGraceful
|
||||||
long maxThreadSleeptime, boolean doGraceful) throws Exception {
|
) throws Exception {
|
||||||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||||
String ip = NetUtils.normalizeHostName("localhost");
|
String ip = NetUtils.normalizeHostName("localhost");
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
writeToHostsFile("host1", ip, "host2");
|
writeToHostsFile("host1", ip, "host2");
|
||||||
writeToHostsFile(excludeHostFile, "host2");
|
writeToHostsFile(excludeHostFile, "host2");
|
||||||
refreshNodesOption(doGraceful, conf);
|
refreshNodesOption(doGraceful, conf);
|
||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
//nm2.nodeHeartbeat(true);
|
//nm2.nodeHeartbeat(true);
|
||||||
nm3.nodeHeartbeat(true);
|
nm3.nodeHeartbeat(true);
|
||||||
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
|
Supplier<RMNode> nodeSupplier = doGraceful
|
||||||
RMNode rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
|
? () -> rmContext.getRMNodes().get(nm2.getNodeId())
|
||||||
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
|
: () -> rmContext.getInactiveRMNodes().get(nm2.getNodeId());
|
||||||
Assert.assertNotEquals("Timer for this node was not canceled!",
|
pollingAssert(() -> nodeSupplier.get() != null,
|
||||||
rmNode, null);
|
"Timer for this node was not canceled!");
|
||||||
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
|
final List<NodeState> expectedStates = Arrays.asList(
|
||||||
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
|
NodeState.DECOMMISSIONED,
|
||||||
(rmNode.getState() == NodeState.DECOMMISSIONING));
|
NodeState.DECOMMISSIONING
|
||||||
|
);
|
||||||
|
pollingAssert(() -> expectedStates.contains(nodeSupplier.get().getState()),
|
||||||
|
"Node should be in one of these states: " + expectedStates);
|
||||||
|
|
||||||
|
|
||||||
writeToHostsFile("host1", ip);
|
writeToHostsFile("host1", ip);
|
||||||
writeToHostsFile(excludeHostFile, "");
|
writeToHostsFile(excludeHostFile, "");
|
||||||
refreshNodesOption(doGraceful, conf);
|
refreshNodesOption(doGraceful, conf);
|
||||||
nm2.nodeHeartbeat(true);
|
nm2.nodeHeartbeat(true);
|
||||||
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
|
pollingAssert(() -> nodeSupplier.get() == null,
|
||||||
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
|
"Node should have been forgotten!");
|
||||||
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
|
pollingAssert(metrics::getNumDecommisionedNMs, 0,
|
||||||
Assert.assertEquals("Node should have been forgotten!",
|
"metrics#getNumDecommisionedNMs should be 0 now");
|
||||||
rmNode, null);
|
pollingAssert(metrics::getNumShutdownNMs, 0,
|
||||||
Assert.assertEquals("Shutdown nodes should be 0 now",
|
"metrics#getNumShutdownNMs should be 0 now");
|
||||||
metrics.getNumDecommisionedNMs(), 0);
|
pollingAssert(metrics::getNumActiveNMs, 2,
|
||||||
Assert.assertEquals("Shutdown nodes should be 0 now",
|
"metrics#getNumActiveNMs should be 2 now");
|
||||||
metrics.getNumShutdownNMs(), 0);
|
|
||||||
Assert.assertEquals("Active nodes should be 2",
|
|
||||||
metrics.getNumActiveNMs(), 2);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
|
private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
|
||||||
@ -2959,6 +2961,18 @@ protected ResourceTrackerService createResourceTrackerService() {
|
|||||||
mockRM.stop();
|
mockRM.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void pollingAssert(Supplier<Boolean> supplier, String message)
|
||||||
|
throws InterruptedException, TimeoutException {
|
||||||
|
GenericTestUtils.waitFor(supplier,
|
||||||
|
100, 10_000, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> void pollingAssert(Supplier<T> supplier, T expected, String message)
|
||||||
|
throws InterruptedException, TimeoutException {
|
||||||
|
GenericTestUtils.waitFor(() -> Objects.equals(supplier.get(), expected),
|
||||||
|
100, 10_000, message);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A no-op implementation of NodeAttributeStore for testing
|
* A no-op implementation of NodeAttributeStore for testing
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user