YARN-4939. The decommissioning Node should keep alive during NM restart. Contributed by sandflee.

This commit is contained in:
Junping Du 2016-07-08 04:14:53 -07:00
parent 673e5e02fe
commit 30ee57ceb1
3 changed files with 37 additions and 10 deletions

View File

@ -74,6 +74,7 @@ public static List<RMNode> queryRMNodes(RMContext context,
ArrayList<RMNode> results = new ArrayList<RMNode>();
if (acceptedStates.contains(NodeState.NEW) ||
acceptedStates.contains(NodeState.RUNNING) ||
acceptedStates.contains(NodeState.DECOMMISSIONING) ||
acceptedStates.contains(NodeState.UNHEALTHY)) {
for (RMNode rmNode : context.getRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) {

View File

@ -336,7 +336,8 @@ public RegisterNodeManagerResponse registerNodeManager(
}
// Check if this node is a 'valid' node
if (!this.nodesListManager.isValidNode(host)) {
if (!this.nodesListManager.isValidNode(host) &&
!isNodeInDecommissioning(nodeId)) {
String message =
"Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager.";

View File

@ -20,41 +20,40 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -195,6 +194,32 @@ public void testCompareRMNodeAfterReconnect() throws Exception {
scheduler.stop();
}
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testDecommissioningNodeReconnect()
throws Exception {
MockRM rm = new MockRM();
rm.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm1.registerNode();
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
rm.getRMContext().getNodesListManager().getHostsReader().
getExcludedHosts().add("127.0.0.1");
rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMNodeEvent(nm1.getNodeId(),
RMNodeEventType.GRACEFUL_DECOMMISSION));
rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
MockNM nm2 =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
RegisterNodeManagerResponse response = nm2.registerNode();
// not SHUTDOWN
Assert.assertTrue(response.getNodeAction().equals(NodeAction.NORMAL));
rm.stop();
}
@Test(timeout = 10000)
public void testRMNodeStatusAfterReconnect() throws Exception {
// The node(127.0.0.1:1234) reconnected with RM. When it registered with