diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index 6b19128008..ba0fd5617c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -71,7 +71,7 @@ public class NMSimulator extends TaskRunner.Task { // resource manager private ResourceManager rm; // heart beat response id - private int RESPONSE_ID = 1; + private int responseId = 0; private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class); public void init(String nodeIdStr, Resource nodeResource, @@ -131,7 +131,7 @@ public void middleStep() throws Exception { ns.setContainersStatuses(generateContainerStatusList()); ns.setNodeId(node.getNodeID()); ns.setKeepAliveApplications(new ArrayList()); - ns.setResponseId(RESPONSE_ID ++); + ns.setResponseId(responseId++); ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0)); beatRequest.setNodeStatus(ns); NodeHeartbeatResponse beatResponse = diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index e71ddff2d0..1016ce1679 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -144,8 +144,8 @@ public List getRunningApps() { return runningApplications; } - public void updateNodeHeartbeatResponseForCleanup( - NodeHeartbeatResponse response) { + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse response) { } public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { @@ -178,13 +178,6 @@ public Set getNodeLabels() { return RMNodeLabelsManager.EMPTY_STRING_SET; } - @Override - public void updateNodeHeartbeatResponseForUpdatedContainers( - NodeHeartbeatResponse response) { - // TODO Auto-generated method stub - - } - @Override public List pullNewlyIncreasedContainers() { // TODO Auto-generated method stub diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 6b7ac3cc23..fdad826994 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -127,9 +127,9 @@ public List getRunningApps() { } @Override - public void updateNodeHeartbeatResponseForCleanup( - NodeHeartbeatResponse nodeHeartbeatResponse) { - node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse); + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse nodeHeartbeatResponse) { + node.setAndUpdateNodeHeartbeatResponse(nodeHeartbeatResponse); } @Override @@ -167,12 +167,6 @@ public Set getNodeLabels() { return RMNodeLabelsManager.EMPTY_STRING_SET; } - @Override - public void updateNodeHeartbeatResponseForUpdatedContainers( - NodeHeartbeatResponse response) { - // TODO Auto-generated method stub - } - @SuppressWarnings("unchecked") @Override public List pullNewlyIncreasedContainers() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index a42d0533c5..9d95f636aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; @@ -403,14 +405,37 @@ public RegisterNodeManagerResponse registerNodeManager( } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); - // Reset heartbeat ID since node just restarted. - oldNode.resetLastNodeHeartBeatResponse(); - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeReconnectEvent(nodeId, rmNode, request - .getRunningApplications(), request.getNMContainerStatuses())); + + if (CollectionUtils.isEmpty(request.getRunningApplications()) + && rmNode.getState() != NodeState.DECOMMISSIONING + && rmNode.getHttpPort() != oldNode.getHttpPort()) { + // Reconnected node differs, so replace old node and start new node + switch (rmNode.getState()) { + case RUNNING: + ClusterMetrics.getMetrics().decrNumActiveNodes(); + break; + case UNHEALTHY: + ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); + break; + default: + LOG.debug("Unexpected Rmnode state"); + } + this.rmContext.getDispatcher().getEventHandler() + .handle(new NodeRemovedSchedulerEvent(rmNode)); + + this.rmContext.getRMNodes().put(nodeId, rmNode); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeStartedEvent(nodeId, null, null)); + + } else { + // Reset heartbeat ID since node just restarted. + oldNode.resetLastNodeHeartBeatResponse(); + + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeReconnectEvent(nodeId, rmNode, + request.getRunningApplications(), + request.getNMContainerStatuses())); + } } // On every node manager register we will be clearing NMToken keys if // present for any running application. @@ -508,12 +533,13 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); - if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse - .getResponseId()) { + if (getNextResponseId( + remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse + .getResponseId()) { LOG.info("Received duplicate heartbeat from node " + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId()); return lastNodeHeartbeatResponse; - } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse + } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse .getResponseId()) { String message = "Too far behind rm response id:" @@ -549,13 +575,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } // Heartbeat response - NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils - .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. - getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, - nextHeartBeatInterval); - rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); - rmNode.updateNodeHeartbeatResponseForUpdatedContainers( - nodeHeartBeatResponse); + NodeHeartbeatResponse nodeHeartBeatResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse( + getNextResponseId(lastNodeHeartbeatResponse.getResponseId()), + NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); + rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); @@ -573,7 +597,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 4. Send status to RMNode, saving the latest response. RMNodeStatusEvent nodeStatusEvent = - new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse); + new RMNodeStatusEvent(nodeId, remoteNodeStatus); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { nodeStatusEvent.setLogAggregationReportsForApps(request @@ -614,6 +638,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nodeHeartBeatResponse; } + private int getNextResponseId(int responseId) { + // Loop between 0 and Integer.MAX_VALUE + return (responseId + 1) & Integer.MAX_VALUE; + } + private void setAppCollectorsMapToResponse( List runningApps, NodeHeartbeatResponse response) { Map liveAppCollectorsMap = new diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 328c040a27..a5615ef06e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -141,10 +141,11 @@ public interface RMNode { /** * Update a {@link NodeHeartbeatResponse} with the list of containers and - * applications to clean up for this node. + * applications to clean up for this node, and the containers to be updated. + * * @param response the {@link NodeHeartbeatResponse} to update */ - void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); + void setAndUpdateNodeHeartbeatResponse(NodeHeartbeatResponse response); public NodeHeartbeatResponse getLastNodeHeartBeatResponse(); @@ -167,13 +168,7 @@ public interface RMNode { * @return labels in this node */ public Set getNodeLabels(); - - /** - * Update containers to be updated - */ - void updateNodeHeartbeatResponseForUpdatedContainers( - NodeHeartbeatResponse response); - + public List pullNewlyIncreasedContainers(); OpportunisticContainersStatus getOpportunisticContainersStatus(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 2b013a04c5..da54eb93a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -598,7 +598,8 @@ public List getContainersToCleanUp() { }; @Override - public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse response) { this.writeLock.lock(); try { @@ -613,38 +614,30 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response this.finishedApplications.clear(); this.containersToSignal.clear(); this.containersToBeRemovedFromNM.clear(); - } finally { - this.writeLock.unlock(); - } - }; - - @VisibleForTesting - public Collection getToBeUpdatedContainers() { - return toBeUpdatedContainers.values(); - } - - @Override - public void updateNodeHeartbeatResponseForUpdatedContainers( - NodeHeartbeatResponse response) { - this.writeLock.lock(); - - try { + response.addAllContainersToUpdate(toBeUpdatedContainers.values()); toBeUpdatedContainers.clear(); // NOTE: This is required for backward compatibility. response.addAllContainersToDecrease(toBeDecreasedContainers.values()); toBeDecreasedContainers.clear(); + + // Synchronously update the last response in rmNode with updated + // responseId + this.latestNodeHeartBeatResponse = response; } finally { this.writeLock.unlock(); } + }; + + @VisibleForTesting + public Collection getToBeUpdatedContainers() { + return toBeUpdatedContainers.values(); } @Override public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { - this.readLock.lock(); - try { return this.latestNodeHeartBeatResponse; } finally { @@ -818,7 +811,6 @@ private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, private static NodeHealthStatus updateRMNodeFromStatusEvents( RMNodeImpl rmNode, RMNodeStatusEvent statusEvent) { // Switch the last heartbeatresponse. - rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse(); NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); rmNode.setLastHealthReportTime(remoteNodeHealthStatus @@ -912,21 +904,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); } - } else { - // Reconnected node differs, so replace old node and start new node - switch (rmNode.getState()) { - case RUNNING: - ClusterMetrics.getMetrics().decrNumActiveNodes(); - break; - case UNHEALTHY: - ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); - break; - default: - LOG.debug("Unexpected Rmnode state"); - } - rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); - rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(newNode.getNodeID(), null, null)); } } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index f9fe159f60..c79f2704a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -35,20 +34,16 @@ public class RMNodeStatusEvent extends RMNodeEvent { private final NodeStatus nodeStatus; - private final NodeHeartbeatResponse latestResponse; private List logAggregationReportsForApps; - public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, - NodeHeartbeatResponse latestResponse) { - this(nodeId, nodeStatus, latestResponse, null); + public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus) { + this(nodeId, nodeStatus, null); } public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, - NodeHeartbeatResponse latestResponse, List logAggregationReportsForApps) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeStatus = nodeStatus; - this.latestResponse = latestResponse; this.logAggregationReportsForApps = logAggregationReportsForApps; } @@ -60,10 +55,6 @@ public List getContainers() { return this.nodeStatus.getContainersStatuses(); } - public NodeHeartbeatResponse getLatestResponse() { - return this.latestResponse; - } - public List getKeepAliveAppIds() { return this.nodeStatus.getKeepAliveApplications(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 05b51e3cc3..0a06e82578 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -131,7 +131,7 @@ public void containerIncreaseStatus(Container container) throws Exception { container.getResource()); List increasedConts = Collections.singletonList(container); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, - true, ++responseId); + true, responseId); } public void addRegisteringCollector(ApplicationId appId, @@ -190,12 +190,13 @@ public RegisterNodeManagerResponse registerNode( } } } + responseId = 0; return registrationResponse; } public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(Collections.emptyList(), - Collections.emptyList(), isHealthy, ++responseId); + Collections.emptyList(), isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, @@ -208,12 +209,12 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, containerStatusList.add(containerStatus); Log.getLog().info("ContainerStatus: " + containerStatus); return nodeHeartbeat(containerStatusList, - Collections.emptyList(), true, ++responseId); + Collections.emptyList(), true, responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy) throws Exception { - return nodeHeartbeat(conts, isHealthy, ++responseId); + return nodeHeartbeat(conts, isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map updatedStats, boolean isHealthy) throws Exception { return nodeHeartbeat(updatedStats, Collections.emptyList(), - isHealthy, ++responseId); + isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, @@ -265,7 +266,8 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); - + responseId = heartbeatResponse.getResponseId(); + MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey(); if (masterKeyFromRM != null && masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey @@ -303,4 +305,8 @@ public Resource getCapability() { public String getVersion() { return version; } + + public void setResponseId(int id) { + this.responseId = id; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 317c6489f3..d6549b99c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -205,7 +205,8 @@ public List getRunningApps() { } @Override - public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse response) { } @Override @@ -245,12 +246,6 @@ public Set getNodeLabels() { return CommonNodeLabelsManager.EMPTY_STRING_SET; } - @Override - public void updateNodeHeartbeatResponseForUpdatedContainers( - NodeHeartbeatResponse response) { - - } - @Override public List pullNewlyIncreasedContainers() { return Collections.emptyList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 36571233f0..487d226072 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -164,15 +164,12 @@ public void tearDown() throws Exception { private RMNodeStatusEvent getMockRMNodeStatusEvent( List containerStatus) { - NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); - NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); Boolean yes = new Boolean(true); doReturn(yes).when(healthStatus).getIsNodeHealthy(); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); doReturn(healthStatus).when(event).getNodeHealthStatus(); - doReturn(response).when(event).getLatestResponse(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); if (containerStatus != null) { doReturn(containerStatus).when(event).getContainers(); @@ -181,15 +178,12 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent( } private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() { - NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); - NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); Boolean yes = new Boolean(true); doReturn(yes).when(healthStatus).getIsNodeHealthy(); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); doReturn(healthStatus).when(event).getNodeHealthStatus(); - doReturn(response).when(event).getLatestResponse(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); doReturn(getAppIdList()).when(event).getKeepAliveAppIds(); return event; @@ -202,15 +196,12 @@ private List getAppIdList() { } private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() { - NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); - NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); Boolean yes = new Boolean(true); doReturn(yes).when(healthStatus).getIsNodeHealthy(); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); doReturn(healthStatus).when(event).getNodeHealthStatus(); - doReturn(response).when(event).getLatestResponse(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); doReturn(null).when(event).getKeepAliveAppIds(); return event; @@ -646,7 +637,7 @@ public void testUpdateHeartbeatResponseForCleanup() { Assert.assertEquals(1, node.getContainersToCleanUp().size()); Assert.assertEquals(1, node.getAppsToCleanup().size()); NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class); - node.updateNodeHeartbeatResponseForCleanup(hbrsp); + node.setAndUpdateNodeHeartbeatResponse(hbrsp); Assert.assertEquals(0, node.getContainersToCleanUp().size()); Assert.assertEquals(0, node.getAppsToCleanup().size()); Assert.assertEquals(1, hbrsp.getContainersToCleanup().size()); @@ -1108,7 +1099,8 @@ public void testForHandlingDuplicatedCompltedContainers() { NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class); - node.updateNodeHeartbeatResponseForCleanup(hbrsp); + node.setAndUpdateNodeHeartbeatResponse(hbrsp); + Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size()); Assert.assertEquals(0, node.getCompletedContainers().size()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index fc6326eb1e..96e4451c95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -801,7 +801,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { Records.newRecord(NodeHeartbeatRequest.class); heartbeatReq.setNodeLabels(null); // Node heartbeat label update nodeStatusObject = getNodeStatusObject(nodeId); - nodeStatusObject.setResponseId(responseId+2); + nodeStatusObject.setResponseId(responseId+1); heartbeatReq.setNodeStatus(nodeStatusObject); heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse .getNMTokenMasterKey()); @@ -1128,8 +1128,7 @@ public void testNodeHeartbeatForAppCollectorsMap() throws Exception { "", System.currentTimeMillis()); NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0, statusList, null, nodeHealth, null, null, null); - node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus, - nodeHeartbeat1)); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus)); Assert.assertEquals(1, node1.getRunningApps().size()); Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0)); @@ -1145,8 +1144,7 @@ public void testNodeHeartbeatForAppCollectorsMap() throws Exception { statusList.add(status2); nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0, statusList, null, nodeHealth, null, null, null); - node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus, - nodeHeartbeat2)); + node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus)); Assert.assertEquals(1, node2.getRunningApps().size()); Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0)); @@ -2290,4 +2288,31 @@ public void testNodeHeartBeatResponseForUnknownContainerCleanUp() } } } + + @Test + public void testResponseIdOverflow() throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + + // prepare the responseId that's about to overflow + RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE); + + nm1.setResponseId(Integer.MAX_VALUE); + + // heartbeat twice and check responseId + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + Assert.assertEquals(0, nodeHeartbeat.getResponseId()); + + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + Assert.assertEquals(1, nodeHeartbeat.getResponseId()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 677990b8c9..c2bc611780 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -172,7 +172,7 @@ public void testLogAggregationStatus() throws Exception { NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0, new ArrayList(), null, NodeHealthStatus.newInstance(true, null, 0), null, null, null); - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp)); List node2ReportForApp = @@ -186,7 +186,7 @@ public void testLogAggregationStatus() throws Exception { NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0, new ArrayList(), null, NodeHealthStatus.newInstance(true, null, 0), null, null, null); - node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, node2ReportForApp)); // node1 and node2 has updated its log aggregation status // verify that the log aggregation status for node1, node2 @@ -223,7 +223,7 @@ public void testLogAggregationStatus() throws Exception { LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING, messageForNode1_2); node1ReportForApp2.add(report1_2); - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp2)); // verify that the log aggregation status for node1 @@ -291,7 +291,7 @@ public void testLogAggregationStatus() throws Exception { LogAggregationStatus.SUCCEEDED, "")); // For every logAggregationReport cached in memory, we can only save at most // 10 diagnostic messages/failure messages - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp3)); logAggregationStatus = rmApp.getLogAggregationReportsForApp(); @@ -335,7 +335,7 @@ public void testLogAggregationStatus() throws Exception { LogAggregationStatus.FAILED, ""); node2ReportForApp2.add(report2_2); node2ReportForApp2.add(report2_3); - node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, node2ReportForApp2)); Assert.assertEquals(LogAggregationStatus.FAILED, rmApp.getLogAggregationStatusForAppReport());