YARN-9413. Queue resource leak after app fail for CapacityScheduler. Contributed by Tao Yang.
This commit is contained in:
parent
1b9ba0ebb2
commit
ec143cbf67
@ -1502,7 +1502,8 @@ public void transition(RMAppAttemptImpl appAttempt,
|
|||||||
&& !appAttempt.submissionContext.getUnmanagedAM()) {
|
&& !appAttempt.submissionContext.getUnmanagedAM()) {
|
||||||
int numberOfFailure = ((RMAppImpl)appAttempt.rmApp)
|
int numberOfFailure = ((RMAppImpl)appAttempt.rmApp)
|
||||||
.getNumFailedAppAttempts();
|
.getNumFailedAppAttempts();
|
||||||
if (numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) {
|
if (appAttempt.rmApp.getMaxAppAttempts() > 1
|
||||||
|
&& numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) {
|
||||||
keepContainersAcrossAppAttempts = true;
|
keepContainersAcrossAppAttempts = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,6 +60,10 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -381,8 +385,10 @@ public void testNMTokensRebindOnAMRestart() throws Exception {
|
|||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
// AM container preempted, nm disk failure
|
/**
|
||||||
// should not be counted towards AM max retry count.
|
* AM container preempted, nm disk failure
|
||||||
|
* should not be counted towards AM max retry count.
|
||||||
|
*/
|
||||||
@Test(timeout = 100000)
|
@Test(timeout = 100000)
|
||||||
public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
||||||
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
@ -408,7 +414,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|||||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
am1.getApplicationAttemptId());
|
am1.getApplicationAttemptId());
|
||||||
|
|
||||||
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry());
|
||||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
ApplicationStateData appState =
|
ApplicationStateData appState =
|
||||||
((MemoryRMStateStore) rm1.getRMStateStore()).getState()
|
((MemoryRMStateStore) rm1.getRMStateStore()).getState()
|
||||||
@ -428,7 +434,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|||||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
am2.getApplicationAttemptId());
|
am2.getApplicationAttemptId());
|
||||||
|
|
||||||
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
|
||||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
MockAM am3 =
|
MockAM am3 =
|
||||||
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
|
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
|
||||||
@ -450,7 +456,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|||||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
am3.getApplicationAttemptId());
|
am3.getApplicationAttemptId());
|
||||||
|
|
||||||
Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertFalse(attempt3.shouldCountTowardsMaxAttemptRetry());
|
||||||
Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
|
Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
|
||||||
appState.getAttempt(am3.getApplicationAttemptId())
|
appState.getAttempt(am3.getApplicationAttemptId())
|
||||||
.getAMContainerExitStatus());
|
.getAMContainerExitStatus());
|
||||||
@ -539,9 +545,11 @@ public void testMaxAttemptOneMeansOne() throws Exception {
|
|||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test RM restarts after AM container is preempted, new RM should not count
|
/**
|
||||||
// AM preemption failure towards the max-retry-account and should be able to
|
* Test RM restarts after AM container is preempted, new RM should not count
|
||||||
// re-launch the AM.
|
* AM preemption failure towards the max-retry-account and should be able to
|
||||||
|
* re-launch the AM.
|
||||||
|
*/
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
||||||
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
@ -624,9 +632,11 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
|||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test regular RM restart/failover, new RM should not count
|
/**
|
||||||
// AM failure towards the max-retry-account and should be able to
|
* Test regular RM restart/failover, new RM should not count
|
||||||
// re-launch the AM.
|
* AM failure towards the max-retry-account and should be able to
|
||||||
|
* re-launch the AM.
|
||||||
|
*/
|
||||||
@Test(timeout = 50000)
|
@Test(timeout = 50000)
|
||||||
public void testRMRestartOrFailoverNotCountedForAMFailures()
|
public void testRMRestartOrFailoverNotCountedForAMFailures()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
@ -944,9 +954,11 @@ public void testAMRestartNotLostContainerCompleteMsg() throws Exception {
|
|||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test restarting AM launched with the KeepContainers and AM reset window.
|
/**
|
||||||
// after AM reset window, even if AM who was the last is failed,
|
* Test restarting AM launched with the KeepContainers and AM reset window.
|
||||||
// all containers are launched by previous AM should be kept.
|
* after AM reset window, even if AM who was the last is failed,
|
||||||
|
* all containers are launched by previous AM should be kept.
|
||||||
|
*/
|
||||||
@Test (timeout = 20000)
|
@Test (timeout = 20000)
|
||||||
public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval()
|
public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
@ -1014,23 +1026,25 @@ public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval()
|
|||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test to verify that the containers of previous attempt are returned in
|
/**
|
||||||
// the RM response to the heartbeat of AM if these containers were not
|
* Test to verify that the containers of previous attempt are returned in
|
||||||
// recovered by the time AM registered.
|
* the RM response to the heartbeat of AM if these containers were not
|
||||||
//
|
* recovered by the time AM registered.
|
||||||
// 1. App is started with 2 containers running on 2 different nodes-
|
*
|
||||||
// container 2 on the NM1 node and container 3 on the NM2 node.
|
* 1. App is started with 2 containers running on 2 different nodes-
|
||||||
// 2. Fail the AM of the application.
|
* container 2 on the NM1 node and container 3 on the NM2 node.
|
||||||
// 3. Simulate RM restart.
|
* 2. Fail the AM of the application.
|
||||||
// 4. NM1 connects to the restarted RM immediately. It sends the RM the status
|
* 3. Simulate RM restart.
|
||||||
// of container 2.
|
* 4. NM1 connects to the restarted RM immediately. It sends the RM the status
|
||||||
// 5. 2nd attempt of the app is launched and the app master registers with RM.
|
* of container 2.
|
||||||
// 6. Verify that app master receives container 2 in the RM response to
|
* 5. 2nd attempt of the app is launched and the app master registers with RM.
|
||||||
// register request.
|
* 6. Verify that app master receives container 2 in the RM response to
|
||||||
// 7. NM2 connects to the RM after a delay. It sends the RM the status of
|
* register request.
|
||||||
// container 3.
|
* 7. NM2 connects to the RM after a delay. It sends the RM the status of
|
||||||
// 8. Verify that the app master receives container 3 in the RM response to
|
* container 3.
|
||||||
// its heartbeat.
|
* 8. Verify that the app master receives container 3 in the RM response to
|
||||||
|
* its heartbeat.
|
||||||
|
*/
|
||||||
@Test(timeout = 200000)
|
@Test(timeout = 200000)
|
||||||
public void testContainersFromPreviousAttemptsWithRMRestart()
|
public void testContainersFromPreviousAttemptsWithRMRestart()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
@ -1167,4 +1181,73 @@ public void testContainersFromPreviousAttemptsWithRMRestart()
|
|||||||
rm2.stop();
|
rm2.stop();
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify that there is no queue resource leak after app fail.
|
||||||
|
*
|
||||||
|
* 1. Submit an app which is configured to keep containers across app
|
||||||
|
* attempts and should fail after AM finished (am-max-attempts=1).
|
||||||
|
* 2. App is started with 2 containers running on NM1 node.
|
||||||
|
* 3. Preempt the AM of the application which should not count towards max
|
||||||
|
* attempt retry but app will fail immediately.
|
||||||
|
* 4. Verify that the used resource of queue should be cleaned up normally
|
||||||
|
* after app fail.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testQueueResourceDoesNotLeak() throws Exception {
|
||||||
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
|
getConf()
|
||||||
|
.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
MockRM rm1 = new MockRM(getConf());
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
RMApp app1 = rm1.submitApp(200, 0, true);
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
allocateContainers(nm1, am1, 1);
|
||||||
|
|
||||||
|
// launch the 2nd container, for testing running container transferred.
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
|
||||||
|
ContainerState.RUNNING);
|
||||||
|
ContainerId containerId2 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
||||||
|
|
||||||
|
AbstractYarnScheduler scheduler =
|
||||||
|
(AbstractYarnScheduler) rm1.getResourceScheduler();
|
||||||
|
ContainerId amContainer =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||||
|
// Preempt AM container
|
||||||
|
scheduler.killContainer(scheduler.getRMContainer(amContainer));
|
||||||
|
|
||||||
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
|
am1.getApplicationAttemptId());
|
||||||
|
|
||||||
|
Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry());
|
||||||
|
|
||||||
|
// AM should not be restarted.
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||||
|
|
||||||
|
// After app1 failed, used resource of this queue should
|
||||||
|
// be cleaned up, otherwise resource leak happened.
|
||||||
|
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
||||||
|
LeafQueue queue =
|
||||||
|
(LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
|
||||||
|
Assert.assertEquals(0,
|
||||||
|
queue.getQueueResourceUsage().getUsed().getMemorySize());
|
||||||
|
Assert.assertEquals(0,
|
||||||
|
queue.getQueueResourceUsage().getUsed().getVirtualCores());
|
||||||
|
} else if (getSchedulerType() == SchedulerType.FAIR) {
|
||||||
|
FSLeafQueue queue = ((FairScheduler) scheduler).getQueueManager()
|
||||||
|
.getLeafQueue("root.default", false);
|
||||||
|
Assert.assertEquals(0, queue.getResourceUsage().getMemorySize());
|
||||||
|
Assert.assertEquals(0, queue.getResourceUsage().getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user