YARN-7388. TestAMRestart should be scheduler agnostic.
This commit is contained in:
parent
6c32ddad30
commit
a1382a18df
@ -1361,6 +1361,14 @@ public long getMaximumApplicationLifetime(String queueName) {
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kill a RMContainer. This is meant to be called in tests only to simulate
|
||||||
|
* AM container failures.
|
||||||
|
* @param container the container to kill
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public abstract void killContainer(RMContainer container);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update internal state of the scheduler. This can be useful for scheduler
|
* Update internal state of the scheduler. This can be useful for scheduler
|
||||||
* implementations that maintain some state that needs to be periodically
|
* implementations that maintain some state that needs to be periodically
|
||||||
|
@ -101,6 +101,19 @@ public static ContainerStatus createAbnormalContainerStatus(
|
|||||||
ContainerExitStatus.ABORTED, diagnostics);
|
ContainerExitStatus.ABORTED, diagnostics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility to create a {@link ContainerStatus} for killed containers.
|
||||||
|
* @param containerId {@link ContainerId} of the killed container.
|
||||||
|
* @param diagnostics diagnostic message
|
||||||
|
* @return <code>ContainerStatus</code> for a killed container
|
||||||
|
*/
|
||||||
|
public static ContainerStatus createKilledContainerStatus(
|
||||||
|
ContainerId containerId, String diagnostics) {
|
||||||
|
return createAbnormalContainerStatus(containerId,
|
||||||
|
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, diagnostics);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility to create a {@link ContainerStatus} during exceptional
|
* Utility to create a {@link ContainerStatus} during exceptional
|
||||||
* circumstances.
|
* circumstances.
|
||||||
|
@ -1759,6 +1759,12 @@ public void markContainerForPreemption(ApplicationAttemptId aid,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
@Override
|
||||||
|
public void killContainer(RMContainer container) {
|
||||||
|
markContainerForKillable(container);
|
||||||
|
}
|
||||||
|
|
||||||
public void markContainerForKillable(
|
public void markContainerForKillable(
|
||||||
RMContainer killableContainer) {
|
RMContainer killableContainer) {
|
||||||
try {
|
try {
|
||||||
|
@ -786,6 +786,16 @@ public Resource getNormalizedResource(Resource requestedResource) {
|
|||||||
incrAllocation);
|
incrAllocation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
@Override
|
||||||
|
public void killContainer(RMContainer container) {
|
||||||
|
ContainerStatus status = SchedulerUtils.createKilledContainerStatus(
|
||||||
|
container.getContainerId(),
|
||||||
|
"Killed by RM to simulate an AM container failure");
|
||||||
|
LOG.info("Killing container " + container);
|
||||||
|
completedContainer(container, status, RMContainerEventType.KILL);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Allocation allocate(ApplicationAttemptId appAttemptId,
|
public Allocation allocate(ApplicationAttemptId appAttemptId,
|
||||||
List<ResourceRequest> ask, List<ContainerId> release,
|
List<ResourceRequest> ask, List<ContainerId> release,
|
||||||
|
@ -975,6 +975,16 @@ protected synchronized void nodeUpdate(RMNode nm) {
|
|||||||
updateAvailableResourcesMetrics();
|
updateAvailableResourcesMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
@Override
|
||||||
|
public void killContainer(RMContainer container) {
|
||||||
|
ContainerStatus status = SchedulerUtils.createKilledContainerStatus(
|
||||||
|
container.getContainerId(),
|
||||||
|
"Killed by RM to simulate an AM container failure");
|
||||||
|
LOG.info("Killing container " + container);
|
||||||
|
completedContainer(container, status, RMContainerEventType.KILL);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void recoverContainersOnNode(
|
public synchronized void recoverContainersOnNode(
|
||||||
List<NMContainerStatus> containerReports, RMNode nm) {
|
List<NMContainerStatus> containerReports, RMNode nm) {
|
||||||
|
@ -51,10 +51,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
||||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -376,8 +374,6 @@ public void testNMTokensRebindOnAMRestart() throws Exception {
|
|||||||
@Test(timeout = 100000)
|
@Test(timeout = 100000)
|
||||||
public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
||||||
ResourceScheduler.class);
|
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
@ -389,12 +385,12 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|||||||
RMApp app1 = rm1.submitApp(200);
|
RMApp app1 = rm1.submitApp(200);
|
||||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
CapacityScheduler scheduler =
|
AbstractYarnScheduler scheduler =
|
||||||
(CapacityScheduler) rm1.getResourceScheduler();
|
(AbstractYarnScheduler) rm1.getResourceScheduler();
|
||||||
ContainerId amContainer =
|
ContainerId amContainer =
|
||||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||||
// Preempt the next attempt;
|
// Preempt the next attempt;
|
||||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
scheduler.killContainer(scheduler.getRMContainer(amContainer));
|
||||||
|
|
||||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
@ -414,7 +410,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|||||||
// Preempt the second attempt.
|
// Preempt the second attempt.
|
||||||
ContainerId amContainer2 =
|
ContainerId amContainer2 =
|
||||||
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
||||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
|
scheduler.killContainer(scheduler.getRMContainer(amContainer2));
|
||||||
|
|
||||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
@ -503,8 +499,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|||||||
@Test(timeout = 100000)
|
@Test(timeout = 100000)
|
||||||
public void testMaxAttemptOneMeansOne() throws Exception {
|
public void testMaxAttemptOneMeansOne() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
||||||
ResourceScheduler.class);
|
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
@ -516,12 +510,12 @@ public void testMaxAttemptOneMeansOne() throws Exception {
|
|||||||
RMApp app1 = rm1.submitApp(200);
|
RMApp app1 = rm1.submitApp(200);
|
||||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
CapacityScheduler scheduler =
|
AbstractYarnScheduler scheduler =
|
||||||
(CapacityScheduler) rm1.getResourceScheduler();
|
(AbstractYarnScheduler) rm1.getResourceScheduler();
|
||||||
ContainerId amContainer =
|
ContainerId amContainer =
|
||||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||||
// Preempt the attempt;
|
// Preempt the attempt;
|
||||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
scheduler.killContainer(scheduler.getRMContainer(amContainer));
|
||||||
|
|
||||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
||||||
@ -539,8 +533,6 @@ public void testMaxAttemptOneMeansOne() throws Exception {
|
|||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
||||||
ResourceScheduler.class);
|
|
||||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
||||||
|
|
||||||
@ -556,8 +548,8 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
|||||||
RMApp app1 = rm1.submitApp(200);
|
RMApp app1 = rm1.submitApp(200);
|
||||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
CapacityScheduler scheduler =
|
AbstractYarnScheduler scheduler =
|
||||||
(CapacityScheduler) rm1.getResourceScheduler();
|
(AbstractYarnScheduler) rm1.getResourceScheduler();
|
||||||
ContainerId amContainer =
|
ContainerId amContainer =
|
||||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||||
|
|
||||||
@ -577,7 +569,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
|||||||
|
|
||||||
// Forcibly preempt the am container;
|
// Forcibly preempt the am container;
|
||||||
amContainer = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
amContainer = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
||||||
scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
|
scheduler.killContainer(scheduler.getRMContainer(amContainer));
|
||||||
|
|
||||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
|
Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
|
||||||
@ -619,8 +611,6 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
|||||||
public void testRMRestartOrFailoverNotCountedForAMFailures()
|
public void testRMRestartOrFailoverNotCountedForAMFailures()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
||||||
ResourceScheduler.class);
|
|
||||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
||||||
|
|
||||||
@ -631,8 +621,8 @@ public void testRMRestartOrFailoverNotCountedForAMFailures()
|
|||||||
MockRM rm1 = new MockRM(conf);
|
MockRM rm1 = new MockRM(conf);
|
||||||
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
||||||
rm1.start();
|
rm1.start();
|
||||||
CapacityScheduler scheduler =
|
AbstractYarnScheduler scheduler =
|
||||||
(CapacityScheduler) rm1.getResourceScheduler();
|
(AbstractYarnScheduler) rm1.getResourceScheduler();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
@ -694,8 +684,6 @@ public void testRMRestartOrFailoverNotCountedForAMFailures()
|
|||||||
@Test (timeout = 120000)
|
@Test (timeout = 120000)
|
||||||
public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
||||||
ResourceScheduler.class);
|
|
||||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user