YARN-3212. RMNode State Transition Update with DECOMMISSIONING state. (Junping Du via wangda)

This commit is contained in:
Wangda Tan 2015-09-18 10:04:17 -07:00
parent 3f42753102
commit 9bc913a35c
7 changed files with 510 additions and 166 deletions

View File

@ -197,6 +197,9 @@ Release 2.8.0 - UNRELEASED
YARN-4034. Render cluster Max Priority in scheduler metrics in RM web YARN-4034. Render cluster Max Priority in scheduler metrics in RM web
UI. (Rohith Sharma K S via jianhe) UI. (Rohith Sharma K S via jianhe)
YARN-3212. RMNode State Transition Update with DECOMMISSIONING state.
(Junping Du via wangda)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -399,7 +399,7 @@ public void refreshNodesGracefully(Configuration conf) throws IOException,
NodeId nodeId = entry.getKey(); NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) { if (!isValidNode(nodeId.getHost())) {
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION_WITH_TIMEOUT)); new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
} else { } else {
// Recommissioning the nodes // Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING if (entry.getValue().getState() == NodeState.DECOMMISSIONING

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -399,8 +400,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeId nodeId = remoteNodeStatus.getNodeId(); NodeId nodeId = remoteNodeStatus.getNodeId();
// 1. Check if it's a valid (i.e. not excluded) node // 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is
if (!this.nodesListManager.isValidNode(nodeId.getHost())) { // in decommissioning.
if (!this.nodesListManager.isValidNode(nodeId.getHost())
&& !isNodeInDecommissioning(nodeId)) {
String message = String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: " "Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost(); + nodeId.getHost();
@ -486,6 +489,19 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
return nodeHeartBeatResponse; return nodeHeartBeatResponse;
} }
/**
* Check if node in decommissioning state.
* @param nodeId
*/
private boolean isNodeInDecommissioning(NodeId nodeId) {
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode != null &&
rmNode.getState().equals(NodeState.DECOMMISSIONING)) {
return true;
}
return false;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public UnRegisterNodeManagerResponse unRegisterNodeManager( public UnRegisterNodeManagerResponse unRegisterNodeManager(

View File

@ -24,7 +24,7 @@ public enum RMNodeEventType {
// Source: AdminService // Source: AdminService
DECOMMISSION, DECOMMISSION,
DECOMMISSION_WITH_TIMEOUT, GRACEFUL_DECOMMISSION,
RECOMMISSION, RECOMMISSION,
// Source: AdminService, ResourceTrackerService // Source: AdminService, ResourceTrackerService

View File

@ -154,10 +154,15 @@ RMNodeEventType.STARTED, new AddNodeTransition())
//Transitions from RUNNING state //Transitions from RUNNING state
.addTransition(NodeState.RUNNING, .addTransition(NodeState.RUNNING,
EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) RMNodeEventType.STATUS_UPDATE,
new StatusUpdateWhenHealthyTransition())
.addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION, RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
.addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONING,
RMNodeEventType.GRACEFUL_DECOMMISSION,
new DecommissioningNodeTransition(NodeState.RUNNING,
NodeState.DECOMMISSIONING))
.addTransition(NodeState.RUNNING, NodeState.LOST, .addTransition(NodeState.RUNNING, NodeState.LOST,
RMNodeEventType.EXPIRE, RMNodeEventType.EXPIRE,
new DeactivateNodeTransition(NodeState.LOST)) new DeactivateNodeTransition(NodeState.LOST))
@ -171,7 +176,7 @@ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING, .addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition()) new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING, .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING),
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING, .addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
@ -192,6 +197,45 @@ RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition()) new AddContainersToBeRemovedFromNMTransition())
//Transitions from DECOMMISSIONING state
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
.addTransition(NodeState.DECOMMISSIONING, NodeState.RUNNING,
RMNodeEventType.RECOMMISSION,
new RecommissionNodeTransition(NodeState.RUNNING))
.addTransition(NodeState.DECOMMISSIONING,
EnumSet.of(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
RMNodeEventType.STATUS_UPDATE,
new StatusUpdateWhenHealthyTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.GRACEFUL_DECOMMISSION,
new DecommissioningNodeTransition(NodeState.DECOMMISSIONING,
NodeState.DECOMMISSIONING))
.addTransition(NodeState.DECOMMISSIONING, NodeState.LOST,
RMNodeEventType.EXPIRE,
new DeactivateNodeTransition(NodeState.LOST))
.addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED,
RMNodeEventType.REBOOTING,
new DeactivateNodeTransition(NodeState.REBOOTED))
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.DECOMMISSIONING, EnumSet.of(
NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenRunningTransition())
//Transitions from LOST state //Transitions from LOST state
.addTransition(NodeState.LOST, NodeState.LOST, .addTransition(NodeState.LOST, NodeState.LOST,
RMNodeEventType.RESOURCE_UPDATE, RMNodeEventType.RESOURCE_UPDATE,
@ -208,20 +252,25 @@ RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION, RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
.addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONING,
RMNodeEventType.GRACEFUL_DECOMMISSION,
new DecommissioningNodeTransition(NodeState.UNHEALTHY,
NodeState.DECOMMISSIONING))
.addTransition(NodeState.UNHEALTHY, NodeState.LOST, .addTransition(NodeState.UNHEALTHY, NodeState.LOST,
RMNodeEventType.EXPIRE, RMNodeEventType.EXPIRE,
new DeactivateNodeTransition(NodeState.LOST)) new DeactivateNodeTransition(NodeState.LOST))
.addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED, .addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED,
RMNodeEventType.REBOOTING, RMNodeEventType.REBOOTING,
new DeactivateNodeTransition(NodeState.REBOOTED)) new DeactivateNodeTransition(NodeState.REBOOTED))
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY),
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition()) new AddContainersToBeRemovedFromNMTransition())
@ -291,6 +340,11 @@ public int getHttpPort() {
return httpPort; return httpPort;
} }
// Test only
public void setHttpPort(int port) {
this.httpPort = port;
}
@Override @Override
public NodeId getNodeID() { public NodeId getNodeID() {
return this.nodeId; return this.nodeId;
@ -501,6 +555,15 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
} }
} }
// Treats nodes in decommissioning as active nodes
// TODO we may want to differentiate active nodes and decommissioning node in
// metrics later.
private void updateMetricsForGracefulDecommissionOnUnhealthyNode() {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
metrics.incrNumActiveNodes();
metrics.decrNumUnhealthyNMs();
}
private void updateMetricsForDeactivatedNode(NodeState initialState, private void updateMetricsForDeactivatedNode(NodeState initialState,
NodeState finalState) { NodeState finalState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics(); ClusterMetrics metrics = ClusterMetrics.getMetrics();
@ -509,6 +572,9 @@ private void updateMetricsForDeactivatedNode(NodeState initialState,
case RUNNING: case RUNNING:
metrics.decrNumActiveNodes(); metrics.decrNumActiveNodes();
break; break;
case DECOMMISSIONING:
metrics.decrNumActiveNodes();
break;
case UNHEALTHY: case UNHEALTHY:
metrics.decrNumUnhealthyNMs(); metrics.decrNumUnhealthyNMs();
break; break;
@ -608,10 +674,10 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
} }
public static class ReconnectNodeTransition implements public static class ReconnectNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> { MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
@Override @Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) { public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
RMNode newNode = reconnectEvent.getReconnectedNode(); RMNode newNode = reconnectEvent.getReconnectedNode();
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
@ -622,6 +688,12 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// No application running on the node, so send node-removal event with // No application running on the node, so send node-removal event with
// cleaning up old container info. // cleaning up old container info.
if (noRunningApps) { if (noRunningApps) {
if (rmNode.getState() == NodeState.DECOMMISSIONING) {
// When node in decommissioning, and no running apps on this node,
// it will return as decommissioned state.
deactivateNode(rmNode, NodeState.DECOMMISSIONED);
return NodeState.DECOMMISSIONED;
}
rmNode.nodeUpdateQueue.clear(); rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode)); new NodeRemovedSchedulerEvent(rmNode));
@ -652,6 +724,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(newNode.getNodeID(), null, null)); new RMNodeStartedEvent(newNode.getNodeID(), null, null));
} }
} else { } else {
rmNode.httpPort = newNode.getHttpPort(); rmNode.httpPort = newNode.getHttpPort();
rmNode.httpAddress = newNode.getHttpAddress(); rmNode.httpAddress = newNode.getHttpAddress();
@ -678,11 +751,14 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption
.newInstance(newNode.getTotalCapability(), -1))); .newInstance(newNode.getTotalCapability(), -1)));
} }
} }
return rmNode.getState();
} }
private void handleNMContainerStatus( private void handleNMContainerStatus(
List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) { List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) {
if (nmContainerStatuses != null) {
List<ContainerStatus> containerStatuses = List<ContainerStatus> containerStatuses =
new ArrayList<ContainerStatus>(); new ArrayList<ContainerStatus>();
for (NMContainerStatus nmContainerStatus : nmContainerStatuses) { for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
@ -690,6 +766,7 @@ private void handleNMContainerStatus(
} }
rmnode.handleContainerStatus(containerStatuses); rmnode.handleContainerStatus(containerStatuses);
} }
}
private ContainerStatus createContainerStatus( private ContainerStatus createContainerStatus(
NMContainerStatus remoteContainer) { NMContainerStatus remoteContainer) {
@ -770,6 +847,33 @@ public DeactivateNodeTransition(NodeState finalState) {
@Override @Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) { public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeImpl.deactivateNode(rmNode, finalState);
}
}
/**
* Put a node in deactivated (decommissioned) status.
* @param rmNode
* @param finalState
*/
public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) {
reportNodeUnusable(rmNode, finalState);
// Deactivate the node
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
}
/**
* Report node is UNUSABLE and update metrics.
* @param rmNode
* @param finalState
*/
public static void reportNodeUnusable(RMNodeImpl rmNode,
NodeState finalState) {
// Inform the scheduler // Inform the scheduler
rmNode.nodeUpdateQueue.clear(); rmNode.nodeUpdateQueue.clear();
// If the current state is NodeState.UNHEALTHY // If the current state is NodeState.UNHEALTHY
@ -784,17 +888,53 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
new NodesListManagerEvent( new NodesListManagerEvent(
NodesListManagerEventType.NODE_UNUSABLE, rmNode)); NodesListManagerEventType.NODE_UNUSABLE, rmNode));
// Deactivate the node
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
//Update the metrics //Update the metrics
rmNode.updateMetricsForDeactivatedNode(initialState, finalState); rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
} }
/**
* The transition to put node in decommissioning state.
*/
public static class DecommissioningNodeTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
private final NodeState initState;
private final NodeState finalState;
public DecommissioningNodeTransition(NodeState initState,
NodeState finalState) {
this.initState = initState;
this.finalState = finalState;
} }
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
if (initState.equals(NodeState.UNHEALTHY)) {
rmNode.updateMetricsForGracefulDecommissionOnUnhealthyNode();
}
// TODO (in YARN-3223) Keep NM's available resource to be 0
}
}
public static class RecommissionNodeTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
private final NodeState finalState;
public RecommissionNodeTransition(NodeState finalState) {
this.finalState = finalState;
}
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " +
"recommissioned back to RUNNING.");
// TODO handle NM resource resume in YARN-3223.
}
}
/**
* Status update transition when node is healthy.
*/
public static class StatusUpdateWhenHealthyTransition implements public static class StatusUpdateWhenHealthyTransition implements
MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> { MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
@Override @Override
@ -810,21 +950,40 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime( rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime()); remoteNodeHealthStatus.getLastHealthReportTime());
NodeState initialState = rmNode.getState();
boolean isNodeDecommissioning =
initialState.equals(NodeState.DECOMMISSIONING);
if (!remoteNodeHealthStatus.getIsNodeHealthy()) { if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: " LOG.info("Node " + rmNode.nodeId +
+ remoteNodeHealthStatus.getHealthReport()); " reported UNHEALTHY with details: " +
rmNode.nodeUpdateQueue.clear(); remoteNodeHealthStatus.getHealthReport());
// Inform the scheduler // if a node in decommissioning receives an unhealthy report,
rmNode.context.getDispatcher().getEventHandler().handle( // it will keep decommissioning.
new NodeRemovedSchedulerEvent(rmNode)); if (isNodeDecommissioning) {
rmNode.context.getDispatcher().getEventHandler().handle( return NodeState.DECOMMISSIONING;
new NodesListManagerEvent( } else {
NodesListManagerEventType.NODE_UNUSABLE, rmNode)); reportNodeUnusable(rmNode, NodeState.UNHEALTHY);
// Update metrics
rmNode.updateMetricsForDeactivatedNode(rmNode.getState(),
NodeState.UNHEALTHY);
return NodeState.UNHEALTHY; return NodeState.UNHEALTHY;
} }
}
if (isNodeDecommissioning) {
List<ApplicationId> runningApps = rmNode.getRunningApps();
List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
// no running (and keeping alive) app on this node, get it
// decommissioned.
// TODO may need to check no container is being scheduled on this node
// as well.
if ((runningApps == null || runningApps.size() == 0)
&& (keepAliveApps == null || keepAliveApps.size() == 0)) {
RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
return NodeState.DECOMMISSIONED;
}
// TODO (in YARN-3223) if node in decommissioning, get node resource
// updated if container get finished (keep available resource to be 0)
}
rmNode.handleContainerStatus(statusEvent.getContainers()); rmNode.handleContainerStatus(statusEvent.getContainers());
@ -848,7 +1007,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
statusEvent.getKeepAliveAppIds()); statusEvent.getKeepAliveAppIds());
} }
return NodeState.RUNNING; return initialState;
} }
} }
@ -861,7 +1020,8 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Switch the last heartbeatresponse. // Switch the last heartbeatresponse.
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse(); rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); NodeHealthStatus remoteNodeHealthStatus =
statusEvent.getNodeHealthStatus();
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime( rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime()); remoteNodeHealthStatus.getLastHealthReportTime());

