MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes on the webapps. Contributed by Bhallamudi Venkata Siva Kamesh and Jason Lowe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1236433 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
54f738575c
commit
3cdc100369
@ -229,6 +229,9 @@ Release 0.23.1 - Unreleased
|
||||
MAPREDUCE-3718. Change default AM heartbeat interval to 1 second. (Hitesh
|
||||
Shah via sseth)
|
||||
|
||||
MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes
|
||||
on the webapps. (Bhallamudi Venkata Siva Kamesh and Jason Lowe via vinodkv)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob
|
||||
|
@ -29,7 +29,6 @@
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||
|
||||
@ -39,9 +38,9 @@ public class ClusterMetrics {
|
||||
|
||||
private static AtomicBoolean isInitialized = new AtomicBoolean(false);
|
||||
|
||||
@Metric("# of NMs") MutableGaugeInt numNMs;
|
||||
@Metric("# of decommissioned NMs") MutableCounterInt numDecommissionedNMs;
|
||||
@Metric("# of lost NMs") MutableCounterInt numLostNMs;
|
||||
@Metric("# of active NMs") MutableGaugeInt numNMs;
|
||||
@Metric("# of decommissioned NMs") MutableGaugeInt numDecommissionedNMs;
|
||||
@Metric("# of lost NMs") MutableGaugeInt numLostNMs;
|
||||
@Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
|
||||
@Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs;
|
||||
|
||||
@ -73,8 +72,8 @@ private static void registerMetrics() {
|
||||
}
|
||||
}
|
||||
|
||||
//Total Nodemanagers
|
||||
public int getNumNMs() {
|
||||
//Active Nodemanagers
|
||||
public int getNumActiveNMs() {
|
||||
return numNMs.value();
|
||||
}
|
||||
|
||||
@ -87,6 +86,10 @@ public void incrDecommisionedNMs() {
|
||||
numDecommissionedNMs.incr();
|
||||
}
|
||||
|
||||
public void decrDecommisionedNMs() {
|
||||
numDecommissionedNMs.decr();
|
||||
}
|
||||
|
||||
//Lost NMs
|
||||
public int getNumLostNMs() {
|
||||
return numLostNMs.value();
|
||||
@ -96,6 +99,10 @@ public void incrNumLostNMs() {
|
||||
numLostNMs.incr();
|
||||
}
|
||||
|
||||
public void decrNumLostNMs() {
|
||||
numLostNMs.decr();
|
||||
}
|
||||
|
||||
//Unhealthy NMs
|
||||
public int getUnhealthyNMs() {
|
||||
return numUnhealthyNMs.value();
|
||||
@ -118,6 +125,10 @@ public void incrNumRebootedNMs() {
|
||||
numRebootedNMs.incr();
|
||||
}
|
||||
|
||||
public void decrNumRebootedNMs() {
|
||||
numRebootedNMs.decr();
|
||||
}
|
||||
|
||||
public void removeNode(RMNodeEventType nodeEventType) {
|
||||
numNMs.decr();
|
||||
switch(nodeEventType){
|
||||
|
@ -43,6 +43,8 @@ public interface RMContext {
|
||||
ApplicationsStore getApplicationsStore();
|
||||
|
||||
ConcurrentMap<ApplicationId, RMApp> getRMApps();
|
||||
|
||||
ConcurrentMap<String, RMNode> getInactiveRMNodes();
|
||||
|
||||
ConcurrentMap<NodeId, RMNode> getRMNodes();
|
||||
|
||||
|
@ -43,6 +43,9 @@ public class RMContextImpl implements RMContext {
|
||||
|
||||
private final ConcurrentMap<NodeId, RMNode> nodes
|
||||
= new ConcurrentHashMap<NodeId, RMNode>();
|
||||
|
||||
private final ConcurrentMap<String, RMNode> inactiveNodes
|
||||
= new ConcurrentHashMap<String, RMNode>();
|
||||
|
||||
private AMLivelinessMonitor amLivelinessMonitor;
|
||||
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||
@ -83,6 +86,11 @@ public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
|
||||
return this.nodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
|
||||
return this.inactiveNodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerAllocationExpirer getContainerAllocationExpirer() {
|
||||
|
@ -220,10 +220,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
if (rmNode == null) {
|
||||
/* node does not exist */
|
||||
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
|
||||
|
||||
// Updating the metrics directly as reboot event cannot be
|
||||
// triggered on a null rmNode
|
||||
ClusterMetrics.getMetrics().incrNumRebootedNMs();
|
||||
return reboot;
|
||||
}
|
||||
|
||||
|
@ -119,7 +119,7 @@ RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
|
||||
RMNodeEventType.DECOMMISSION, new RemoveNodeTransition())
|
||||
.addTransition(RMNodeState.RUNNING, RMNodeState.LOST,
|
||||
RMNodeEventType.EXPIRE, new RemoveNodeTransition())
|
||||
.addTransition(RMNodeState.RUNNING, RMNodeState.LOST,
|
||||
.addTransition(RMNodeState.RUNNING, RMNodeState.REBOOTED,
|
||||
RMNodeEventType.REBOOTING, new RemoveNodeTransition())
|
||||
.addTransition(RMNodeState.RUNNING, RMNodeState.RUNNING,
|
||||
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
|
||||
@ -307,6 +307,21 @@ public void handle(RMNodeEvent event) {
|
||||
|
||||
public static class AddNodeTransition implements
|
||||
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
||||
|
||||
private void updateMetrics(RMNodeState nodeState) {
|
||||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||
switch (nodeState) {
|
||||
case LOST:
|
||||
metrics.decrNumLostNMs();
|
||||
break;
|
||||
case REBOOTED:
|
||||
metrics.decrNumRebootedNMs();
|
||||
break;
|
||||
case DECOMMISSIONED:
|
||||
metrics.decrDecommisionedNMs();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
@ -315,6 +330,13 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeAddedSchedulerEvent(rmNode));
|
||||
|
||||
String host = rmNode.nodeId.getHost();
|
||||
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
|
||||
RMNode node = rmNode.context.getInactiveRMNodes().get(host);
|
||||
rmNode.context.getInactiveRMNodes().remove(host);
|
||||
updateMetrics(node.getState());
|
||||
}
|
||||
|
||||
ClusterMetrics.getMetrics().addNode();
|
||||
}
|
||||
@ -353,7 +375,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
// Remove the node from the system.
|
||||
rmNode.context.getRMNodes().remove(rmNode.nodeId);
|
||||
LOG.info("Removed Node " + rmNode.nodeId);
|
||||
|
||||
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
|
||||
//Update the metrics
|
||||
ClusterMetrics.getMetrics().removeNode(event.getType());
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ protected void render(Block html) {
|
||||
th().$class("ui-state-default")._("Memory Used")._().
|
||||
th().$class("ui-state-default")._("Memory Total")._().
|
||||
th().$class("ui-state-default")._("Memory Reserved")._().
|
||||
th().$class("ui-state-default")._("Total Nodes")._().
|
||||
th().$class("ui-state-default")._("Active Nodes")._().
|
||||
th().$class("ui-state-default")._("Decommissioned Nodes")._().
|
||||
th().$class("ui-state-default")._("Lost Nodes")._().
|
||||
th().$class("ui-state-default")._("Unhealthy Nodes")._().
|
||||
@ -82,7 +82,7 @@ protected void render(Block html) {
|
||||
td(StringUtils.byteDesc(clusterMetrics.getAllocatedMB() * BYTES_IN_MB)).
|
||||
td(StringUtils.byteDesc(clusterMetrics.getTotalMB() * BYTES_IN_MB)).
|
||||
td(StringUtils.byteDesc(clusterMetrics.getReservedMB() * BYTES_IN_MB)).
|
||||
td().a(url("nodes"),String.valueOf(clusterMetrics.getTotalNodes()))._().
|
||||
td().a(url("nodes"),String.valueOf(clusterMetrics.getActiveNodes()))._().
|
||||
td().a(url("nodes/decommissioned"),String.valueOf(clusterMetrics.getDecommissionedNodes()))._().
|
||||
td().a(url("nodes/lost"),String.valueOf(clusterMetrics.getLostNodes()))._().
|
||||
td().a(url("nodes/unhealthy"),String.valueOf(clusterMetrics.getUnhealthyNodes()))._().
|
||||
|
@ -24,6 +24,8 @@
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
|
||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
@ -36,6 +38,7 @@
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
@ -79,7 +82,19 @@ protected void render(Block html) {
|
||||
if(type != null && !type.isEmpty()) {
|
||||
stateFilter = RMNodeState.valueOf(type.toUpperCase());
|
||||
}
|
||||
for (RMNode ni : this.rmContext.getRMNodes().values()) {
|
||||
Collection<RMNode> rmNodes = this.rmContext.getRMNodes().values();
|
||||
boolean isInactive = false;
|
||||
if (stateFilter != null) {
|
||||
switch (stateFilter) {
|
||||
case DECOMMISSIONED:
|
||||
case LOST:
|
||||
case REBOOTED:
|
||||
rmNodes = this.rmContext.getInactiveRMNodes().values();
|
||||
isInactive = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (RMNode ni : rmNodes) {
|
||||
if(stateFilter != null) {
|
||||
RMNodeState state = ni.getState();
|
||||
if(!stateFilter.equals(state)) {
|
||||
@ -89,12 +104,17 @@ protected void render(Block html) {
|
||||
NodeInfo info = new NodeInfo(ni, sched);
|
||||
int usedMemory = (int)info.getUsedMemory();
|
||||
int availableMemory = (int)info.getAvailableMemory();
|
||||
tbody.tr().
|
||||
TR<TBODY<TABLE<Hamlet>>> row = tbody.tr().
|
||||
td(info.getRack()).
|
||||
td(info.getState()).
|
||||
td(info.getNodeId()).
|
||||
td().a("http://" + info.getNodeHTTPAddress(), info.getNodeHTTPAddress())._().
|
||||
td(info.getHealthStatus()).
|
||||
td(info.getNodeId());
|
||||
if (isInactive) {
|
||||
row.td()._("N/A")._();
|
||||
} else {
|
||||
String httpAddress = info.getNodeHTTPAddress();
|
||||
row.td().a("http://" + httpAddress, httpAddress)._();
|
||||
}
|
||||
row.td(info.getHealthStatus()).
|
||||
td(Times.format(info.getLastHealthUpdate())).
|
||||
td(info.getHealthReport()).
|
||||
td(String.valueOf(info.getNumContainers())).
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
@ -68,6 +69,7 @@
|
||||
@Singleton
|
||||
@Path("/ws/v1/cluster")
|
||||
public class RMWebServices {
|
||||
private static final String EMPTY = "";
|
||||
private static final Log LOG = LogFactory.getLog(RMWebServices.class);
|
||||
private final ResourceManager rm;
|
||||
private static RecordFactory recordFactory = RecordFactoryProvider
|
||||
@ -144,12 +146,23 @@ public NodesInfo getNodes(@QueryParam("state") String filterState,
|
||||
if (sched == null) {
|
||||
throw new NotFoundException("Null ResourceScheduler instance");
|
||||
}
|
||||
|
||||
Collection<RMNode> rmNodes = this.rm.getRMContext().getRMNodes().values();
|
||||
boolean isInactive = false;
|
||||
if (filterState != null && !filterState.isEmpty()) {
|
||||
RMNodeState nodeState = RMNodeState.valueOf(filterState.toUpperCase());
|
||||
switch (nodeState) {
|
||||
case DECOMMISSIONED:
|
||||
case LOST:
|
||||
case REBOOTED:
|
||||
rmNodes = this.rm.getRMContext().getInactiveRMNodes().values();
|
||||
isInactive = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
NodesInfo allNodes = new NodesInfo();
|
||||
for (RMNode ni : this.rm.getRMContext().getRMNodes().values()) {
|
||||
for (RMNode ni : rmNodes) {
|
||||
NodeInfo nodeInfo = new NodeInfo(ni, sched);
|
||||
if (filterState != null) {
|
||||
RMNodeState.valueOf(filterState);
|
||||
if (!(nodeInfo.getState().equalsIgnoreCase(filterState))) {
|
||||
continue;
|
||||
}
|
||||
@ -165,6 +178,9 @@ public NodesInfo getNodes(@QueryParam("state") String filterState,
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (isInactive) {
|
||||
nodeInfo.setNodeHTTPAddress(EMPTY);
|
||||
}
|
||||
allNodes.add(nodeInfo);
|
||||
}
|
||||
return allNodes;
|
||||
@ -183,10 +199,19 @@ public NodeInfo getNode(@PathParam("nodeId") String nodeId) {
|
||||
}
|
||||
NodeId nid = ConverterUtils.toNodeId(nodeId);
|
||||
RMNode ni = this.rm.getRMContext().getRMNodes().get(nid);
|
||||
boolean isInactive = false;
|
||||
if (ni == null) {
|
||||
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
|
||||
ni = this.rm.getRMContext().getInactiveRMNodes().get(nid.getHost());
|
||||
if (ni == null) {
|
||||
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
|
||||
}
|
||||
isInactive = true;
|
||||
}
|
||||
return new NodeInfo(ni, sched);
|
||||
NodeInfo nodeInfo = new NodeInfo(ni, sched);
|
||||
if (isInactive) {
|
||||
nodeInfo.setNodeHTTPAddress(EMPTY);
|
||||
}
|
||||
return nodeInfo;
|
||||
}
|
||||
|
||||
@GET
|
||||
|
@ -44,6 +44,7 @@ public class ClusterMetricsInfo {
|
||||
protected int unhealthyNodes;
|
||||
protected int decommissionedNodes;
|
||||
protected int rebootedNodes;
|
||||
protected int activeNodes;
|
||||
|
||||
public ClusterMetricsInfo() {
|
||||
} // JAXB needs this
|
||||
@ -59,12 +60,13 @@ public ClusterMetricsInfo(final ResourceManager rm, final RMContext rmContext) {
|
||||
this.allocatedMB = metrics.getAllocatedGB() * MB_IN_GB;
|
||||
this.containersAllocated = metrics.getAllocatedContainers();
|
||||
this.totalMB = availableMB + reservedMB + allocatedMB;
|
||||
this.totalNodes = clusterMetrics.getNumNMs();
|
||||
this.activeNodes = clusterMetrics.getNumActiveNMs();
|
||||
this.lostNodes = clusterMetrics.getNumLostNMs();
|
||||
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
|
||||
this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
|
||||
this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
|
||||
|
||||
this.totalNodes = activeNodes + lostNodes + decommissionedNodes
|
||||
+ rebootedNodes;
|
||||
}
|
||||
|
||||
public int getAppsSubmitted() {
|
||||
@ -94,6 +96,10 @@ public long getTotalMB() {
|
||||
public int getTotalNodes() {
|
||||
return this.totalNodes;
|
||||
}
|
||||
|
||||
public int getActiveNodes() {
|
||||
return this.activeNodes;
|
||||
}
|
||||
|
||||
public int getLostNodes() {
|
||||
return this.lostNodes;
|
||||
|
@ -94,6 +94,10 @@ public String getNodeId() {
|
||||
public String getNodeHTTPAddress() {
|
||||
return this.nodeHTTPAddress;
|
||||
}
|
||||
|
||||
public void setNodeHTTPAddress(String nodeHTTPAddress) {
|
||||
this.nodeHTTPAddress = nodeHTTPAddress;
|
||||
}
|
||||
|
||||
public String getHealthStatus() {
|
||||
return this.healthStatus;
|
||||
|
@ -81,13 +81,20 @@ public NodeId registerNode() throws Exception {
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
|
||||
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
|
||||
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
|
||||
b, ++responseId);
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
|
||||
return nodeHeartbeat(conts, isHealthy, ++responseId);
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
|
||||
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
|
||||
NodeStatus status = Records.newRecord(NodeStatus.class);
|
||||
status.setResponseId(resId);
|
||||
status.setNodeId(nodeId);
|
||||
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
|
||||
status.setContainersStatuses(entry.getValue());
|
||||
@ -97,7 +104,6 @@ public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
healthStatus.setIsNodeHealthy(isHealthy);
|
||||
healthStatus.setLastHealthReportTime(1);
|
||||
status.setNodeHealthStatus(healthStatus);
|
||||
status.setResponseId(++responseId);
|
||||
req.setNodeStatus(status);
|
||||
return resourceTracker.nodeHeartbeat(req).getHeartbeatResponse();
|
||||
}
|
||||
|
@ -56,6 +56,17 @@ public static List<RMNode> newNodes(int racks, int nodesPerRack,
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
public static List<RMNode> lostNodes(int racks, int nodesPerRack,
|
||||
Resource perNode) {
|
||||
List<RMNode> list = Lists.newArrayList();
|
||||
for (int i = 0; i < racks; ++i) {
|
||||
for (int j = 0; j < nodesPerRack; ++j) {
|
||||
list.add(lostNodeInfo(i, perNode, RMNodeState.LOST));
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
public static NodeId newNodeID(String host, int port) {
|
||||
NodeId nid = recordFactory.newRecordInstance(NodeId.class);
|
||||
@ -82,92 +93,120 @@ public static Resource newAvailResource(Resource total, Resource used) {
|
||||
return rs;
|
||||
}
|
||||
|
||||
public static RMNode newNodeInfo(int rack, final Resource perNode) {
|
||||
private static class MockRMNodeImpl implements RMNode {
|
||||
private NodeId nodeId;
|
||||
private String hostName;
|
||||
private String nodeAddr;
|
||||
private String httpAddress;
|
||||
private int cmdPort;
|
||||
private Resource perNode;
|
||||
private String rackName;
|
||||
private NodeHealthStatus nodeHealthStatus;
|
||||
private RMNodeState state;
|
||||
|
||||
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
|
||||
Resource perNode, String rackName, NodeHealthStatus nodeHealthStatus,
|
||||
int cmdPort, String hostName, RMNodeState state) {
|
||||
this.nodeId = nodeId;
|
||||
this.nodeAddr = nodeAddr;
|
||||
this.httpAddress = httpAddress;
|
||||
this.perNode = perNode;
|
||||
this.rackName = rackName;
|
||||
this.nodeHealthStatus = nodeHealthStatus;
|
||||
this.cmdPort = cmdPort;
|
||||
this.hostName = hostName;
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeId getNodeID() {
|
||||
return this.nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHostName() {
|
||||
return this.hostName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCommandPort() {
|
||||
return this.cmdPort;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getHttpPort() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeAddress() {
|
||||
return this.nodeAddr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHttpAddress() {
|
||||
return this.httpAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeHealthStatus getNodeHealthStatus() {
|
||||
return this.nodeHealthStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getTotalCapability() {
|
||||
return this.perNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRackName() {
|
||||
return this.rackName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Node getNode() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMNodeState getState() {
|
||||
return this.state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerId> getContainersToCleanUp() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationId> getAppsToCleanup() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HeartbeatResponse getLastHeartBeatResponse() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
private static RMNode buildRMNode(int rack, final Resource perNode, RMNodeState state, String httpAddr) {
|
||||
final String rackName = "rack"+ rack;
|
||||
final int nid = NODE_ID++;
|
||||
final String hostName = "host"+ nid;
|
||||
final int port = 123;
|
||||
final NodeId nodeID = newNodeID(hostName, port);
|
||||
final String httpAddress = "localhost:0";
|
||||
final String httpAddress = httpAddr;
|
||||
final NodeHealthStatus nodeHealthStatus =
|
||||
recordFactory.newRecordInstance(NodeHealthStatus.class);
|
||||
final Resource used = newUsedResource(perNode);
|
||||
final Resource avail = newAvailResource(perNode, used);
|
||||
return new RMNode() {
|
||||
@Override
|
||||
public NodeId getNodeID() {
|
||||
return nodeID;
|
||||
}
|
||||
return new MockRMNodeImpl(nodeID, hostName, httpAddress, perNode, rackName,
|
||||
nodeHealthStatus, nid, hostName, state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeAddress() {
|
||||
return hostName;
|
||||
}
|
||||
public static RMNode lostNodeInfo(int rack, final Resource perNode, RMNodeState state) {
|
||||
return buildRMNode(rack, perNode, state, "N/A");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHttpAddress() {
|
||||
return httpAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getTotalCapability() {
|
||||
return perNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRackName() {
|
||||
return rackName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Node getNode() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeHealthStatus getNodeHealthStatus() {
|
||||
return nodeHealthStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCommandPort() {
|
||||
return nid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getHttpPort() {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHostName() {
|
||||
return hostName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMNodeState getState() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationId> getAppsToCleanup() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerId> getContainersToCleanUp() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HeartbeatResponse getLastHeartBeatResponse() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
};
|
||||
public static RMNode newNodeInfo(int rack, final Resource perNode) {
|
||||
return buildRMNode(rack, perNode, null, "localhost:0");
|
||||
}
|
||||
}
|
||||
|
@ -130,6 +130,12 @@ public void sendNodeStarted(MockNM nm) throws Exception {
|
||||
nm.getNodeId());
|
||||
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED));
|
||||
}
|
||||
|
||||
public void sendNodeLost(MockNM nm) throws Exception {
|
||||
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
||||
nm.getNodeId());
|
||||
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE));
|
||||
}
|
||||
|
||||
public void NMwaitForState(NodeId nodeid, RMNodeState finalState)
|
||||
throws Exception {
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
||||
@ -100,8 +101,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
rmDispatcher.register(SchedulerEventType.class,
|
||||
new TestSchedulerEventDispatcher());
|
||||
|
||||
|
||||
node = new RMNodeImpl(null, rmContext, null, 0, 0, null, null);
|
||||
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
|
||||
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
|
||||
|
||||
}
|
||||
|
||||
|
@ -157,14 +157,14 @@ public void testReboot() throws Exception {
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||
MockNM nm2 = new MockNM("host2:1234", 2048, rm.getResourceTrackerService());
|
||||
MockNM nm2 = rm.registerNode("host2:1234", 2048);
|
||||
|
||||
int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs();
|
||||
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||
|
||||
nodeHeartbeat = nm2.nodeHeartbeat(
|
||||
new HashMap<ApplicationId, List<ContainerStatus>>(), true);
|
||||
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
|
||||
Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
|
||||
checkRebootedNMCount(rm, ++initialMetricCount);
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodesPage.NodesBlock;
|
||||
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
@ -36,39 +37,65 @@
|
||||
* data for all the columns in the table as specified in the header.
|
||||
*/
|
||||
public class TestNodesPage {
|
||||
|
||||
final int numberOfRacks = 2;
|
||||
final int numberOfNodesPerRack = 2;
|
||||
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
|
||||
// future. In that case this value should be adjusted to the new value.
|
||||
final int numberOfThInMetricsTable = 10;
|
||||
final int numberOfActualTableHeaders = 10;
|
||||
|
||||
@Test
|
||||
public void testNodesBlockRender() throws Exception {
|
||||
final int numberOfRacks = 2;
|
||||
final int numberOfNodesPerRack = 2;
|
||||
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
|
||||
// future. In that case this value should be adjusted to the new value.
|
||||
final int numberOfThInMetricsTable = 10;
|
||||
final int numberOfActualTableHeaders = 10;
|
||||
|
||||
Injector injector = WebAppTests.createMockInjector(RMContext.class,
|
||||
TestRMWebApp.mockRMContext(3, numberOfRacks, numberOfNodesPerRack, 8*TestRMWebApp.GiB),
|
||||
new Module() {
|
||||
private Injector injector;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
injector = WebAppTests.createMockInjector(RMContext.class, TestRMWebApp
|
||||
.mockRMContext(3, numberOfRacks, numberOfNodesPerRack,
|
||||
8 * TestRMWebApp.GiB), new Module() {
|
||||
@Override
|
||||
public void configure(Binder binder) {
|
||||
try {
|
||||
binder.bind(ResourceManager.class).toInstance(TestRMWebApp.mockRm(3,
|
||||
numberOfRacks, numberOfNodesPerRack, 8*TestRMWebApp.GiB));
|
||||
binder.bind(ResourceManager.class).toInstance(
|
||||
TestRMWebApp.mockRm(3, numberOfRacks, numberOfNodesPerRack,
|
||||
8 * TestRMWebApp.GiB));
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodesBlockRender() throws Exception {
|
||||
injector.getInstance(NodesBlock.class).render();
|
||||
PrintWriter writer = injector.getInstance(PrintWriter.class);
|
||||
WebAppTests.flushOutput(injector);
|
||||
|
||||
Mockito.verify(writer, Mockito.times(numberOfActualTableHeaders +
|
||||
numberOfThInMetricsTable)).print(
|
||||
"<th");
|
||||
Mockito.verify(writer,
|
||||
Mockito.times(numberOfActualTableHeaders + numberOfThInMetricsTable))
|
||||
.print("<th");
|
||||
Mockito.verify(
|
||||
writer,
|
||||
Mockito.times(numberOfRacks * numberOfNodesPerRack
|
||||
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print("<td");
|
||||
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print(
|
||||
"<td");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodesBlockRenderForLostNodes() {
|
||||
NodesBlock nodesBlock = injector.getInstance(NodesBlock.class);
|
||||
nodesBlock.set("node.state", "lost");
|
||||
nodesBlock.render();
|
||||
PrintWriter writer = injector.getInstance(PrintWriter.class);
|
||||
WebAppTests.flushOutput(injector);
|
||||
|
||||
Mockito.verify(writer,
|
||||
Mockito.times(numberOfActualTableHeaders + numberOfThInMetricsTable))
|
||||
.print("<th");
|
||||
Mockito.verify(
|
||||
writer,
|
||||
Mockito.times(numberOfRacks * numberOfNodesPerRack
|
||||
* numberOfActualTableHeaders + numberOfThInMetricsTable)).print(
|
||||
"<td");
|
||||
}
|
||||
}
|
||||
|
@ -120,12 +120,23 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes,
|
||||
for (RMNode node : nodes) {
|
||||
nodesMap.put(node.getNodeID(), node);
|
||||
}
|
||||
|
||||
final List<RMNode> lostNodes = MockNodes.lostNodes(racks, numNodes,
|
||||
newResource(mbsPerNode));
|
||||
final ConcurrentMap<String, RMNode> lostNodesMap = Maps.newConcurrentMap();
|
||||
for (RMNode node : lostNodes) {
|
||||
lostNodesMap.put(node.getHostName(), node);
|
||||
}
|
||||
return new RMContextImpl(new MemStore(), null, null, null, null) {
|
||||
@Override
|
||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||
return applicationsMaps;
|
||||
}
|
||||
@Override
|
||||
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
|
||||
return lostNodesMap;
|
||||
}
|
||||
@Override
|
||||
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
|
||||
return nodesMap;
|
||||
}
|
||||
|
@ -370,7 +370,8 @@ public void verifyClusterMetricsXML(String xml) throws JSONException,
|
||||
WebServicesTestUtils.getXmlInt(element, "lostNodes"),
|
||||
WebServicesTestUtils.getXmlInt(element, "unhealthyNodes"),
|
||||
WebServicesTestUtils.getXmlInt(element, "decommissionedNodes"),
|
||||
WebServicesTestUtils.getXmlInt(element, "rebootedNodes"));
|
||||
WebServicesTestUtils.getXmlInt(element, "rebootedNodes"),
|
||||
WebServicesTestUtils.getXmlInt(element, "activeNodes"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -378,7 +379,7 @@ public void verifyClusterMetricsJSON(JSONObject json) throws JSONException,
|
||||
Exception {
|
||||
assertEquals("incorrect number of elements", 1, json.length());
|
||||
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
|
||||
assertEquals("incorrect number of elements", 11, clusterinfo.length());
|
||||
assertEquals("incorrect number of elements", 12, clusterinfo.length());
|
||||
verifyClusterMetrics(clusterinfo.getInt("appsSubmitted"),
|
||||
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
|
||||
clusterinfo.getInt("allocatedMB"),
|
||||
@ -386,13 +387,13 @@ public void verifyClusterMetricsJSON(JSONObject json) throws JSONException,
|
||||
clusterinfo.getInt("totalMB"), clusterinfo.getInt("totalNodes"),
|
||||
clusterinfo.getInt("lostNodes"), clusterinfo.getInt("unhealthyNodes"),
|
||||
clusterinfo.getInt("decommissionedNodes"),
|
||||
clusterinfo.getInt("rebootedNodes"));
|
||||
clusterinfo.getInt("rebootedNodes"),clusterinfo.getInt("activeNodes"));
|
||||
}
|
||||
|
||||
public void verifyClusterMetrics(int sub, int reservedMB, int availableMB,
|
||||
int allocMB, int containersAlloc, int totalMB, int totalNodes,
|
||||
int lostNodes, int unhealthyNodes, int decommissionedNodes,
|
||||
int rebootedNodes) throws JSONException, Exception {
|
||||
int rebootedNodes, int activeNodes) throws JSONException, Exception {
|
||||
|
||||
ResourceScheduler rs = rm.getResourceScheduler();
|
||||
QueueMetrics metrics = rs.getRootQueueMetrics();
|
||||
@ -412,8 +413,11 @@ public void verifyClusterMetrics(int sub, int reservedMB, int availableMB,
|
||||
* MB_IN_GB, allocMB);
|
||||
assertEquals("containersAllocated doesn't match", 0, containersAlloc);
|
||||
assertEquals("totalMB doesn't match", totalMBExpect, totalMB);
|
||||
assertEquals("totalNodes doesn't match", clusterMetrics.getNumNMs(),
|
||||
totalNodes);
|
||||
assertEquals(
|
||||
"totalNodes doesn't match",
|
||||
clusterMetrics.getNumActiveNMs() + clusterMetrics.getNumLostNMs()
|
||||
+ clusterMetrics.getNumDecommisionedNMs()
|
||||
+ clusterMetrics.getNumRebootedNMs(), totalNodes);
|
||||
assertEquals("lostNodes doesn't match", clusterMetrics.getNumLostNMs(),
|
||||
lostNodes);
|
||||
assertEquals("unhealthyNodes doesn't match",
|
||||
@ -422,6 +426,8 @@ public void verifyClusterMetrics(int sub, int reservedMB, int availableMB,
|
||||
clusterMetrics.getNumDecommisionedNMs(), decommissionedNodes);
|
||||
assertEquals("rebootedNodes doesn't match",
|
||||
clusterMetrics.getNumRebootedNMs(), rebootedNodes);
|
||||
assertEquals("activeNodes doesn't match", clusterMetrics.getNumActiveNMs(),
|
||||
activeNodes);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -202,6 +202,69 @@ public void testNodesQueryStateInvalid() throws JSONException, Exception {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodesQueryStateLost() throws JSONException, Exception {
|
||||
WebResource r = resource();
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||
MockNM nm2 = rm.registerNode("h2:1234", 5120);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.sendNodeStarted(nm2);
|
||||
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING);
|
||||
rm.NMwaitForState(nm2.getNodeId(), RMNodeState.RUNNING);
|
||||
rm.sendNodeLost(nm1);
|
||||
rm.sendNodeLost(nm2);
|
||||
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path("nodes").queryParam("state", RMNodeState.LOST.toString())
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
JSONObject nodes = json.getJSONObject("nodes");
|
||||
assertEquals("incorrect number of elements", 1, nodes.length());
|
||||
JSONArray nodeArray = nodes.getJSONArray("node");
|
||||
assertEquals("incorrect number of elements", 2, nodeArray.length());
|
||||
for (int i = 0; i < nodeArray.length(); ++i) {
|
||||
JSONObject info = nodeArray.getJSONObject(i);
|
||||
String host = info.get("id").toString().split(":")[0];
|
||||
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(host);
|
||||
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
|
||||
info.getString("nodeHTTPAddress"));
|
||||
WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
|
||||
.toString(), info.getString("state"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleNodeQueryStateLost() throws JSONException, Exception {
|
||||
WebResource r = resource();
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||
MockNM nm2 = rm.registerNode("h2:1234", 5120);
|
||||
rm.sendNodeStarted(nm1);
|
||||
rm.sendNodeStarted(nm2);
|
||||
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.RUNNING);
|
||||
rm.NMwaitForState(nm2.getNodeId(), RMNodeState.RUNNING);
|
||||
rm.sendNodeLost(nm1);
|
||||
rm.sendNodeLost(nm2);
|
||||
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path("nodes").path("h2:1234").accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
JSONObject info = json.getJSONObject("node");
|
||||
String id = info.get("id").toString();
|
||||
|
||||
assertEquals("Incorrect Node Information.", "h2:1234", id);
|
||||
|
||||
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get("h2");
|
||||
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
|
||||
info.getString("nodeHTTPAddress"));
|
||||
WebServicesTestUtils.checkStringMatch("state",
|
||||
rmNode.getState().toString(), info.getString("state"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodesQueryHealthy() throws JSONException, Exception {
|
||||
|
Loading…
Reference in New Issue
Block a user