MAPREDUCE-2775. Fixed ResourceManager and NodeManager to force a decommissioned node to shutdown. Contributed by Devaraj K.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190467 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-28 17:31:06 +00:00
parent da1db28e93
commit cbdb07f4ca
23 changed files with 864 additions and 206 deletions

View File

@ -1847,6 +1847,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3296. Fixed the remaining nine FindBugs warnings. (vinodkv)
MAPREDUCE-2775. Fixed ResourceManager and NodeManager to force a
decommissioned node to shutdown. (Devaraj K via vinodkv)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -24,7 +24,7 @@
public interface HeartbeatResponse {
int getResponseId();
boolean getReboot();
NodeAction getNodeAction();
List<ContainerId> getContainersToCleanupList();
ContainerId getContainerToCleanup(int index);
@ -35,7 +35,7 @@ public interface HeartbeatResponse {
int getApplicationsToCleanupCount();
void setResponseId(int responseId);
void setReboot(boolean reboot);
void setNodeAction(NodeAction action);
void addAllContainersToCleanup(List<ContainerId> containers);
void addContainerToCleanup(ContainerId container);

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;
/**
* The NodeManager is instructed to perform the given action.
*
*/
public enum NodeAction {
NORMAL, REBOOT, SHUTDOWN
}

View File

@ -23,4 +23,8 @@ public interface RegistrationResponse {
public abstract ByteBuffer getSecretKey();
public abstract void setSecretKey(ByteBuffer secretKey);
public abstract NodeAction getNodeAction();
public abstract void setNodeAction(NodeAction nodeAction);
}

View File

@ -32,11 +32,12 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
public class HeartbeatResponsePBImpl extends ProtoBase<HeartbeatResponseProto> implements HeartbeatResponse {
public class HeartbeatResponsePBImpl extends
ProtoBase<HeartbeatResponseProto> implements HeartbeatResponse {
HeartbeatResponseProto proto = HeartbeatResponseProto.getDefaultInstance();
HeartbeatResponseProto.Builder builder = null;
boolean viaProto = false;
@ -100,16 +101,24 @@ public void setResponseId(int responseId) {
builder.setResponseId((responseId));
}
@Override
public boolean getReboot() {
public NodeAction getNodeAction() {
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getReboot());
if(!p.hasNodeAction()) {
return null;
}
return (convertFromProtoFormat(p.getNodeAction()));
}
@Override
public void setReboot(boolean reboot) {
public void setNodeAction(NodeAction nodeAction) {
maybeInitBuilder();
builder.setReboot((reboot));
if (nodeAction == null) {
builder.clearNodeAction();
return;
}
builder.setNodeAction(convertToProtoFormat(nodeAction));
}
@Override
public List<ContainerId> getContainersToCleanupList() {
initContainersToCleanup();
@ -297,6 +306,11 @@ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl)t).getProto();
}
private NodeAction convertFromProtoFormat(NodeActionProto p) {
return NodeAction.valueOf(p.name());
}
private NodeActionProto convertToProtoFormat(NodeAction t) {
return NodeActionProto.valueOf(t.name());
}
}

View File

@ -21,17 +21,15 @@
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
public class RegistrationResponsePBImpl extends ProtoBase<RegistrationResponseProto> implements RegistrationResponse {
public class RegistrationResponsePBImpl extends
ProtoBase<RegistrationResponseProto> implements RegistrationResponse {
RegistrationResponseProto proto = RegistrationResponseProto.getDefaultInstance();
RegistrationResponseProto.Builder builder = null;
boolean viaProto = false;
@ -98,4 +96,31 @@ public void setSecretKey(ByteBuffer secretKey) {
this.secretKey = secretKey;
}
@Override
public NodeAction getNodeAction() {
RegistrationResponseProtoOrBuilder p = viaProto ? proto : builder;
if(!p.hasNodeAction()) {
return null;
}
return convertFromProtoFormat(p.getNodeAction());
}
@Override
public void setNodeAction(NodeAction nodeAction) {
maybeInitBuilder();
if (nodeAction == null) {
builder.clearNodeAction();
return;
}
builder.setNodeAction(convertToProtoFormat(nodeAction));
}
private NodeAction convertFromProtoFormat(NodeActionProto p) {
return NodeAction.valueOf(p.name());
}
private NodeActionProto convertToProtoFormat(NodeAction t) {
return NodeActionProto.valueOf(t.name());
}
}

View File

@ -23,6 +23,12 @@ option java_generate_equals_and_hash = true;
import "yarn_protos.proto";
enum NodeActionProto {
NORMAL = 0;
REBOOT = 1;
SHUTDOWN = 2;
}
message NodeStatusProto {
optional NodeIdProto node_id = 1;
optional int32 response_id = 2;
@ -32,11 +38,12 @@ message NodeStatusProto {
message RegistrationResponseProto {
optional bytes secret_key = 1;
optional NodeActionProto nodeAction = 2;
}
message HeartbeatResponseProto {
optional int32 response_id = 1;
optional bool reboot = 2;
optional NodeActionProto nodeAction = 2;
repeated ContainerIdProto containers_to_cleanup = 3;
repeated ApplicationIdProto applications_to_cleanup = 4;
}

View File

@ -50,9 +50,11 @@
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
import org.apache.hadoop.yarn.util.Records;
public class NodeManager extends CompositeService {
public class NodeManager extends CompositeService implements
ServiceStateChangeListener {
private static final Log LOG = LogFactory.getLog(NodeManager.class);
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
protected ContainerTokenSecretManager containerTokenSecretManager;
@ -124,6 +126,8 @@ public void init(Configuration conf) {
createNodeStatusUpdater(context, dispatcher, healthChecker,
this.containerTokenSecretManager);
nodeStatusUpdater.register(this);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
@ -206,6 +210,16 @@ public NodeHealthStatus getNodeHealthStatus() {
}
}
@Override
public void stateChanged(Service service) {
// Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
&& STATE.STOPPED.equals(service.getServiceState())) {
stop();
}
}
public static void main(String[] args) {
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
try {
@ -220,5 +234,4 @@ public static void main(String[] args) {
System.exit(-1);
}
}
}

View File

@ -30,8 +30,8 @@
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -45,11 +45,11 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -160,6 +160,12 @@ private void registerWithRM() throws YarnRemoteException {
request.setNodeId(this.nodeId);
RegistrationResponse regResponse =
this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
// if the Resourcemanager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
throw new YarnException(
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
}
if (UserGroupInformation.isSecurityEnabled()) {
this.secretKeyBytes = regResponse.getSecretKey().array();
}
@ -248,10 +254,25 @@ public void run() {
NodeStatus nodeStatus = getNodeStatus();
nodeStatus.setResponseId(lastHeartBeatID);
NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
NodeHeartbeatRequest request = recordFactory
.newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus);
HeartbeatResponse response =
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
LOG
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
" hence shutting down.");
NodeStatusUpdaterImpl.this.stop();
break;
}
if (response.getNodeAction() == NodeAction.REBOOT) {
LOG.info("Node is out of sync with ResourceManager,"
+ " hence shutting down.");
NodeStatusUpdaterImpl.this.stop();
break;
}
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanupList();
@ -269,7 +290,6 @@ public void run() {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
break;
}
}
}

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
@ -85,10 +86,15 @@ public class TestNodeStatusUpdater {
volatile Error nmStartError = null;
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
private final Configuration conf = new YarnConfiguration();
private NodeManager nm;
@After
public void tearDown() {
this.registeredNodes.clear();
heartBeatID = 0;
if (nm != null) {
nm.stop();
}
DefaultMetricsSystem.shutdown();
}
@ -220,6 +226,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
}
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
@ -232,7 +239,41 @@ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
@Override
protected ResourceTracker getRMClient() {
return new MyResourceTracker(this.context);
return resourceTracker;
}
}
//
private class MyResourceTracker2 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL;
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
RegistrationResponse regResponse = recordFactory
.newRecordInstance(RegistrationResponse.class);
regResponse.setNodeAction(registerNodeAction );
response.setRegistrationResponse(regResponse);
return response;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++);
HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
response.setNodeAction(heartBeatNodeAction);
NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
return nhResponse;
}
}
@ -249,7 +290,7 @@ public void deleteBaseDir() throws IOException {
@Test
public void testNMRegistration() throws InterruptedException {
final NodeManager nm = new NodeManager() {
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
@ -295,15 +336,86 @@ public void run() {
Assert.fail("NodeManager failed to start");
}
while (heartBeatID <= 3) {
waitCount = 0;
while (heartBeatID <= 3 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID <= 3);
Assert.assertEquals("Number of registered NMs is wrong!!", 1,
this.registeredNodes.size());
nm.stop();
}
@Test
public void testNodeDecommision() throws Exception {
nm = getNodeManager(NodeAction.SHUTDOWN);
YarnConfiguration conf = createNMConfig();
nm.init(conf);
Assert.assertEquals(STATE.INITED, nm.getServiceState());
nm.start();
int waitCount = 0;
while (heartBeatID < 1 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID < 1);
// NM takes a while to reach the STOPPED state.
waitCount = 0;
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
}
@Test
public void testNodeReboot() throws Exception {
nm = getNodeManager(NodeAction.REBOOT);
YarnConfiguration conf = createNMConfig();
nm.init(conf);
Assert.assertEquals(STATE.INITED, nm.getServiceState());
nm.start();
int waitCount = 0;
while (heartBeatID < 1 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID < 1);
// NM takes a while to reach the STOPPED state.
waitCount = 0;
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
}
@Test
public void testNMShutdownForRegistrationFailure() {
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
context, dispatcher, healthChecker, metrics,
containerTokenSecretManager);
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
nodeStatusUpdater.resourceTracker = myResourceTracker2;
return nodeStatusUpdater;
}
};
verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: "
+ "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
}
/**
* Verifies that if for some reason NM fails to start ContainerManager RPC
* server, RM is oblivious to NM's presence. The behaviour is like this
@ -314,7 +426,7 @@ public void run() {
@Test
public void testNoRegistrationWhenNMServicesFail() {
final NodeManager nm = new NodeManager() {
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
@ -341,16 +453,22 @@ public void start() {
}
};
verifyNodeStartFailure("Starting of RPC Server failed");
}
private void verifyNodeStartFailure(String errMessage) {
YarnConfiguration conf = createNMConfig();
nm.init(conf);
try {
nm.start();
Assert.fail("NM should have failed to start. Didn't get exception!!");
} catch (Exception e) {
Assert.assertEquals("Starting of RPC Server failed", e.getCause()
Assert.assertEquals(errMessage, e.getCause()
.getMessage());
}
// the state change to stopped occurs only if the startup is success, else
// state change doesn't occur
Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
.getServiceState());
@ -371,4 +489,21 @@ private YarnConfiguration createNMConfig() {
.toUri().getPath());
return conf;
}
private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
return new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
context, dispatcher, healthChecker, metrics,
containerTokenSecretManager);
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction;
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater;
}
};
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@InterfaceAudience.Private
@Metrics(context="yarn")
public class ClusterMetrics {
@Metric("# of NMs") MutableGaugeInt numNMs;
@Metric("# of decommissioned NMs") MutableCounterInt numDecommissionedNMs;
@Metric("# of lost NMs") MutableCounterInt numLostNMs;
@Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
@Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs;
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
"Metrics for the Yarn Cluster");
private static volatile ClusterMetrics INSTANCE = null;
private static MetricsRegistry registry;
public static ClusterMetrics getMetrics() {
if(INSTANCE == null){
synchronized (ClusterMetrics.class) {
if(INSTANCE == null){
INSTANCE = new ClusterMetrics();
registerMetrics();
}
}
}
return INSTANCE;
}
private static void registerMetrics() {
registry = new MetricsRegistry(RECORD_INFO);
registry.tag(RECORD_INFO, "ResourceManager");
MetricsSystem ms = DefaultMetricsSystem.instance();
if (ms != null) {
ms.register("ClusterMetrics", "Metrics for the Yarn Cluster", INSTANCE);
}
}
//Total Nodemanagers
public int getNumNMs() {
return numNMs.value();
}
//Decommisioned NMs
public int getNumDecommisionedNMs() {
return numDecommissionedNMs.value();
}
public void incrDecommisionedNMs() {
numDecommissionedNMs.incr();
}
//Lost NMs
public int getNumLostNMs() {
return numLostNMs.value();
}
public void incrNumLostNMs() {
numLostNMs.incr();
}
//Unhealthy NMs
public int getUnhealthyNMs() {
return numUnhealthyNMs.value();
}
public void incrNumUnhealthyNMs() {
numUnhealthyNMs.incr();
}
public void decrNumUnhealthyNMs() {
numUnhealthyNMs.decr();
}
//Rebooted NMs
public int getNumRebootedNMs() {
return numRebootedNMs.value();
}
public void incrNumRebootedNMs() {
numRebootedNMs.incr();
}
public void removeNode(RMNodeEventType nodeEventType) {
numNMs.decr();
switch(nodeEventType){
case DECOMMISSION: incrDecommisionedNMs(); break;
case EXPIRE: incrNumLostNMs();break;
case REBOOTING: incrNumRebootedNMs();break;
}
}
public void addNode() {
numNMs.incr();
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -37,7 +36,6 @@
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@ -45,6 +43,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
@ -76,11 +75,19 @@ public class ResourceTrackerService extends AbstractService implements
private static final NodeHeartbeatResponse reboot = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
private static final NodeHeartbeatResponse shutDown = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
static {
HeartbeatResponse rebootResp = recordFactory
.newRecordInstance(HeartbeatResponse.class);
rebootResp.setReboot(true);
rebootResp.setNodeAction(NodeAction.REBOOT);
reboot.setHeartbeatResponse(rebootResp);
HeartbeatResponse decommissionedResp = recordFactory
.newRecordInstance(HeartbeatResponse.class);
decommissionedResp.setNodeAction(NodeAction.SHUTDOWN);
shutDown.setHeartbeatResponse(decommissionedResp);
}
public ResourceTrackerService(RMContext rmContext,
@ -139,6 +146,7 @@ public synchronized void stop() {
super.stop();
}
@SuppressWarnings("unchecked")
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException {
@ -149,49 +157,54 @@ public RegisterNodeManagerResponse registerNodeManager(
int httpPort = request.getHttpPort();
Resource capability = request.getResource();
try {
// Check if this node is a 'valid' node
if (!this.nodesListManager.isValidNode(host)) {
LOG.info("Disallowed NodeManager from " + host);
throw new IOException("Disallowed NodeManager from " + host);
}
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort,
httpPort, resolve(host), capability);
if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
throw new IOException("Duplicate registration from the node!");
}
this.nmLivelinessMonitor.register(nodeId);
LOG.info("NodeManager from node " + host +
"(cmPort: " + cmPort + " httpPort: " + httpPort + ") "
+ "registered with capability: " + capability.getMemory()
+ ", assigned nodeId " + nodeId);
RegistrationResponse regResponse = recordFactory.newRecordInstance(
RegistrationResponse.class);
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
RegistrationResponse regResponse = recordFactory
.newRecordInstance(RegistrationResponse.class);
SecretKey secretKey = this.containerTokenSecretManager
.createAndGetSecretKey(nodeId.toString());
regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
// Check if this node is a 'valid' node
if (!this.nodesListManager.isValidNode(host)) {
LOG.info("Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager.");
regResponse.setNodeAction(NodeAction.SHUTDOWN);
response.setRegistrationResponse(regResponse);
return response;
} catch (IOException ioe) {
LOG.info("Exception in node registration from " + nodeId.getHost(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
}
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability);
if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
LOG.info("Duplicate registration from the node at: " + host
+ ", Sending SHUTDOWN Signal to the NodeManager");
regResponse.setNodeAction(NodeAction.SHUTDOWN);
response.setRegistrationResponse(regResponse);
return response;
}
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
this.nmLivelinessMonitor.register(nodeId);
LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort
+ " httpPort: " + httpPort + ") " + "registered with capability: "
+ capability.getMemory() + ", assigned nodeId " + nodeId);
regResponse.setNodeAction(NodeAction.NORMAL);
response.setRegistrationResponse(regResponse);
return response;
}
@SuppressWarnings("unchecked")
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException {
NodeStatus remoteNodeStatus = request.getNodeStatus();
try {
/**
* Here is the node heartbeat sequence...
* 1. Check if it's a registered node
@ -207,6 +220,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
if (rmNode == null) {
/* node does not exist */
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
// Updating the metrics directly as reboot event cannot be
// triggered on a null rmNode
ClusterMetrics.getMetrics().incrNumRebootedNMs();
return reboot;
}
@ -215,28 +232,28 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// 2. Check if it's a valid (i.e. not excluded) node
if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
LOG.info("Disallowed NodeManager nodeId: " + nodeId +
" hostname: " + rmNode.getNodeAddress());
throw new IOException("Disallowed NodeManager nodeId: " +
remoteNodeStatus.getNodeId());
LOG.info("Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ rmNode.getNodeAddress());
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
return shutDown;
}
NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
HeartbeatResponse lastHeartbeatResponse = rmNode
.getLastHeartBeatResponse();
HeartbeatResponse lastHeartbeatResponse = rmNode.getLastHeartBeatResponse();
if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse
.getResponseId()) {
LOG.info("Received duplicate heartbeat from node " +
rmNode.getNodeAddress());
LOG.info("Received duplicate heartbeat from node "
+ rmNode.getNodeAddress());
nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse);
return nodeHeartBeatResponse;
} else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
.getResponseId()) {
LOG.info("Too far behind rm response id:" +
lastHeartbeatResponse.getResponseId() + " nm response id:"
LOG.info("Too far behind rm response id:"
+ lastHeartbeatResponse.getResponseId() + " nm response id:"
+ remoteNodeStatus.getResponseId());
// TODO: Just sending reboot is not enough. Think more.
this.rmContext.getDispatcher().getEventHandler().handle(
@ -247,10 +264,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// Heartbeat response
HeartbeatResponse latestResponse = recordFactory
.newRecordInstance(HeartbeatResponse.class);
latestResponse
.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
latestResponse.setNodeAction(NodeAction.NORMAL);
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
@ -259,11 +276,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
return nodeHeartBeatResponse;
} catch (IOException ioe) {
LOG.info("Exception in heartbeat from node " +
request.getNodeStatus().getNodeId(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
}
public void recover(RMState state) {

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
public enum RMNodeEventType {
STARTED,
// Source: AdminService
DECOMMISSION,

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@ -107,9 +108,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
= new StateMachineFactory<RMNodeImpl,
RMNodeState,
RMNodeEventType,
RMNodeEvent>(RMNodeState.RUNNING)
RMNodeEvent>(RMNodeState.NEW)
//Transitions from RUNNING state
.addTransition(RMNodeState.NEW, RMNodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
.addTransition(RMNodeState.RUNNING,
EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
@ -158,8 +161,6 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.stateMachine = stateMachineFactory.make(this);
context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(this));
}
@Override
@ -311,6 +312,21 @@ public void handle(RMNodeEvent event) {
}
}
public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
ClusterMetrics.getMetrics().addNode();
}
}
public static class CleanUpAppTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@ -335,6 +351,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
public static class RemoveNodeTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
@ -345,11 +362,14 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Removed Node " + rmNode.nodeId);
//Update the metrics
ClusterMetrics.getMetrics().removeNode(event.getType());
}
}
public static class StatusUpdateWhenHealthyTransition implements
MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
@SuppressWarnings("unchecked")
@Override
public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
@ -365,6 +385,7 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
ClusterMetrics.getMetrics().incrNumUnhealthyNMs();
return RMNodeState.UNHEALTHY;
}
@ -402,6 +423,7 @@ public static class StatusUpdateWhenUnHealthyTransition
implements
MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
@SuppressWarnings("unchecked")
@Override
public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
@ -413,6 +435,7 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
return RMNodeState.RUNNING;
}

