MAPREDUCE-3760. Changed active nodes list to not contain unhealthy nodes on the webUI and metrics. (vinodkv)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1240421 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-02-04 01:03:51 +00:00
parent 94242c9385
commit 3343494d6c
9 changed files with 209 additions and 80 deletions

View File

@ -677,6 +677,9 @@ Release 0.23.1 - Unreleased
output is recovered and thus reduce the unnecessarily bloated recovery
time. (Robert Joseph Evans via vinodkv)
MAPREDUCE-3760. Changed active nodes list to not contain unhealthy nodes
on the webUI and metrics. (vinodkv)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -38,7 +38,7 @@ public class ClusterMetrics {
private static AtomicBoolean isInitialized = new AtomicBoolean(false);
@Metric("# of active NMs") MutableGaugeInt numNMs;
@Metric("# of active NMs") MutableGaugeInt numActiveNMs;
@Metric("# of decommissioned NMs") MutableGaugeInt numDecommissionedNMs;
@Metric("# of lost NMs") MutableGaugeInt numLostNMs;
@Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
@ -74,7 +74,7 @@ private static void registerMetrics() {
//Active Nodemanagers
public int getNumActiveNMs() {
return numNMs.value();
return numActiveNMs.value();
}
//Decommisioned NMs
@ -128,17 +128,12 @@ public void incrNumRebootedNMs() {
public void decrNumRebootedNMs() {
numRebootedNMs.decr();
}
public void removeNode(RMNodeEventType nodeEventType) {
numNMs.decr();
switch(nodeEventType){
case DECOMMISSION: incrDecommisionedNMs(); break;
case EXPIRE: incrNumLostNMs();break;
case REBOOTING: incrNumRebootedNMs();break;
}
public void incrNumActiveNodes() {
numActiveNMs.incr();
}
public void addNode() {
numNMs.incr();
public void decrNumActiveNodes() {
numActiveNMs.decr();
}
}

View File

@ -64,6 +64,7 @@
*/
@Private
@Unstable
@SuppressWarnings("unchecked")
public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private static final Log LOG = LogFactory.getLog(RMNodeImpl.class);
@ -116,11 +117,14 @@ RMNodeEventType.STARTED, new AddNodeTransition())
EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION, new RemoveNodeTransition())
RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(RMNodeState.DECOMMISSIONED))
.addTransition(RMNodeState.RUNNING, RMNodeState.LOST,
RMNodeEventType.EXPIRE, new RemoveNodeTransition())
RMNodeEventType.EXPIRE,
new DeactivateNodeTransition(RMNodeState.LOST))
.addTransition(RMNodeState.RUNNING, RMNodeState.REBOOTED,
RMNodeEventType.REBOOTING, new RemoveNodeTransition())
RMNodeEventType.REBOOTING,
new DeactivateNodeTransition(RMNodeState.REBOOTED))
.addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
@ -304,26 +308,50 @@ public void handle(RMNodeEvent event) {
writeLock.unlock();
}
}
private void updateMetricsForRejoinedNode(RMNodeState previousNodeState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
metrics.incrNumActiveNodes();
switch (previousNodeState) {
case LOST:
metrics.decrNumLostNMs();
break;
case REBOOTED:
metrics.decrNumRebootedNMs();
break;
case DECOMMISSIONED:
metrics.decrDecommisionedNMs();
break;
case UNHEALTHY:
metrics.decrNumUnhealthyNMs();
break;
}
}
private void updateMetricsForDeactivatedNode(RMNodeState finalState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
metrics.decrNumActiveNodes();
switch (finalState) {
case DECOMMISSIONED:
metrics.incrDecommisionedNMs();
break;
case LOST:
metrics.incrNumLostNMs();
break;
case REBOOTED:
metrics.incrNumRebootedNMs();
break;
case UNHEALTHY:
metrics.incrNumUnhealthyNMs();
break;
}
}
public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
private void updateMetrics(RMNodeState nodeState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
switch (nodeState) {
case LOST:
metrics.decrNumLostNMs();
break;
case REBOOTED:
metrics.decrNumRebootedNMs();
break;
case DECOMMISSIONED:
metrics.decrDecommisionedNMs();
break;
}
}
@SuppressWarnings("unchecked")
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
@ -333,12 +361,14 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
RMNode node = rmNode.context.getInactiveRMNodes().get(host);
// Old node rejoining
RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host);
rmNode.context.getInactiveRMNodes().remove(host);
updateMetrics(node.getState());
rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
} else {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
}
ClusterMetrics.getMetrics().addNode();
}
}
@ -362,28 +392,33 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
}
public static class RemoveNodeTransition
public static class DeactivateNodeTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@SuppressWarnings("unchecked")
private final RMNodeState finalState;
public DeactivateNodeTransition(RMNodeState finalState) {
this.finalState = finalState;
}
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
// Remove the node from the system.
// Deactivate the node
rmNode.context.getRMNodes().remove(rmNode.nodeId);
LOG.info("Removed Node " + rmNode.nodeId);
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
//Update the metrics
ClusterMetrics.getMetrics().removeNode(event.getType());
//Update the metrics
rmNode.updateMetricsForDeactivatedNode(finalState);
}
}
public static class StatusUpdateWhenHealthyTransition implements
MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
@SuppressWarnings("unchecked")
@Override
public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
@ -399,7 +434,8 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
ClusterMetrics.getMetrics().incrNumUnhealthyNMs();
// Update metrics
rmNode.updateMetricsForDeactivatedNode(RMNodeState.UNHEALTHY);
return RMNodeState.UNHEALTHY;
}
@ -458,11 +494,9 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
}
public static class StatusUpdateWhenUnHealthyTransition
implements
public static class StatusUpdateWhenUnHealthyTransition implements
MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
@SuppressWarnings("unchecked")
@Override
public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
@ -474,7 +508,8 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
// Update metrics
rmNode.updateMetricsForRejoinedNode(RMNodeState.UNHEALTHY);
return RMNodeState.RUNNING;
}

