YARN-5037. Fix random failure of TestRMRestart#testQueueMetricsOnRMRestart (sandflee via Varun Saxena).
This commit is contained in:
parent
b191c6b202
commit
0fd3980a1f
@ -88,6 +88,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
@ -109,6 +110,7 @@ public class MockRM extends ResourceManager {
|
|||||||
static final String ENABLE_WEBAPP = "mockrm.webapp.enabled";
|
static final String ENABLE_WEBAPP = "mockrm.webapp.enabled";
|
||||||
private static final int SECOND = 1000;
|
private static final int SECOND = 1000;
|
||||||
private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND;
|
private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND;
|
||||||
|
private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND;
|
||||||
private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 10 * SECOND;
|
private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 10 * SECOND;
|
||||||
private static final int WAIT_MS_PER_LOOP = 10;
|
private static final int WAIT_MS_PER_LOOP = 10;
|
||||||
|
|
||||||
@ -1016,4 +1018,44 @@ public void signalToContainer(ContainerId containerId,
|
|||||||
SignalContainerRequest.newInstance(containerId, command);
|
SignalContainerRequest.newInstance(containerId, command);
|
||||||
client.signalToContainer(req);
|
client.signalToContainer(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until an app removed from scheduler.
|
||||||
|
* The timeout is 40 seconds.
|
||||||
|
* @param appId the id of an app
|
||||||
|
* @throws InterruptedException
|
||||||
|
* if interrupted while waiting for app removed
|
||||||
|
*/
|
||||||
|
public void waitForAppRemovedFromScheduler(ApplicationId appId)
|
||||||
|
throws InterruptedException {
|
||||||
|
waitForAppRemovedFromScheduler(appId, TIMEOUT_MS_FOR_APP_REMOVED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until an app is removed from scheduler.
|
||||||
|
* @param appId the id of an app
|
||||||
|
* @param timeoutMsecs the length of timeout in milliseconds
|
||||||
|
* @throws InterruptedException
|
||||||
|
* if interrupted while waiting for app removed
|
||||||
|
*/
|
||||||
|
public void waitForAppRemovedFromScheduler(ApplicationId appId,
|
||||||
|
long timeoutMsecs) throws InterruptedException {
|
||||||
|
int timeWaiting = 0;
|
||||||
|
|
||||||
|
Map<ApplicationId, SchedulerApplication> apps =
|
||||||
|
((AbstractYarnScheduler) getResourceScheduler())
|
||||||
|
.getSchedulerApplications();
|
||||||
|
while (apps.containsKey(appId)) {
|
||||||
|
if (timeWaiting >= timeoutMsecs) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
LOG.info("wait for app removed, " + appId);
|
||||||
|
Thread.sleep(WAIT_MS_PER_LOOP);
|
||||||
|
timeWaiting += WAIT_MS_PER_LOOP;
|
||||||
|
}
|
||||||
|
Assert.assertTrue("app is not removed from scheduler (timeout).",
|
||||||
|
!apps.containsKey(appId));
|
||||||
|
LOG.info("app is removed from scheduler, " + appId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1909,6 +1909,10 @@ public void testQueueMetricsOnRMRestart() throws Exception {
|
|||||||
|
|
||||||
// finish the AMs
|
// finish the AMs
|
||||||
finishApplicationMaster(loadedApp1, rm2, nm1, am1);
|
finishApplicationMaster(loadedApp1, rm2, nm1, am1);
|
||||||
|
// now AppAttempt and App becomes FINISHED,
|
||||||
|
// we should also grant APP_ATTEMPT_REMOVE/APP_REMOVE event
|
||||||
|
// had processed by scheduler
|
||||||
|
rm2.waitForAppRemovedFromScheduler(loadedApp1.getApplicationId());
|
||||||
assertQueueMetrics(qm2, 1, 0, 0, 1);
|
assertQueueMetrics(qm2, 1, 0, 0, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1930,14 +1934,14 @@ private void resetQueueMetrics(QueueMetrics qm) {
|
|||||||
|
|
||||||
private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted,
|
private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted,
|
||||||
int appsPending, int appsRunning, int appsCompleted) {
|
int appsPending, int appsRunning, int appsCompleted) {
|
||||||
Assert.assertEquals(qm.getAppsSubmitted(),
|
Assert.assertEquals(appsSubmitted + appsSubmittedCarryOn,
|
||||||
appsSubmitted + appsSubmittedCarryOn);
|
qm.getAppsSubmitted());
|
||||||
Assert.assertEquals(qm.getAppsPending(),
|
Assert.assertEquals(appsPending + appsPendingCarryOn,
|
||||||
appsPending + appsPendingCarryOn);
|
qm.getAppsPending());
|
||||||
Assert.assertEquals(qm.getAppsRunning(),
|
Assert.assertEquals(appsRunning + appsRunningCarryOn,
|
||||||
appsRunning + appsRunningCarryOn);
|
qm.getAppsRunning());
|
||||||
Assert.assertEquals(qm.getAppsCompleted(),
|
Assert.assertEquals(appsCompleted + appsCompletedCarryOn,
|
||||||
appsCompleted + appsCompletedCarryOn);
|
qm.getAppsCompleted());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 60000)
|
@Test (timeout = 60000)
|
||||||
|
Loading…
Reference in New Issue
Block a user