diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 8a9a55e9d0..1e2a293315 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -927,6 +927,20 @@ public void transferStateFromAttempt(RMAppAttempt attempt) { this.justFinishedContainers = attempt.getJustFinishedContainersReference(); this.finishedContainersSentToAM = attempt.getFinishedContainersSentToAMReference(); + // container complete msg was moved from justFinishedContainers to + // finishedContainersSentToAM in ApplicationMasterService#allocate, + // if am crashed and not received this response, we should resend + // this msg again after am restart + if (!this.finishedContainersSentToAM.isEmpty()) { + for (NodeId nodeId : this.finishedContainersSentToAM.keySet()) { + List containerStatuses = + this.finishedContainersSentToAM.get(nodeId); + this.justFinishedContainers.putIfAbsent(nodeId, + new ArrayList()); + this.justFinishedContainers.get(nodeId).addAll(containerStatuses); + } + this.finishedContainersSentToAM.clear(); + } } private void recoverAppAttemptCredentials(Credentials appAttemptTokens, @@ -1845,13 +1859,13 @@ private void sendAMContainerToNM(RMAppAttemptImpl appAttempt, } else { LOG.warn("No ContainerStatus in containerFinishedEvent"); } - finishedContainersSentToAM.putIfAbsent(nodeId, - new ArrayList()); - appAttempt.finishedContainersSentToAM.get(nodeId).add( - containerFinishedEvent.getContainerStatus()); if (!appAttempt.getSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { + .getKeepContainersAcrossApplicationAttempts()) { + finishedContainersSentToAM.putIfAbsent(nodeId, + new ArrayList()); + appAttempt.finishedContainersSentToAM.get(nodeId).add( + containerFinishedEvent.getContainerStatus()); appAttempt.sendFinishedContainersToNM(); } else { appAttempt.sendFinishedAMContainerToNM(nodeId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 16f3f60d4b..6cfd86807d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -907,4 +907,111 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { rm1.stop(); rm2.stop(); } + + private boolean isContainerIdInContainerStatus( + List containerStatuses, ContainerId containerId) { + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(containerId)) { + return true; + } + } + return false; + } + + @Test(timeout = 30000) + public void testAMRestartNotLostContainerCompleteMsg() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app1 = + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", -1, + null, "MAPREDUCE", false, true); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService()); + nm1.registerNode(); + + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + allocateContainers(nm1, am1, 1); + + nm1.nodeHeartbeat( + am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // container complete + nm1.nodeHeartbeat( + am1.getApplicationAttemptId(), 2, ContainerState.COMPLETE); + rm1.waitForState(nm1, containerId2, RMContainerState.COMPLETED); + + // make sure allocate() get complete container, + // before this msg pass to AM, AM may crash + while (true) { + AllocateResponse response = am1.allocate( + new ArrayList(), new ArrayList()); + List containerStatuses = + response.getCompletedContainersStatuses(); + if (isContainerIdInContainerStatus( + containerStatuses, containerId2) == false) { + Thread.sleep(100); + continue; + } + + // is containerId still in justFinishedContainer? + containerStatuses = + app1.getCurrentAppAttempt().getJustFinishedContainers(); + if (isContainerIdInContainerStatus(containerStatuses, + containerId2)) { + Assert.fail(); + } + break; + } + + // fail the AM by sending CONTAINER_FINISHED event without registering. + nm1.nodeHeartbeat( + am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + + // wait for app to start a new attempt. + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + // assert this is a new AM. + ApplicationAttemptId newAttemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId())); + + // launch the new AM + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + nm1.nodeHeartbeat(true); + MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId()); + am2.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // whether new AM could get container complete msg + AllocateResponse allocateResponse = am2.allocate( + new ArrayList(), new ArrayList()); + List containerStatuses = + allocateResponse.getCompletedContainersStatuses(); + if (isContainerIdInContainerStatus(containerStatuses, + containerId2) == false) { + Assert.fail(); + } + containerStatuses = attempt2.getJustFinishedContainers(); + if (isContainerIdInContainerStatus(containerStatuses, containerId2)) { + Assert.fail(); + } + + // the second allocate should not get container complete msg + allocateResponse = am2.allocate( + new ArrayList(), new ArrayList()); + containerStatuses = + allocateResponse.getCompletedContainersStatuses(); + if (isContainerIdInContainerStatus(containerStatuses, containerId2)) { + Assert.fail(); + } + + rm1.stop(); + } }