diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 9e57fa7529..6dfcc84019 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -140,6 +140,13 @@ void containerCompleted(RMContainer rmContainer, Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); + // Remove from the list of containers + if (liveContainers.remove(containerId) == null) { + LOG.info("Additional complete request on completed container " + + rmContainer.getContainerId()); + return; + } + // Remove from the list of newly allocated containers if found newlyAllocatedContainers.remove(rmContainer); @@ -151,8 +158,6 @@ void containerCompleted(RMContainer rmContainer, + " in state: " + rmContainer.getState() + " event:" + event); } - // Remove from the list of containers - liveContainers.remove(rmContainer.getContainerId()); untrackContainerForPreemption(rmContainer); Resource containerResource = rmContainer.getContainer().getResource(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ce32459f3c..da5d3ad601 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -99,6 +100,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -3552,7 +3554,58 @@ public void testUserAndQueueMaxRunningApps() throws Exception { verifyAppRunnable(attId5, false); verifyQueueNumRunnable("queue1", 2, 1); } - + + @Test + public void testMultipleCompletedEvent() throws Exception { + // Set up a fair scheduler + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0.2"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create a node + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(20480, 20), + 0, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + // Launch an app + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource( + attId1, "queue1", "user1", + Resource.newInstance(1024, 1)); + createSchedulingRequestExistingApplication( + 1024, 1, + RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(), attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + scheduler.handle(updateEvent); + + RMContainer container = app1.getLiveContainersMap(). + values().iterator().next(); + scheduler.completedContainer(container, SchedulerUtils + .createAbnormalContainerStatus(container.getContainerId(), + SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + scheduler.completedContainer(container, SchedulerUtils + .createAbnormalContainerStatus(container.getContainerId(), + SchedulerUtils.COMPLETED_APPLICATION), + RMContainerEventType.FINISHED); + assertEquals(Resources.none(), app1.getResourceUsage()); + } + @Test public void testQueueMaxAMShare() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);