diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c075aacbee..fc901066a3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1847,6 +1847,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3296. Fixed the remaining nine FindBugs warnings. (vinodkv) + MAPREDUCE-2775. Fixed ResourceManager and NodeManager to force a + decommissioned node to shutdown. (Devaraj K via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java index 8ca390bb1f..50e45d49f8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java @@ -24,7 +24,7 @@ public interface HeartbeatResponse { int getResponseId(); - boolean getReboot(); + NodeAction getNodeAction(); List getContainersToCleanupList(); ContainerId getContainerToCleanup(int index); @@ -35,7 +35,7 @@ public interface HeartbeatResponse { int getApplicationsToCleanupCount(); void setResponseId(int responseId); - void setReboot(boolean reboot); + void setNodeAction(NodeAction action); void addAllContainersToCleanup(List containers); void addContainerToCleanup(ContainerId container); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java new file mode 100644 index 0000000000..4d8246e712 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +/** + * The NodeManager is instructed to perform the given action. + * + */ + +public enum NodeAction { + NORMAL, REBOOT, SHUTDOWN +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java index a237d90cfc..f4bb31a2c6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java @@ -23,4 +23,8 @@ public interface RegistrationResponse { public abstract ByteBuffer getSecretKey(); public abstract void setSecretKey(ByteBuffer secretKey); + + public abstract NodeAction getNodeAction(); + + public abstract void setNodeAction(NodeAction nodeAction); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java index 1b8c5989f6..7cf7ac8bcb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/HeartbeatResponsePBImpl.java @@ -32,11 +32,12 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; - - -public class HeartbeatResponsePBImpl extends ProtoBase implements HeartbeatResponse { +public class HeartbeatResponsePBImpl extends + ProtoBase implements HeartbeatResponse { HeartbeatResponseProto proto = HeartbeatResponseProto.getDefaultInstance(); HeartbeatResponseProto.Builder builder = null; boolean viaProto = false; @@ -100,16 +101,24 @@ public void setResponseId(int responseId) { builder.setResponseId((responseId)); } @Override - public boolean getReboot() { + public NodeAction getNodeAction() { HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - return (p.getReboot()); + if(!p.hasNodeAction()) { + return null; + } + return (convertFromProtoFormat(p.getNodeAction())); } @Override - public void setReboot(boolean reboot) { + public void setNodeAction(NodeAction nodeAction) { maybeInitBuilder(); - builder.setReboot((reboot)); + if (nodeAction == null) { + builder.clearNodeAction(); + return; + } + builder.setNodeAction(convertToProtoFormat(nodeAction)); } + @Override public List getContainersToCleanupList() { initContainersToCleanup(); @@ -296,7 +305,12 @@ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { private ApplicationIdProto convertToProtoFormat(ApplicationId t) { return ((ApplicationIdPBImpl)t).getProto(); } - - - + + private NodeAction convertFromProtoFormat(NodeActionProto p) { + return NodeAction.valueOf(p.name()); + } + + private NodeActionProto convertToProtoFormat(NodeAction t) { + return NodeActionProto.valueOf(t.name()); + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java index 9a9a063218..21d708b14b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/RegistrationResponsePBImpl.java @@ -21,17 +21,15 @@ import java.nio.ByteBuffer; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; - - -public class RegistrationResponsePBImpl extends ProtoBase implements RegistrationResponse { +public class RegistrationResponsePBImpl extends + ProtoBase implements RegistrationResponse { RegistrationResponseProto proto = RegistrationResponseProto.getDefaultInstance(); RegistrationResponseProto.Builder builder = null; boolean viaProto = false; @@ -98,4 +96,31 @@ public void setSecretKey(ByteBuffer secretKey) { this.secretKey = secretKey; } + @Override + public NodeAction getNodeAction() { + RegistrationResponseProtoOrBuilder p = viaProto ? proto : builder; + if(!p.hasNodeAction()) { + return null; + } + return convertFromProtoFormat(p.getNodeAction()); + } + + @Override + public void setNodeAction(NodeAction nodeAction) { + maybeInitBuilder(); + if (nodeAction == null) { + builder.clearNodeAction(); + return; + } + builder.setNodeAction(convertToProtoFormat(nodeAction)); + } + + private NodeAction convertFromProtoFormat(NodeActionProto p) { + return NodeAction.valueOf(p.name()); + } + + private NodeActionProto convertToProtoFormat(NodeAction t) { + return NodeActionProto.valueOf(t.name()); + } + } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 5198c5743f..b2e995f45a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -23,6 +23,12 @@ option java_generate_equals_and_hash = true; import "yarn_protos.proto"; +enum NodeActionProto { + NORMAL = 0; + REBOOT = 1; + SHUTDOWN = 2; +} + message NodeStatusProto { optional NodeIdProto node_id = 1; optional int32 response_id = 2; @@ -32,11 +38,12 @@ message NodeStatusProto { message RegistrationResponseProto { optional bytes secret_key = 1; + optional NodeActionProto nodeAction = 2; } message HeartbeatResponseProto { optional int32 response_id = 1; - optional bool reboot = 2; + optional NodeActionProto nodeAction = 2; repeated ContainerIdProto containers_to_cleanup = 3; repeated ApplicationIdProto applications_to_cleanup = 4; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 399625ab5b..319d5a04c8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -50,9 +50,11 @@ import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; +import org.apache.hadoop.yarn.service.ServiceStateChangeListener; import org.apache.hadoop.yarn.util.Records; -public class NodeManager extends CompositeService { +public class NodeManager extends CompositeService implements + ServiceStateChangeListener { private static final Log LOG = LogFactory.getLog(NodeManager.class); protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); protected ContainerTokenSecretManager containerTokenSecretManager; @@ -123,6 +125,8 @@ public void init(Configuration conf) { NodeStatusUpdater nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, healthChecker, this.containerTokenSecretManager); + + nodeStatusUpdater.register(this); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); @@ -206,6 +210,16 @@ public NodeHealthStatus getNodeHealthStatus() { } } + + @Override + public void stateChanged(Service service) { + // Shutdown the Nodemanager when the NodeStatusUpdater is stopped. + if (NodeStatusUpdaterImpl.class.getName().equals(service.getName()) + && STATE.STOPPED.equals(service.getServiceState())) { + stop(); + } + } + public static void main(String[] args) { StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); try { @@ -220,5 +234,4 @@ public static void main(String[] args) { System.exit(-1); } } - } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 07902f606e..94396088ca 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -30,8 +30,8 @@ import org.apache.hadoop.NodeHealthCheckerService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -45,11 +45,11 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -160,6 +160,12 @@ private void registerWithRM() throws YarnRemoteException { request.setNodeId(this.nodeId); RegistrationResponse regResponse = this.resourceTracker.registerNodeManager(request).getRegistrationResponse(); + // if the Resourcemanager instructs NM to shutdown. + if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) { + throw new YarnException( + "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed"); + } + if (UserGroupInformation.isSecurityEnabled()) { this.secretKeyBytes = regResponse.getSecretKey().array(); } @@ -248,10 +254,25 @@ public void run() { NodeStatus nodeStatus = getNodeStatus(); nodeStatus.setResponseId(lastHeartBeatID); - NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class); + NodeHeartbeatRequest request = recordFactory + .newRecordInstance(NodeHeartbeatRequest.class); request.setNodeStatus(nodeStatus); HeartbeatResponse response = resourceTracker.nodeHeartbeat(request).getHeartbeatResponse(); + if (response.getNodeAction() == NodeAction.SHUTDOWN) { + LOG + .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," + + " hence shutting down."); + NodeStatusUpdaterImpl.this.stop(); + break; + } + if (response.getNodeAction() == NodeAction.REBOOT) { + LOG.info("Node is out of sync with ResourceManager," + + " hence shutting down."); + NodeStatusUpdaterImpl.this.stop(); + break; + } + lastHeartBeatID = response.getResponseId(); List containersToCleanup = response .getContainersToCleanupList(); @@ -269,7 +290,6 @@ public void run() { // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); - break; } } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index dffe7d68d1..a0a5c55795 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; @@ -85,10 +86,15 @@ public class TestNodeStatusUpdater { volatile Error nmStartError = null; private final List registeredNodes = new ArrayList(); private final Configuration conf = new YarnConfiguration(); + private NodeManager nm; @After public void tearDown() { this.registeredNodes.clear(); + heartBeatID = 0; + if (nm != null) { + nm.stop(); + } DefaultMetricsSystem.shutdown(); } @@ -220,6 +226,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { + public ResourceTracker resourceTracker = new MyResourceTracker(this.context); private Context context; public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, @@ -232,10 +239,44 @@ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, @Override protected ResourceTracker getRMClient() { - return new MyResourceTracker(this.context); + return resourceTracker; } } + + // + private class MyResourceTracker2 implements ResourceTracker { + public NodeAction heartBeatNodeAction = NodeAction.NORMAL; + public NodeAction registerNodeAction = NodeAction.NORMAL; + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnRemoteException { + + RegisterNodeManagerResponse response = recordFactory + .newRecordInstance(RegisterNodeManagerResponse.class); + RegistrationResponse regResponse = recordFactory + .newRecordInstance(RegistrationResponse.class); + regResponse.setNodeAction(registerNodeAction ); + response.setRegistrationResponse(regResponse); + return response; + } + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnRemoteException { + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID++); + HeartbeatResponse response = recordFactory + .newRecordInstance(HeartbeatResponse.class); + response.setResponseId(heartBeatID); + response.setNodeAction(heartBeatNodeAction); + + NodeHeartbeatResponse nhResponse = recordFactory + .newRecordInstance(NodeHeartbeatResponse.class); + nhResponse.setHeartbeatResponse(response); + return nhResponse; + } + } + @Before public void clearError() { nmStartError = null; @@ -249,7 +290,7 @@ public void deleteBaseDir() throws IOException { @Test public void testNMRegistration() throws InterruptedException { - final NodeManager nm = new NodeManager() { + nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, @@ -295,14 +336,85 @@ public void run() { Assert.fail("NodeManager failed to start"); } - while (heartBeatID <= 3) { + waitCount = 0; + while (heartBeatID <= 3 && waitCount++ != 20) { Thread.sleep(500); } + Assert.assertFalse(heartBeatID <= 3); Assert.assertEquals("Number of registered NMs is wrong!!", 1, this.registeredNodes.size()); nm.stop(); } + + @Test + public void testNodeDecommision() throws Exception { + nm = getNodeManager(NodeAction.SHUTDOWN); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + Assert.assertEquals(STATE.INITED, nm.getServiceState()); + nm.start(); + + int waitCount = 0; + while (heartBeatID < 1 && waitCount++ != 20) { + Thread.sleep(500); + } + Assert.assertFalse(heartBeatID < 1); + + // NM takes a while to reach the STOPPED state. + waitCount = 0; + while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { + LOG.info("Waiting for NM to stop.."); + Thread.sleep(1000); + } + + Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); + } + + @Test + public void testNodeReboot() throws Exception { + nm = getNodeManager(NodeAction.REBOOT); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + Assert.assertEquals(STATE.INITED, nm.getServiceState()); + nm.start(); + + int waitCount = 0; + while (heartBeatID < 1 && waitCount++ != 20) { + Thread.sleep(500); + } + Assert.assertFalse(heartBeatID < 1); + + // NM takes a while to reach the STOPPED state. + waitCount = 0; + while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { + LOG.info("Waiting for NM to stop.."); + Thread.sleep(1000); + } + + Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); + } + + @Test + public void testNMShutdownForRegistrationFailure() { + + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + ContainerTokenSecretManager containerTokenSecretManager) { + MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater( + context, dispatcher, healthChecker, metrics, + containerTokenSecretManager); + MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); + myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN; + nodeStatusUpdater.resourceTracker = myResourceTracker2; + return nodeStatusUpdater; + } + }; + verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: " + + "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed"); + } /** * Verifies that if for some reason NM fails to start ContainerManager RPC @@ -314,7 +426,7 @@ public void run() { @Test public void testNoRegistrationWhenNMServicesFail() { - final NodeManager nm = new NodeManager() { + nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, @@ -341,16 +453,22 @@ public void start() { } }; + verifyNodeStartFailure("Starting of RPC Server failed"); + } + + private void verifyNodeStartFailure(String errMessage) { YarnConfiguration conf = createNMConfig(); nm.init(conf); try { nm.start(); Assert.fail("NM should have failed to start. Didn't get exception!!"); } catch (Exception e) { - Assert.assertEquals("Starting of RPC Server failed", e.getCause() + Assert.assertEquals(errMessage, e.getCause() .getMessage()); } - + + // the state change to stopped occurs only if the startup is success, else + // state change doesn't occur Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm .getServiceState()); @@ -371,4 +489,21 @@ private YarnConfiguration createNMConfig() { .toUri().getPath()); return conf; } + + private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { + return new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + ContainerTokenSecretManager containerTokenSecretManager) { + MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( + context, dispatcher, healthChecker, metrics, + containerTokenSecretManager); + MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); + myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction; + myNodeStatusUpdater.resourceTracker = myResourceTracker2; + return myNodeStatusUpdater; + } + }; + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java new file mode 100644 index 0000000000..90820c3998 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; + +@InterfaceAudience.Private +@Metrics(context="yarn") +public class ClusterMetrics { + + @Metric("# of NMs") MutableGaugeInt numNMs; + @Metric("# of decommissioned NMs") MutableCounterInt numDecommissionedNMs; + @Metric("# of lost NMs") MutableCounterInt numLostNMs; + @Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs; + @Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs; + + private static final MetricsInfo RECORD_INFO = info("ClusterMetrics", + "Metrics for the Yarn Cluster"); + + private static volatile ClusterMetrics INSTANCE = null; + private static MetricsRegistry registry; + + public static ClusterMetrics getMetrics() { + if(INSTANCE == null){ + synchronized (ClusterMetrics.class) { + if(INSTANCE == null){ + INSTANCE = new ClusterMetrics(); + registerMetrics(); + } + } + } + return INSTANCE; + } + + private static void registerMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "ResourceManager"); + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register("ClusterMetrics", "Metrics for the Yarn Cluster", INSTANCE); + } + } + + //Total Nodemanagers + public int getNumNMs() { + return numNMs.value(); + } + + //Decommisioned NMs + public int getNumDecommisionedNMs() { + return numDecommissionedNMs.value(); + } + + public void incrDecommisionedNMs() { + numDecommissionedNMs.incr(); + } + + //Lost NMs + public int getNumLostNMs() { + return numLostNMs.value(); + } + + public void incrNumLostNMs() { + numLostNMs.incr(); + } + + //Unhealthy NMs + public int getUnhealthyNMs() { + return numUnhealthyNMs.value(); + } + + public void incrNumUnhealthyNMs() { + numUnhealthyNMs.incr(); + } + + public void decrNumUnhealthyNMs() { + numUnhealthyNMs.decr(); + } + + //Rebooted NMs + public int getNumRebootedNMs() { + return numRebootedNMs.value(); + } + + public void incrNumRebootedNMs() { + numRebootedNMs.incr(); + } + + public void removeNode(RMNodeEventType nodeEventType) { + numNMs.decr(); + switch(nodeEventType){ + case DECOMMISSION: incrDecommisionedNMs(); break; + case EXPIRE: incrNumLostNMs();break; + case REBOOTING: incrNumRebootedNMs();break; + } + } + + public void addNode() { + numNMs.incr(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 29997fdab8..a01a0bf42e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; -import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -45,6 +43,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; @@ -76,11 +75,19 @@ public class ResourceTrackerService extends AbstractService implements private static final NodeHeartbeatResponse reboot = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); + private static final NodeHeartbeatResponse shutDown = recordFactory + .newRecordInstance(NodeHeartbeatResponse.class); + static { HeartbeatResponse rebootResp = recordFactory .newRecordInstance(HeartbeatResponse.class); - rebootResp.setReboot(true); + rebootResp.setNodeAction(NodeAction.REBOOT); reboot.setHeartbeatResponse(rebootResp); + + HeartbeatResponse decommissionedResp = recordFactory + .newRecordInstance(HeartbeatResponse.class); + decommissionedResp.setNodeAction(NodeAction.SHUTDOWN); + shutDown.setHeartbeatResponse(decommissionedResp); } public ResourceTrackerService(RMContext rmContext, @@ -139,6 +146,7 @@ public synchronized void stop() { super.stop(); } + @SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnRemoteException { @@ -149,121 +157,125 @@ public RegisterNodeManagerResponse registerNodeManager( int httpPort = request.getHttpPort(); Resource capability = request.getResource(); - try { - // Check if this node is a 'valid' node - if (!this.nodesListManager.isValidNode(host)) { - LOG.info("Disallowed NodeManager from " + host); - throw new IOException("Disallowed NodeManager from " + host); - } + RegisterNodeManagerResponse response = recordFactory + .newRecordInstance(RegisterNodeManagerResponse.class); + RegistrationResponse regResponse = recordFactory + .newRecordInstance(RegistrationResponse.class); + SecretKey secretKey = this.containerTokenSecretManager + .createAndGetSecretKey(nodeId.toString()); + regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded())); - RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, - httpPort, resolve(host), capability); - - if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) { - throw new IOException("Duplicate registration from the node!"); - } - - this.nmLivelinessMonitor.register(nodeId); - - LOG.info("NodeManager from node " + host + - "(cmPort: " + cmPort + " httpPort: " + httpPort + ") " - + "registered with capability: " + capability.getMemory() - + ", assigned nodeId " + nodeId); - - RegistrationResponse regResponse = recordFactory.newRecordInstance( - RegistrationResponse.class); - SecretKey secretKey = this.containerTokenSecretManager - .createAndGetSecretKey(nodeId.toString()); - regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded())); - - RegisterNodeManagerResponse response = recordFactory - .newRecordInstance(RegisterNodeManagerResponse.class); + // Check if this node is a 'valid' node + if (!this.nodesListManager.isValidNode(host)) { + LOG.info("Disallowed NodeManager from " + host + + ", Sending SHUTDOWN signal to the NodeManager."); + regResponse.setNodeAction(NodeAction.SHUTDOWN); response.setRegistrationResponse(regResponse); return response; - } catch (IOException ioe) { - LOG.info("Exception in node registration from " + nodeId.getHost(), ioe); - throw RPCUtil.getRemoteException(ioe); } + + RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, + resolve(host), capability); + + if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) { + LOG.info("Duplicate registration from the node at: " + host + + ", Sending SHUTDOWN Signal to the NodeManager"); + regResponse.setNodeAction(NodeAction.SHUTDOWN); + response.setRegistrationResponse(regResponse); + return response; + } + + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.STARTED)); + + this.nmLivelinessMonitor.register(nodeId); + + LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort + + " httpPort: " + httpPort + ") " + "registered with capability: " + + capability.getMemory() + ", assigned nodeId " + nodeId); + + regResponse.setNodeAction(NodeAction.NORMAL); + response.setRegistrationResponse(regResponse); + return response; } + @SuppressWarnings("unchecked") @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException { NodeStatus remoteNodeStatus = request.getNodeStatus(); - try { - /** - * Here is the node heartbeat sequence... - * 1. Check if it's a registered node - * 2. Check if it's a valid (i.e. not excluded) node - * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat - * 4. Send healthStatus to RMNode - */ + /** + * Here is the node heartbeat sequence... + * 1. Check if it's a registered node + * 2. Check if it's a valid (i.e. not excluded) node + * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat + * 4. Send healthStatus to RMNode + */ + + NodeId nodeId = remoteNodeStatus.getNodeId(); + + // 1. Check if it's a registered node + RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); + if (rmNode == null) { + /* node does not exist */ + LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId()); - NodeId nodeId = remoteNodeStatus.getNodeId(); - - // 1. Check if it's a registered node - RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); - if (rmNode == null) { - /* node does not exist */ - LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId()); - return reboot; - } - - // Send ping - this.nmLivelinessMonitor.receivedPing(nodeId); - - // 2. Check if it's a valid (i.e. not excluded) node - if (!this.nodesListManager.isValidNode(rmNode.getHostName())) { - LOG.info("Disallowed NodeManager nodeId: " + nodeId + - " hostname: " + rmNode.getNodeAddress()); - throw new IOException("Disallowed NodeManager nodeId: " + - remoteNodeStatus.getNodeId()); - } - - NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory - .newRecordInstance(NodeHeartbeatResponse.class); - - // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat - HeartbeatResponse lastHeartbeatResponse = rmNode - .getLastHeartBeatResponse(); - if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse - .getResponseId()) { - LOG.info("Received duplicate heartbeat from node " + - rmNode.getNodeAddress()); - nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse); - return nodeHeartBeatResponse; - } else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse - .getResponseId()) { - LOG.info("Too far behind rm response id:" + - lastHeartbeatResponse.getResponseId() + " nm response id:" - + remoteNodeStatus.getResponseId()); - // TODO: Just sending reboot is not enough. Think more. - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING)); - return reboot; - } - - // Heartbeat response - HeartbeatResponse latestResponse = recordFactory - .newRecordInstance(HeartbeatResponse.class); - latestResponse - .setResponseId(lastHeartbeatResponse.getResponseId() + 1); - latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp()); - latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup()); - - // 4. Send status to RMNode, saving the latest response. - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), - remoteNodeStatus.getContainersStatuses(), latestResponse)); - - nodeHeartBeatResponse.setHeartbeatResponse(latestResponse); - return nodeHeartBeatResponse; - } catch (IOException ioe) { - LOG.info("Exception in heartbeat from node " + - request.getNodeStatus().getNodeId(), ioe); - throw RPCUtil.getRemoteException(ioe); + // Updating the metrics directly as reboot event cannot be + // triggered on a null rmNode + ClusterMetrics.getMetrics().incrNumRebootedNMs(); + return reboot; } + + // Send ping + this.nmLivelinessMonitor.receivedPing(nodeId); + + // 2. Check if it's a valid (i.e. not excluded) node + if (!this.nodesListManager.isValidNode(rmNode.getHostName())) { + LOG.info("Disallowed NodeManager nodeId: " + nodeId + " hostname: " + + rmNode.getNodeAddress()); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + return shutDown; + } + + NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory + .newRecordInstance(NodeHeartbeatResponse.class); + + // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat + HeartbeatResponse lastHeartbeatResponse = rmNode.getLastHeartBeatResponse(); + if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse + .getResponseId()) { + LOG.info("Received duplicate heartbeat from node " + + rmNode.getNodeAddress()); + nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse); + return nodeHeartBeatResponse; + } else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse + .getResponseId()) { + LOG.info("Too far behind rm response id:" + + lastHeartbeatResponse.getResponseId() + " nm response id:" + + remoteNodeStatus.getResponseId()); + // TODO: Just sending reboot is not enough. Think more. + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING)); + return reboot; + } + + // Heartbeat response + HeartbeatResponse latestResponse = recordFactory + .newRecordInstance(HeartbeatResponse.class); + latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1); + latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp()); + latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup()); + latestResponse.setNodeAction(NodeAction.NORMAL); + + // 4. Send status to RMNode, saving the latest response. + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), + remoteNodeStatus.getContainersStatuses(), latestResponse)); + + nodeHeartBeatResponse.setHeartbeatResponse(latestResponse); + return nodeHeartBeatResponse; } public void recover(RMState state) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index de0d551ae5..d562836101 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; public enum RMNodeEventType { + + STARTED, + // Source: AdminService DECOMMISSION, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3b3864a541..30109edbc0 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -107,9 +108,11 @@ public class RMNodeImpl implements RMNode, EventHandler { = new StateMachineFactory(RMNodeState.RUNNING) + RMNodeEvent>(RMNodeState.NEW) //Transitions from RUNNING state + .addTransition(RMNodeState.NEW, RMNodeState.RUNNING, + RMNodeEventType.STARTED, new AddNodeTransition()) .addTransition(RMNodeState.RUNNING, EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) @@ -158,8 +161,6 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.stateMachine = stateMachineFactory.make(this); - context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(this)); } @Override @@ -311,6 +312,21 @@ public void handle(RMNodeEvent event) { } } + public static class AddNodeTransition implements + SingleArcTransition { + + @SuppressWarnings("unchecked") + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + // Inform the scheduler + + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeAddedSchedulerEvent(rmNode)); + + ClusterMetrics.getMetrics().addNode(); + } + } + public static class CleanUpAppTransition implements SingleArcTransition { @@ -335,6 +351,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { public static class RemoveNodeTransition implements SingleArcTransition { + @SuppressWarnings("unchecked") @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler @@ -345,11 +362,14 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.context.getRMNodes().remove(rmNode.nodeId); LOG.info("Removed Node " + rmNode.nodeId); + //Update the metrics + ClusterMetrics.getMetrics().removeNode(event.getType()); } } public static class StatusUpdateWhenHealthyTransition implements MultipleArcTransition { + @SuppressWarnings("unchecked") @Override public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { @@ -365,6 +385,7 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); + ClusterMetrics.getMetrics().incrNumUnhealthyNMs(); return RMNodeState.UNHEALTHY; } @@ -402,6 +423,7 @@ public static class StatusUpdateWhenUnHealthyTransition implements MultipleArcTransition { + @SuppressWarnings("unchecked") @Override public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; @@ -413,6 +435,7 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { if (remoteNodeHealthStatus.getIsNodeHealthy()) { rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); + ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); return RMNodeState.RUNNING; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java index ab63421b61..5466cd6326 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeState.java @@ -19,5 +19,5 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; public enum RMNodeState { - RUNNING, UNHEALTHY, DECOMMISSIONED, LOST + NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java index 7e0150ab9e..dd2644eb3f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java @@ -22,6 +22,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -60,6 +61,7 @@ protected void render(Block html) { ResourceScheduler rs = rm.getResourceScheduler(); QueueMetrics metrics = rs.getRootQueueMetrics(); + ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); int appsSubmitted = metrics.getAppsSubmitted(); int reservedGB = metrics.getReservedGB(); @@ -67,30 +69,13 @@ protected void render(Block html) { int allocatedGB = metrics.getAllocatedGB(); int containersAllocated = metrics.getAllocatedContainers(); int totalGB = availableGB + reservedGB + allocatedGB; - - ConcurrentMap nodes = rmContext.getRMNodes(); - int totalNodes = nodes.size(); - int lostNodes = 0; - int unhealthyNodes = 0; - int decommissionedNodes = 0; - for(RMNode node: nodes.values()) { - if(node == null || node.getState() == null) { - lostNodes++; - continue; - } - switch(node.getState()) { - case DECOMMISSIONED: - decommissionedNodes++; - break; - case LOST: - lostNodes++; - break; - case UNHEALTHY: - unhealthyNodes++; - break; - //RUNNING noop - } - } + + int totalNodes = clusterMetrics.getNumNMs(); + int lostNodes = clusterMetrics.getNumLostNMs(); + int unhealthyNodes = clusterMetrics.getUnhealthyNMs(); + int decommissionedNodes = clusterMetrics.getNumDecommisionedNMs(); + int rebootedNodes = clusterMetrics.getNumRebootedNMs(); + DIV div = html.div().$class("metrics"); @@ -106,6 +91,7 @@ protected void render(Block html) { th().$class("ui-state-default")._("Decommissioned Nodes")._(). th().$class("ui-state-default")._("Lost Nodes")._(). th().$class("ui-state-default")._("Unhealthy Nodes")._(). + th().$class("ui-state-default")._("Rebooted Nodes")._(). _(). _(). tbody().$class("ui-widget-content"). @@ -116,9 +102,10 @@ protected void render(Block html) { td(StringUtils.byteDesc(totalGB * BYTES_IN_GB)). td(StringUtils.byteDesc(reservedGB * BYTES_IN_GB)). td().a(url("nodes"),String.valueOf(totalNodes))._(). - td().a(url("nodes/DECOMMISSIONED"),String.valueOf(decommissionedNodes))._(). - td().a(url("nodes/LOST"),String.valueOf(lostNodes))._(). - td().a(url("nodes/UNHEALTHY"),String.valueOf(unhealthyNodes))._(). + td().a(url("nodes/decommissioned"),String.valueOf(decommissionedNodes))._(). + td().a(url("nodes/lost"),String.valueOf(lostNodes))._(). + td().a(url("nodes/unhealthy"),String.valueOf(unhealthyNodes))._(). + td().a(url("nodes/rebooted"),String.valueOf(rebootedNodes))._(). _(). _()._(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index d3b4393810..d5e87f137c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -63,7 +63,7 @@ public void containerStatus(Container container) throws Exception { new HashMap>(); conts.put(container.getId().getApplicationAttemptId().getApplicationId(), Arrays.asList(new ContainerStatus[] { container.getContainerStatus() })); - nodeHeartbeat(conts, true); + nodeHeartbeat(conts, true,nodeId); } public NodeId registerNode() throws Exception { @@ -83,11 +83,11 @@ public NodeId registerNode() throws Exception { } public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception { - return nodeHeartbeat(new HashMap>(), b); + return nodeHeartbeat(new HashMap>(), b,nodeId); } public HeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy) throws Exception { + List> conts, boolean isHealthy, NodeId nodeId) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setNodeId(nodeId); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index f522b78c1a..4a840c36a1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -220,6 +220,10 @@ public void stop() { } }; } + + public NodesListManager getNodesListManager() { + return this.nodesListManager; + } @Override protected void startWepApp() { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index cc6d174f41..df1d8923f4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java new file mode 100644 index 0000000000..e39655a7c8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.Test; + +public class TestResourceTrackerService { + + private final static File TEMP_DIR = new File(System.getProperty( + "test.build.data", "/tmp"), "decommision"); + private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); + private MockRM rm; + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + /** + * decommissioning using a include hosts file + */ + @Test + public void testDecommissionWithIncludeHosts() throws Exception { + + writeToHostsFile("host1", "host2"); + Configuration conf = new Configuration(); + conf.set("yarn.resourcemanager.nodes.include-path", hostFile + .getAbsolutePath()); + + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + int initialMetricCount = ClusterMetrics.getMetrics() + .getNumDecommisionedNMs(); + + HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + + writeToHostsFile("host1"); + + rm.getNodesListManager().refreshNodes(); + + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + Assert + .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN + .equals(nodeHeartbeat.getNodeAction())); + + checkDecommissionedNMCount(rm, ++initialMetricCount); + } + + /** + * decommissioning using a exclude hosts file + */ + @Test + public void testDecommissionWithExcludeHosts() throws Exception { + Configuration conf = new Configuration(); + conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + + int initialMetricCount = ClusterMetrics.getMetrics() + .getNumDecommisionedNMs(); + + HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + + writeToHostsFile("host2"); + + rm.getNodesListManager().refreshNodes(); + + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue("The decommisioned metrics are not updated", + NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction())); + checkDecommissionedNMCount(rm, ++initialMetricCount); + } + + @Test + public void testNodeRegistrationFailure() throws Exception { + writeToHostsFile("host1"); + Configuration conf = new Configuration(); + conf.set("yarn.resourcemanager.nodes.include-path", hostFile + .getAbsolutePath()); + rm = new MockRM(conf); + rm.start(); + + ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService(); + RegisterNodeManagerRequest req = Records.newRecord( + RegisterNodeManagerRequest.class); + NodeId nodeId = Records.newRecord(NodeId.class); + nodeId.setHost("host2"); + nodeId.setPort(1234); + req.setNodeId(nodeId); + req.setHttpPort(1234); + // trying to register a invalid node. + RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req); + Assert.assertEquals(NodeAction.SHUTDOWN,response.getRegistrationResponse().getNodeAction()); + } + + @Test + public void testReboot() throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = new MockNM("host2:1234", 2048, rm.getResourceTrackerService()); + + int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs(); + HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + + nodeHeartbeat = nm2.nodeHeartbeat( + new HashMap>(), true, + recordFactory.newRecordInstance(NodeId.class)); + Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction())); + checkRebootedNMCount(rm, ++initialMetricCount); + } + + private void checkRebootedNMCount(MockRM rm2, int count) + throws InterruptedException { + + int waitCount = 0; + while (ClusterMetrics.getMetrics().getNumRebootedNMs() != count + && waitCount++ < 20) { + synchronized (this) { + wait(100); + } + } + Assert.assertEquals("The rebooted metrics are not updated", count, + ClusterMetrics.getMetrics().getNumRebootedNMs()); + } + + @Test + public void testUnhealthyNodeStatus() throws Exception { + Configuration conf = new Configuration(); + conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile + .getAbsolutePath()); + + MockRM rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + Assert.assertEquals(0, ClusterMetrics.getMetrics().getUnhealthyNMs()); + // node healthy + nm1.nodeHeartbeat(true); + + // node unhealthy + nm1.nodeHeartbeat(false); + checkUnealthyNMCount(rm, nm1, true, 1); + + // node healthy again + nm1.nodeHeartbeat(true); + checkUnealthyNMCount(rm, nm1, false, 0); + } + + private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health, + int count) throws Exception { + + int waitCount = 0; + while(rm.getRMContext().getRMNodes().get(nm1.getNodeId()) + .getNodeHealthStatus().getIsNodeHealthy() == health + && waitCount++ < 20) { + synchronized (this) { + wait(100); + } + } + Assert.assertFalse(rm.getRMContext().getRMNodes().get(nm1.getNodeId()) + .getNodeHealthStatus().getIsNodeHealthy() == health); + Assert.assertEquals("Unhealthy metrics not incremented", count, + ClusterMetrics.getMetrics().getUnhealthyNMs()); + } + + private void writeToHostsFile(String... hosts) throws IOException { + if (!hostFile.exists()) { + TEMP_DIR.mkdirs(); + hostFile.createNewFile(); + } + FileOutputStream fStream = null; + try { + fStream = new FileOutputStream(hostFile); + for (int i = 0; i < hosts.length; i++) { + fStream.write(hosts[i].getBytes()); + fStream.write("\n".getBytes()); + } + } finally { + if (fStream != null) { + IOUtils.closeStream(fStream); + fStream = null; + } + } + } + + private void checkDecommissionedNMCount(MockRM rm, int count) + throws InterruptedException { + int waitCount = 0; + while (ClusterMetrics.getMetrics().getNumDecommisionedNMs() != count + && waitCount++ < 20) { + synchronized (this) { + wait(100); + } + } + Assert.assertEquals(count, ClusterMetrics.getMetrics() + .getNumDecommisionedNMs()); + Assert.assertEquals("The decommisioned metrics are not updated", count, + ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + } + + @After + public void tearDown() { + if (hostFile != null && hostFile.exists()) { + hostFile.delete(); + } + if (rm != null) { + rm.stop(); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index 7411748b06..792a096d72 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; -import java.util.concurrent.atomic.AtomicInteger; - import junit.framework.Assert; import org.apache.commons.logging.Log; @@ -34,12 +32,13 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; -import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -55,8 +54,6 @@ public class TestNMExpiry { ResourceTrackerService resourceTrackerService; ContainerTokenSecretManager containerTokenSecretManager = new ContainerTokenSecretManager(); - AtomicInteger test = new AtomicInteger(); - AtomicInteger notify = new AtomicInteger(); private class TestNmLivelinessMonitor extends NMLivelinessMonitor { public TestNmLivelinessMonitor(Dispatcher dispatcher) { @@ -68,22 +65,6 @@ public void init(Configuration conf) { conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1000); super.init(conf); } - @Override - protected void expire(NodeId id) { - LOG.info("Expired " + id); - if (test.addAndGet(1) == 2) { - try { - /* delay atleast 2 seconds to make sure the 3rd one does not expire - * - */ - Thread.sleep(2000); - } catch(InterruptedException ie){} - synchronized(notify) { - notify.addAndGet(1); - notify.notifyAll(); - } - } - } } @Before @@ -91,12 +72,12 @@ public void setUp() { Configuration conf = new Configuration(); // Dispatcher that processes events inline Dispatcher dispatcher = new InlineDispatcher(); + RMContext context = new RMContextImpl(new MemStore(), dispatcher, null, + null, null); dispatcher.register(SchedulerEventType.class, new InlineDispatcher.EmptyEventHandler()); dispatcher.register(RMNodeEventType.class, - new InlineDispatcher.EmptyEventHandler()); - RMContext context = new RMContextImpl(new MemStore(), dispatcher, null, - null, null); + new NodeEventDispatcher(context)); NMLivelinessMonitor nmLivelinessMonitor = new TestNmLivelinessMonitor( dispatcher); nmLivelinessMonitor.init(conf); @@ -166,6 +147,14 @@ public void testNMExpiry() throws Exception { request2.setHttpPort(0); request2.setResource(capability); resourceTrackerService.registerNodeManager(request2); + + int waitCount = 0; + while(ClusterMetrics.getMetrics().getNumLostNMs()!=2 && waitCount ++<20){ + synchronized (this) { + wait(100); + } + } + Assert.assertEquals(2, ClusterMetrics.getMetrics().getNumLostNMs()); request3 = recordFactory .newRecordInstance(RegisterNodeManagerRequest.class); @@ -175,20 +164,13 @@ public void testNMExpiry() throws Exception { request3.setNodeId(nodeId3); request3.setHttpPort(0); request3.setResource(capability); - RegistrationResponse thirdNodeRegResponse = resourceTrackerService + resourceTrackerService .registerNodeManager(request3).getRegistrationResponse(); /* test to see if hostanme 3 does not expire */ stopT = false; new ThirdNodeHeartBeatThread().start(); - int timeOut = 0; - synchronized (notify) { - while (notify.get() == 0 && timeOut++ < 30) { - notify.wait(1000); - } - } - Assert.assertEquals(2, test.get()); - + Assert.assertEquals(2,ClusterMetrics.getMetrics().getNumLostNMs()); stopT = true; } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index ca025eafa5..e5e4a71d03 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -130,6 +131,6 @@ public void testRPCResponseId() throws IOException { nodeStatus.setResponseId(0); response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest) .getHeartbeatResponse(); - Assert.assertTrue(response.getReboot() == true); + Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction())); } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java index bfea484477..4a264fd24b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java @@ -43,7 +43,7 @@ public void testNodesBlockRender() throws Exception { final int numberOfNodesPerRack = 2; // Number of Actual Table Headers for NodesPage.NodesBlock might change in // future. In that case this value should be adjusted to the new value. - final int numberOfThInMetricsTable = 9; + final int numberOfThInMetricsTable = 10; final int numberOfActualTableHeaders = 10; Injector injector = WebAppTests.createMockInjector(RMContext.class,