YARN-7102. NM heartbeat stuck when responseId overflows MAX_INT. Contributed by Botong Huang

This commit is contained in:
Jason Lowe 2018-01-25 17:47:19 -06:00
parent 16be42d309
commit ff8378eb1b
12 changed files with 125 additions and 128 deletions

View File

@ -71,7 +71,7 @@ public class NMSimulator extends TaskRunner.Task {
// resource manager // resource manager
private ResourceManager rm; private ResourceManager rm;
// heart beat response id // heart beat response id
private int RESPONSE_ID = 1; private int responseId = 0;
private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class); private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class);
public void init(String nodeIdStr, Resource nodeResource, public void init(String nodeIdStr, Resource nodeResource,
@ -131,7 +131,7 @@ public void middleStep() throws Exception {
ns.setContainersStatuses(generateContainerStatusList()); ns.setContainersStatuses(generateContainerStatusList());
ns.setNodeId(node.getNodeID()); ns.setNodeId(node.getNodeID());
ns.setKeepAliveApplications(new ArrayList<ApplicationId>()); ns.setKeepAliveApplications(new ArrayList<ApplicationId>());
ns.setResponseId(RESPONSE_ID ++); ns.setResponseId(responseId++);
ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0)); ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0));
beatRequest.setNodeStatus(ns); beatRequest.setNodeStatus(ns);
NodeHeartbeatResponse beatResponse = NodeHeartbeatResponse beatResponse =

View File

