diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 53f43e428b..7308fd8b0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1361,6 +1361,14 @@ public long getMaximumApplicationLifetime(String queueName) { return -1; } + /** + * Kill a RMContainer. This is meant to be called in tests only to simulate + * AM container failures. + * @param container the container to kill + */ + @VisibleForTesting + public abstract void killContainer(RMContainer container); + /** * Update internal state of the scheduler. This can be useful for scheduler * implementations that maintain some state that needs to be periodically diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index c558b8dd91..32f5824e6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -101,6 +101,19 @@ public static ContainerStatus createAbnormalContainerStatus( ContainerExitStatus.ABORTED, diagnostics); } + + /** + * Utility to create a {@link ContainerStatus} for killed containers. + * @param containerId {@link ContainerId} of the killed container. + * @param diagnostics diagnostic message + * @return ContainerStatus for a killed container + */ + public static ContainerStatus createKilledContainerStatus( + ContainerId containerId, String diagnostics) { + return createAbnormalContainerStatus(containerId, + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, diagnostics); + } + /** * Utility to create a {@link ContainerStatus} during exceptional * circumstances. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index db69042ff5..5e172b8e74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1759,6 +1759,12 @@ public void markContainerForPreemption(ApplicationAttemptId aid, } } + @VisibleForTesting + @Override + public void killContainer(RMContainer container) { + markContainerForKillable(container); + } + public void markContainerForKillable( RMContainer killableContainer) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 8ea07ab8b1..7f1b91e32a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -786,6 +786,16 @@ public Resource getNormalizedResource(Resource requestedResource) { incrAllocation); } + @VisibleForTesting + @Override + public void killContainer(RMContainer container) { + ContainerStatus status = SchedulerUtils.createKilledContainerStatus( + container.getContainerId(), + "Killed by RM to simulate an AM container failure"); + LOG.info("Killing container " + container); + completedContainer(container, status, RMContainerEventType.KILL); + } + @Override public Allocation allocate(ApplicationAttemptId appAttemptId, List ask, List release, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 185d426d71..3288912836 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -975,6 +975,16 @@ protected synchronized void nodeUpdate(RMNode nm) { updateAvailableResourcesMetrics(); } + @VisibleForTesting + @Override + public void killContainer(RMContainer container) { + ContainerStatus status = SchedulerUtils.createKilledContainerStatus( + container.getContainerId(), + "Killed by RM to simulate an AM container failure"); + LOG.info("Killing container " + container); + completedContainer(container, status, RMContainerEventType.KILL); + } + @Override public synchronized void recoverContainersOnNode( List containerReports, RMNode nm) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 9d0d87979c..c43069bac0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -51,10 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -376,8 +374,6 @@ public void testNMTokensRebindOnAMRestart() throws Exception { @Test(timeout = 100000) public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); @@ -389,12 +385,12 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the next attempt; - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, @@ -414,7 +410,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { // Preempt the second attempt. ContainerId amContainer2 = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2)); + scheduler.killContainer(scheduler.getRMContainer(amContainer2)); rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, @@ -503,8 +499,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception { @Test(timeout = 100000) public void testMaxAttemptOneMeansOne() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); @@ -516,12 +510,12 @@ public void testMaxAttemptOneMeansOne() throws Exception { RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the attempt; - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler, @@ -539,8 +533,6 @@ public void testMaxAttemptOneMeansOne() throws Exception { @Test(timeout = 60000) public void testPreemptedAMRestartOnRMRestart() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); @@ -556,8 +548,8 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { RMApp app1 = rm1.submitApp(200); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); @@ -577,7 +569,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { // Forcibly preempt the am container; amContainer = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry()); @@ -619,8 +611,6 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception { public void testRMRestartOrFailoverNotCountedForAMFailures() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); @@ -631,8 +621,8 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() MockRM rm1 = new MockRM(conf); MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); rm1.start(); - CapacityScheduler scheduler = - (CapacityScheduler) rm1.getResourceScheduler(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm1.getResourceScheduler(); MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); nm1.registerNode(); @@ -694,8 +684,6 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() @Test (timeout = 120000) public void testRMAppAttemptFailuresValidityInterval() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);