View File

@ -19,5 +19,5 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
public enum RMNodeState {
RUNNING, UNHEALTHY, DECOMMISSIONED, LOST
NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST
}

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -60,6 +61,7 @@ protected void render(Block html) {
ResourceScheduler rs = rm.getResourceScheduler();
QueueMetrics metrics = rs.getRootQueueMetrics();
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
int appsSubmitted = metrics.getAppsSubmitted();
int reservedGB = metrics.getReservedGB();
@ -68,29 +70,12 @@ protected void render(Block html) {
int containersAllocated = metrics.getAllocatedContainers();
int totalGB = availableGB + reservedGB + allocatedGB;
ConcurrentMap<NodeId,RMNode> nodes = rmContext.getRMNodes();
int totalNodes = nodes.size();
int lostNodes = 0;
int unhealthyNodes = 0;
int decommissionedNodes = 0;
for(RMNode node: nodes.values()) {
if(node == null || node.getState() == null) {
lostNodes++;
continue;
}
switch(node.getState()) {
case DECOMMISSIONED:
decommissionedNodes++;
break;
case LOST:
lostNodes++;
break;
case UNHEALTHY:
unhealthyNodes++;
break;
//RUNNING noop
}
}
int totalNodes = clusterMetrics.getNumNMs();
int lostNodes = clusterMetrics.getNumLostNMs();
int unhealthyNodes = clusterMetrics.getUnhealthyNMs();
int decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
int rebootedNodes = clusterMetrics.getNumRebootedNMs();
DIV<Hamlet> div = html.div().$class("metrics");
@ -106,6 +91,7 @@ protected void render(Block html) {
th().$class("ui-state-default")._("Decommissioned Nodes")._().
th().$class("ui-state-default")._("Lost Nodes")._().
th().$class("ui-state-default")._("Unhealthy Nodes")._().
th().$class("ui-state-default")._("Rebooted Nodes")._().
_().
_().
tbody().$class("ui-widget-content").
@ -116,9 +102,10 @@ protected void render(Block html) {
td(StringUtils.byteDesc(totalGB * BYTES_IN_GB)).
td(StringUtils.byteDesc(reservedGB * BYTES_IN_GB)).
td().a(url("nodes"),String.valueOf(totalNodes))._().
td().a(url("nodes/DECOMMISSIONED"),String.valueOf(decommissionedNodes))._().
td().a(url("nodes/LOST"),String.valueOf(lostNodes))._().
td().a(url("nodes/UNHEALTHY"),String.valueOf(unhealthyNodes))._().
td().a(url("nodes/decommissioned"),String.valueOf(decommissionedNodes))._().
td().a(url("nodes/lost"),String.valueOf(lostNodes))._().
td().a(url("nodes/unhealthy"),String.valueOf(unhealthyNodes))._().
td().a(url("nodes/rebooted"),String.valueOf(rebootedNodes))._().
_().
_()._();

View File

@ -63,7 +63,7 @@ public void containerStatus(Container container) throws Exception {
new HashMap<ApplicationId, List<ContainerStatus>>();
conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
Arrays.asList(new ContainerStatus[] { container.getContainerStatus() }));
nodeHeartbeat(conts, true);
nodeHeartbeat(conts, true,nodeId);
}
public NodeId registerNode() throws Exception {
@ -83,11 +83,11 @@ public NodeId registerNode() throws Exception {
}
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b,nodeId);
}
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
List<ContainerStatus>> conts, boolean isHealthy, NodeId nodeId) throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setNodeId(nodeId);

View File

@ -221,6 +221,10 @@ public void stop() {
};
}
public NodesListManager getNodesListManager() {
return this.nodesListManager;
}
@Override
protected void startWepApp() {
//override to disable webapp

View File

@ -45,7 +45,6 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;

View File

@ -0,0 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Test;
public class TestResourceTrackerService {
private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision");
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
private MockRM rm;
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
/**
* decommissioning using a include hosts file
*/
@Test
public void testDecommissionWithIncludeHosts() throws Exception {
writeToHostsFile("host1", "host2");
Configuration conf = new Configuration();
conf.set("yarn.resourcemanager.nodes.include-path", hostFile
.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
int initialMetricCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
writeToHostsFile("host1");
rm.getNodesListManager().refreshNodes();
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
.equals(nodeHeartbeat.getNodeAction()));
checkDecommissionedNMCount(rm, ++initialMetricCount);
}
/**
* decommissioning using a exclude hosts file
*/
@Test
public void testDecommissionWithExcludeHosts() throws Exception {
Configuration conf = new Configuration();
conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
.getAbsolutePath());
writeToHostsFile("");
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
int initialMetricCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
writeToHostsFile("host2");
rm.getNodesListManager().refreshNodes();
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("The decommisioned metrics are not updated",
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
checkDecommissionedNMCount(rm, ++initialMetricCount);
}
@Test
public void testNodeRegistrationFailure() throws Exception {
writeToHostsFile("host1");
Configuration conf = new Configuration();
conf.set("yarn.resourcemanager.nodes.include-path", hostFile
.getAbsolutePath());
rm = new MockRM(conf);
rm.start();
ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService();
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
NodeId nodeId = Records.newRecord(NodeId.class);
nodeId.setHost("host2");
nodeId.setPort(1234);
req.setNodeId(nodeId);
req.setHttpPort(1234);
// trying to register a invalid node.
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response.getRegistrationResponse().getNodeAction());
}
@Test
public void testReboot() throws Exception {
Configuration conf = new Configuration();
rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = new MockNM("host2:1234", 2048, rm.getResourceTrackerService());
int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(
new HashMap<ApplicationId, List<ContainerStatus>>(), true,
recordFactory.newRecordInstance(NodeId.class));
Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
checkRebootedNMCount(rm, ++initialMetricCount);
}
private void checkRebootedNMCount(MockRM rm2, int count)
throws InterruptedException {
int waitCount = 0;
while (ClusterMetrics.getMetrics().getNumRebootedNMs() != count
&& waitCount++ < 20) {
synchronized (this) {
wait(100);
}
}
Assert.assertEquals("The rebooted metrics are not updated", count,
ClusterMetrics.getMetrics().getNumRebootedNMs());
}
@Test
public void testUnhealthyNodeStatus() throws Exception {
Configuration conf = new Configuration();
conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
.getAbsolutePath());
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
Assert.assertEquals(0, ClusterMetrics.getMetrics().getUnhealthyNMs());
// node healthy
nm1.nodeHeartbeat(true);
// node unhealthy
nm1.nodeHeartbeat(false);
checkUnealthyNMCount(rm, nm1, true, 1);
// node healthy again
nm1.nodeHeartbeat(true);
checkUnealthyNMCount(rm, nm1, false, 0);
}
private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health,
int count) throws Exception {
int waitCount = 0;
while(rm.getRMContext().getRMNodes().get(nm1.getNodeId())
.getNodeHealthStatus().getIsNodeHealthy() == health
&& waitCount++ < 20) {
synchronized (this) {
wait(100);
}
}
Assert.assertFalse(rm.getRMContext().getRMNodes().get(nm1.getNodeId())
.getNodeHealthStatus().getIsNodeHealthy() == health);
Assert.assertEquals("Unhealthy metrics not incremented", count,
ClusterMetrics.getMetrics().getUnhealthyNMs());
}
private void writeToHostsFile(String... hosts) throws IOException {
if (!hostFile.exists()) {
TEMP_DIR.mkdirs();
hostFile.createNewFile();
}
FileOutputStream fStream = null;
try {
fStream = new FileOutputStream(hostFile);
for (int i = 0; i < hosts.length; i++) {
fStream.write(hosts[i].getBytes());
fStream.write("\n".getBytes());
}
} finally {
if (fStream != null) {
IOUtils.closeStream(fStream);
fStream = null;
}
}
}
private void checkDecommissionedNMCount(MockRM rm, int count)
throws InterruptedException {
int waitCount = 0;
while (ClusterMetrics.getMetrics().getNumDecommisionedNMs() != count
&& waitCount++ < 20) {
synchronized (this) {
wait(100);
}
}
Assert.assertEquals(count, ClusterMetrics.getMetrics()
.getNumDecommisionedNMs());
Assert.assertEquals("The decommisioned metrics are not updated", count,
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
}
@After
public void tearDown() {
if (hostFile != null && hostFile.exists()) {
hostFile.delete();
}
if (rm != null) {
rm.stop();
}
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
@ -34,12 +32,13 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@ -55,8 +54,6 @@ public class TestNMExpiry {
ResourceTrackerService resourceTrackerService;
ContainerTokenSecretManager containerTokenSecretManager =
new ContainerTokenSecretManager();
AtomicInteger test = new AtomicInteger();
AtomicInteger notify = new AtomicInteger();
private class TestNmLivelinessMonitor extends NMLivelinessMonitor {
public TestNmLivelinessMonitor(Dispatcher dispatcher) {
@ -68,22 +65,6 @@ public void init(Configuration conf) {
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1000);
super.init(conf);
}
@Override
protected void expire(NodeId id) {
LOG.info("Expired " + id);
if (test.addAndGet(1) == 2) {
try {
/* delay atleast 2 seconds to make sure the 3rd one does not expire
*
*/
Thread.sleep(2000);
} catch(InterruptedException ie){}
synchronized(notify) {
notify.addAndGet(1);
notify.notifyAll();
}
}
}
}
@Before
@ -91,12 +72,12 @@ public void setUp() {
Configuration conf = new Configuration();
// Dispatcher that processes events inline
Dispatcher dispatcher = new InlineDispatcher();
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
null, null);
dispatcher.register(SchedulerEventType.class,
new InlineDispatcher.EmptyEventHandler());
dispatcher.register(RMNodeEventType.class,
new InlineDispatcher.EmptyEventHandler());
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
null, null);
new NodeEventDispatcher(context));
NMLivelinessMonitor nmLivelinessMonitor = new TestNmLivelinessMonitor(
dispatcher);
nmLivelinessMonitor.init(conf);
@ -167,6 +148,14 @@ public void testNMExpiry() throws Exception {
request2.setResource(capability);
resourceTrackerService.registerNodeManager(request2);
int waitCount = 0;
while(ClusterMetrics.getMetrics().getNumLostNMs()!=2 && waitCount ++<20){
synchronized (this) {
wait(100);
}
}
Assert.assertEquals(2, ClusterMetrics.getMetrics().getNumLostNMs());
request3 = recordFactory
.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId3 = Records.newRecord(NodeId.class);
@ -175,20 +164,13 @@ public void testNMExpiry() throws Exception {
request3.setNodeId(nodeId3);
request3.setHttpPort(0);
request3.setResource(capability);
RegistrationResponse thirdNodeRegResponse = resourceTrackerService
resourceTrackerService
.registerNodeManager(request3).getRegistrationResponse();
/* test to see if hostanme 3 does not expire */
stopT = false;
new ThirdNodeHeartBeatThread().start();
int timeOut = 0;
synchronized (notify) {
while (notify.get() == 0 && timeOut++ < 30) {
notify.wait(1000);
}
}
Assert.assertEquals(2, test.get());
Assert.assertEquals(2,ClusterMetrics.getMetrics().getNumLostNMs());
stopT = true;
}
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -130,6 +131,6 @@ public void testRPCResponseId() throws IOException {
nodeStatus.setResponseId(0);
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
.getHeartbeatResponse();
Assert.assertTrue(response.getReboot() == true);
Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction()));
}
}

View File

@ -43,7 +43,7 @@ public void testNodesBlockRender() throws Exception {
final int numberOfNodesPerRack = 2;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 9;
final int numberOfThInMetricsTable = 10;
final int numberOfActualTableHeaders = 10;
Injector injector = WebAppTests.createMockInjector(RMContext.class,