@ -144,8 +144,8 @@ public List<ApplicationId> getRunningApps() {
return runningApplications; return runningApplications;
} }
public void updateNodeHeartbeatResponseForCleanup( public void setAndUpdateNodeHeartbeatResponse(
NodeHeartbeatResponse response) { NodeHeartbeatResponse response) {
} }
public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
@ -178,13 +178,6 @@ public Set<String> getNodeLabels() {
return RMNodeLabelsManager.EMPTY_STRING_SET; return RMNodeLabelsManager.EMPTY_STRING_SET;
} }
@Override
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
// TODO Auto-generated method stub
}
@Override @Override
public List<Container> pullNewlyIncreasedContainers() { public List<Container> pullNewlyIncreasedContainers() {
// TODO Auto-generated method stub // TODO Auto-generated method stub

View File

@ -127,9 +127,9 @@ public List<ApplicationId> getRunningApps() {
} }
@Override @Override
public void updateNodeHeartbeatResponseForCleanup( public void setAndUpdateNodeHeartbeatResponse(
NodeHeartbeatResponse nodeHeartbeatResponse) { NodeHeartbeatResponse nodeHeartbeatResponse) {
node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse); node.setAndUpdateNodeHeartbeatResponse(nodeHeartbeatResponse);
} }
@Override @Override
@ -167,12 +167,6 @@ public Set<String> getNodeLabels() {
return RMNodeLabelsManager.EMPTY_STRING_SET; return RMNodeLabelsManager.EMPTY_STRING_SET;
} }
@Override
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
// TODO Auto-generated method stub
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public List<Container> pullNewlyIncreasedContainers() { public List<Container> pullNewlyIncreasedContainers() {

View File

@ -32,6 +32,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -81,6 +82,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@ -403,14 +405,37 @@ public RegisterNodeManagerResponse registerNodeManager(
} else { } else {
LOG.info("Reconnect from the node at: " + host); LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId); this.nmLivelinessMonitor.unregister(nodeId);
// Reset heartbeat ID since node just restarted.
oldNode.resetLastNodeHeartBeatResponse(); if (CollectionUtils.isEmpty(request.getRunningApplications())
this.rmContext && rmNode.getState() != NodeState.DECOMMISSIONING
.getDispatcher() && rmNode.getHttpPort() != oldNode.getHttpPort()) {
.getEventHandler() // Reconnected node differs, so replace old node and start new node
.handle( switch (rmNode.getState()) {
new RMNodeReconnectEvent(nodeId, rmNode, request case RUNNING:
.getRunningApplications(), request.getNMContainerStatuses())); ClusterMetrics.getMetrics().decrNumActiveNodes();
break;
case UNHEALTHY:
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
break;
default:
LOG.debug("Unexpected Rmnode state");
}
this.rmContext.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));
this.rmContext.getRMNodes().put(nodeId, rmNode);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeStartedEvent(nodeId, null, null));
} else {
// Reset heartbeat ID since node just restarted.
oldNode.resetLastNodeHeartBeatResponse();
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeReconnectEvent(nodeId, rmNode,
request.getRunningApplications(),
request.getNMContainerStatuses()));
}
} }
// On every node manager register we will be clearing NMToken keys if // On every node manager register we will be clearing NMToken keys if
// present for any running application. // present for any running application.
@ -508,12 +533,13 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse if (getNextResponseId(
.getResponseId()) { remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse
.getResponseId()) {
LOG.info("Received duplicate heartbeat from node " LOG.info("Received duplicate heartbeat from node "
+ rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId()); + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
return lastNodeHeartbeatResponse; return lastNodeHeartbeatResponse;
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse
.getResponseId()) { .getResponseId()) {
String message = String message =
"Too far behind rm response id:" "Too far behind rm response id:"
@ -549,13 +575,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
} }
// Heartbeat response // Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils NodeHeartbeatResponse nodeHeartBeatResponse =
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse. YarnServerBuilderUtils.newNodeHeartbeatResponse(
getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
nextHeartBeatInterval); NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
rmNode.updateNodeHeartbeatResponseForUpdatedContainers(
nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse);
@ -573,7 +597,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// 4. Send status to RMNode, saving the latest response. // 4. Send status to RMNode, saving the latest response.
RMNodeStatusEvent nodeStatusEvent = RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse); new RMNodeStatusEvent(nodeId, remoteNodeStatus);
if (request.getLogAggregationReportsForApps() != null if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) { && !request.getLogAggregationReportsForApps().isEmpty()) {
nodeStatusEvent.setLogAggregationReportsForApps(request nodeStatusEvent.setLogAggregationReportsForApps(request
@ -614,6 +638,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
return nodeHeartBeatResponse; return nodeHeartBeatResponse;
} }
private int getNextResponseId(int responseId) {
// Loop between 0 and Integer.MAX_VALUE
return (responseId + 1) & Integer.MAX_VALUE;
}
private void setAppCollectorsMapToResponse( private void setAppCollectorsMapToResponse(
List<ApplicationId> runningApps, NodeHeartbeatResponse response) { List<ApplicationId> runningApps, NodeHeartbeatResponse response) {
Map<ApplicationId, AppCollectorData> liveAppCollectorsMap = new Map<ApplicationId, AppCollectorData> liveAppCollectorsMap = new

View File

@ -141,10 +141,11 @@ public interface RMNode {
/** /**
* Update a {@link NodeHeartbeatResponse} with the list of containers and * Update a {@link NodeHeartbeatResponse} with the list of containers and
* applications to clean up for this node. * applications to clean up for this node, and the containers to be updated.
*
* @param response the {@link NodeHeartbeatResponse} to update * @param response the {@link NodeHeartbeatResponse} to update
*/ */
void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); void setAndUpdateNodeHeartbeatResponse(NodeHeartbeatResponse response);
public NodeHeartbeatResponse getLastNodeHeartBeatResponse(); public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
@ -167,13 +168,7 @@ public interface RMNode {
* @return labels in this node * @return labels in this node
*/ */
public Set<String> getNodeLabels(); public Set<String> getNodeLabels();
/**
* Update containers to be updated
*/
void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response);
public List<Container> pullNewlyIncreasedContainers(); public List<Container> pullNewlyIncreasedContainers();
OpportunisticContainersStatus getOpportunisticContainersStatus(); OpportunisticContainersStatus getOpportunisticContainersStatus();

View File

@ -598,7 +598,8 @@ public List<ContainerId> getContainersToCleanUp() {
}; };
@Override @Override
public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { public void setAndUpdateNodeHeartbeatResponse(
NodeHeartbeatResponse response) {
this.writeLock.lock(); this.writeLock.lock();
try { try {
@ -613,38 +614,30 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response
this.finishedApplications.clear(); this.finishedApplications.clear();
this.containersToSignal.clear(); this.containersToSignal.clear();
this.containersToBeRemovedFromNM.clear(); this.containersToBeRemovedFromNM.clear();
} finally {
this.writeLock.unlock();
}
};
@VisibleForTesting
public Collection<Container> getToBeUpdatedContainers() {
return toBeUpdatedContainers.values();
}
@Override
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
this.writeLock.lock();
try {
response.addAllContainersToUpdate(toBeUpdatedContainers.values()); response.addAllContainersToUpdate(toBeUpdatedContainers.values());
toBeUpdatedContainers.clear(); toBeUpdatedContainers.clear();
// NOTE: This is required for backward compatibility. // NOTE: This is required for backward compatibility.
response.addAllContainersToDecrease(toBeDecreasedContainers.values()); response.addAllContainersToDecrease(toBeDecreasedContainers.values());
toBeDecreasedContainers.clear(); toBeDecreasedContainers.clear();
// Synchronously update the last response in rmNode with updated
// responseId
this.latestNodeHeartBeatResponse = response;
} finally { } finally {
this.writeLock.unlock(); this.writeLock.unlock();
} }
};
@VisibleForTesting
public Collection<Container> getToBeUpdatedContainers() {
return toBeUpdatedContainers.values();
} }
@Override @Override
public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
this.readLock.lock(); this.readLock.lock();
try { try {
return this.latestNodeHeartBeatResponse; return this.latestNodeHeartBeatResponse;
} finally { } finally {
@ -818,7 +811,6 @@ private static void updateNodeResourceFromEvent(RMNodeImpl rmNode,
private static NodeHealthStatus updateRMNodeFromStatusEvents( private static NodeHealthStatus updateRMNodeFromStatusEvents(
RMNodeImpl rmNode, RMNodeStatusEvent statusEvent) { RMNodeImpl rmNode, RMNodeStatusEvent statusEvent) {
// Switch the last heartbeatresponse. // Switch the last heartbeatresponse.
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(remoteNodeHealthStatus rmNode.setLastHealthReportTime(remoteNodeHealthStatus
@ -912,21 +904,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode)); new NodeAddedSchedulerEvent(rmNode));
} }
} else {
// Reconnected node differs, so replace old node and start new node
switch (rmNode.getState()) {
case RUNNING:
ClusterMetrics.getMetrics().decrNumActiveNodes();
break;
case UNHEALTHY:
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
break;
default:
LOG.debug("Unexpected Rmnode state");
}
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(newNode.getNodeID(), null, null));
} }
} else { } else {

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -35,20 +34,16 @@
public class RMNodeStatusEvent extends RMNodeEvent { public class RMNodeStatusEvent extends RMNodeEvent {
private final NodeStatus nodeStatus; private final NodeStatus nodeStatus;
private final NodeHeartbeatResponse latestResponse;
private List<LogAggregationReport> logAggregationReportsForApps; private List<LogAggregationReport> logAggregationReportsForApps;
public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus) {
NodeHeartbeatResponse latestResponse) { this(nodeId, nodeStatus, null);
this(nodeId, nodeStatus, latestResponse, null);
} }
public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
NodeHeartbeatResponse latestResponse,
List<LogAggregationReport> logAggregationReportsForApps) { List<LogAggregationReport> logAggregationReportsForApps) {
super(nodeId, RMNodeEventType.STATUS_UPDATE); super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeStatus = nodeStatus; this.nodeStatus = nodeStatus;
this.latestResponse = latestResponse;
this.logAggregationReportsForApps = logAggregationReportsForApps; this.logAggregationReportsForApps = logAggregationReportsForApps;
} }
@ -60,10 +55,6 @@ public List<ContainerStatus> getContainers() {
return this.nodeStatus.getContainersStatuses(); return this.nodeStatus.getContainersStatuses();
} }
public NodeHeartbeatResponse getLatestResponse() {
return this.latestResponse;
}
public List<ApplicationId> getKeepAliveAppIds() { public List<ApplicationId> getKeepAliveAppIds() {
return this.nodeStatus.getKeepAliveApplications(); return this.nodeStatus.getKeepAliveApplications();
} }

