YARN-4907. Make all MockRM#waitForState consistent. (Contributed by Yufei Gu via Daniel Templeton)

This commit is contained in:
Daniel Templeton 2016-10-31 13:18:33 -07:00
parent a9d68d2e8e
commit cc2c993a8a

View File

@ -272,9 +272,10 @@ public class MockRM extends ResourceManager {
public void waitForContainerToComplete(RMAppAttempt attempt, public void waitForContainerToComplete(RMAppAttempt attempt,
NMContainerStatus completedContainer) throws InterruptedException { NMContainerStatus completedContainer) throws InterruptedException {
while (true) { int timeWaiting = 0;
while (timeWaiting < TIMEOUT_MS_FOR_CONTAINER_AND_NODE) {
List<ContainerStatus> containers = attempt.getJustFinishedContainers(); List<ContainerStatus> containers = attempt.getJustFinishedContainers();
System.out.println("Received completed containers " + containers); LOG.info("Received completed containers " + containers);
for (ContainerStatus container : containers) { for (ContainerStatus container : containers) {
if (container.getContainerId().equals( if (container.getContainerId().equals(
completedContainer.getContainerId())) { completedContainer.getContainerId())) {
@ -282,6 +283,7 @@ public class MockRM extends ResourceManager {
} }
} }
Thread.sleep(WAIT_MS_PER_LOOP); Thread.sleep(WAIT_MS_PER_LOOP);
timeWaiting += WAIT_MS_PER_LOOP;
} }
} }
@ -289,11 +291,16 @@ public class MockRM extends ResourceManager {
MockNM nm) throws Exception { MockNM nm) throws Exception {
RMApp app = getRMContext().getRMApps().get(appId); RMApp app = getRMContext().getRMApps().get(appId);
Assert.assertNotNull(app); Assert.assertNotNull(app);
int timeWaiting = 0;
while (app.getAppAttempts().size() != attemptSize) { while (app.getAppAttempts().size() != attemptSize) {
System.out.println("Application " + appId if (timeWaiting >= TIMEOUT_MS_FOR_ATTEMPT) {
break;
}
LOG.info("Application " + appId
+ " is waiting for AM to restart. Current has " + " is waiting for AM to restart. Current has "
+ app.getAppAttempts().size() + " attempts."); + app.getAppAttempts().size() + " attempts.");
Thread.sleep(WAIT_MS_PER_LOOP); Thread.sleep(WAIT_MS_PER_LOOP);
timeWaiting += WAIT_MS_PER_LOOP;
} }
return launchAndRegisterAM(app, this, nm); return launchAndRegisterAM(app, this, nm);
} }
@ -375,7 +382,7 @@ public class MockRM extends ResourceManager {
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
} }
container = getResourceScheduler().getRMContainer(containerId); container = getResourceScheduler().getRMContainer(containerId);
System.out.println("Waiting for container " + containerId + " to be " LOG.info("Waiting for container " + containerId + " to be "
+ containerState + ", container is null right now."); + containerState + ", container is null right now.");
Thread.sleep(WAIT_MS_PER_LOOP); Thread.sleep(WAIT_MS_PER_LOOP);
timeWaiting += WAIT_MS_PER_LOOP; timeWaiting += WAIT_MS_PER_LOOP;
@ -386,7 +393,7 @@ public class MockRM extends ResourceManager {
return false; return false;
} }
System.out.println("Container : " + containerId + " State is : " LOG.info("Container : " + containerId + " State is : "
+ container.getState() + " Waiting for state : " + containerState); + container.getState() + " Waiting for state : " + containerState);
for (MockNM nm : nms) { for (MockNM nm : nms) {
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
@ -395,7 +402,7 @@ public class MockRM extends ResourceManager {
timeWaiting += WAIT_MS_PER_LOOP; timeWaiting += WAIT_MS_PER_LOOP;
} }
System.out.println("Container State is : " + container.getState()); LOG.info("Container State is : " + container.getState());
return true; return true;
} }
@ -724,13 +731,13 @@ public class MockRM extends ResourceManager {
break; break;
} }
System.out.println("Node State is : " + node.getState() LOG.info("Node State is : " + node.getState()
+ " Waiting for state : " + finalState); + " Waiting for state : " + finalState);
Thread.sleep(WAIT_MS_PER_LOOP); Thread.sleep(WAIT_MS_PER_LOOP);
timeWaiting += WAIT_MS_PER_LOOP; timeWaiting += WAIT_MS_PER_LOOP;
} }
System.out.println("Node " + nodeId + " State is : " + node.getState()); LOG.info("Node " + nodeId + " State is : " + node.getState());
Assert.assertEquals("Node state is not correct (timedout)", finalState, Assert.assertEquals("Node state is not correct (timedout)", finalState,
node.getState()); node.getState());
} }
@ -949,7 +956,7 @@ public class MockRM extends ResourceManager {
.getApplicationAttempt(attemptId) && tick < 50) { .getApplicationAttempt(attemptId) && tick < 50) {
Thread.sleep(100); Thread.sleep(100);
if (tick % 10 == 0) { if (tick % 10 == 0) {
System.out.println("waiting for SchedulerApplicationAttempt=" LOG.info("waiting for SchedulerApplicationAttempt="
+ attemptId + " added."); + attemptId + " added.");
} }
tick++; tick++;
@ -966,7 +973,7 @@ public class MockRM extends ResourceManager {
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception { throws Exception {
RMAppAttempt attempt = waitForAttemptScheduled(app, rm); RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
System.out.println("Launch AM " + attempt.getAppAttemptId()); LOG.info("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
@ -979,7 +986,7 @@ public class MockRM extends ResourceManager {
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt(); RMAppAttempt attempt = app.getCurrentAppAttempt();
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
System.out.println("Launch AM " + attempt.getAppAttemptId()); LOG.info("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
MockAM am = new MockAM(rm.getRMContext(), rm.masterService, MockAM am = new MockAM(rm.getRMContext(), rm.masterService,
attempt.getAppAttemptId()); attempt.getAppAttemptId());
@ -1039,7 +1046,6 @@ public class MockRM extends ResourceManager {
client.signalToContainer(req); client.signalToContainer(req);
} }
/** /**
* Wait until an app removed from scheduler. * Wait until an app removed from scheduler.
* The timeout is 40 seconds. * The timeout is 40 seconds.
@ -1049,25 +1055,13 @@ public class MockRM extends ResourceManager {
*/ */
public void waitForAppRemovedFromScheduler(ApplicationId appId) public void waitForAppRemovedFromScheduler(ApplicationId appId)
throws InterruptedException { throws InterruptedException {
waitForAppRemovedFromScheduler(appId, TIMEOUT_MS_FOR_APP_REMOVED);
}
/**
* Wait until an app is removed from scheduler.
* @param appId the id of an app
* @param timeoutMsecs the length of timeout in milliseconds
* @throws InterruptedException
* if interrupted while waiting for app removed
*/
public void waitForAppRemovedFromScheduler(ApplicationId appId,
long timeoutMsecs) throws InterruptedException {
int timeWaiting = 0; int timeWaiting = 0;
Map<ApplicationId, SchedulerApplication> apps = Map<ApplicationId, SchedulerApplication> apps =
((AbstractYarnScheduler) getResourceScheduler()) ((AbstractYarnScheduler) getResourceScheduler())
.getSchedulerApplications(); .getSchedulerApplications();
while (apps.containsKey(appId)) { while (apps.containsKey(appId)) {
if (timeWaiting >= timeoutMsecs) { if (timeWaiting >= TIMEOUT_MS_FOR_APP_REMOVED) {
break; break;
} }
LOG.info("wait for app removed, " + appId); LOG.info("wait for app removed, " + appId);