From b2f1ec312ee431aef762cfb49cb29cd6f4661e86 Mon Sep 17 00:00:00 2001 From: Jian He Date: Tue, 3 Mar 2015 16:25:57 -0800 Subject: [PATCH] YARN-3222. Fixed NPE on RMNodeImpl#ReconnectNodeTransition when a node is reconnected with a different port. Contributed by Rohith Sharmaks --- .../resourcemanager/rmnode/RMNodeImpl.java | 34 ++++++++++--------- .../yarn/server/resourcemanager/MockNM.java | 6 +++- .../TestResourceTrackerService.java | 17 +++++++++- 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 9701775994..c556b80469 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -571,12 +571,12 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.nodeUpdateQueue.clear(); rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); - + if (rmNode.getHttpPort() == newNode.getHttpPort()) { // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - if (rmNode.getState() != NodeState.UNHEALTHY) { - // Only add new node if old state is not UNHEALTHY + if (rmNode.getState().equals(NodeState.RUNNING)) { + // Only add new node if old state is RUNNING rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(newNode)); } @@ -599,30 +599,32 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } else { rmNode.httpPort = newNode.getHttpPort(); rmNode.httpAddress = newNode.getHttpAddress(); - rmNode.totalCapability = newNode.getTotalCapability(); + boolean isCapabilityChanged = false; + if (rmNode.getTotalCapability() != newNode.getTotalCapability()) { + rmNode.totalCapability = newNode.getTotalCapability(); + isCapabilityChanged = true; + } handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - } - if (null != reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } - } - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); - if (rmNode.getState().equals(NodeState.RUNNING)) { - // Update scheduler node's capacity for reconnect node. - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeResourceUpdateSchedulerEvent(rmNode, - ResourceOption.newInstance(newNode.getTotalCapability(), -1))); + if (isCapabilityChanged + && rmNode.getState().equals(NodeState.RUNNING)) { + // Update scheduler node's capacity for reconnect node. + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption + .newInstance(newNode.getTotalCapability(), -1))); + } } - } private void handleNMContainerStatus( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 5f53805c21..c917f7976b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -51,7 +51,7 @@ public class MockNM { private final int memory; private final int vCores; private ResourceTrackerService resourceTracker; - private final int httpPort = 2; + private int httpPort = 2; private MasterKey currentContainerTokenMasterKey; private MasterKey currentNMTokenMasterKey; private String version; @@ -87,6 +87,10 @@ public int getHttpPort() { return httpPort; } + public void setHttpPort(int port) { + httpPort = port; + } + public void setResourceTrackerService(ResourceTrackerService resourceTracker) { this.resourceTracker = resourceTracker; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 7c128481c7..a904dc0af4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -623,7 +624,7 @@ protected Dispatcher createDispatcher() { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); - + // reconnect of node with changed capability and running applications List runningApps = new ArrayList(); runningApps.add(ApplicationId.newInstance(1, 0)); @@ -633,6 +634,20 @@ protected Dispatcher createDispatcher() { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); + + // reconnect healthy node changing http port + nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); + nm1.setHttpPort(3); + nm1.registerNode(); + dispatcher.await(); + response = nm1.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(true); + dispatcher.await(); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + Assert.assertEquals(3, rmNode.getHttpPort()); + Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory()); + Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); + } private void writeToHostsFile(String... hosts) throws IOException {