View File

@ -131,7 +131,7 @@ public void containerIncreaseStatus(Container container) throws Exception {
container.getResource()); container.getResource());
List<Container> increasedConts = Collections.singletonList(container); List<Container> increasedConts = Collections.singletonList(container);
nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts,
true, ++responseId); true, responseId);
} }
public void addRegisteringCollector(ApplicationId appId, public void addRegisteringCollector(ApplicationId appId,
@ -190,12 +190,13 @@ public RegisterNodeManagerResponse registerNode(
} }
} }
} }
responseId = 0;
return registrationResponse; return registrationResponse;
} }
public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
return nodeHeartbeat(Collections.<ContainerStatus>emptyList(), return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
Collections.<Container>emptyList(), isHealthy, ++responseId); Collections.<Container>emptyList(), isHealthy, responseId);
} }
public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
@ -208,12 +209,12 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
containerStatusList.add(containerStatus); containerStatusList.add(containerStatus);
Log.getLog().info("ContainerStatus: " + containerStatus); Log.getLog().info("ContainerStatus: " + containerStatus);
return nodeHeartbeat(containerStatusList, return nodeHeartbeat(containerStatusList,
Collections.<Container>emptyList(), true, ++responseId); Collections.<Container>emptyList(), true, responseId);
} }
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId, public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy) throws Exception { List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
return nodeHeartbeat(conts, isHealthy, ++responseId); return nodeHeartbeat(conts, isHealthy, responseId);
} }
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId, public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@ -229,7 +230,7 @@ public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
public NodeHeartbeatResponse nodeHeartbeat( public NodeHeartbeatResponse nodeHeartbeat(
List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception { List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception {
return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(), return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
isHealthy, ++responseId); isHealthy, responseId);
} }
public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats, public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
@ -265,7 +266,8 @@ public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
NodeHeartbeatResponse heartbeatResponse = NodeHeartbeatResponse heartbeatResponse =
resourceTracker.nodeHeartbeat(req); resourceTracker.nodeHeartbeat(req);
responseId = heartbeatResponse.getResponseId();
MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey(); MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey();
if (masterKeyFromRM != null if (masterKeyFromRM != null
&& masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey && masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey
@ -303,4 +305,8 @@ public Resource getCapability() {
public String getVersion() { public String getVersion() {
return version; return version;
} }
public void setResponseId(int id) {
this.responseId = id;
}
} }

View File

@ -205,7 +205,8 @@ public List<ApplicationId> getRunningApps() {
} }
@Override @Override
public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { public void setAndUpdateNodeHeartbeatResponse(
NodeHeartbeatResponse response) {
} }
@Override @Override
@ -245,12 +246,6 @@ public Set<String> getNodeLabels() {
return CommonNodeLabelsManager.EMPTY_STRING_SET; return CommonNodeLabelsManager.EMPTY_STRING_SET;
} }
@Override
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
}
@Override @Override
public List<Container> pullNewlyIncreasedContainers() { public List<Container> pullNewlyIncreasedContainers() {
return Collections.emptyList(); return Collections.emptyList();

View File

@ -164,15 +164,12 @@ public void tearDown() throws Exception {
private RMNodeStatusEvent getMockRMNodeStatusEvent( private RMNodeStatusEvent getMockRMNodeStatusEvent(
List<ContainerStatus> containerStatus) { List<ContainerStatus> containerStatus) {
NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true); Boolean yes = new Boolean(true);
doReturn(yes).when(healthStatus).getIsNodeHealthy(); doReturn(yes).when(healthStatus).getIsNodeHealthy();
RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
doReturn(healthStatus).when(event).getNodeHealthStatus(); doReturn(healthStatus).when(event).getNodeHealthStatus();
doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
if (containerStatus != null) { if (containerStatus != null) {
doReturn(containerStatus).when(event).getContainers(); doReturn(containerStatus).when(event).getContainers();
@ -181,15 +178,12 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent(
} }
private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() { private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() {
NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true); Boolean yes = new Boolean(true);
doReturn(yes).when(healthStatus).getIsNodeHealthy(); doReturn(yes).when(healthStatus).getIsNodeHealthy();
RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
doReturn(healthStatus).when(event).getNodeHealthStatus(); doReturn(healthStatus).when(event).getNodeHealthStatus();
doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
doReturn(getAppIdList()).when(event).getKeepAliveAppIds(); doReturn(getAppIdList()).when(event).getKeepAliveAppIds();
return event; return event;
@ -202,15 +196,12 @@ private List<ApplicationId> getAppIdList() {
} }
private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() { private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true); Boolean yes = new Boolean(true);
doReturn(yes).when(healthStatus).getIsNodeHealthy(); doReturn(yes).when(healthStatus).getIsNodeHealthy();
RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
doReturn(healthStatus).when(event).getNodeHealthStatus(); doReturn(healthStatus).when(event).getNodeHealthStatus();
doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
doReturn(null).when(event).getKeepAliveAppIds(); doReturn(null).when(event).getKeepAliveAppIds();
return event; return event;
@ -646,7 +637,7 @@ public void testUpdateHeartbeatResponseForCleanup() {
Assert.assertEquals(1, node.getContainersToCleanUp().size()); Assert.assertEquals(1, node.getContainersToCleanUp().size());
Assert.assertEquals(1, node.getAppsToCleanup().size()); Assert.assertEquals(1, node.getAppsToCleanup().size());
NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class); NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class);
node.updateNodeHeartbeatResponseForCleanup(hbrsp); node.setAndUpdateNodeHeartbeatResponse(hbrsp);
Assert.assertEquals(0, node.getContainersToCleanUp().size()); Assert.assertEquals(0, node.getContainersToCleanUp().size());
Assert.assertEquals(0, node.getAppsToCleanup().size()); Assert.assertEquals(0, node.getAppsToCleanup().size());
Assert.assertEquals(1, hbrsp.getContainersToCleanup().size()); Assert.assertEquals(1, hbrsp.getContainersToCleanup().size());
@ -1108,7 +1099,8 @@ public void testForHandlingDuplicatedCompltedContainers() {
NodeHeartbeatResponse hbrsp = NodeHeartbeatResponse hbrsp =
Records.newRecord(NodeHeartbeatResponse.class); Records.newRecord(NodeHeartbeatResponse.class);
node.updateNodeHeartbeatResponseForCleanup(hbrsp); node.setAndUpdateNodeHeartbeatResponse(hbrsp);
Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size()); Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size());
Assert.assertEquals(0, node.getCompletedContainers().size()); Assert.assertEquals(0, node.getCompletedContainers().size());
} }

View File

@ -801,7 +801,7 @@ protected RMNodeLabelsManager createNodeLabelManager() {
Records.newRecord(NodeHeartbeatRequest.class); Records.newRecord(NodeHeartbeatRequest.class);
heartbeatReq.setNodeLabels(null); // Node heartbeat label update heartbeatReq.setNodeLabels(null); // Node heartbeat label update
nodeStatusObject = getNodeStatusObject(nodeId); nodeStatusObject = getNodeStatusObject(nodeId);
nodeStatusObject.setResponseId(responseId+2); nodeStatusObject.setResponseId(responseId+1);
heartbeatReq.setNodeStatus(nodeStatusObject); heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey()); .getNMTokenMasterKey());
@ -1128,8 +1128,7 @@ public void testNodeHeartbeatForAppCollectorsMap() throws Exception {
"", System.currentTimeMillis()); "", System.currentTimeMillis());
NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0, NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
statusList, null, nodeHealth, null, null, null); statusList, null, nodeHealth, null, null, null);
node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus, node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
nodeHeartbeat1));
Assert.assertEquals(1, node1.getRunningApps().size()); Assert.assertEquals(1, node1.getRunningApps().size());
Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0)); Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0));
@ -1145,8 +1144,7 @@ public void testNodeHeartbeatForAppCollectorsMap() throws Exception {
statusList.add(status2); statusList.add(status2);
nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0, nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
statusList, null, nodeHealth, null, null, null); statusList, null, nodeHealth, null, null, null);
node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus, node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus));
nodeHeartbeat2));
Assert.assertEquals(1, node2.getRunningApps().size()); Assert.assertEquals(1, node2.getRunningApps().size());
Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0)); Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0));
@ -2290,4 +2288,31 @@ public void testNodeHeartBeatResponseForUnknownContainerCleanUp()
} }
} }
} }
@Test
public void testResponseIdOverflow() throws Exception {
Configuration conf = new Configuration();
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
// prepare the responseId that's about to overflow
RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE);
nm1.setResponseId(Integer.MAX_VALUE);
// heartbeat twice and check responseId
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
Assert.assertEquals(0, nodeHeartbeat.getResponseId());
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
Assert.assertEquals(1, nodeHeartbeat.getResponseId());
}
} }

