diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8fe686deb1..32bf7dc7ca 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -197,6 +197,9 @@ Release 2.8.0 - UNRELEASED YARN-4034. Render cluster Max Priority in scheduler metrics in RM web UI. (Rohith Sharma K S via jianhe) + YARN-3212. RMNode State Transition Update with DECOMMISSIONING state. + (Junping Du via wangda) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index abea85e908..1e8b98a010 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -399,7 +399,7 @@ public void refreshNodesGracefully(Configuration conf) throws IOException, NodeId nodeId = entry.getKey(); if (!isValidNode(nodeId.getHost())) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION_WITH_TIMEOUT)); + new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION)); } else { // Recommissioning the nodes if (entry.getValue().getState() == NodeState.DECOMMISSIONING diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 100e991621..7e774c5fbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -399,8 +400,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeId nodeId = remoteNodeStatus.getNodeId(); - // 1. Check if it's a valid (i.e. not excluded) node - if (!this.nodesListManager.isValidNode(nodeId.getHost())) { + // 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is + // in decommissioning. + if (!this.nodesListManager.isValidNode(nodeId.getHost()) + && !isNodeInDecommissioning(nodeId)) { String message = "Disallowed NodeManager nodeId: " + nodeId + " hostname: " + nodeId.getHost(); @@ -486,6 +489,19 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nodeHeartBeatResponse; } + /** + * Check if node in decommissioning state. + * @param nodeId + */ + private boolean isNodeInDecommissioning(NodeId nodeId) { + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode != null && + rmNode.getState().equals(NodeState.DECOMMISSIONING)) { + return true; + } + return false; + } + @SuppressWarnings("unchecked") @Override public UnRegisterNodeManagerResponse unRegisterNodeManager( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index 27ba1c0ec5..ad360360d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -24,7 +24,7 @@ public enum RMNodeEventType { // Source: AdminService DECOMMISSION, - DECOMMISSION_WITH_TIMEOUT, + GRACEFUL_DECOMMISSION, RECOMMISSION, // Source: AdminService, ResourceTrackerService diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 7a1ba74191..391b6ff854 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -144,101 +144,150 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeEventType, RMNodeEvent>(NodeState.NEW) - //Transitions from NEW state - .addTransition(NodeState.NEW, NodeState.RUNNING, - RMNodeEventType.STARTED, new AddNodeTransition()) - .addTransition(NodeState.NEW, NodeState.NEW, - RMNodeEventType.RESOURCE_UPDATE, - new UpdateNodeResourceWhenUnusableTransition()) + //Transitions from NEW state + .addTransition(NodeState.NEW, NodeState.RUNNING, + RMNodeEventType.STARTED, new AddNodeTransition()) + .addTransition(NodeState.NEW, NodeState.NEW, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenUnusableTransition()) - //Transitions from RUNNING state - .addTransition(NodeState.RUNNING, - EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), - RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) - .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, - RMNodeEventType.DECOMMISSION, - new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) - .addTransition(NodeState.RUNNING, NodeState.LOST, - RMNodeEventType.EXPIRE, - new DeactivateNodeTransition(NodeState.LOST)) - .addTransition(NodeState.RUNNING, NodeState.REBOOTED, - RMNodeEventType.REBOOTING, - new DeactivateNodeTransition(NodeState.REBOOTED)) - .addTransition(NodeState.RUNNING, NodeState.RUNNING, - RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) - .addTransition(NodeState.RUNNING, NodeState.RUNNING, - RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) - .addTransition(NodeState.RUNNING, NodeState.RUNNING, - RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, - new AddContainersToBeRemovedFromNMTransition()) - .addTransition(NodeState.RUNNING, NodeState.RUNNING, - RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) - .addTransition(NodeState.RUNNING, NodeState.RUNNING, - RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) - .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN, - RMNodeEventType.SHUTDOWN, - new DeactivateNodeTransition(NodeState.SHUTDOWN)) + //Transitions from RUNNING state + .addTransition(NodeState.RUNNING, + EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), + RMNodeEventType.STATUS_UPDATE, + new StatusUpdateWhenHealthyTransition()) + .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, + RMNodeEventType.DECOMMISSION, + new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) + .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONING, + RMNodeEventType.GRACEFUL_DECOMMISSION, + new DecommissioningNodeTransition(NodeState.RUNNING, + NodeState.DECOMMISSIONING)) + .addTransition(NodeState.RUNNING, NodeState.LOST, + RMNodeEventType.EXPIRE, + new DeactivateNodeTransition(NodeState.LOST)) + .addTransition(NodeState.RUNNING, NodeState.REBOOTED, + RMNodeEventType.REBOOTING, + new DeactivateNodeTransition(NodeState.REBOOTED)) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) + .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING), + RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) + .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN, + RMNodeEventType.SHUTDOWN, + new DeactivateNodeTransition(NodeState.SHUTDOWN)) - //Transitions from REBOOTED state - .addTransition(NodeState.REBOOTED, NodeState.REBOOTED, - RMNodeEventType.RESOURCE_UPDATE, - new UpdateNodeResourceWhenUnusableTransition()) - - //Transitions from DECOMMISSIONED state - .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, - RMNodeEventType.RESOURCE_UPDATE, - new UpdateNodeResourceWhenUnusableTransition()) - .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, - RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, - new AddContainersToBeRemovedFromNMTransition()) + //Transitions from REBOOTED state + .addTransition(NodeState.REBOOTED, NodeState.REBOOTED, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenUnusableTransition()) - //Transitions from LOST state - .addTransition(NodeState.LOST, NodeState.LOST, - RMNodeEventType.RESOURCE_UPDATE, - new UpdateNodeResourceWhenUnusableTransition()) - .addTransition(NodeState.LOST, NodeState.LOST, - RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, - new AddContainersToBeRemovedFromNMTransition()) + //Transitions from DECOMMISSIONED state + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenUnusableTransition()) + .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) - //Transitions from UNHEALTHY state - .addTransition(NodeState.UNHEALTHY, - EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING), - RMNodeEventType.STATUS_UPDATE, - new StatusUpdateWhenUnHealthyTransition()) - .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, - RMNodeEventType.DECOMMISSION, - new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) - .addTransition(NodeState.UNHEALTHY, NodeState.LOST, - RMNodeEventType.EXPIRE, - new DeactivateNodeTransition(NodeState.LOST)) - .addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED, - RMNodeEventType.REBOOTING, - new DeactivateNodeTransition(NodeState.REBOOTED)) - .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, - RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) - .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, - RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) - .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, - RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) - .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, - RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) - .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, - RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, - new AddContainersToBeRemovedFromNMTransition()) - .addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN, - RMNodeEventType.SHUTDOWN, - new DeactivateNodeTransition(NodeState.SHUTDOWN)) + //Transitions from DECOMMISSIONING state + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED, + RMNodeEventType.DECOMMISSION, + new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) + .addTransition(NodeState.DECOMMISSIONING, NodeState.RUNNING, + RMNodeEventType.RECOMMISSION, + new RecommissionNodeTransition(NodeState.RUNNING)) + .addTransition(NodeState.DECOMMISSIONING, + EnumSet.of(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED), + RMNodeEventType.STATUS_UPDATE, + new StatusUpdateWhenHealthyTransition()) + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.GRACEFUL_DECOMMISSION, + new DecommissioningNodeTransition(NodeState.DECOMMISSIONING, + NodeState.DECOMMISSIONING)) + .addTransition(NodeState.DECOMMISSIONING, NodeState.LOST, + RMNodeEventType.EXPIRE, + new DeactivateNodeTransition(NodeState.LOST)) + .addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED, + RMNodeEventType.REBOOTING, + new DeactivateNodeTransition(NodeState.REBOOTED)) - //Transitions from SHUTDOWN state - .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN, - RMNodeEventType.RESOURCE_UPDATE, - new UpdateNodeResourceWhenUnusableTransition()) - .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN, - RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, - new AddContainersToBeRemovedFromNMTransition()) + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) - // create the topology tables - .installTopology(); + // TODO (in YARN-3223) update resource when container finished. + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) + // TODO (in YARN-3223) update resource when container finished. + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) + .addTransition(NodeState.DECOMMISSIONING, EnumSet.of( + NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED), + RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenRunningTransition()) + + //Transitions from LOST state + .addTransition(NodeState.LOST, NodeState.LOST, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenUnusableTransition()) + .addTransition(NodeState.LOST, NodeState.LOST, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) + + //Transitions from UNHEALTHY state + .addTransition(NodeState.UNHEALTHY, + EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING), + RMNodeEventType.STATUS_UPDATE, + new StatusUpdateWhenUnHealthyTransition()) + .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, + RMNodeEventType.DECOMMISSION, + new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) + .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONING, + RMNodeEventType.GRACEFUL_DECOMMISSION, + new DecommissioningNodeTransition(NodeState.UNHEALTHY, + NodeState.DECOMMISSIONING)) + .addTransition(NodeState.UNHEALTHY, NodeState.LOST, + RMNodeEventType.EXPIRE, + new DeactivateNodeTransition(NodeState.LOST)) + .addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED, + RMNodeEventType.REBOOTING, + new DeactivateNodeTransition(NodeState.REBOOTED)) + .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY), + RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenUnusableTransition()) + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) + .addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN, + RMNodeEventType.SHUTDOWN, + new DeactivateNodeTransition(NodeState.SHUTDOWN)) + + //Transitions from SHUTDOWN state + .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenUnusableTransition()) + .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN, + RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, + new AddContainersToBeRemovedFromNMTransition()) + + // create the topology tables + .installTopology(); private final StateMachine stateMachine; @@ -265,7 +314,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.writeLock = lock.writeLock(); this.stateMachine = stateMachineFactory.make(this); - + this.nodeUpdateQueue = new ConcurrentLinkedQueue(); this.containerAllocationExpirer = context.getContainerAllocationExpirer(); @@ -291,6 +340,11 @@ public int getHttpPort() { return httpPort; } + // Test only + public void setHttpPort(int port) { + this.httpPort = port; + } + @Override public NodeId getNodeID() { return this.nodeId; @@ -497,23 +551,35 @@ private void updateMetricsForRejoinedNode(NodeState previousNodeState) { metrics.decrNumShutdownNMs(); break; default: - LOG.debug("Unexpected previous node state"); + LOG.debug("Unexpected previous node state"); } } + // Treats nodes in decommissioning as active nodes + // TODO we may want to differentiate active nodes and decommissioning node in + // metrics later. + private void updateMetricsForGracefulDecommissionOnUnhealthyNode() { + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + metrics.incrNumActiveNodes(); + metrics.decrNumUnhealthyNMs(); + } + private void updateMetricsForDeactivatedNode(NodeState initialState, NodeState finalState) { ClusterMetrics metrics = ClusterMetrics.getMetrics(); switch (initialState) { - case RUNNING: - metrics.decrNumActiveNodes(); - break; - case UNHEALTHY: - metrics.decrNumUnhealthyNMs(); - break; - default: - LOG.debug("Unexpected inital state"); + case RUNNING: + metrics.decrNumActiveNodes(); + break; + case DECOMMISSIONING: + metrics.decrNumActiveNodes(); + break; + case UNHEALTHY: + metrics.decrNumUnhealthyNMs(); + break; + default: + LOG.debug("Unexpected inital state"); } switch (finalState) { @@ -608,10 +674,10 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } public static class ReconnectNodeTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); @@ -622,6 +688,12 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // No application running on the node, so send node-removal event with // cleaning up old container info. if (noRunningApps) { + if (rmNode.getState() == NodeState.DECOMMISSIONING) { + // When node in decommissioning, and no running apps on this node, + // it will return as decommissioned state. + deactivateNode(rmNode, NodeState.DECOMMISSIONED); + return NodeState.DECOMMISSIONED; + } rmNode.nodeUpdateQueue.clear(); rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); @@ -652,6 +724,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.context.getDispatcher().getEventHandler().handle( new RMNodeStartedEvent(newNode.getNodeID(), null, null)); } + } else { rmNode.httpPort = newNode.getHttpPort(); rmNode.httpAddress = newNode.getHttpAddress(); @@ -678,17 +751,21 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption .newInstance(newNode.getTotalCapability(), -1))); } + } + return rmNode.getState(); } private void handleNMContainerStatus( List nmContainerStatuses, RMNodeImpl rmnode) { - List containerStatuses = - new ArrayList(); - for (NMContainerStatus nmContainerStatus : nmContainerStatuses) { - containerStatuses.add(createContainerStatus(nmContainerStatus)); + if (nmContainerStatuses != null) { + List containerStatuses = + new ArrayList(); + for (NMContainerStatus nmContainerStatus : nmContainerStatuses) { + containerStatuses.add(createContainerStatus(nmContainerStatus)); + } + rmnode.handleContainerStatus(containerStatuses); } - rmnode.handleContainerStatus(containerStatuses); } private ContainerStatus createContainerStatus( @@ -770,31 +847,94 @@ public DeactivateNodeTransition(NodeState finalState) { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - // Inform the scheduler - rmNode.nodeUpdateQueue.clear(); - // If the current state is NodeState.UNHEALTHY - // Then node is already been removed from the - // Scheduler - NodeState initialState = rmNode.getState(); - if (!initialState.equals(NodeState.UNHEALTHY)) { - rmNode.context.getDispatcher().getEventHandler() - .handle(new NodeRemovedSchedulerEvent(rmNode)); - } - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_UNUSABLE, rmNode)); - - // Deactivate the node - rmNode.context.getRMNodes().remove(rmNode.nodeId); - LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now " - + finalState); - rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode); - - //Update the metrics - rmNode.updateMetricsForDeactivatedNode(initialState, finalState); + RMNodeImpl.deactivateNode(rmNode, finalState); } } + /** + * Put a node in deactivated (decommissioned) status. + * @param rmNode + * @param finalState + */ + public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) { + + reportNodeUnusable(rmNode, finalState); + + // Deactivate the node + rmNode.context.getRMNodes().remove(rmNode.nodeId); + LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now " + + finalState); + rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode); + } + + /** + * Report node is UNUSABLE and update metrics. + * @param rmNode + * @param finalState + */ + public static void reportNodeUnusable(RMNodeImpl rmNode, + NodeState finalState) { + // Inform the scheduler + rmNode.nodeUpdateQueue.clear(); + // If the current state is NodeState.UNHEALTHY + // Then node is already been removed from the + // Scheduler + NodeState initialState = rmNode.getState(); + if (!initialState.equals(NodeState.UNHEALTHY)) { + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeRemovedSchedulerEvent(rmNode)); + } + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_UNUSABLE, rmNode)); + + //Update the metrics + rmNode.updateMetricsForDeactivatedNode(initialState, finalState); + } + + /** + * The transition to put node in decommissioning state. + */ + public static class DecommissioningNodeTransition + implements SingleArcTransition { + private final NodeState initState; + private final NodeState finalState; + + public DecommissioningNodeTransition(NodeState initState, + NodeState finalState) { + this.initState = initState; + this.finalState = finalState; + } + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING."); + if (initState.equals(NodeState.UNHEALTHY)) { + rmNode.updateMetricsForGracefulDecommissionOnUnhealthyNode(); + } + // TODO (in YARN-3223) Keep NM's available resource to be 0 + } + } + + public static class RecommissionNodeTransition + implements SingleArcTransition { + + private final NodeState finalState; + public RecommissionNodeTransition(NodeState finalState) { + this.finalState = finalState; + } + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " + + "recommissioned back to RUNNING."); + // TODO handle NM resource resume in YARN-3223. + } + } + + /** + * Status update transition when node is healthy. + */ public static class StatusUpdateWhenHealthyTransition implements MultipleArcTransition { @Override @@ -805,25 +945,44 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { // Switch the last heartbeatresponse. rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse(); - NodeHealthStatus remoteNodeHealthStatus = + NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); rmNode.setLastHealthReportTime( remoteNodeHealthStatus.getLastHealthReportTime()); + NodeState initialState = rmNode.getState(); + boolean isNodeDecommissioning = + initialState.equals(NodeState.DECOMMISSIONING); if (!remoteNodeHealthStatus.getIsNodeHealthy()) { - LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: " - + remoteNodeHealthStatus.getHealthReport()); - rmNode.nodeUpdateQueue.clear(); - // Inform the scheduler - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeRemovedSchedulerEvent(rmNode)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_UNUSABLE, rmNode)); - // Update metrics - rmNode.updateMetricsForDeactivatedNode(rmNode.getState(), - NodeState.UNHEALTHY); - return NodeState.UNHEALTHY; + LOG.info("Node " + rmNode.nodeId + + " reported UNHEALTHY with details: " + + remoteNodeHealthStatus.getHealthReport()); + // if a node in decommissioning receives an unhealthy report, + // it will keep decommissioning. + if (isNodeDecommissioning) { + return NodeState.DECOMMISSIONING; + } else { + reportNodeUnusable(rmNode, NodeState.UNHEALTHY); + return NodeState.UNHEALTHY; + } + } + if (isNodeDecommissioning) { + List runningApps = rmNode.getRunningApps(); + + List keepAliveApps = statusEvent.getKeepAliveAppIds(); + + // no running (and keeping alive) app on this node, get it + // decommissioned. + // TODO may need to check no container is being scheduled on this node + // as well. + if ((runningApps == null || runningApps.size() == 0) + && (keepAliveApps == null || keepAliveApps.size() == 0)) { + RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); + return NodeState.DECOMMISSIONED; + } + + // TODO (in YARN-3223) if node in decommissioning, get node resource + // updated if container get finished (keep available resource to be 0) } rmNode.handleContainerStatus(statusEvent.getContainers()); @@ -848,7 +1007,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { statusEvent.getKeepAliveAppIds()); } - return NodeState.RUNNING; + return initialState; } } @@ -857,11 +1016,12 @@ public static class StatusUpdateWhenUnHealthyTransition implements @Override public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { - RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; + RMNodeStatusEvent statusEvent = (RMNodeStatusEvent)event; // Switch the last heartbeatresponse. rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse(); - NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); + NodeHealthStatus remoteNodeHealthStatus = + statusEvent.getNodeHealthStatus(); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); rmNode.setLastHealthReportTime( remoteNodeHealthStatus.getLastHealthReportTime()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 61c6166af4..a6e15757cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -29,7 +29,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Random; +import org.apache.hadoop.net.Node; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -75,7 +77,7 @@ public class TestRMNodeTransitions { RMNodeImpl node; - + private RMContext rmContext; private YarnScheduler scheduler; @@ -168,6 +170,42 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent( return event; } + private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() { + NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); + + NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); + Boolean yes = new Boolean(true); + doReturn(yes).when(healthStatus).getIsNodeHealthy(); + + RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); + doReturn(healthStatus).when(event).getNodeHealthStatus(); + doReturn(response).when(event).getLatestResponse(); + doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); + doReturn(getAppIdList()).when(event).getKeepAliveAppIds(); + return event; + } + + private List getAppIdList() { + List appIdList = new ArrayList(); + appIdList.add(BuilderUtils.newApplicationId(0, 0)); + return appIdList; + } + + private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() { + NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); + + NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); + Boolean yes = new Boolean(true); + doReturn(yes).when(healthStatus).getIsNodeHealthy(); + + RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); + doReturn(healthStatus).when(event).getNodeHealthStatus(); + doReturn(response).when(event).getLatestResponse(); + doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); + doReturn(null).when(event).getKeepAliveAppIds(); + return event; + } + @Test (timeout = 5000) public void testExpiredContainer() { // Start the node @@ -195,7 +233,33 @@ public void testExpiredContainer() { */ verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class)); } - + + @Test + public void testStatusUpdateOnDecommissioningNode(){ + RMNodeImpl node = getDecommissioningNode(); + Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + // Verify node in DECOMMISSIONING won't be changed by status update + // with running apps + RMNodeStatusEvent statusEvent = getMockRMNodeStatusEventWithRunningApps(); + node.handle(statusEvent); + Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + + // Verify node in DECOMMISSIONING will be changed by status update + // without running apps + statusEvent = getMockRMNodeStatusEventWithoutRunningApps(); + node.handle(statusEvent); + Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); + } + + @Test + public void testRecommissionNode(){ + RMNodeImpl node = getDecommissioningNode(); + Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + node.handle(new RMNodeEvent(node.getNodeID(), + RMNodeEventType.RECOMMISSION)); + Assert.assertEquals(NodeState.RUNNING, node.getState()); + } + @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ //Start the node @@ -253,9 +317,9 @@ public void testContainerUpdate() throws InterruptedException{ Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0) .getContainerId()); Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1) - .getContainerId()); + .getContainerId()); } - + @Test (timeout = 5000) public void testStatusChange(){ //Start the node @@ -292,7 +356,7 @@ public void testStatusChange(){ node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE)); Assert.assertEquals(0, node.getQueueSize()); } - + @Test public void testRunningExpire() { RMNodeImpl node = getRunningNode(); @@ -375,7 +439,7 @@ public void testUnhealthyExpire() { initialRebooted, cm.getNumRebootedNMs()); Assert.assertEquals(NodeState.LOST, node.getState()); } - + @Test public void testUnhealthyExpireForSchedulerRemove() { RMNodeImpl node = getUnhealthyNode(); @@ -407,6 +471,28 @@ public void testRunningDecommission() { Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); } + @Test + public void testDecommissionOnDecommissioningNode() { + RMNodeImpl node = getDecommissioningNode(); + ClusterMetrics cm = ClusterMetrics.getMetrics(); + int initialActive = cm.getNumActiveNMs(); + int initialLost = cm.getNumLostNMs(); + int initialUnhealthy = cm.getUnhealthyNMs(); + int initialDecommissioned = cm.getNumDecommisionedNMs(); + int initialRebooted = cm.getNumRebootedNMs(); + node.handle(new RMNodeEvent(node.getNodeID(), + RMNodeEventType.DECOMMISSION)); + Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs()); + Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); + Assert.assertEquals("Unhealthy Nodes", + initialUnhealthy, cm.getUnhealthyNMs()); + Assert.assertEquals("Decommissioned Nodes", + initialDecommissioned + 1, cm.getNumDecommisionedNMs()); + Assert.assertEquals("Rebooted Nodes", + initialRebooted, cm.getNumRebootedNMs()); + Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); + } + @Test public void testUnhealthyDecommission() { RMNodeImpl node = getUnhealthyNode(); @@ -429,6 +515,30 @@ public void testUnhealthyDecommission() { Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); } + // Test Decommissioning on a unhealthy node will make it decommissioning. + @Test + public void testUnhealthyDecommissioning() { + RMNodeImpl node = getUnhealthyNode(); + ClusterMetrics cm = ClusterMetrics.getMetrics(); + int initialActive = cm.getNumActiveNMs(); + int initialLost = cm.getNumLostNMs(); + int initialUnhealthy = cm.getUnhealthyNMs(); + int initialDecommissioned = cm.getNumDecommisionedNMs(); + int initialRebooted = cm.getNumRebootedNMs(); + node.handle(new RMNodeEvent(node.getNodeID(), + RMNodeEventType.GRACEFUL_DECOMMISSION)); + Assert.assertEquals("Active Nodes", initialActive + 1, + cm.getNumActiveNMs()); + Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); + Assert.assertEquals("Unhealthy Nodes", + initialUnhealthy - 1, cm.getUnhealthyNMs()); + Assert.assertEquals("Decommissioned Nodes", initialDecommissioned, + cm.getNumDecommisionedNMs()); + Assert.assertEquals("Rebooted Nodes", + initialRebooted, cm.getNumRebootedNMs()); + Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + } + @Test public void testRunningRebooting() { RMNodeImpl node = getRunningNode(); @@ -567,6 +677,14 @@ private RMNodeImpl getRunningNode(String nmVersion, int port) { return node; } + private RMNodeImpl getDecommissioningNode() { + RMNodeImpl node = getRunningNode(); + node.handle(new RMNodeEvent(node.getNodeID(), + RMNodeEventType.GRACEFUL_DECOMMISSION)); + Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + return node; + } + private RMNodeImpl getUnhealthyNode() { RMNodeImpl node = getRunningNode(); NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", @@ -577,20 +695,19 @@ private RMNodeImpl getUnhealthyNode() { return node; } - private RMNodeImpl getNewNode() { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); return node; } - + private RMNodeImpl getNewNode(Resource capability) { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, capability, null); return node; } - + private RMNodeImpl getRebootedNode() { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); Resource capability = Resource.newInstance(4096, 4); @@ -650,7 +767,39 @@ public void testReconnect() { Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, nodesListManagerEvent.getType()); } - + + @Test + public void testReconnectOnDecommissioningNode() { + RMNodeImpl node = getDecommissioningNode(); + + // Reconnect event with running app + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, + getAppIdList(), null)); + // still decommissioning + Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + + // Reconnect event without any running app + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null)); + Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); + } + + @Test + public void testReconnectWithNewPortOnDecommissioningNode() { + RMNodeImpl node = getDecommissioningNode(); + Random r= new Random(); + node.setHttpPort(r.nextInt(10000)); + // Reconnect event with running app + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, + getAppIdList(), null)); + // still decommissioning + Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + + node.setHttpPort(r.nextInt(10000)); + // Reconnect event without any running app + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null)); + Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState()); + } + @Test public void testResourceUpdateOnRunningNode() { RMNodeImpl node = getRunningNode(); @@ -658,18 +807,23 @@ public void testResourceUpdateOnRunningNode() { assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096); assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4); node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), - ResourceOption.newInstance(Resource.newInstance(2048, 2), + ResourceOption.newInstance(Resource.newInstance(2048, 2), ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); Resource newCapacity = node.getTotalCapability(); assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048); assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2); - + Assert.assertEquals(NodeState.RUNNING, node.getState()); Assert.assertNotNull(nodesListManagerEvent); Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, nodesListManagerEvent.getType()); } - + + @Test + public void testDecommissioningOnRunningNode(){ + getDecommissioningNode(); + } + @Test public void testResourceUpdateOnNewNode() { RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4)); @@ -682,10 +836,10 @@ public void testResourceUpdateOnNewNode() { Resource newCapacity = node.getTotalCapability(); assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048); assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2); - + Assert.assertEquals(NodeState.NEW, node.getState()); } - + @Test public void testResourceUpdateOnRebootedNode() { RMNodeImpl node = getRebootedNode(); @@ -702,6 +856,18 @@ public void testResourceUpdateOnRebootedNode() { Assert.assertEquals(NodeState.REBOOTED, node.getState()); } + // Test unhealthy report on a decommissioning node will make it + // keep decommissioning. + @Test + public void testDecommissioningUnhealthy() { + RMNodeImpl node = getDecommissioningNode(); + NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", + System.currentTimeMillis()); + node.handle(new RMNodeStatusEvent(node.getNodeID(), status, + new ArrayList(), null, null)); + Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + } + @Test public void testReconnnectUpdate() { final String nmVersion1 = "nm version 1"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java index b70fdc100a..458b2401f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java @@ -43,8 +43,7 @@ public class TestNodesPage { final int numberOfNodesPerRack = 8; // The following is because of the way TestRMWebApp.mockRMContext creates // nodes. - final int numberOfLostNodesPerRack = numberOfNodesPerRack - / NodeState.values().length; + final int numberOfLostNodesPerRack = 1; // Number of Actual Table Headers for NodesPage.NodesBlock might change in // future. In that case this value should be adjusted to the new value.