diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 50780b3cd4..18f0381cca 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -47,6 +47,9 @@ Release 2.0.4-beta - UNRELEASED YARN-406. Fix TestRackResolver to function in networks where "host1" resolves to a valid host. (Hitesh Shah via sseth) + YARN-376. Fixes a bug which would prevent the NM knowing about completed + containers and applications. (Jason Lowe via sseth) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index de5fcc68ce..7aaa066845 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -262,8 +262,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) HeartbeatResponse latestResponse = recordFactory .newRecordInstance(HeartbeatResponse.class); latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1); - latestResponse.addAllContainersToCleanup(rmNode.getContainersToCleanUp()); - latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup()); + rmNode.updateHeartbeatResponseForCleanup(latestResponse); latestResponse.setNodeAction(NodeAction.NORMAL); // Check if node's masterKey needs to be updated and if the currentKey has diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 4b1f8f9e6a..dc7295af08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -105,6 +105,13 @@ public interface RMNode { public List getAppsToCleanup(); + /** + * Update a {@link HeartbeatResponse} with the list of containers and + * applications to clean up for this node. + * @param response the {@link HeartbeatResponse} to update + */ + public void updateHeartbeatResponseForCleanup(HeartbeatResponse response); + public HeartbeatResponse getLastHeartBeatResponse(); /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3799526590..23a4599139 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -303,6 +303,21 @@ public List getContainersToCleanUp() { } }; + @Override + public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) { + this.writeLock.lock(); + + try { + response.addAllContainersToCleanup( + new ArrayList(this.containersToClean)); + response.addAllApplicationsToCleanup(this.finishedApplications); + this.containersToClean.clear(); + this.finishedApplications.clear(); + } finally { + this.writeLock.unlock(); + } + }; + @Override public HeartbeatResponse getLastHeartBeatResponse() { @@ -564,12 +579,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications( statusEvent.getKeepAliveAppIds()); - // HeartBeat processing from our end is done, as node pulls the following - // lists before sending status-updates. Clear data-structures - // TODO: These lists could go to the NM multiple times, or never. - rmNode.containersToClean.clear(); - rmNode.finishedApplications.clear(); - return NodeState.RUNNING; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 37e1017038..2c0fb6cf1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -186,6 +186,10 @@ public List getAppsToCleanup() { return null; } + @Override + public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) { + } + @Override public HeartbeatResponse getLastHeartBeatResponse() { return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 982d2af506..e165e417cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -30,6 +30,7 @@ import junit.framework.Assert; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; @@ -51,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -299,6 +302,39 @@ public void testUnhealthyRebooting() { Assert.assertEquals(NodeState.REBOOTED, node.getState()); } + @Test(timeout=20000) + public void testUpdateHeartbeatResponseForCleanup() { + RMNodeImpl node = getRunningNode(); + NodeId nodeId = node.getNodeID(); + + // Expire a container + ContainerId completedContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(0, 0), 0), 0); + node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId)); + Assert.assertEquals(1, node.getContainersToCleanUp().size()); + + // Finish an application + ApplicationId finishedAppId = BuilderUtils.newApplicationId(0, 1); + node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId)); + Assert.assertEquals(1, node.getAppsToCleanup().size()); + + // Verify status update does not clear containers/apps to cleanup + // but updating heartbeat response for cleanup does + RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(); + node.handle(statusEvent); + Assert.assertEquals(1, node.getContainersToCleanUp().size()); + Assert.assertEquals(1, node.getAppsToCleanup().size()); + HeartbeatResponse hbrsp = Records.newRecord(HeartbeatResponse.class); + node.updateHeartbeatResponseForCleanup(hbrsp); + Assert.assertEquals(0, node.getContainersToCleanUp().size()); + Assert.assertEquals(0, node.getAppsToCleanup().size()); + Assert.assertEquals(1, hbrsp.getContainersToCleanupCount()); + Assert.assertEquals(completedContainerId, hbrsp.getContainerToCleanup(0)); + Assert.assertEquals(1, hbrsp.getApplicationsToCleanupCount()); + Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup(0)); + } + private RMNodeImpl getRunningNode() { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,