diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1e1392ac37..54b8481206 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -111,6 +111,9 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-3866. Fixed the bin/yarn script to not print the command line unnecessarily. (vinodkv) + MAPREDUCE-3730. Modified RM to allow restarted NMs to be able to join the + cluster without waiting for expiry. (Jason Lowe via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 75c91aa83f..d762766efc 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; @@ -177,17 +178,17 @@ public RegisterNodeManagerResponse registerNodeManager( RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, resolve(host), capability); - if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) { - LOG.info("Duplicate registration from the node at: " + host - + ", Sending SHUTDOWN Signal to the NodeManager"); - regResponse.setNodeAction(NodeAction.SHUTDOWN); - response.setRegistrationResponse(regResponse); - return response; + RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); + if (oldNode == null) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.STARTED)); + } else { + LOG.info("Reconnect from the node at: " + host); + this.nmLivelinessMonitor.unregister(nodeId); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeReconnectEvent(nodeId, rmNode)); } - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.STARTED)); - this.nmLivelinessMonitor.register(nodeId); LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index d562836101..ef644be700 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -28,6 +28,7 @@ public enum RMNodeEventType { // ResourceTrackerService STATUS_UPDATE, REBOOTING, + RECONNECTED, // Source: Application CLEANUP_APP, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 9b8892a6dc..f0384da703 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -110,9 +110,11 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeEventType, RMNodeEvent>(RMNodeState.NEW) - //Transitions from RUNNING state + //Transitions from NEW state .addTransition(RMNodeState.NEW, RMNodeState.RUNNING, RMNodeEventType.STARTED, new AddNodeTransition()) + + //Transitions from RUNNING state .addTransition(RMNodeState.RUNNING, EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) @@ -129,11 +131,15 @@ RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) + .addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING, + RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) //Transitions from UNHEALTHY state .addTransition(RMNodeState.UNHEALTHY, EnumSet.of(RMNodeState.UNHEALTHY, RMNodeState.RUNNING), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition()) + .addTransition(RMNodeState.UNHEALTHY, RMNodeState.UNHEALTHY, + RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) // create the topology tables .installTopology(); @@ -372,6 +378,39 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + public static class ReconnectNodeTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + // Kill containers since node is rejoining. + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeRemovedSchedulerEvent(rmNode)); + + RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode(); + if (rmNode.getTotalCapability().equals(newNode.getTotalCapability()) + && rmNode.getHttpPort() == newNode.getHttpPort()) { + // Reset heartbeat ID since node just restarted. + rmNode.getLastHeartBeatResponse().setResponseId(0); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeAddedSchedulerEvent(rmNode)); + } else { + // Reconnected node differs, so replace old node and start new node + switch (rmNode.getState()) { + case RUNNING: + ClusterMetrics.getMetrics().decrNumActiveNodes(); + break; + case UNHEALTHY: + ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); + break; + } + rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); + rmNode.context.getDispatcher().getEventHandler().handle( + new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED)); + } + } + } + public static class CleanUpAppTransition implements SingleArcTransition { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java new file mode 100644 index 0000000000..b1fa0ad8c0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java @@ -0,0 +1,34 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.NodeId; + +public class RMNodeReconnectEvent extends RMNodeEvent { + private RMNode reconnectedNode; + + public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) { + super(nodeId, RMNodeEventType.RECONNECTED); + reconnectedNode = newNode; + } + + public RMNode getReconnectedNode() { + return reconnectedNode; + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index aa7d23ed91..0f90f6c4e2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -666,7 +666,10 @@ private synchronized void addNode(RMNode nodeManager) { private synchronized void removeNode(RMNode nodeInfo) { SchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); - Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability()); + if (node == null) { + return; + } + Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); root.updateClusterResource(clusterResource); --numNodeManagers; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 6fcf1fedd9..152668318d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -731,6 +731,9 @@ private synchronized void containerCompleted(RMContainer rmContainer, private synchronized void removeNode(RMNode nodeInfo) { SchedulerNode node = getNode(nodeInfo.getNodeID()); + if (node == null) { + return; + } // Kill running containers for(RMContainer container : node.getRunningContainers()) { containerCompleted(container, @@ -744,7 +747,7 @@ private synchronized void removeNode(RMNode nodeInfo) { this.nodes.remove(nodeInfo.getNodeID()); // Update cluster metrics - Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability()); + Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 05b17a367f..f30883f1ba 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -19,23 +19,18 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.List; -import java.util.Map; import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse; import com.google.common.collect.Lists; @@ -195,8 +190,12 @@ public HeartbeatResponse getLastHeartBeatResponse() { }; private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) { + return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++); + } + + private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr, int hostnum) { final String rackName = "rack"+ rack; - final int nid = NODE_ID++; + final int nid = hostnum; final String hostName = "host"+ nid; final int port = 123; final NodeId nodeID = newNodeID(hostName, port); @@ -219,4 +218,8 @@ public static RMNode nodeInfo(int rack, final Resource perNode, public static RMNode newNodeInfo(int rack, final Resource perNode) { return buildRMNode(rack, perNode, RMNodeState.RUNNING, "localhost:0"); } + + public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) { + return buildRMNode(rack, perNode, null, "localhost:0", hostnum); + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 2cca6f09ad..e995e5153a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.util.ArrayList; import java.util.List; import junit.framework.Assert; @@ -27,10 +28,17 @@ import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -167,10 +175,37 @@ public void testNonDefaultMinimumAllocation() throws Exception { testMinimumAllocation(conf); } + @Test + public void testReconnectedNode() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setQueues("default", new String[] {"default"}); + conf.setCapacity("default", 100); + FifoScheduler fs = new FifoScheduler(); + fs.reinitialize(conf, null, null); + + RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1); + RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2); + + fs.handle(new NodeAddedSchedulerEvent(n1)); + fs.handle(new NodeAddedSchedulerEvent(n2)); + List emptyList = new ArrayList(); + fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList)); + Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB()); + + // reconnect n1 with downgraded memory + n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1); + fs.handle(new NodeRemovedSchedulerEvent(n1)); + fs.handle(new NodeAddedSchedulerEvent(n1)); + fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList)); + + Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB()); + } + public static void main(String[] args) throws Exception { TestFifoScheduler t = new TestFifoScheduler(); t.test(); t.testDefaultMinimumAllocation(); t.testNonDefaultMinimumAllocation(); + t.testReconnectedNode(); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 8b3f4a08e9..7826819a11 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -31,12 +31,17 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.util.Records; import org.junit.After; import org.junit.Test; @@ -189,7 +194,7 @@ public void testUnhealthyNodeStatus() throws Exception { conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile .getAbsolutePath()); - MockRM rm = new MockRM(conf); + rm = new MockRM(conf); rm.start(); MockNM nm1 = rm.registerNode("host1:1234", 5120); @@ -223,6 +228,61 @@ private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health, ClusterMetrics.getMetrics().getUnhealthyNMs()); } + @Test + public void testReconnectNode() throws Exception { + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm = new MockRM() { + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler) { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 5120); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(false); + checkUnealthyNMCount(rm, nm2, true, 1); + final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs(); + QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); + Assert.assertEquals(5120 + 5120, metrics.getAvailableMB()); + + // reconnect of healthy node + nm1 = rm.registerNode("host1:1234", 5120); + HeartbeatResponse response = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); + dispatcher.await(); + Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); + checkUnealthyNMCount(rm, nm2, true, 1); + + // reconnect of unhealthy node + nm2 = rm.registerNode("host2:5678", 5120); + response = nm2.nodeHeartbeat(false); + Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); + dispatcher.await(); + Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); + checkUnealthyNMCount(rm, nm2, true, 1); + + // reconnect of node with changed capability + nm1 = rm.registerNode("host2:5678", 10240); + dispatcher.await(); + response = nm2.nodeHeartbeat(true); + dispatcher.await(); + Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); + Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); + } + private void writeToHostsFile(String... hosts) throws IOException { if (!hostFile.exists()) { TEMP_DIR.mkdirs(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index bcfd09d3c8..dbe21d1c6c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; @@ -41,12 +42,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.junit.After; import org.junit.Before; import org.junit.Test; public class TestCapacityScheduler { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); + private final int GB = 1024; private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; @@ -97,8 +101,6 @@ public void testCapacityScheduler() throws Exception { LOG.info("--- START: testCapacityScheduler ---"); - final int GB = 1024; - // Register node1 String host_0 = "host_0"; org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = @@ -340,4 +342,27 @@ public void testParseQueue() throws IOException { cs.reinitialize(conf, null, null); } + @Test + public void testReconnectedNode() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + CapacityScheduler cs = new CapacityScheduler(); + cs.reinitialize(csConf, null, null); + + RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1); + RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2); + + cs.handle(new NodeAddedSchedulerEvent(n1)); + cs.handle(new NodeAddedSchedulerEvent(n2)); + + Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory()); + + // reconnect n1 with downgraded memory + n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1); + cs.handle(new NodeRemovedSchedulerEvent(n1)); + cs.handle(new NodeAddedSchedulerEvent(n1)); + + Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory()); + } }