YARN-10468. Fix TestNodeStatusUpdater timeouts and broken conditions (#2461)

This commit is contained in:
Ahmed Hussein 2020-11-24 13:09:30 -06:00 committed by GitHub
parent f813f14b5b
commit 569b20e31c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,7 +20,6 @@
import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse; import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -63,6 +62,7 @@
import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -131,8 +131,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
/** Bytes in a GigaByte. */ /** Bytes in a GigaByte. */
private static final long GB = 1024L * 1024L * 1024L; private static final long GB = 1024L * 1024L * 1024L;
volatile int heartBeatID = 0; private volatile Throwable nmStartError = null;
volatile Throwable nmStartError = null; private AtomicInteger heartBeatID = new AtomicInteger(0);
private final List<NodeId> registeredNodes = new ArrayList<NodeId>(); private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
private boolean triggered = false; private boolean triggered = false;
private NodeManager nm; private NodeManager nm;
@ -147,8 +147,12 @@ public void before() {
@After @After
public void tearDown() { public void tearDown() {
this.registeredNodes.clear(); this.registeredNodes.clear();
heartBeatID = 0; heartBeatID.set(0);
ServiceOperations.stop(nm); if (nm != null) {
ServiceOperations.stop(nm);
nm.waitForServiceToStop(10000);
}
assertionFailedInThread.set(false); assertionFailedInThread.set(false);
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
} }
@ -220,7 +224,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
EventHandler<Event> mockEventHandler = mock(EventHandler.class); EventHandler<Event> mockEventHandler = mock(EventHandler.class);
when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler); when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
NMStateStoreService stateStore = new NMNullStateStoreService(); NMStateStoreService stateStore = new NMNullStateStoreService();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID.getAndIncrement());
Map<ApplicationId, List<ContainerStatus>> appToContainers = Map<ApplicationId, List<ContainerStatus>> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
List<SignalContainerRequest> containersToSignal = null; List<SignalContainerRequest> containersToSignal = null;
@ -229,14 +233,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
ApplicationId appId2 = ApplicationId.newInstance(0, 2); ApplicationId appId2 = ApplicationId.newInstance(0, 2);
ContainerId firstContainerID = null; ContainerId firstContainerID = null;
if (heartBeatID == 1) { if (heartBeatID.get() == 1) {
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
// Give a container to the NM. // Give a container to the NM.
ApplicationAttemptId appAttemptID = ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId1, 0); ApplicationAttemptId.newInstance(appId1, 0);
firstContainerID = firstContainerID =
ContainerId.newContainerId(appAttemptID, heartBeatID); ContainerId.newContainerId(appAttemptID, heartBeatID.get());
ContainerLaunchContext launchContext = recordFactory ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class); .newRecordInstance(ContainerLaunchContext.class);
Resource resource = BuilderUtils.newResource(2, 1); Resource resource = BuilderUtils.newResource(2, 1);
@ -252,7 +256,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
Container container = new ContainerImpl(conf, mockDispatcher, Container container = new ContainerImpl(conf, mockDispatcher,
launchContext, null, mockMetrics, containerToken, context); launchContext, null, mockMetrics, containerToken, context);
this.context.getContainers().put(firstContainerID, container); this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) { } else if (heartBeatID.get() == 2) {
// Checks on the RM end // Checks on the RM end
Assert.assertEquals("Number of applications should only be one!", 1, Assert.assertEquals("Number of applications should only be one!", 1,
nodeStatus.getContainersStatuses().size()); nodeStatus.getContainersStatuses().size());
@ -277,7 +281,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
ApplicationAttemptId appAttemptID = ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId2, 0); ApplicationAttemptId.newInstance(appId2, 0);
ContainerId secondContainerID = ContainerId secondContainerID =
ContainerId.newContainerId(appAttemptID, heartBeatID); ContainerId.newContainerId(appAttemptID, heartBeatID.get());
ContainerLaunchContext launchContext = recordFactory ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class); .newRecordInstance(ContainerLaunchContext.class);
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
@ -293,7 +297,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
Container container = new ContainerImpl(conf, mockDispatcher, Container container = new ContainerImpl(conf, mockDispatcher,
launchContext, null, mockMetrics, containerToken, context); launchContext, null, mockMetrics, containerToken, context);
this.context.getContainers().put(secondContainerID, container); this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) { } else if (heartBeatID.get() == 3) {
// Checks on the RM end // Checks on the RM end
Assert.assertEquals("Number of applications should have two!", 2, Assert.assertEquals("Number of applications should have two!", 2,
appToContainers.size()); appToContainers.size());
@ -309,8 +313,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
} }
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null, newNodeHeartbeatResponse(heartBeatID.get(), null, null, null, null,
1000L); null, 1000L);
if (containersToSignal != null) { if (containersToSignal != null) {
nhResponse.addAllContainersToSignal(containersToSignal); nhResponse.addAllContainersToSignal(containersToSignal);
} }
@ -576,10 +580,10 @@ public RegisterNodeManagerResponse registerNodeManager(
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException { throws YarnException, IOException {
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID.getAndIncrement());
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, newNodeHeartbeatResponse(heartBeatID.get(), heartBeatNodeAction, null,
null, null, null, 1000L); null, null, null, 1000L);
nhResponse.setDiagnosticsMessage(shutDownMessage); nhResponse.setDiagnosticsMessage(shutDownMessage);
return nhResponse; return nhResponse;
@ -623,9 +627,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException { throws YarnException, IOException {
LOG.info("Got heartBeatId: [" + heartBeatID +"]"); LOG.info("Got heartBeatId: [" + heartBeatID +"]");
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID.getAndIncrement());
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, newNodeHeartbeatResponse(heartBeatID.get(), heartBeatNodeAction, null,
null, null, null, 1000L); null, null, null, 1000L);
if (nodeStatus.getKeepAliveApplications() != null if (nodeStatus.getKeepAliveApplications() != null
@ -639,7 +643,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
list.add(System.currentTimeMillis()); list.add(System.currentTimeMillis());
} }
} }
if (heartBeatID == 2) { if (heartBeatID.get() == 2) {
LOG.info("Sending FINISH_APP for application: [" + appId + "]"); LOG.info("Sending FINISH_APP for application: [" + appId + "]");
this.context.getApplications().put(appId, mock(Application.class)); this.context.getApplications().put(appId, mock(Application.class));
nhResponse.addAllApplicationsToCleanup(Collections.singletonList(appId)); nhResponse.addAllApplicationsToCleanup(Collections.singletonList(appId));
@ -698,11 +702,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
List<ContainerId> finishedContainersPulledByAM = new ArrayList List<ContainerId> finishedContainersPulledByAM = new ArrayList
<ContainerId>(); <ContainerId>();
try { try {
if (heartBeatID == 0) { if (heartBeatID.get() == 0) {
Assert.assertEquals(0, request.getNodeStatus().getContainersStatuses() Assert.assertEquals(0, request.getNodeStatus().getContainersStatuses()
.size()); .size());
Assert.assertEquals(0, context.getContainers().size()); Assert.assertEquals(0, context.getContainers().size());
} else if (heartBeatID == 1) { } else if (heartBeatID.get() == 1) {
List<ContainerStatus> statuses = List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses(); request.getNodeStatus().getContainersStatuses();
Assert.assertEquals(2, statuses.size()); Assert.assertEquals(2, statuses.size());
@ -712,14 +716,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
for (ContainerStatus status : statuses) { for (ContainerStatus status : statuses) {
if (status.getContainerId().equals( if (status.getContainerId().equals(
containerStatus2.getContainerId())) { containerStatus2.getContainerId())) {
Assert.assertTrue(status.getState().equals( Assert.assertEquals(containerStatus2.getState(),
containerStatus2.getState())); status.getState());
container2Exist = true; container2Exist = true;
} }
if (status.getContainerId().equals( if (status.getContainerId().equals(
containerStatus3.getContainerId())) { containerStatus3.getContainerId())) {
Assert.assertTrue(status.getState().equals( Assert.assertEquals(containerStatus3.getState(),
containerStatus3.getState())); status.getState());
container3Exist = true; container3Exist = true;
} }
} }
@ -729,7 +733,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// nodeStatusUpdaterRunnable, otherwise nm just shuts down and the // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the
// test passes. // test passes.
throw new YarnRuntimeException("Lost the heartbeat response"); throw new YarnRuntimeException("Lost the heartbeat response");
} else if (heartBeatID == 2 || heartBeatID == 3) { } else if (heartBeatID.get() == 2 || heartBeatID.get() == 3) {
List<ContainerStatus> statuses = List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses(); request.getNodeStatus().getContainersStatuses();
// NM should send completed containers on heartbeat 2, // NM should send completed containers on heartbeat 2,
@ -744,36 +748,36 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
for (ContainerStatus status : statuses) { for (ContainerStatus status : statuses) {
if (status.getContainerId().equals( if (status.getContainerId().equals(
containerStatus2.getContainerId())) { containerStatus2.getContainerId())) {
Assert.assertTrue(status.getState().equals( Assert.assertEquals(containerStatus2.getState(),
containerStatus2.getState())); status.getState());
container2Exist = true; container2Exist = true;
} }
if (status.getContainerId().equals( if (status.getContainerId().equals(
containerStatus3.getContainerId())) { containerStatus3.getContainerId())) {
Assert.assertTrue(status.getState().equals( Assert.assertEquals(containerStatus3.getState(),
containerStatus3.getState())); status.getState());
container3Exist = true; container3Exist = true;
} }
if (status.getContainerId().equals( if (status.getContainerId().equals(
containerStatus4.getContainerId())) { containerStatus4.getContainerId())) {
Assert.assertTrue(status.getState().equals( Assert.assertEquals(containerStatus4.getState(),
containerStatus4.getState())); status.getState());
container4Exist = true; container4Exist = true;
} }
if (status.getContainerId().equals( if (status.getContainerId().equals(
containerStatus5.getContainerId())) { containerStatus5.getContainerId())) {
Assert.assertTrue(status.getState().equals( Assert.assertEquals(containerStatus5.getState(),
containerStatus5.getState())); status.getState());
container5Exist = true; container5Exist = true;
} }
} }
Assert.assertTrue(container2Exist && container3Exist Assert.assertTrue(container2Exist && container3Exist
&& container4Exist && container5Exist); && container4Exist && container5Exist);
if (heartBeatID == 3) { if (heartBeatID.get() == 3) {
finishedContainersPulledByAM.add(containerStatus3.getContainerId()); finishedContainersPulledByAM.add(containerStatus3.getContainerId());
} }
} else if (heartBeatID == 4) { } else if (heartBeatID.get() == 4) {
List<ContainerStatus> statuses = List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses(); request.getNodeStatus().getContainersStatuses();
Assert.assertEquals(2, statuses.size()); Assert.assertEquals(2, statuses.size());
@ -793,12 +797,12 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
error.printStackTrace(); error.printStackTrace();
assertionFailedInThread.set(true); assertionFailedInThread.set(true);
} finally { } finally {
heartBeatID++; heartBeatID.incrementAndGet();
} }
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID); nodeStatus.setResponseId(heartBeatID.get());
NodeHeartbeatResponse nhResponse = NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID.get(),
heartBeatNodeAction, null, null, null, null, 1000L); heartBeatNodeAction, null, null, null, null, 1000L);
nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM); nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
Map<ApplicationId, ByteBuffer> appCredentials = Map<ApplicationId, ByteBuffer> appCredentials =
@ -839,8 +843,7 @@ public RegisterNodeManagerResponse registerNodeManager(
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException { throws YarnException, IOException {
heartBeatID++; if (heartBeatID.incrementAndGet() == 1) {
if(heartBeatID == 1) {
// EOFException should be retried as well. // EOFException should be retried as well.
throw new EOFException("NodeHeartbeat exception"); throw new EOFException("NodeHeartbeat exception");
} }
@ -909,10 +912,10 @@ public RegisterNodeManagerResponse registerNodeManager(
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException { throws YarnException, IOException {
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID.getAndIncrement());
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null, newNodeHeartbeatResponse(heartBeatID.get(), NodeAction.NORMAL, null,
null, null, null, 1000L); null, null, null, 1000L);
return nhResponse; return nhResponse;
} }
@ -1141,7 +1144,7 @@ public ContainerState getCurrentState() {
} }
@Test @Test
public void testNMRegistration() throws InterruptedException, IOException { public void testNMRegistration() throws Exception {
nm = new NodeManager() { nm = new NodeManager() {
@Override @Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@ -1161,43 +1164,32 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Assert.assertTrue("last service is NOT the node status updater", Assert.assertTrue("last service is NOT the node status updater",
lastService instanceof NodeStatusUpdater); lastService instanceof NodeStatusUpdater);
new Thread() { Thread starterThread = new Thread(() -> {
public void run() { try {
try { nm.start();
nm.start(); } catch (Throwable e) {
} catch (Throwable e) { TestNodeStatusUpdater.this.nmStartError = e;
TestNodeStatusUpdater.this.nmStartError = e; throw new YarnRuntimeException(e);
throw new YarnRuntimeException(e);
}
} }
}.start(); });
starterThread.start();
System.out.println(" ----- thread already started.." LOG.info(" ----- thread already started..{}", nm.getServiceState());
+ nm.getServiceState());
int waitCount = 0; starterThread.join(100000);
while (nm.getServiceState() == STATE.INITED && waitCount++ != 50) {
LOG.info("Waiting for NM to start.."); if (nmStartError != null) {
if (nmStartError != null) { LOG.error("Error during startup. ", nmStartError);
LOG.error("Error during startup. ", nmStartError); Assert.fail(nmStartError.getCause().getMessage());
Assert.fail(nmStartError.getCause().getMessage());
}
Thread.sleep(2000);
}
if (nm.getServiceState() != STATE.STARTED) {
// NM could have failed.
Assert.fail("NodeManager failed to start");
} }
waitCount = 0; GenericTestUtils.waitFor(
while (heartBeatID <= 3 && waitCount++ != 200) { () -> nm.getServiceState() != STATE.STARTED || heartBeatID.get() > 3,
Thread.sleep(1000); 50, 20000);
}
Assert.assertFalse(heartBeatID <= 3);
Assert.assertEquals("Number of registered NMs is wrong!!", 1,
this.registeredNodes.size());
nm.stop(); Assert.assertTrue(heartBeatID.get() > 3);
Assert.assertEquals("Number of registered NMs is wrong!!",
1, this.registeredNodes.size());
} }
@Test @Test
@ -1236,31 +1228,23 @@ public void cleanUpApplicationsOnNMShutDown() {
YarnConfiguration conf = createNMConfig(); YarnConfiguration conf = createNMConfig();
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED,
int waitCount = 0; 20, 10000);
while (heartBeatID < 1 && waitCount++ != 200) { GenericTestUtils.waitFor(
Thread.sleep(500); () -> nm.getServiceState() != STATE.STARTED || heartBeatID.get() >= 1,
} 50, 20000);
Assert.assertFalse(heartBeatID < 1); Assert.assertTrue(heartBeatID.get() >= 1);
// Meanwhile call stop directly as the shutdown hook would // Meanwhile call stop directly as the shutdown hook would
nm.stop(); nm.stop();
// NM takes a while to reach the STOPPED state. // NM takes a while to reach the STOPPED state.
waitCount = 0; nm.waitForServiceToStop(20000);
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
// It further takes a while after NM reached the STOPPED state. // It further takes a while after NM reached the STOPPED state.
waitCount = 0; GenericTestUtils.waitFor(() -> numCleanups.get() > 0, 20, 20000);
while (numCleanups.get() == 0 && waitCount++ != 20) {
LOG.info("Waiting for NM shutdown..");
Thread.sleep(1000);
}
Assert.assertEquals(1, numCleanups.get()); Assert.assertEquals(1, numCleanups.get());
} }
@ -1271,20 +1255,22 @@ public void testNodeDecommision() throws Exception {
nm.init(conf); nm.init(conf);
Assert.assertEquals(STATE.INITED, nm.getServiceState()); Assert.assertEquals(STATE.INITED, nm.getServiceState());
nm.start(); nm.start();
GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED,
int waitCount = 0; 20, 10000);
while (heartBeatID < 1 && waitCount++ != 200) { GenericTestUtils.waitFor(
Thread.sleep(500); () -> {
} if (nm.getServiceState() == STATE.STARTED) {
Assert.assertFalse(heartBeatID < 1); return (heartBeatID.get() >= 1
&& nm.getNMContext().getDecommissioned());
}
return true;
},
50, 200000);
Assert.assertTrue(heartBeatID.get() >= 1);
Assert.assertTrue(nm.getNMContext().getDecommissioned()); Assert.assertTrue(nm.getNMContext().getDecommissioned());
// NM takes a while to reach the STOPPED state. // NM takes a while to reach the STOPPED state.
waitCount = 0; nm.waitForServiceToStop(20000);
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
} }
@ -1529,9 +1515,14 @@ public void testApplicationKeepAlive() throws Exception {
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
// HB 2 -> app cancelled by RM. // HB 2 -> app cancelled by RM.
while (heartBeatID < 12) { GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED, 20,
Thread.sleep(1000l); 10000);
} GenericTestUtils.waitFor(
() -> nm.getServiceState() != STATE.STARTED
|| heartBeatID.get() >= 12,
100L, 60000000);
Assert.assertTrue(heartBeatID.get() >= 12);
MyResourceTracker3 rt = MyResourceTracker3 rt =
(MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient(); (MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient();
rt.context.getApplications().remove(rt.appId); rt.context.getApplications().remove(rt.appId);
@ -1539,14 +1530,18 @@ public void testApplicationKeepAlive() throws Exception {
int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size(); int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size();
LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]"); LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]");
Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3); Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3);
while (heartBeatID < 20) { GenericTestUtils.waitFor(
Thread.sleep(1000l); () -> nm.getServiceState() != STATE.STARTED
} || heartBeatID.get() >= 20,
100L, 60000000);
Assert.assertTrue(heartBeatID.get() >= 20);
int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size(); int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size();
Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2); Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2);
} finally { } finally {
if (nm.getServiceState() == STATE.STARTED) if (nm != null) {
nm.stop(); nm.stop();
nm.waitForServiceToStop(10000);
}
} }
} }
@ -1581,20 +1576,19 @@ protected NMContext createNMContext(
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
int waitCount = 0; GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED,
while (heartBeatID <= 4 && waitCount++ != 20) { 20, 10000);
Thread.sleep(500);
} GenericTestUtils.waitFor(
if (heartBeatID <= 4) { () -> nm.getServiceState() != STATE.STARTED || heartBeatID.get() > 4,
Assert.fail("Failed to get all heartbeats in time, " + 50, 20000);
"heartbeatID:" + heartBeatID); int hbID = heartBeatID.get();
} Assert.assertFalse("Failed to get all heartbeats in time, "
if(assertionFailedInThread.get()) { + "heartbeatID:" + hbID, hbID <= 4);
Assert.fail("ContainerStatus Backup failed"); Assert.assertFalse("ContainerStatus Backup failed",
} assertionFailedInThread.get());
Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps() Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps()
.get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1"))); .get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1")));
nm.stop();
} }
@Test(timeout = 200000) @Test(timeout = 200000)
@ -1631,13 +1625,12 @@ public void testNodeStatusUpdaterRetryAndNMShutdown()
Assert.assertFalse("Containers not cleaned up when NM stopped", Assert.assertFalse("Containers not cleaned up when NM stopped",
assertionFailedInThread.get()); assertionFailedInThread.get());
Assert.assertTrue(((MyNodeManager2) nm).isStopped); Assert.assertTrue(((MyNodeManager2) nm).isStopped);
Assert.assertTrue("calculate heartBeatCount based on" + Assert.assertEquals("calculate heartBeatCount based on" +
" connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2); " connectionWaitSecs and RetryIntervalSecs", 2, heartBeatID.get());
} }
@Test @Test
public void testRMVersionLessThanMinimum() throws InterruptedException, public void testRMVersionLessThanMinimum() throws Exception {
IOException {
final AtomicInteger numCleanups = new AtomicInteger(0); final AtomicInteger numCleanups = new AtomicInteger(0);
YarnConfiguration conf = createNMConfig(); YarnConfiguration conf = createNMConfig();
conf.set(YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, "3.0.0"); conf.set(YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, "3.0.0");
@ -1674,15 +1667,9 @@ public void cleanUpApplicationsOnNMShutDown() {
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
// NM takes a while to reach the STARTED state. // NM takes a while to reach the STARTED state.
int waitCount = 0; GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED,
while (nm.getServiceState() != STATE.STARTED && waitCount++ != 20) { 20, 200000);
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertTrue(nm.getServiceState() == STATE.STARTED);
nm.stop();
} }
@ -1712,37 +1699,20 @@ protected ContainerManagerImpl createContainerManager(Context context,
YarnConfiguration conf = createNMConfig(); YarnConfiguration conf = createNMConfig();
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
GenericTestUtils.waitFor(() -> nm.getServiceState() == STATE.STARTED,
20, 20000);
System.out.println(" ----- thread already started.." GenericTestUtils.waitFor(
+ nm.getServiceState()); () -> nm.getServiceState() != STATE.STARTED
|| heartBeatID.get() > 3,
int waitCount = 0; 50, 20000);
while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) { Assert.assertTrue(heartBeatID.get() > 3);
LOG.info("Waiting for NM to start..");
if (nmStartError != null) {
LOG.error("Error during startup. ", nmStartError);
Assert.fail(nmStartError.getCause().getMessage());
}
Thread.sleep(1000);
}
if (nm.getServiceState() != STATE.STARTED) {
// NM could have failed.
Assert.fail("NodeManager failed to start");
}
waitCount = 0;
while (heartBeatID <= 3 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID <= 3);
Assert.assertEquals("Number of registered NMs is wrong!!", 1, Assert.assertEquals("Number of registered NMs is wrong!!", 1,
this.registeredNodes.size()); this.registeredNodes.size());
MyContainerManager containerManager = MyContainerManager containerManager =
(MyContainerManager)nm.getContainerManager(); (MyContainerManager)nm.getContainerManager();
Assert.assertTrue(containerManager.signaled); Assert.assertTrue(containerManager.signaled);
nm.stop();
} }
@Test @Test
@ -1823,38 +1793,48 @@ public void testUpdateNMResources() throws Exception {
LOG.info("Start the Node Manager"); LOG.info("Start the Node Manager");
NodeManager nodeManager = new NodeManager(); NodeManager nodeManager = new NodeManager();
YarnConfiguration nmConf = new YarnConfiguration(); YarnConfiguration nmConf = new YarnConfiguration();
nmConf.setSocketAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, try {
resourceTracker.getListenerAddress()); nmConf.setSocketAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
nmConf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0"); resourceTracker.getListenerAddress());
nodeManager.init(nmConf); nmConf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0");
nodeManager.start(); nodeManager.init(nmConf);
nodeManager.start();
LOG.info("Initially the Node Manager should have the default resources"); LOG.info("Initially the Node Manager should have the default resources");
ContainerManager containerManager = nodeManager.getContainerManager(); ContainerManager containerManager = nodeManager.getContainerManager();
ContainersMonitor containerMonitor = ContainersMonitor containerMonitor =
containerManager.getContainersMonitor(); containerManager.getContainersMonitor();
assertEquals(8, containerMonitor.getVCoresAllocatedForContainers()); Assert.assertEquals(8,
assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers()); containerMonitor.getVCoresAllocatedForContainers());
Assert.assertEquals(8 * GB,
containerMonitor.getPmemAllocatedForContainers());
LOG.info("The first heartbeat should trigger a resource change to {}", LOG.info("The first heartbeat should trigger a resource change to {}",
resource); resource);
GenericTestUtils.waitFor( GenericTestUtils.waitFor(
() -> containerMonitor.getVCoresAllocatedForContainers() == 1, () -> containerMonitor.getVCoresAllocatedForContainers() == 1,
100, 2 * 1000); 100, 2 * 1000);
assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers()); Assert.assertEquals(8 * GB,
containerMonitor.getPmemAllocatedForContainers());
resource.setVirtualCores(5); resource.setVirtualCores(5);
resource.setMemorySize(4 * 1024); resource.setMemorySize(4 * 1024);
LOG.info("Change the resources to {}", resource); LOG.info("Change the resources to {}", resource);
GenericTestUtils.waitFor( GenericTestUtils.waitFor(
() -> containerMonitor.getVCoresAllocatedForContainers() == 5, () -> containerMonitor.getVCoresAllocatedForContainers() == 5,
100, 2 * 1000); 100, 2 * 1000);
assertEquals(4 * GB, containerMonitor.getPmemAllocatedForContainers()); Assert.assertEquals(4 * GB,
containerMonitor.getPmemAllocatedForContainers());
LOG.info("Cleanup"); } finally {
nodeManager.stop(); LOG.info("Cleanup");
nodeManager.close(); nodeManager.stop();
resourceTracker.stop(); try {
nodeManager.close();
} catch (IOException ex) {
LOG.error("Could not close the node manager", ex);
}
resourceTracker.stop();
}
} }
/** /**
@ -1908,9 +1888,9 @@ public MyNMContext(
@Override @Override
public ConcurrentMap<ContainerId, Container> getContainers() { public ConcurrentMap<ContainerId, Container> getContainers() {
if (heartBeatID == 0) { if (heartBeatID.get() == 0) {
return containers; return containers;
} else if (heartBeatID == 1) { } else if (heartBeatID.get() == 1) {
ContainerStatus containerStatus2 = ContainerStatus containerStatus2 =
createContainerStatus(2, ContainerState.RUNNING); createContainerStatus(2, ContainerState.RUNNING);
putMockContainer(containerStatus2); putMockContainer(containerStatus2);
@ -1919,7 +1899,7 @@ public ConcurrentMap<ContainerId, Container> getContainers() {
createContainerStatus(3, ContainerState.COMPLETE); createContainerStatus(3, ContainerState.COMPLETE);
putMockContainer(containerStatus3); putMockContainer(containerStatus3);
return containers; return containers;
} else if (heartBeatID == 2) { } else if (heartBeatID.get() == 2) {
ContainerStatus containerStatus4 = ContainerStatus containerStatus4 =
createContainerStatus(4, ContainerState.RUNNING); createContainerStatus(4, ContainerState.RUNNING);
putMockContainer(containerStatus4); putMockContainer(containerStatus4);
@ -1928,7 +1908,7 @@ public ConcurrentMap<ContainerId, Container> getContainers() {
createContainerStatus(5, ContainerState.COMPLETE); createContainerStatus(5, ContainerState.COMPLETE);
putMockContainer(containerStatus5); putMockContainer(containerStatus5);
return containers; return containers;
} else if (heartBeatID == 3 || heartBeatID == 4) { } else if (heartBeatID.get() == 3 || heartBeatID.get() == 4) {
return containers; return containers;
} else { } else {
containers.clear(); containers.clear();
@ -1978,22 +1958,16 @@ private void verifyNodeStartFailure(String errMessage) throws Exception {
Assert.assertNotNull("nm is null", nm); Assert.assertNotNull("nm is null", nm);
YarnConfiguration conf = createNMConfig(); YarnConfiguration conf = createNMConfig();
nm.init(conf); nm.init(conf);
try {
nm.start(); //the version in trunk looked in the cause for equality
Assert.fail("NM should have failed to start. Didn't get exception!!"); // and assumed failures were nested.
} catch (Exception e) { //this version assumes that error strings propagate to the base and
//the version in trunk looked in the cause for equality //use a contains() test only. It should be less brittle
// and assumed failures were nested. LambdaTestUtils.intercept(Exception.class, errMessage, () -> nm.start());
//this version assumes that error strings propagate to the base and
//use a contains() test only. It should be less brittle
if(!e.getMessage().contains(errMessage)) {
throw e;
}
}
// the service should be stopped // the service should be stopped
Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm Assert.assertEquals("NM state is wrong!", STATE.STOPPED,
.getServiceState()); nm.getServiceState());
Assert.assertEquals("Number of registered nodes is wrong!", 0, Assert.assertEquals("Number of registered nodes is wrong!", 0,
this.registeredNodes.size()); this.registeredNodes.size());