diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 1701e7f0e7..2d6de3750e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -179,6 +179,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private long launchAMEndTime = 0; private long scheduledTime = 0; private long containerAllocatedTime = 0; + private boolean nonWorkPreservingAMContainerFinished = false; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -853,7 +854,7 @@ public List pullJustFinishedContainers() { // A new allocate means the AM received the previously sent // finishedContainers. We can ack this to NM now - sendFinishedContainersToNM(); + sendFinishedContainersToNM(finishedContainersSentToAM); // Mark every containerStatus as being sent to AM though we may return // only the ones that belong to the current attempt @@ -1980,12 +1981,13 @@ private void sendFinishedAMContainerToNM(NodeId nodeId, } // Ack NM to remove finished containers from context. - private void sendFinishedContainersToNM() { - for (NodeId nodeId : finishedContainersSentToAM.keySet()) { + private void sendFinishedContainersToNM( + Map> finishedContainers) { + for (NodeId nodeId : finishedContainers.keySet()) { // Clear and get current values List currentSentContainers = - finishedContainersSentToAM.put(nodeId, new ArrayList<>()); + finishedContainers.put(nodeId, new ArrayList<>()); List containerIdList = new ArrayList<>(currentSentContainers.size()); for (ContainerStatus containerStatus : currentSentContainers) { @@ -1994,7 +1996,7 @@ private void sendFinishedContainersToNM() { eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, containerIdList)); } - this.finishedContainersSentToAM.clear(); + finishedContainers.clear(); } // Add am container to the list so that am container instance will be @@ -2020,7 +2022,16 @@ private static void amContainerFinished(RMAppAttemptImpl appAttempt, appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList<>()); appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus); - appAttempt.sendFinishedContainersToNM(); + appAttempt.sendFinishedContainersToNM( + appAttempt.finishedContainersSentToAM); + // there might be some completed containers that have not been pulled + // by the AM heartbeat, explicitly add them for cleanup. + appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); + + // mark the fact that AM container has finished so that future finished + // containers will be cleaned up without the engagement of AM containers + // (through heartbeat) + appAttempt.nonWorkPreservingAMContainerFinished = true; } else { appAttempt.sendFinishedAMContainerToNM(nodeId, containerStatus.getContainerId()); @@ -2048,6 +2059,11 @@ private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, .getNodeId(), new ArrayList<>()); appAttempt.justFinishedContainers.get(containerFinishedEvent .getNodeId()).add(containerFinishedEvent.getContainerStatus()); + + if (appAttempt.nonWorkPreservingAMContainerFinished) { + // AM container has finished, so no more AM heartbeats to do the cleanup. + appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); + } } private static final class ContainerFinishedAtFinalStateTransition diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index e2f80cafd4..4e5ff3f768 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -643,6 +643,8 @@ private Container allocateApplicationAttempt() { RMContainer rmContainer = mock(RMContainerImpl.class); when(scheduler.getRMContainer(container.getId())). thenReturn(rmContainer); + when(container.getNodeId()).thenReturn( + BuilderUtils.newNodeId("localhost", 0)); applicationAttempt.handle( new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), @@ -1530,6 +1532,119 @@ public void testFinishedContainer() { .handle(Mockito.any(RMNodeEvent.class)); } + /** + * Check a completed container that is not yet pulled by AM heartbeat, + * is ACKed to NM for cleanup when the AM container exits. + */ + @Test + public void testFinishedContainerNotBeingPulledByAMHeartbeat() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + + application.handle(new RMAppRunningOnNodeEvent(application + .getApplicationId(), amContainer.getNodeId())); + + // Complete a non-AM container + ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt + .getAppAttemptId(), 2); + Container container1 = mock(Container.class); + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + when(container1.getId()).thenReturn( + containerId1); + when(containerStatus1.getContainerId()).thenReturn(containerId1); + when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + container1.getNodeId())); + + // Verify justFinishedContainers + ArgumentCaptor captor = + ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class); + Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers() + .size()); + Assert.assertEquals(container1.getId(), applicationAttempt + .getJustFinishedContainers().get(0).getContainerId()); + Assert.assertTrue( + getFinishedContainersSentToAM(applicationAttempt).isEmpty()); + + // finish AM container to emulate AM exit event + containerStatus1 = mock(ContainerStatus.class); + ContainerId amContainerId = amContainer.getId(); + when(containerStatus1.getContainerId()).thenReturn(amContainerId); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + amContainer.getNodeId())); + + Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture()); + List containerPulledEvents = + captor.getAllValues(); + // Verify AM container is acked to NM via the RMNodeEvent immediately + Assert.assertEquals(amContainer.getId(), + containerPulledEvents.get(0).getContainers().get(0)); + // Verify the non-AM container is acked to NM via the RMNodeEvent + Assert.assertEquals(container1.getId(), + containerPulledEvents.get(1).getContainers().get(0)); + Assert.assertTrue("No container shall be added to justFinishedContainers" + + " as soon as AM container exits", + applicationAttempt.getJustFinishedContainers().isEmpty()); + Assert.assertTrue( + getFinishedContainersSentToAM(applicationAttempt).isEmpty()); + } + + /** + * Check a completed container is ACKed to NM for cleanup after the AM + * container has exited. + */ + @Test + public void testFinishedContainerAfterAMExit() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + + // finish AM container to emulate AM exit event + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + ContainerId amContainerId = amContainer.getId(); + when(containerStatus1.getContainerId()).thenReturn(amContainerId); + application.handle(new RMAppRunningOnNodeEvent(application + .getApplicationId(), + amContainer.getNodeId())); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + amContainer.getNodeId())); + + // Verify AM container is acked to NM via the RMNodeEvent immediately + ArgumentCaptor captor = + ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class); + Mockito.verify(rmnodeEventHandler).handle(captor.capture()); + Assert.assertEquals(amContainer.getId(), + captor.getValue().getContainers().get(0)); + + // Complete a non-AM container + ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt + .getAppAttemptId(), 2); + Container container1 = mock(Container.class); + containerStatus1 = mock(ContainerStatus.class); + when(container1.getId()).thenReturn(containerId1); + when(containerStatus1.getContainerId()).thenReturn(containerId1); + when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + container1.getNodeId())); + + // Verify container is acked to NM via the RMNodeEvent immediately + captor = ArgumentCaptor.forClass( + RMNodeFinishedContainersPulledByAMEvent.class); + Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture()); + Assert.assertEquals(container1.getId(), + captor.getAllValues().get(1).getContainers().get(0)); + Assert.assertTrue("No container shall be added to justFinishedContainers" + + " after AM container exited", + applicationAttempt.getJustFinishedContainers().isEmpty()); + Assert.assertTrue( + getFinishedContainersSentToAM(applicationAttempt).isEmpty()); + } + private static List getFinishedContainersSentToAM( RMAppAttempt applicationAttempt) { List containers = new ArrayList();