View File

@ -100,6 +100,12 @@ protected void render(Block html) {
if(!stateFilter.equals(state)) {
continue;
}
} else {
// No filter. User is asking for all nodes. Make sure you skip the
// unhealthy nodes.
if (ni.getState() == RMNodeState.UNHEALTHY) {
continue;
}
}
NodeInfo info = new NodeInfo(ni, sched);
int usedMemory = (int)info.getUsedMemory();

View File

@ -166,6 +166,12 @@ public NodesInfo getNodes(@QueryParam("state") String filterState,
if (!(nodeInfo.getState().equalsIgnoreCase(filterState))) {
continue;
}
} else {
// No filter. User is asking for all nodes. Make sure you skip the
// unhealthy nodes.
if (ni.getState() == RMNodeState.UNHEALTHY) {
continue;
}
}
if ((healthState != null) && (!healthState.isEmpty())) {
LOG.info("heatlh state is : " + healthState);

View File

@ -51,18 +51,23 @@ public static List<RMNode> newNodes(int racks, int nodesPerRack,
List<RMNode> list = Lists.newArrayList();
for (int i = 0; i < racks; ++i) {
for (int j = 0; j < nodesPerRack; ++j) {
if (j == (nodesPerRack - 1)) {
// One unhealthy node per rack.
list.add(nodeInfo(i, perNode, RMNodeState.UNHEALTHY));
}
list.add(newNodeInfo(i, perNode));
}
}
return list;
}
public static List<RMNode> lostNodes(int racks, int nodesPerRack,
public static List<RMNode> deactivatedNodes(int racks, int nodesPerRack,
Resource perNode) {
List<RMNode> list = Lists.newArrayList();
for (int i = 0; i < racks; ++i) {
for (int j = 0; j < nodesPerRack; ++j) {
list.add(lostNodeInfo(i, perNode, RMNodeState.LOST));
RMNodeState[] allStates = RMNodeState.values();
list.add(nodeInfo(i, perNode, allStates[j % allStates.length]));
}
}
return list;
@ -198,15 +203,20 @@ private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState
final String httpAddress = httpAddr;
final NodeHealthStatus nodeHealthStatus =
recordFactory.newRecordInstance(NodeHealthStatus.class);
if (state != RMNodeState.UNHEALTHY) {
nodeHealthStatus.setIsNodeHealthy(true);
nodeHealthStatus.setHealthReport("HealthyMe");
}
return new MockRMNodeImpl(nodeID, hostName, httpAddress, perNode, rackName,
nodeHealthStatus, nid, hostName, state);
}
public static RMNode lostNodeInfo(int rack, final Resource perNode, RMNodeState state) {
public static RMNode nodeInfo(int rack, final Resource perNode,
RMNodeState state) {
return buildRMNode(rack, perNode, state, "N/A");
}
public static RMNode newNodeInfo(int rack, final Resource perNode) {
return buildRMNode(rack, perNode, null, "localhost:0");
return buildRMNode(rack, perNode, RMNodeState.RUNNING, "localhost:0");
}
}

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodesPage.NodesBlock;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Before;
@ -39,7 +40,12 @@
public class TestNodesPage {
final int numberOfRacks = 2;
final int numberOfNodesPerRack = 2;
final int numberOfNodesPerRack = 6;
// The following is because of the way TestRMWebApp.mockRMContext creates
// nodes.
final int numberOfLostNodesPerRack = numberOfNodesPerRack
/ RMNodeState.values().length;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 10;
@ -49,20 +55,22 @@ public class TestNodesPage {
@Before
public void setUp() throws Exception {
injector = WebAppTests.createMockInjector(RMContext.class, TestRMWebApp
.mockRMContext(3, numberOfRacks, numberOfNodesPerRack,
8 * TestRMWebApp.GiB), new Module() {
@Override
public void configure(Binder binder) {
try {
binder.bind(ResourceManager.class).toInstance(
TestRMWebApp.mockRm(3, numberOfRacks, numberOfNodesPerRack,
8 * TestRMWebApp.GiB));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
});
final RMContext mockRMContext =
TestRMWebApp.mockRMContext(3, numberOfRacks, numberOfNodesPerRack,
8 * TestRMWebApp.GiB);
injector =
WebAppTests.createMockInjector(RMContext.class, mockRMContext,
new Module() {
@Override
public void configure(Binder binder) {
try {
binder.bind(ResourceManager.class).toInstance(
TestRMWebApp.mockRm(mockRMContext));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
});
}
@Test
@ -94,7 +102,7 @@ public void testNodesBlockRenderForLostNodes() {
.print("<th");
Mockito.verify(
writer,
Mockito.times(numberOfRacks * numberOfNodesPerRack
Mockito.times(numberOfRacks * numberOfLostNodesPerRack
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print(
"<td");
}

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@ -94,20 +95,38 @@ public void configure(Binder binder) {
}
@Test public void testNodesPage() {
// 10 nodes. Two of each type.
final RMContext rmContext = mockRMContext(3, 2, 12, 8*GiB);
Injector injector = WebAppTests.createMockInjector(RMContext.class,
mockRMContext(3, 1, 2, 8*GiB),
rmContext,
new Module() {
@Override
public void configure(Binder binder) {
try {
binder.bind(ResourceManager.class).toInstance(mockRm(3, 1, 2, 8*GiB));
binder.bind(ResourceManager.class).toInstance(mockRm(rmContext));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
});
injector.getInstance(NodesPage.class).render();
// All nodes
NodesPage instance = injector.getInstance(NodesPage.class);
instance.render();
WebAppTests.flushOutput(injector);
// Unhealthy nodes
instance.moreParams().put(YarnWebParams.NODE_STATE,
RMNodeState.UNHEALTHY.toString());
instance.render();
WebAppTests.flushOutput(injector);
// Lost nodes
instance.moreParams().put(YarnWebParams.NODE_STATE,
RMNodeState.LOST.toString());
instance.render();
WebAppTests.flushOutput(injector);
}
public static RMContext mockRMContext(int numApps, int racks, int numNodes,
@ -125,11 +144,12 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes,
nodesMap.put(node.getNodeID(), node);
}
final List<RMNode> lostNodes = MockNodes.lostNodes(racks, numNodes,
newResource(mbsPerNode));
final ConcurrentMap<String, RMNode> lostNodesMap = Maps.newConcurrentMap();
for (RMNode node : lostNodes) {
lostNodesMap.put(node.getHostName(), node);
final List<RMNode> deactivatedNodes =
MockNodes.deactivatedNodes(racks, numNodes, newResource(mbsPerNode));
final ConcurrentMap<String, RMNode> deactivatedNodesMap =
Maps.newConcurrentMap();
for (RMNode node : deactivatedNodes) {
deactivatedNodesMap.put(node.getHostName(), node);
}
return new RMContextImpl(new MemStore(), null, null, null, null) {
@Override
@ -138,7 +158,7 @@ public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
}
@Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return lostNodesMap;
return deactivatedNodesMap;
}
@Override
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
@ -149,9 +169,13 @@ public ConcurrentMap<NodeId, RMNode> getRMNodes() {
public static ResourceManager mockRm(int apps, int racks, int nodes,
int mbsPerNode) throws IOException {
ResourceManager rm = mock(ResourceManager.class);
RMContext rmContext = mockRMContext(apps, racks, nodes,
mbsPerNode);
mbsPerNode);
return mockRm(rmContext);
}
public static ResourceManager mockRm(RMContext rmContext) throws IOException {
ResourceManager rm = mock(ResourceManager.class);
ResourceScheduler rs = mockCapacityScheduler();
when(rm.getResourceScheduler()).thenReturn(rs);
when(rm.getRMContext()).thenReturn(rmContext);

View File

@ -55,6 +55,8 @@
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import clover.org.jfree.util.Log;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceServletContextListener;
@ -123,6 +125,46 @@ public void testNodesDefault() throws JSONException, Exception {
testNodesHelper("nodes/", "");
}
@Test
public void testNodesDefaultWithUnHealthyNode() throws JSONException,
Exception {
WebResource r = resource();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
MockNM nm2 = rm.registerNode("h2:1235", 5121);
rm.sendNodeStarted(nm1);
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING);
rm.NMwaitForState(nm2.getNodeId(), RMNodeState.NEW);
// One unhealthy node which should not appear in the list after
// MAPREDUCE-3760.
MockNM nm3 = rm.registerNode("h3:1236", 5122);
rm.NMwaitForState(nm3.getNodeId(), RMNodeState.NEW);
rm.sendNodeStarted(nm3);
rm.NMwaitForState(nm3.getNodeId(), RMNodeState.RUNNING);
RMNodeImpl node = (RMNodeImpl) rm.getRMContext().getRMNodes()
.get(nm3.getNodeId());
NodeHealthStatus nodeHealth = node.getNodeHealthStatus();
nodeHealth.setHealthReport("test health report");
nodeHealth.setIsNodeHealthy(false);
node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeHealth,
new ArrayList<ContainerStatus>(), null, null));
rm.NMwaitForState(nm3.getNodeId(), RMNodeState.UNHEALTHY);
ClientResponse response =
r.path("ws").path("v1").path("cluster").path("nodes")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject nodes = json.getJSONObject("nodes");
assertEquals("incorrect number of elements", 1, nodes.length());
JSONArray nodeArray = nodes.getJSONArray("node");
// Just 2 nodes, leaving behind the unhealthy node.
assertEquals("incorrect number of elements", 2, nodeArray.length());
}
@Test
public void testNodesQueryState() throws JSONException, Exception {
WebResource r = resource();