View File

@ -29,7 +29,9 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Random;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -168,6 +170,42 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent(
return event; return event;
} }
private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() {
NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true);
doReturn(yes).when(healthStatus).getIsNodeHealthy();
RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
doReturn(healthStatus).when(event).getNodeHealthStatus();
doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
doReturn(getAppIdList()).when(event).getKeepAliveAppIds();
return event;
}
private List<ApplicationId> getAppIdList() {
List<ApplicationId> appIdList = new ArrayList<ApplicationId>();
appIdList.add(BuilderUtils.newApplicationId(0, 0));
return appIdList;
}
private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true);
doReturn(yes).when(healthStatus).getIsNodeHealthy();
RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
doReturn(healthStatus).when(event).getNodeHealthStatus();
doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
doReturn(null).when(event).getKeepAliveAppIds();
return event;
}
@Test (timeout = 5000) @Test (timeout = 5000)
public void testExpiredContainer() { public void testExpiredContainer() {
// Start the node // Start the node
@ -196,6 +234,32 @@ public void testExpiredContainer() {
verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class)); verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class));
} }
@Test
public void testStatusUpdateOnDecommissioningNode(){
RMNodeImpl node = getDecommissioningNode();
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
// Verify node in DECOMMISSIONING won't be changed by status update
// with running apps
RMNodeStatusEvent statusEvent = getMockRMNodeStatusEventWithRunningApps();
node.handle(statusEvent);
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
// Verify node in DECOMMISSIONING will be changed by status update
// without running apps
statusEvent = getMockRMNodeStatusEventWithoutRunningApps();
node.handle(statusEvent);
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@Test
public void testRecommissionNode(){
RMNodeImpl node = getDecommissioningNode();
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.RECOMMISSION));
Assert.assertEquals(NodeState.RUNNING, node.getState());
}
@Test (timeout = 5000) @Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{ public void testContainerUpdate() throws InterruptedException{
//Start the node //Start the node
@ -407,6 +471,28 @@ public void testRunningDecommission() {
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
} }
@Test
public void testDecommissionOnDecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.DECOMMISSION));
Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes",
initialDecommissioned + 1, cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@Test @Test
public void testUnhealthyDecommission() { public void testUnhealthyDecommission() {
RMNodeImpl node = getUnhealthyNode(); RMNodeImpl node = getUnhealthyNode();
@ -429,6 +515,30 @@ public void testUnhealthyDecommission() {
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
} }
// Test Decommissioning on a unhealthy node will make it decommissioning.
@Test
public void testUnhealthyDecommissioning() {
RMNodeImpl node = getUnhealthyNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
int initialActive = cm.getNumActiveNMs();
int initialLost = cm.getNumLostNMs();
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
Assert.assertEquals("Active Nodes", initialActive + 1,
cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
initialUnhealthy - 1, cm.getUnhealthyNMs());
Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
cm.getNumDecommisionedNMs());
Assert.assertEquals("Rebooted Nodes",
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
}
@Test @Test
public void testRunningRebooting() { public void testRunningRebooting() {
RMNodeImpl node = getRunningNode(); RMNodeImpl node = getRunningNode();
@ -567,6 +677,14 @@ private RMNodeImpl getRunningNode(String nmVersion, int port) {
return node; return node;
} }
private RMNodeImpl getDecommissioningNode() {
RMNodeImpl node = getRunningNode();
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
return node;
}
private RMNodeImpl getUnhealthyNode() { private RMNodeImpl getUnhealthyNode() {
RMNodeImpl node = getRunningNode(); RMNodeImpl node = getRunningNode();
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
@ -577,7 +695,6 @@ private RMNodeImpl getUnhealthyNode() {
return node; return node;
} }
private RMNodeImpl getNewNode() { private RMNodeImpl getNewNode() {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
@ -651,6 +768,38 @@ public void testReconnect() {
nodesListManagerEvent.getType()); nodesListManagerEvent.getType());
} }
@Test
public void testReconnectOnDecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
// Reconnect event with running app
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
getAppIdList(), null));
// still decommissioning
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
// Reconnect event without any running app
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@Test
public void testReconnectWithNewPortOnDecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
Random r= new Random();
node.setHttpPort(r.nextInt(10000));
// Reconnect event with running app
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
getAppIdList(), null));
// still decommissioning
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
node.setHttpPort(r.nextInt(10000));
// Reconnect event without any running app
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
@Test @Test
public void testResourceUpdateOnRunningNode() { public void testResourceUpdateOnRunningNode() {
RMNodeImpl node = getRunningNode(); RMNodeImpl node = getRunningNode();
@ -670,6 +819,11 @@ public void testResourceUpdateOnRunningNode() {
nodesListManagerEvent.getType()); nodesListManagerEvent.getType());
} }
@Test
public void testDecommissioningOnRunningNode(){
getDecommissioningNode();
}
@Test @Test
public void testResourceUpdateOnNewNode() { public void testResourceUpdateOnNewNode() {
RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4)); RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4));
@ -702,6 +856,18 @@ public void testResourceUpdateOnRebootedNode() {
Assert.assertEquals(NodeState.REBOOTED, node.getState()); Assert.assertEquals(NodeState.REBOOTED, node.getState());
} }
// Test unhealthy report on a decommissioning node will make it
// keep decommissioning.
@Test
public void testDecommissioningUnhealthy() {
RMNodeImpl node = getDecommissioningNode();
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
System.currentTimeMillis());
node.handle(new RMNodeStatusEvent(node.getNodeID(), status,
new ArrayList<ContainerStatus>(), null, null));
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
}
@Test @Test
public void testReconnnectUpdate() { public void testReconnnectUpdate() {
final String nmVersion1 = "nm version 1"; final String nmVersion1 = "nm version 1";

View File

@ -43,8 +43,7 @@ public class TestNodesPage {
final int numberOfNodesPerRack = 8; final int numberOfNodesPerRack = 8;
// The following is because of the way TestRMWebApp.mockRMContext creates // The following is because of the way TestRMWebApp.mockRMContext creates
// nodes. // nodes.
final int numberOfLostNodesPerRack = numberOfNodesPerRack final int numberOfLostNodesPerRack = 1;
/ NodeState.values().length;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in // Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value. // future. In that case this value should be adjusted to the new value.