diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5c9cafd53f..7e5d0f9eb8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -238,6 +238,9 @@ Release 2.6.0 - UNRELEASED YARN-2001. Added a time threshold for RM to wait before starting container allocations after restart/failover. (Jian He via vinodkv) + YARN-1372. Ensure all completed containers are reported to the AMs across + RM restart. (Anubhav Dhoot via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 38dfa58280..9887acccb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -30,6 +30,7 @@ public interface NodeHeartbeatResponse { NodeAction getNodeAction(); List getContainersToCleanup(); + List getFinishedContainersPulledByAM(); List getApplicationsToCleanup(); @@ -43,6 +44,10 @@ public interface NodeHeartbeatResponse { void setNMTokenMasterKey(MasterKey secretKey); void addAllContainersToCleanup(List containers); + + // This tells NM to remove finished containers only after the AM + // has actually received it in a previous allocate response + void addFinishedContainersPulledByAM(List containers); void addAllApplicationsToCleanup(List applications); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 775f95afe9..e9296f4323 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -46,6 +46,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase containersToCleanup = null; + private List finishedContainersPulledByAM = null; private List applicationsToCleanup = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -73,6 +74,9 @@ private void mergeLocalToBuilder() { if (this.applicationsToCleanup != null) { addApplicationsToCleanupToProto(); } + if (this.finishedContainersPulledByAM != null) { + addFinishedContainersPulledByAMToProto(); + } if (this.containerTokenMasterKey != null) { builder.setContainerTokenMasterKey( convertToProtoFormat(this.containerTokenMasterKey)); @@ -199,6 +203,12 @@ public List getContainersToCleanup() { return this.containersToCleanup; } + @Override + public List getFinishedContainersPulledByAM() { + initFinishedContainersPulledByAM(); + return this.finishedContainersPulledByAM; + } + private void initContainersToCleanup() { if (this.containersToCleanup != null) { return; @@ -212,6 +222,19 @@ private void initContainersToCleanup() { } } + private void initFinishedContainersPulledByAM() { + if (this.finishedContainersPulledByAM != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getFinishedContainersPulledByAmList(); + this.finishedContainersPulledByAM = new ArrayList(); + + for (ContainerIdProto c : list) { + this.finishedContainersPulledByAM.add(convertFromProtoFormat(c)); + } + } + @Override public void addAllContainersToCleanup( final List containersToCleanup) { @@ -221,6 +244,15 @@ public void addAllContainersToCleanup( this.containersToCleanup.addAll(containersToCleanup); } + @Override + public void addFinishedContainersPulledByAM( + final List finishedContainersPulledByAM) { + if (finishedContainersPulledByAM == null) + return; + initFinishedContainersPulledByAM(); + this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM); + } + private void addContainersToCleanupToProto() { maybeInitBuilder(); builder.clearContainersToCleanup(); @@ -256,6 +288,41 @@ public void remove() { builder.addAllContainersToCleanup(iterable); } + private void addFinishedContainersPulledByAMToProto() { + maybeInitBuilder(); + builder.clearFinishedContainersPulledByAm(); + if (finishedContainersPulledByAM == null) + return; + Iterable iterable = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = finishedContainersPulledByAM.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllFinishedContainersPulledByAm(iterable); + } + @Override public List getApplicationsToCleanup() { initApplicationsToCleanup(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 29cd64e782..600f54d647 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -58,6 +58,7 @@ message NodeHeartbeatResponseProto { repeated ApplicationIdProto applications_to_cleanup = 6; optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; + repeated ContainerIdProto finished_containers_pulled_by_am = 9; } message NMContainerStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a479be29f7..43770c188c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -311,7 +311,7 @@ public void run() { public static class NMContext implements Context { private NodeId nodeId = null; - private final ConcurrentMap applications = + protected final ConcurrentMap applications = new ConcurrentHashMap(); protected final ConcurrentMap containers = new ConcurrentSkipListMap(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b52b0fbf6e..b9feacb6e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -104,11 +104,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // Duration for which to track recently stopped container. private long durationToTrackStoppedContainers; - // This is used to track the current completed containers when nodeheartBeat - // is called. These completed containers will be removed from NM context after - // nodeHeartBeat succeeds and the response from the nodeHeartBeat is - // processed. - private final Set previousCompletedContainers; private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -125,7 +120,6 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, this.metrics = metrics; this.recentlyStoppedContainers = new LinkedHashMap(); - this.previousCompletedContainers = new HashSet(); } @Override @@ -331,7 +325,7 @@ private List createKeepAliveApplicationList() { return appList; } - private NodeStatus getNodeStatus(int responseId) { + private NodeStatus getNodeStatus(int responseId) throws IOException { NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); @@ -352,11 +346,18 @@ private NodeStatus getNodeStatus(int responseId) { // Iterate through the NMContext and clone and get all the containers' // statuses. If it's a completed container, add into the - // recentlyStoppedContainers and previousCompletedContainers collections. + // recentlyStoppedContainers collections. @VisibleForTesting - protected List getContainerStatuses() { + protected List getContainerStatuses() throws IOException { List containerStatuses = new ArrayList(); for (Container container : this.context.getContainers().values()) { + ContainerId containerId = container.getContainerId(); + ApplicationId applicationId = container.getContainerId() + .getApplicationAttemptId().getApplicationId(); + if (!this.context.getApplications().containsKey(applicationId)) { + context.getContainers().remove(containerId); + continue; + } org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); containerStatuses.add(containerStatus); @@ -381,10 +382,17 @@ private List getRunningApplications() { } // These NMContainerStatus are sent on NM registration and used by YARN only. - private List getNMContainerStatuses() { + private List getNMContainerStatuses() throws IOException { List containerStatuses = new ArrayList(); for (Container container : this.context.getContainers().values()) { + ContainerId containerId = container.getContainerId(); + ApplicationId applicationId = container.getContainerId() + .getApplicationAttemptId().getApplicationId(); + if (!this.context.getApplications().containsKey(applicationId)) { + context.getContainers().remove(containerId); + continue; + } NMContainerStatus status = container.getNMContainerStatus(); containerStatuses.add(status); @@ -402,26 +410,30 @@ private List getNMContainerStatuses() { @Override public void addCompletedContainer(ContainerId containerId) { - synchronized (previousCompletedContainers) { - previousCompletedContainers.add(containerId); - } synchronized (recentlyStoppedContainers) { removeVeryOldStoppedContainersFromCache(); - recentlyStoppedContainers.put(containerId, - System.currentTimeMillis() + durationToTrackStoppedContainers); + if (!recentlyStoppedContainers.containsKey(containerId)) { + recentlyStoppedContainers.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); + } } } - private void removeCompletedContainersFromContext() { - synchronized (previousCompletedContainers) { - if (!previousCompletedContainers.isEmpty()) { - for (ContainerId containerId : previousCompletedContainers) { - this.context.getContainers().remove(containerId); - } - LOG.info("Removed completed containers from NM context: " - + previousCompletedContainers); - previousCompletedContainers.clear(); - } + @VisibleForTesting + @Private + public void removeCompletedContainersFromContext( + ListcontainerIds) throws IOException { + Set removedContainers = new HashSet(); + + // If the AM has pulled the completedContainer it can be removed + for (ContainerId containerId : containerIds) { + context.getContainers().remove(containerId); + removedContainers.add(containerId); + } + + if (!removedContainers.isEmpty()) { + LOG.info("Removed completed containers from NM context: " + + removedContainers); } } @@ -454,7 +466,7 @@ public boolean isContainerRecentlyStopped(ContainerId containerId) { return recentlyStoppedContainers.containsKey(containerId); } } - + @Override public void clearFinishedContainersFromCache() { synchronized (recentlyStoppedContainers) { @@ -472,11 +484,13 @@ public void removeVeryOldStoppedContainersFromCache() { while (i.hasNext()) { ContainerId cid = i.next(); if (recentlyStoppedContainers.get(cid) < currentTime) { - i.remove(); - try { - context.getNMStateStore().removeContainer(cid); - } catch (IOException e) { - LOG.error("Unable to remove container " + cid + " in store", e); + if (!context.getContainers().containsKey(cid)) { + i.remove(); + try { + context.getNMStateStore().removeContainer(cid); + } catch (IOException e) { + LOG.error("Unable to remove container " + cid + " in store", e); + } } } else { break; @@ -542,7 +556,9 @@ public void run() { // don't want to remove the completed containers before resync // because these completed containers will be reported back to RM // when NM re-registers with RM. - removeCompletedContainersFromContext(); + // Only remove the cleanedup containers that are acked + removeCompletedContainersFromContext(response + .getFinishedContainersPulledByAM()); lastHeartBeatID = response.getResponseId(); List containersToCleanup = response diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index acda2a9970..85bafb3dee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -247,6 +248,10 @@ public RegisterNodeManagerResponse registerNodeManager( // put the completed container into the context getNMContext().getContainers().put( testCompleteContainer.getContainerId(), container); + getNMContext().getApplications().put( + testCompleteContainer.getContainerId() + .getApplicationAttemptId().getApplicationId(), + mock(Application.class)); } else { // second register contains the completed container info. List statuses = @@ -382,9 +387,17 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { if (containersShouldBePreserved) { Assert.assertFalse(containers.isEmpty()); Assert.assertTrue(containers.containsKey(existingCid)); + Assert.assertEquals(ContainerState.RUNNING, + containers.get(existingCid) + .cloneAndGetContainerStatus().getState()); } else { - // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); + // ensure that containers are empty or are completed before + // restart nodeStatusUpdater + if (!containers.isEmpty()) { + Assert.assertEquals(ContainerState.COMPLETE, + containers.get(existingCid) + .cloneAndGetContainerStatus().getState()); + } } super.rebootNodeStatusUpdaterAndRegisterWithRM(); } @@ -465,7 +478,12 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { try { // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); + if (!containers.isEmpty()) { + for (Container container: containers.values()) { + Assert.assertEquals(ContainerState.COMPLETE, + container.cloneAndGetContainerStatus().getState()); + } + } super.rebootNodeStatusUpdaterAndRegisterWithRM(); // After this point new containers are free to be launched, except // containers from previous RM diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index f2a3a4a8c0..8fb51a3105 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -180,7 +181,7 @@ private Map> getAppToContainerStatusMap( Map> map = new HashMap>(); for (ContainerStatus cs : containers) { - ApplicationId applicationId = + ApplicationId applicationId = cs.getContainerId().getApplicationAttemptId().getApplicationId(); List appContainers = map.get(applicationId); if (appContainers == null) { @@ -205,10 +206,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeStatus.setResponseId(heartBeatID++); Map> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); - + ApplicationId appId1 = ApplicationId.newInstance(0, 1); ApplicationId appId2 = ApplicationId.newInstance(0, 2); - + if (heartBeatID == 1) { Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); @@ -419,7 +420,7 @@ protected void stopRMProxy() { } private class MyNodeManager extends NodeManager { - + private MyNodeStatusUpdater3 nodeStatusUpdater; @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -433,7 +434,7 @@ public MyNodeStatusUpdater3 getNodeStatusUpdater() { return this.nodeStatusUpdater; } } - + private class MyNodeManager2 extends NodeManager { public boolean isStopped = false; private NodeStatusUpdater nodeStatusUpdater; @@ -467,7 +468,7 @@ protected void serviceStop() throws Exception { syncBarrier.await(10000, TimeUnit.MILLISECONDS); } } - // + // private class MyResourceTracker2 implements ResourceTracker { public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -478,7 +479,7 @@ private class MyResourceTracker2 implements ResourceTracker { public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - + RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); response.setNodeAction(registerNodeAction ); @@ -493,7 +494,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { NodeStatus nodeStatus = request.getNodeStatus(); nodeStatus.setResponseId(heartBeatID++); - + NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); @@ -501,7 +502,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nhResponse; } } - + private class MyResourceTracker3 implements ResourceTracker { public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -513,7 +514,7 @@ private class MyResourceTracker3 implements ResourceTracker { MyResourceTracker3(Context context) { this.context = context; } - + @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, @@ -564,6 +565,14 @@ private class MyResourceTracker4 implements ResourceTracker { public NodeAction registerNodeAction = NodeAction.NORMAL; public NodeAction heartBeatNodeAction = NodeAction.NORMAL; private Context context; + private final ContainerStatus containerStatus2 = + createContainerStatus(2, ContainerState.RUNNING); + private final ContainerStatus containerStatus3 = + createContainerStatus(3, ContainerState.COMPLETE); + private final ContainerStatus containerStatus4 = + createContainerStatus(4, ContainerState.RUNNING); + private final ContainerStatus containerStatus5 = + createContainerStatus(5, ContainerState.COMPLETE); public MyResourceTracker4(Context context) { this.context = context; @@ -583,6 +592,8 @@ public RegisterNodeManagerResponse registerNodeManager( @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { + List finishedContainersPulledByAM = new ArrayList + (); try { if (heartBeatID == 0) { Assert.assertEquals(request.getNodeStatus().getContainersStatuses() @@ -594,10 +605,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) Assert.assertEquals(statuses.size(), 2); Assert.assertEquals(context.getContainers().size(), 2); - ContainerStatus containerStatus2 = - createContainerStatus(2, ContainerState.RUNNING); - ContainerStatus containerStatus3 = - createContainerStatus(3, ContainerState.COMPLETE); boolean container2Exist = false, container3Exist = false; for (ContainerStatus status : statuses) { if (status.getContainerId().equals( @@ -619,23 +626,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the // test passes. throw new YarnRuntimeException("Lost the heartbeat response"); - } else if (heartBeatID == 2) { + } else if (heartBeatID == 2 || heartBeatID == 3) { List statuses = request.getNodeStatus().getContainersStatuses(); Assert.assertEquals(statuses.size(), 4); Assert.assertEquals(context.getContainers().size(), 4); - ContainerStatus containerStatus2 = - createContainerStatus(2, ContainerState.RUNNING); - ContainerStatus containerStatus3 = - createContainerStatus(3, ContainerState.COMPLETE); - ContainerStatus containerStatus4 = - createContainerStatus(4, ContainerState.RUNNING); - ContainerStatus containerStatus5 = - createContainerStatus(5, ContainerState.COMPLETE); - - boolean container2Exist = false, container3Exist = false, container4Exist = - false, container5Exist = false; + boolean container2Exist = false, container3Exist = false, + container4Exist = false, container5Exist = false; for (ContainerStatus status : statuses) { if (status.getContainerId().equals( containerStatus2.getContainerId())) { @@ -664,6 +662,24 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } Assert.assertTrue(container2Exist && container3Exist && container4Exist && container5Exist); + + if (heartBeatID == 3) { + finishedContainersPulledByAM.add(containerStatus3.getContainerId()); + } + } else if (heartBeatID == 4) { + List statuses = + request.getNodeStatus().getContainersStatuses(); + Assert.assertEquals(statuses.size(), 3); + Assert.assertEquals(context.getContainers().size(), 3); + + boolean container3Exist = false; + for (ContainerStatus status : statuses) { + if (status.getContainerId().equals( + containerStatus3.getContainerId())) { + container3Exist = true; + } + } + Assert.assertFalse(container3Exist); } } catch (AssertionError error) { error.printStackTrace(); @@ -676,6 +692,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, null, null, null, 1000L); + nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM); return nhResponse; } } @@ -686,7 +703,7 @@ private class MyResourceTracker5 implements ResourceTracker { public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - + RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); response.setNodeAction(registerNodeAction ); @@ -694,7 +711,7 @@ public RegisterNodeManagerResponse registerNodeManager( response.setNMTokenMasterKey(createMasterKey()); return response; } - + @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { @@ -767,11 +784,11 @@ public void deleteBaseDir() throws IOException { lfs.delete(new Path(basedir.getPath()), true); } - @Test(timeout = 90000) - public void testRecentlyFinishedContainers() throws Exception { - NodeManager nm = new NodeManager(); - YarnConfiguration conf = new YarnConfiguration(); - conf.set( + @Test(timeout = 90000) + public void testRecentlyFinishedContainers() throws Exception { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set( NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, "10000"); nm.init(conf); @@ -780,27 +797,112 @@ public void testRecentlyFinishedContainers() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 0); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); - ContainerId cId = ContainerId.newInstance(appAttemptId, 0); - - + ContainerId cId = ContainerId.newInstance(appAttemptId, 0); + nm.getNMContext().getApplications().putIfAbsent(appId, + mock(Application.class)); + nm.getNMContext().getContainers().putIfAbsent(cId, mock(Container.class)); + nodeStatusUpdater.addCompletedContainer(cId); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); - + + nm.getNMContext().getContainers().remove(cId); long time1 = System.currentTimeMillis(); int waitInterval = 15; while (waitInterval-- > 0 && nodeStatusUpdater.isContainerRecentlyStopped(cId)) { - nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); + nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); Thread.sleep(1000); } - long time2 = System.currentTimeMillis(); + long time2 = System.currentTimeMillis(); // By this time the container will be removed from cache. need to verify. - Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); - Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000); - } - + Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); + Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000); + } + @Test(timeout = 90000) + public void testRemovePreviousCompletedContainersFromContext() throws Exception { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set( + NodeStatusUpdaterImpl + .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + "10000"); + nm.init(conf); + NodeStatusUpdaterImpl nodeStatusUpdater = + (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + ContainerId cId = ContainerId.newInstance(appAttemptId, 1); + Token containerToken = + BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container anyCompletedContainer = new ContainerImpl(conf, null, + null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(containerToken)) { + + @Override + public ContainerState getCurrentState() { + return ContainerState.COMPLETE; + } + }; + + nm.getNMContext().getApplications().putIfAbsent(appId, + mock(Application.class)); + nm.getNMContext().getContainers().put(cId, anyCompletedContainer); + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); + + List ackedContainers = new ArrayList(); + ackedContainers.add(cId); + + nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers); + Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty()); + } + + @Test + public void testCleanedupApplicationContainerCleanup() throws IOException { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set(NodeStatusUpdaterImpl + .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + "1000000"); + nm.init(conf); + + NodeStatusUpdaterImpl nodeStatusUpdater = + (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + + ContainerId cId = ContainerId.newInstance(appAttemptId, 1); + Token containerToken = + BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", + BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container anyCompletedContainer = new ContainerImpl(conf, null, + null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(containerToken)) { + + @Override + public ContainerState getCurrentState() { + return ContainerState.COMPLETE; + } + }; + + nm.getNMContext().getApplications().putIfAbsent(appId, + mock(Application.class)); + nm.getNMContext().getContainers().put(cId, anyCompletedContainer); + + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); + + nm.getNMContext().getApplications().remove(appId); + nodeStatusUpdater.removeCompletedContainersFromContext(new ArrayList + ()); + Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size()); + } + @Test public void testNMRegistration() throws InterruptedException { nm = new NodeManager() { @@ -860,7 +962,7 @@ public void run() { nm.stop(); } - + @Test public void testStopReentrant() throws Exception { final AtomicInteger numCleanups = new AtomicInteger(0); @@ -875,7 +977,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, myNodeStatusUpdater.resourceTracker = myResourceTracker2; return myNodeStatusUpdater; } - + @Override protected ContainerManagerImpl createContainerManager(Context context, ContainerExecutor exec, DeletionService del, @@ -897,7 +999,7 @@ public void cleanUpApplicationsOnNMShutDown() { YarnConfiguration conf = createNMConfig(); nm.init(conf); nm.start(); - + int waitCount = 0; while (heartBeatID < 1 && waitCount++ != 200) { Thread.sleep(500); @@ -906,7 +1008,7 @@ public void cleanUpApplicationsOnNMShutDown() { // Meanwhile call stop directly as the shutdown hook would nm.stop(); - + // NM takes a while to reach the STOPPED state. waitCount = 0; while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { @@ -1172,9 +1274,13 @@ protected NMContext createNMContext( nm.start(); int waitCount = 0; - while (heartBeatID <= 3 && waitCount++ != 20) { + while (heartBeatID <= 4 && waitCount++ != 20) { Thread.sleep(500); } + if (heartBeatID <= 4) { + Assert.fail("Failed to get all heartbeats in time, " + + "heartbeatID:" + heartBeatID); + } if(assertionFailedInThread.get()) { Assert.fail("ContainerStatus Backup failed"); } @@ -1182,7 +1288,7 @@ protected NMContext createNMContext( } @Test(timeout = 200000) - public void testNodeStatusUpdaterRetryAndNMShutdown() + public void testNodeStatusUpdaterRetryAndNMShutdown() throws Exception { final long connectionWaitSecs = 1000; final long connectionRetryIntervalMs = 1000; @@ -1190,7 +1296,7 @@ public void testNodeStatusUpdaterRetryAndNMShutdown() conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, connectionWaitSecs); conf.setLong(YarnConfiguration - .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, connectionRetryIntervalMs); conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); @@ -1281,30 +1387,36 @@ public ConcurrentMap getContainers() { } else if (heartBeatID == 1) { ContainerStatus containerStatus2 = createContainerStatus(2, ContainerState.RUNNING); - Container container2 = getMockContainer(containerStatus2); - containers.put(containerStatus2.getContainerId(), container2); + putMockContainer(containerStatus2); ContainerStatus containerStatus3 = createContainerStatus(3, ContainerState.COMPLETE); - Container container3 = getMockContainer(containerStatus3); - containers.put(containerStatus3.getContainerId(), container3); + putMockContainer(containerStatus3); return containers; } else if (heartBeatID == 2) { ContainerStatus containerStatus4 = createContainerStatus(4, ContainerState.RUNNING); - Container container4 = getMockContainer(containerStatus4); - containers.put(containerStatus4.getContainerId(), container4); + putMockContainer(containerStatus4); ContainerStatus containerStatus5 = createContainerStatus(5, ContainerState.COMPLETE); - Container container5 = getMockContainer(containerStatus5); - containers.put(containerStatus5.getContainerId(), container5); + putMockContainer(containerStatus5); + return containers; + } else if (heartBeatID == 3 || heartBeatID == 4) { return containers; } else { containers.clear(); return containers; } } + + private void putMockContainer(ContainerStatus containerStatus) { + Container container = getMockContainer(containerStatus); + containers.put(containerStatus.getContainerId(), container); + applications.putIfAbsent(containerStatus.getContainerId() + .getApplicationAttemptId().getApplicationId(), + mock(Application.class)); + } } public static ContainerStatus createContainerStatus(int id, @@ -1345,7 +1457,7 @@ private void verifyNodeStartFailure(String errMessage) throws Exception { throw e; } } - + // the service should be stopped Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm .getServiceState()); @@ -1364,7 +1476,7 @@ private YarnConfiguration createNMConfig() { } conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345"); - conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346"); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346"); conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath()); @@ -1372,7 +1484,7 @@ private YarnConfiguration createNMConfig() { conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); return conf; } - + private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { return new NodeManager() { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 4798120c0d..4222888272 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -198,7 +198,7 @@ protected void serviceStop() throws Exception { */ @SuppressWarnings("unchecked") @VisibleForTesting - void handleNMContainerStatus(NMContainerStatus containerStatus) { + void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) { ApplicationAttemptId appAttemptId = containerStatus.getContainerId().getApplicationAttemptId(); RMApp rmApp = @@ -229,7 +229,8 @@ void handleNMContainerStatus(NMContainerStatus containerStatus) { containerStatus.getContainerExitStatus()); // sending master container finished event. RMAppAttemptContainerFinishedEvent evt = - new RMAppAttemptContainerFinishedEvent(appAttemptId, status); + new RMAppAttemptContainerFinishedEvent(appAttemptId, status, + nodeId); rmContext.getDispatcher().getEventHandler().handle(evt); } } @@ -324,7 +325,7 @@ public RegisterNodeManagerResponse registerNodeManager( LOG.info("received container statuses on node manager register :" + request.getNMContainerStatuses()); for (NMContainerStatus status : request.getNMContainerStatuses()) { - handleNMContainerStatus(status); + handleNMContainerStatus(status, nodeId); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index ff520be127..0b8f321222 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1181,7 +1181,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { int numberOfFailure = app.getNumFailedAppAttempts(); if (!app.submissionContext.getUnmanagedAM() && numberOfFailure < app.maxAppAttempts) { - boolean transferStateFromPreviousAttempt = false; + boolean transferStateFromPreviousAttempt; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = failedEvent.getTransferStateFromPreviousAttempt(); @@ -1191,11 +1191,11 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // Transfer the state from the previous attempt to the current attempt. // Note that the previous failed attempt may still be collecting the // container events from the scheduler and update its data structures - // before the new attempt is created. - if (transferStateFromPreviousAttempt) { - ((RMAppAttemptImpl) app.currentAttempt) - .transferStateFromPreviousAttempt(oldAttempt); - } + // before the new attempt is created. We always transferState for + // finished containers so that they can be acked to NM, + // but when pulling finished container we will check this flag again. + ((RMAppAttemptImpl) app.currentAttempt) + .transferStateFromPreviousAttempt(oldAttempt); return initialState; } else { if (numberOfFailure >= app.maxAppAttempts) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 943a5e53d4..cf8c2bbc86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import java.util.List; +import java.util.concurrent.ConcurrentMap; import javax.crypto.SecretKey; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -120,13 +122,28 @@ public interface RMAppAttempt extends EventHandler { List pullJustFinishedContainers(); /** - * Return the list of last set of finished containers. This does not reset the - * finished containers. - * @return the list of just finished contianers, this does not reset the + * Returns a reference to the map of last set of finished containers to the + * corresponding node. This does not reset the finished containers. + * @return the list of just finished containers, this does not reset the * finished containers. */ + ConcurrentMap> + getJustFinishedContainersReference(); + + /** + * Return the list of last set of finished containers. This does not reset + * the finished containers. + * @return the list of just finished containers + */ List getJustFinishedContainers(); + /** + * The map of conatiners per Node that are already sent to the AM. + * @return map of per node list of finished container status sent to AM + */ + ConcurrentMap> + getFinishedContainersSentToAMReference(); + /** * The container on which the Application Master is running. * @return the {@link Container} on which the application master is running. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 7ca57ee018..d75a871695 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -24,9 +24,12 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -52,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; @@ -83,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -129,9 +134,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private final ApplicationSubmissionContext submissionContext; private Token amrmToken = null; private SecretKey clientTokenMasterKey = null; - - private List justFinishedContainers = - new ArrayList(); + + private ConcurrentMap> + justFinishedContainers = + new ConcurrentHashMap>(); + // Tracks the previous finished containers that are waiting to be + // verified as received by the AM. If the AM sends the next allocate + // request it implicitly acks this list. + private ConcurrentMap> + finishedContainersSentToAM = + new ConcurrentHashMap>(); private Container masterContainer; private float progress = 0; @@ -627,9 +639,27 @@ public float getProgress() { } } + @VisibleForTesting @Override public List getJustFinishedContainers() { this.readLock.lock(); + try { + List returnList = new ArrayList(); + for (Collection containerStatusList : + justFinishedContainers.values()) { + returnList.addAll(containerStatusList); + } + return returnList; + } finally { + this.readLock.unlock(); + } + } + + @Override + public ConcurrentMap> + getJustFinishedContainersReference + () { + this.readLock.lock(); try { return this.justFinishedContainers; } finally { @@ -637,15 +667,68 @@ public List getJustFinishedContainers() { } } + @Override + public ConcurrentMap> + getFinishedContainersSentToAMReference() { + this.readLock.lock(); + try { + return this.finishedContainersSentToAM; + } finally { + this.readLock.unlock(); + } + } + @Override public List pullJustFinishedContainers() { this.writeLock.lock(); try { - List returnList = new ArrayList( - this.justFinishedContainers.size()); - returnList.addAll(this.justFinishedContainers); - this.justFinishedContainers.clear(); + List returnList = new ArrayList(); + + // A new allocate means the AM received the previously sent + // finishedContainers. We can ack this to NM now + for (NodeId nodeId:finishedContainersSentToAM.keySet()) { + + // Clear and get current values + List currentSentContainers = + finishedContainersSentToAM + .put(nodeId, new ArrayList()); + List containerIdList = new ArrayList + (currentSentContainers.size()); + for (ContainerStatus containerStatus:currentSentContainers) { + containerIdList.add(containerStatus.getContainerId()); + } + eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent( + nodeId, containerIdList)); + } + + // Mark every containerStatus as being sent to AM though we may return + // only the ones that belong to the current attempt + boolean keepContainersAcressAttempts = this.submissionContext + .getKeepContainersAcrossApplicationAttempts(); + for (NodeId nodeId:justFinishedContainers.keySet()) { + + // Clear and get current values + List finishedContainers = justFinishedContainers.put + (nodeId, new ArrayList()); + + if (keepContainersAcressAttempts) { + returnList.addAll(finishedContainers); + } else { + // Filter out containers from previous attempt + for (ContainerStatus containerStatus: finishedContainers) { + if (containerStatus.getContainerId().getApplicationAttemptId() + .equals(this.getAppAttemptId())) { + returnList.add(containerStatus); + } + } + } + + finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList + ()); + finishedContainersSentToAM.get(nodeId).addAll(finishedContainers); + } + return returnList; } finally { this.writeLock.unlock(); @@ -732,7 +815,7 @@ public void recover(RMState state) throws Exception { } setMasterContainer(attemptState.getMasterContainer()); recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(), - attemptState.getState()); + attemptState.getState()); this.recoveredFinalState = attemptState.getState(); this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); @@ -744,7 +827,9 @@ public void recover(RMState state) throws Exception { } public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { - this.justFinishedContainers = attempt.getJustFinishedContainers(); + this.justFinishedContainers = attempt.getJustFinishedContainersReference(); + this.finishedContainersSentToAM = + attempt.getFinishedContainersSentToAMReference(); } private void recoverAppAttemptCredentials(Credentials appAttemptTokens, @@ -1507,6 +1592,9 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); + // Add all finished containers so that they can be acked to NM + addJustFinishedContainer(appAttempt, containerFinishedEvent); + // Is this container the AmContainer? If the finished container is same as // the AMContainer, AppAttempt fails if (appAttempt.masterContainer != null @@ -1519,12 +1607,18 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return RMAppAttemptState.FINAL_SAVING; } - // Normal container.Put it in completed containers list - appAttempt.justFinishedContainers.add(containerStatus); return this.currentState; } } + private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, + RMAppAttemptContainerFinishedEvent containerFinishedEvent) { + appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent + .getNodeId(), new ArrayList()); + appAttempt.justFinishedContainers.get(containerFinishedEvent + .getNodeId()).add(containerFinishedEvent.getContainerStatus()); + } + private static final class ContainerFinishedAtFinalStateTransition extends BaseTransition { @Override @@ -1533,10 +1627,8 @@ private static final class ContainerFinishedAtFinalStateTransition RMAppAttemptContainerFinishedEvent containerFinishedEvent = (RMAppAttemptContainerFinishedEvent) event; - ContainerStatus containerStatus = - containerFinishedEvent.getContainerStatus(); // Normal container. Add it in completed containers list - appAttempt.justFinishedContainers.add(containerStatus); + addJustFinishedContainer(appAttempt, containerFinishedEvent); } } @@ -1569,6 +1661,9 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); + // Add all finished containers so that they can be acked to NM. + addJustFinishedContainer(appAttempt, containerFinishedEvent); + // Is this container the ApplicationMaster container? if (appAttempt.masterContainer.getId().equals( containerStatus.getContainerId())) { @@ -1576,8 +1671,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, appAttempt, containerFinishedEvent); return RMAppAttemptState.FINISHED; } - // Normal container. - appAttempt.justFinishedContainers.add(containerStatus); + return RMAppAttemptState.FINISHING; } } @@ -1592,6 +1686,9 @@ private static class ContainerFinishedAtFinalSavingTransition extends ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); + // Add all finished containers so that they can be acked to NM. + addJustFinishedContainer(appAttempt, containerFinishedEvent); + // If this is the AM container, it means the AM container is finished, // but we are not yet acknowledged that the final state has been saved. // Thus, we still return FINAL_SAVING state here. @@ -1611,8 +1708,6 @@ private static class ContainerFinishedAtFinalSavingTransition extends appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED); return; } - // Normal container. - appAttempt.justFinishedContainers.add(containerStatus); } } @@ -1629,7 +1724,7 @@ public AMFinishedAfterFinalSavingTransition( transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent); new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt, - event); + event); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java index 3660597d3f..39c6f29063 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java @@ -20,21 +20,27 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent { private final ContainerStatus containerStatus; + private final NodeId nodeId; public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId, - ContainerStatus containerStatus) { + ContainerStatus containerStatus, NodeId nodeId) { super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED); this.containerStatus = containerStatus; + this.nodeId = nodeId; } public ContainerStatus getContainerStatus() { return this.containerStatus; } + public NodeId getNodeId() { + return this.nodeId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 885e864124..479734a653 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -78,13 +78,13 @@ RMContainerEventType.RESERVED, new ContainerReservedTransition()) RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) // Transitions from RESERVED state - .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, + .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) - .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, + .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, RMContainerEventType.START, new ContainerStartedTransition()) - .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, + .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, RMContainerEventType.KILL) // nothing to do - .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, + .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, RMContainerEventType.RELEASED) // nothing to do @@ -100,7 +100,7 @@ RMContainerEventType.KILL, new FinishedTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, RMContainerEventType.LAUNCHED, new LaunchedTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, - RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) + RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED, @@ -495,7 +495,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { updateAttemptMetrics(container); container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( - container.appAttemptId, finishedEvent.getRemoteContainerStatus())); + container.appAttemptId, finishedEvent.getRemoteContainerStatus(), + container.getAllocatedNode())); container.rmContext.getRMApplicationHistoryWriter().containerFinished( container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index c0096b9b90..b4d0b8bce2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -40,6 +40,9 @@ public enum RMNodeEventType { CONTAINER_ALLOCATED, CLEANUP_CONTAINER, + // Source: RMAppAttempt + FINISHED_CONTAINERS_PULLED_BY_AM, + // Source: NMLivelinessMonitor EXPIRE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java new file mode 100644 index 0000000000..a4fb70764c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; + +import java.util.List; + +// Happens after an implicit ack from AM that the container completion has +// been notified successfully to the AM +public class RMNodeFinishedContainersPulledByAMEvent extends RMNodeEvent { + + private List containers; + + public RMNodeFinishedContainersPulledByAMEvent(NodeId nodeId, + List containers) { + super(nodeId, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM); + this.containers = containers; + } + + public List getContainers() { + return this.containers; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 12659587e5..f0ae826ee5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -112,6 +112,10 @@ public class RMNodeImpl implements RMNode, EventHandler { private final Set containersToClean = new TreeSet( new ContainerIdComparator()); + /* set of containers that were notified to AM about their completion */ + private final Set finishedContainersPulledByAM = + new HashSet(); + /* the list of applications that have finished and need to be purged */ private final List finishedApplications = new ArrayList(); @@ -135,7 +139,7 @@ RMNodeEventType.STARTED, new AddNodeTransition()) new UpdateNodeResourceWhenUnusableTransition()) //Transitions from RUNNING state - .addTransition(NodeState.RUNNING, + .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, @@ -151,6 +155,9 @@ RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new FinishedContainersPulledByAMTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, @@ -158,23 +165,30 @@ RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) //Transitions from REBOOTED state .addTransition(NodeState.REBOOTED, NodeState.REBOOTED, - RMNodeEventType.RESOURCE_UPDATE, + RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) //Transitions from DECOMMISSIONED state .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, - RMNodeEventType.RESOURCE_UPDATE, + RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) - + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new FinishedContainersPulledByAMTransition()) + //Transitions from LOST state .addTransition(NodeState.LOST, NodeState.LOST, - RMNodeEventType.RESOURCE_UPDATE, + RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) + .addTransition(NodeState.LOST, NodeState.LOST, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new FinishedContainersPulledByAMTransition()) //Transitions from UNHEALTHY state - .addTransition(NodeState.UNHEALTHY, + .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING), - RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition()) + RMNodeEventType.STATUS_UPDATE, + new StatusUpdateWhenUnHealthyTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, RMNodeEventType.DECOMMISSION, new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) @@ -192,7 +206,10 @@ RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) - + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new FinishedContainersPulledByAMTransition()) + // create the topology tables .installTopology(); @@ -365,8 +382,11 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response response.addAllContainersToCleanup( new ArrayList(this.containersToClean)); response.addAllApplicationsToCleanup(this.finishedApplications); + response.addFinishedContainersPulledByAM( + new ArrayList(this.finishedContainersPulledByAM)); this.containersToClean.clear(); this.finishedApplications.clear(); + this.finishedContainersPulledByAM.clear(); } finally { this.writeLock.unlock(); } @@ -652,6 +672,16 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + public static class FinishedContainersPulledByAMTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + rmNode.finishedContainersPulledByAM.addAll((( + RMNodeFinishedContainersPulledByAMEvent) event).getContainers()); + } + } + public static class DeactivateNodeTransition implements SingleArcTransition { @@ -726,7 +756,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { new ArrayList(); for (ContainerStatus remoteContainer : statusEvent.getContainers()) { ContainerId containerId = remoteContainer.getContainerId(); - + // Don't bother with containers already scheduled for cleanup, or for // applications already killed. The scheduler doens't need to know any // more about this container diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 115f0b40a3..877a12215e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -491,7 +491,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); verify(handler, never()).handle((Event) any()); // Case 1.2: Master container is null @@ -502,7 +502,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); verify(handler, never()).handle((Event)any()); // Case 2: Managed AM @@ -515,7 +515,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); } catch (Exception e) { // expected - ignore } @@ -530,7 +530,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); } catch (Exception e) { // expected - ignore } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index fcb4e450b0..ba592fc976 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -98,6 +98,9 @@ public void testAMRestartWithExistingContainers() throws Exception { Thread.sleep(200); } + ContainerId amContainerId = ContainerId.newInstance(am1 + .getApplicationAttemptId(), 1); + // launch the 2nd container, for testing running container transferred. nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); ContainerId containerId2 = @@ -196,11 +199,15 @@ public void testAMRestartWithExistingContainers() throws Exception { // completed containerId4 is also transferred to the new attempt. RMAppAttempt newAttempt = app1.getRMAppAttempt(am2.getApplicationAttemptId()); - // 4 containers finished, acquired/allocated/reserved/completed. - waitForContainersToFinish(4, newAttempt); + // 4 containers finished, acquired/allocated/reserved/completed + AM + // container. + waitForContainersToFinish(5, newAttempt); boolean container3Exists = false, container4Exists = false, container5Exists = - false, container6Exists = false; + false, container6Exists = false, amContainerExists = false; for(ContainerStatus status : newAttempt.getJustFinishedContainers()) { + if(status.getContainerId().equals(amContainerId)) { + amContainerExists = true; + } if(status.getContainerId().equals(containerId3)) { // containerId3 is the container ran by previous attempt but finished by the // new attempt. @@ -220,8 +227,11 @@ public void testAMRestartWithExistingContainers() throws Exception { container6Exists = true; } } - Assert.assertTrue(container3Exists && container4Exists && container5Exists - && container6Exists); + Assert.assertTrue(amContainerExists); + Assert.assertTrue(container3Exists); + Assert.assertTrue(container4Exists); + Assert.assertTrue(container5Exists); + Assert.assertTrue(container6Exists); // New SchedulerApplicationAttempt also has the containers info. rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); @@ -240,14 +250,14 @@ public void testAMRestartWithExistingContainers() throws Exception { // all 4 normal containers finished. System.out.println("New attempt's just finished containers: " + newAttempt.getJustFinishedContainers()); - waitForContainersToFinish(5, newAttempt); + waitForContainersToFinish(6, newAttempt); rm1.stop(); } private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) throws InterruptedException { int count = 0; - while (attempt.getJustFinishedContainers().size() != expectedNum + while (attempt.getJustFinishedContainers().size() < expectedNum && count < 500) { Thread.sleep(100); count++; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index b8e6f434e7..15028f9b00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -28,6 +28,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -35,6 +36,7 @@ import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -91,6 +93,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -151,6 +158,7 @@ public class TestRMAppAttemptTransitions { private NMTokenSecretManagerInRM nmTokenManager = spy(new NMTokenSecretManagerInRM(conf)); private boolean transferStateFromPreviousAttempt = false; + private EventHandler rmnodeEventHandler; private final class TestApplicationAttemptEventDispatcher implements EventHandler { @@ -203,7 +211,7 @@ public void handle(AMLauncherEvent event) { applicationMasterLauncher.handle(event); } } - + private static int appId = 1; private ApplicationSubmissionContext submissionContext = null; @@ -268,6 +276,9 @@ public void setUp() throws Exception { rmDispatcher.register(AMLauncherEventType.class, new TestAMLauncherEventDispatcher()); + rmnodeEventHandler = mock(RMNodeImpl.class); + rmDispatcher.register(RMNodeEventType.class, rmnodeEventHandler); + rmDispatcher.init(conf); rmDispatcher.start(); @@ -575,6 +586,8 @@ private void testAppAttemptFinishedState(Container container, } assertEquals(finishedContainerCount, applicationAttempt .getJustFinishedContainers().size()); + Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt) + .size()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); @@ -704,7 +717,8 @@ private void testUnmanagedAMSuccess(String url) { application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(), container.getNodeId())); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); + applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class), + container.getNodeId())); // complete AM String diagnostics = "Successful"; FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; @@ -752,10 +766,11 @@ public void testUsageReport() { when(appResUsgRpt.getMemorySeconds()).thenReturn(223456L); when(appResUsgRpt.getVcoreSeconds()).thenReturn(75544L); sendAttemptUpdateSavedEvent(applicationAttempt); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( attemptId, ContainerStatus.newInstance( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); when(scheduler.getSchedulerAppInfo(eq(attemptId))).thenReturn(null); @@ -857,8 +872,9 @@ public void testAMCrashAtScheduled() { SchedulerUtils.LOST_CONTAINER); // send CONTAINER_FINISHED event at SCHEDULED state, // The state should be FINAL_SAVING with previous state SCHEDULED + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, anyNodeId)); // createApplicationAttemptState will return previous state (SCHEDULED), // if the current state is FINAL_SAVING. assertEquals(YarnApplicationAttemptState.SCHEDULED, @@ -904,8 +920,9 @@ public void testAMCrashAtAllocated() { ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, containerDiagMsg, exitCode); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, anyNodeId)); assertEquals(YarnApplicationAttemptState.ALLOCATED, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -928,16 +945,17 @@ public void testRunningToFailed() { ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, containerDiagMsg, exitCode); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs)); + appAttemptId, cs, anyNodeId)); // ignored ContainerFinished and Expire at FinalSaving if we were supposed // to Failed state. assertEquals(RMAppAttemptState.FINAL_SAVING, - applicationAttempt.getAppAttemptState()); + applicationAttempt.getAppAttemptState()); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -947,7 +965,7 @@ public void testRunningToFailed() { sendAttemptUpdateSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); - assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertEquals(2, applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", @@ -971,10 +989,11 @@ public void testRunningToKilled() { // ignored ContainerFinished and Expire at FinalSaving if we were supposed // to Killed state. assertEquals(RMAppAttemptState.FINAL_SAVING, - applicationAttempt.getAppAttemptState()); + applicationAttempt.getAppAttemptState()); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -984,7 +1003,7 @@ public void testRunningToKilled() { sendAttemptUpdateSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); - assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertEquals(1,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", @@ -1144,13 +1163,14 @@ public void testFinishingToFinishing() { unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, diagnostics); // container must be AM container to move from FINISHING to FINISHED + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle( new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( BuilderUtils.newContainerId( applicationAttempt.getAppAttemptId(), 42), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), anyNodeId)); testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl, diagnostics); } @@ -1165,13 +1185,14 @@ public void testSuccessfulFinishingToFinished() { String diagnostics = "Successful"; unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, diagnostics); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle( new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(amContainer.getId(), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), anyNodeId)); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, - diagnostics, 0, false); + diagnostics, 1, false); } // While attempt is at FINAL_SAVING, Contaienr_Finished event may come before @@ -1195,15 +1216,16 @@ public void testSuccessfulFinishingToFinished() { assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); // Container_finished event comes before Attempt_Saved event. + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); // send attempt_saved sendAttemptUpdateSavedEvent(applicationAttempt); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, - diagnostics, 0, false); + diagnostics, 1, false); } // While attempt is at FINAL_SAVING, Expire event may come before @@ -1235,6 +1257,71 @@ public void testFinalSavingToFinishedWithExpire() { diagnostics, 0, false); } + @Test + public void testFinishedContainer() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + + // Complete one container + ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt + .getAppAttemptId(), 2); + Container container1 = mock(Container.class); + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + when(container1.getId()).thenReturn( + containerId1); + when(containerStatus1.getContainerId()).thenReturn(containerId1); + when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + + application.handle(new RMAppRunningOnNodeEvent(application + .getApplicationId(), + container1.getNodeId())); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + container1.getNodeId())); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class); + + // Verify justFinishedContainers + Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers() + .size()); + Assert.assertEquals(container1.getId(), applicationAttempt + .getJustFinishedContainers().get(0).getContainerId()); + Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt) + .size()); + + // Verify finishedContainersSentToAM gets container after pull + List containerStatuses = applicationAttempt + .pullJustFinishedContainers(); + Assert.assertEquals(1, containerStatuses.size()); + Mockito.verify(rmnodeEventHandler, never()).handle(Mockito + .any(RMNodeEvent.class)); + Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty()); + Assert.assertEquals(1, getFinishedContainersSentToAM(applicationAttempt) + .size()); + + // Verify container is acked to NM via the RMNodeEvent after second pull + containerStatuses = applicationAttempt.pullJustFinishedContainers(); + Assert.assertEquals(0, containerStatuses.size()); + Mockito.verify(rmnodeEventHandler).handle(captor.capture()); + Assert.assertEquals(container1.getId(), captor.getValue().getContainers() + .get(0)); + Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty()); + Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt) + .size()); + } + + private static List getFinishedContainersSentToAM( + RMAppAttempt applicationAttempt) { + List containers = new ArrayList(); + for (List containerStatuses: applicationAttempt + .getFinishedContainersSentToAMReference().values()) { + containers.addAll(containerStatuses); + } + return containers; + } + // this is to test user can get client tokens only after the client token // master key is saved in the state store and also registered in // ClientTokenSecretManager @@ -1281,8 +1368,9 @@ public void testFailedToFailed() { ContainerStatus.newInstance(amContainer.getId(), ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, anyNodeId)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -1293,15 +1381,21 @@ public void testFailedToFailed() { verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); // failed attempt captured the container finished event. - assertEquals(0, applicationAttempt.getJustFinishedContainers().size()); + assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); ContainerStatus cs2 = ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2), ContainerState.COMPLETE, "", 0); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs2)); - assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); - assertEquals(cs2.getContainerId(), applicationAttempt - .getJustFinishedContainers().get(0).getContainerId()); + appAttemptId, cs2, anyNodeId)); + assertEquals(2, applicationAttempt.getJustFinishedContainers().size()); + boolean found = false; + for (ContainerStatus containerStatus:applicationAttempt + .getJustFinishedContainers()) { + if (cs2.getContainerId().equals(containerStatus.getContainerId())) { + found = true; + } + } + assertTrue(found); } @@ -1322,8 +1416,9 @@ scheduler, masterService, submissionContext, new Configuration(), ContainerStatus.newInstance(amContainer.getId(), ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, anyNodeId)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index b3dc35f3cf..a0d5d84bdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -161,7 +161,7 @@ public void testTokenExpiry() throws Exception { .getEventHandler() .handle( new RMAppAttemptContainerFinishedEvent(applicationAttemptId, - containerStatus)); + containerStatus, nm1.getNodeId())); // Make sure the RMAppAttempt is at Finished State. // Both AMRMToken and ClientToAMToken have been removed.