View File

@ -172,7 +172,7 @@ public void testLogAggregationStatus() throws Exception {
NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0, NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0,
new ArrayList<ContainerStatus>(), null, new ArrayList<ContainerStatus>(), null,
NodeHealthStatus.newInstance(true, null, 0), null, null, null); NodeHealthStatus.newInstance(true, null, 0), null, null, null);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
node1ReportForApp)); node1ReportForApp));
List<LogAggregationReport> node2ReportForApp = List<LogAggregationReport> node2ReportForApp =
@ -186,7 +186,7 @@ public void testLogAggregationStatus() throws Exception {
NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0, NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0,
new ArrayList<ContainerStatus>(), null, new ArrayList<ContainerStatus>(), null,
NodeHealthStatus.newInstance(true, null, 0), null, null, null); NodeHealthStatus.newInstance(true, null, 0), null, null, null);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2,
node2ReportForApp)); node2ReportForApp));
// node1 and node2 has updated its log aggregation status // node1 and node2 has updated its log aggregation status
// verify that the log aggregation status for node1, node2 // verify that the log aggregation status for node1, node2
@ -223,7 +223,7 @@ public void testLogAggregationStatus() throws Exception {
LogAggregationReport.newInstance(appId, LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode1_2); LogAggregationStatus.RUNNING, messageForNode1_2);
node1ReportForApp2.add(report1_2); node1ReportForApp2.add(report1_2);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
node1ReportForApp2)); node1ReportForApp2));
// verify that the log aggregation status for node1 // verify that the log aggregation status for node1
@ -291,7 +291,7 @@ public void testLogAggregationStatus() throws Exception {
LogAggregationStatus.SUCCEEDED, "")); LogAggregationStatus.SUCCEEDED, ""));
// For every logAggregationReport cached in memory, we can only save at most // For every logAggregationReport cached in memory, we can only save at most
// 10 diagnostic messages/failure messages // 10 diagnostic messages/failure messages
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1,
node1ReportForApp3)); node1ReportForApp3));
logAggregationStatus = rmApp.getLogAggregationReportsForApp(); logAggregationStatus = rmApp.getLogAggregationReportsForApp();
@ -335,7 +335,7 @@ public void testLogAggregationStatus() throws Exception {
LogAggregationStatus.FAILED, ""); LogAggregationStatus.FAILED, "");
node2ReportForApp2.add(report2_2); node2ReportForApp2.add(report2_2);
node2ReportForApp2.add(report2_3); node2ReportForApp2.add(report2_3);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2,
node2ReportForApp2)); node2ReportForApp2));
Assert.assertEquals(LogAggregationStatus.FAILED, Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport()); rmApp.getLogAggregationStatusForAppReport());