YARN-3980. Plumb resource-utilization info in node heartbeat through to the scheduler. (Inigo Goiri via kasha)

This commit is contained in:
Karthik Kambatla 2015-11-24 10:05:12 +05:30
parent f80dc6f499
commit 52948bb20b
19 changed files with 522 additions and 69 deletions

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -188,6 +189,16 @@ public List<Container> pullNewlyIncreasedContainers() {
// TODO Auto-generated method stub
return null;
}
@Override
public ResourceUtilization getAggregatedContainersUtilization() {
return null;
}
@Override
public ResourceUtilization getNodeUtilization() {
return null;
}
}
public static RMNode newNodeInfo(String rackName, String hostName,

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -176,4 +177,14 @@ public List<Container> pullNewlyIncreasedContainers() {
// TODO Auto-generated method stub
return null;
}
@Override
public ResourceUtilization getAggregatedContainersUtilization() {
return node.getAggregatedContainersUtilization();
}
@Override
public ResourceUtilization getNodeUtilization() {
return node.getNodeUtilization();
}
}

View File

@ -568,6 +568,9 @@ Release 2.8.0 - UNRELEASED
YARN-3454. Add efficient merge operation to RLESparseResourceAllocation
(Carlo Curino via asuresh)
YARN-3980. Plumb resource-utilization info in node heartbeat through to the
scheduler. (Inigo Goiri via kasha)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -75,7 +75,7 @@ public int getVirtualMemory() {
@Override
public void setVirtualMemory(int vmem) {
maybeInitBuilder();
builder.setPmem(vmem);
builder.setVmem(vmem);
}
@Override

View File

@ -417,7 +417,8 @@ private List<ApplicationId> createKeepAliveApplicationList() {
return appList;
}
private NodeStatus getNodeStatus(int responseId) throws IOException {
@VisibleForTesting
protected NodeStatus getNodeStatus(int responseId) throws IOException {
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());

View File

@ -461,10 +461,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// 4. Send status to RMNode, saving the latest response.
RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(),
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse,
remoteNodeStatus.getIncreasedContainers());
new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse);
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
nodeStatusEvent.setLogAggregationReportsForApps(request

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
/**
* Node managers information on available resources
@ -98,7 +99,19 @@ public interface RMNode {
* @return the total available resource.
*/
public Resource getTotalCapability();
/**
* the aggregated resource utilization of the containers.
* @return the aggregated resource utilization of the containers.
*/
public ResourceUtilization getAggregatedContainersUtilization();
/**
* the total resource utilization of the node.
* @return the total resource utilization of the node.
*/
public ResourceUtilization getNodeUtilization();
/**
* The rack name for this node manager.
* @return the rack name.

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
@ -114,6 +115,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private long lastHealthReportTime;
private String nodeManagerVersion;
/* Aggregated resource utilization for the containers. */
private ResourceUtilization containersUtilization;
/* Resource utilization for the node. */
private ResourceUtilization nodeUtilization;
private final ContainerAllocationExpirer containerAllocationExpirer;
/* set of containers that have just launched */
private final Set<ContainerId> launchedContainers =
@ -445,6 +451,49 @@ public String getNodeManagerVersion() {
return nodeManagerVersion;
}
@Override
public ResourceUtilization getAggregatedContainersUtilization() {
this.readLock.lock();
try {
return this.containersUtilization;
} finally {
this.readLock.unlock();
}
}
public void setAggregatedContainersUtilization(
ResourceUtilization containersUtilization) {
this.writeLock.lock();
try {
this.containersUtilization = containersUtilization;
} finally {
this.writeLock.unlock();
}
}
@Override
public ResourceUtilization getNodeUtilization() {
this.readLock.lock();
try {
return this.nodeUtilization;
} finally {
this.readLock.unlock();
}
}
public void setNodeUtilization(ResourceUtilization nodeUtilization) {
this.writeLock.lock();
try {
this.nodeUtilization = nodeUtilization;
} finally {
this.writeLock.unlock();
}
}
@Override
public NodeState getState() {
this.readLock.lock();
@ -1006,6 +1055,9 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime());
rmNode.setAggregatedContainersUtilization(
statusEvent.getAggregatedContainersUtilization());
rmNode.setNodeUtilization(statusEvent.getNodeUtilization());
NodeState initialState = rmNode.getState();
boolean isNodeDecommissioning =
initialState.equals(NodeState.DECOMMISSIONING);
@ -1083,6 +1135,9 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime());
rmNode.setAggregatedContainersUtilization(
statusEvent.getAggregatedContainersUtilization());
rmNode.setNodeUtilization(statusEvent.getNodeUtilization());
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));

View File

@ -28,52 +28,35 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
public class RMNodeStatusEvent extends RMNodeEvent {
private final NodeHealthStatus nodeHealthStatus;
private final List<ContainerStatus> containersCollection;
private final NodeStatus nodeStatus;
private final NodeHeartbeatResponse latestResponse;
private final List<ApplicationId> keepAliveAppIds;
private List<LogAggregationReport> logAggregationReportsForApps;
private final List<Container> nmReportedIncreasedContainers;
// Used by tests
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
NodeHeartbeatResponse latestResponse) {
this(nodeId, nodeHealthStatus, collection, keepAliveAppIds,
latestResponse, null);
this(nodeId, nodeStatus, latestResponse, null);
}
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
NodeHeartbeatResponse latestResponse,
List<Container> nmReportedIncreasedContainers) {
this(nodeId, nodeHealthStatus, collection, keepAliveAppIds, latestResponse,
null, nmReportedIncreasedContainers);
}
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
NodeHeartbeatResponse latestResponse,
List<LogAggregationReport> logAggregationReportsForApps,
List<Container> nmReportedIncreasedContainers) {
List<LogAggregationReport> logAggregationReportsForApps) {
super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus;
this.containersCollection = collection;
this.keepAliveAppIds = keepAliveAppIds;
this.nodeStatus = nodeStatus;
this.latestResponse = latestResponse;
this.logAggregationReportsForApps = logAggregationReportsForApps;
this.nmReportedIncreasedContainers = nmReportedIncreasedContainers;
}
public NodeHealthStatus getNodeHealthStatus() {
return this.nodeHealthStatus;
return this.nodeStatus.getNodeHealthStatus();
}
public List<ContainerStatus> getContainers() {
return this.containersCollection;
return this.nodeStatus.getContainersStatuses();
}
public NodeHeartbeatResponse getLatestResponse() {
@ -81,7 +64,15 @@ public NodeHeartbeatResponse getLatestResponse() {
}
public List<ApplicationId> getKeepAliveAppIds() {
return this.keepAliveAppIds;
return this.nodeStatus.getKeepAliveApplications();
}
public ResourceUtilization getAggregatedContainersUtilization() {
return this.nodeStatus.getContainersUtilization();
}
public ResourceUtilization getNodeUtilization() {
return this.nodeStatus.getNodeUtilization();
}
public List<LogAggregationReport> getLogAggregationReportsForApps() {
@ -95,7 +86,7 @@ public void setLogAggregationReportsForApps(
@SuppressWarnings("unchecked")
public List<Container> getNMReportedIncreasedContainers() {
return nmReportedIncreasedContainers == null ? Collections.EMPTY_LIST
: nmReportedIncreasedContainers;
return this.nodeStatus.getIncreasedContainers() == null ?
Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers();
}
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@ -58,6 +59,10 @@ public abstract class SchedulerNode {
private Resource totalResourceCapability;
private RMContainer reservedContainer;
private volatile int numContainers;
private volatile ResourceUtilization containersUtilization =
ResourceUtilization.newInstance(0, 0, 0f);
private volatile ResourceUtilization nodeUtilization =
ResourceUtilization.newInstance(0, 0, 0f);
/* set of containers that are allocated containers */
@ -339,4 +344,37 @@ public String getPartition() {
return this.labels.iterator().next();
}
}
/**
* Set the resource utilization of the containers in the node.
* @param containersUtilization Resource utilization of the containers.
*/
public void setAggregatedContainersUtilization(
ResourceUtilization containersUtilization) {
this.containersUtilization = containersUtilization;
}
/**
* Get the resource utilization of the containers in the node.
* @return Resource utilization of the containers.
*/
public ResourceUtilization getAggregatedContainersUtilization() {
return this.containersUtilization;
}
/**
* Set the resource utilization of the node. This includes the containers.
* @param nodeUtilization Resource utilization of the node.
*/
public void setNodeUtilization(ResourceUtilization nodeUtilization) {
this.nodeUtilization = nodeUtilization;
}
/**
* Get the resource utilization of the node.
* @return Resource utilization of the node.
*/
public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
}

View File

@ -1063,6 +1063,11 @@ private synchronized void nodeUpdate(RMNode nm) {
releaseResources);
schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
// Updating node resource utilization
node.setAggregatedContainersUtilization(
nm.getAggregatedContainersUtilization());
node.setNodeUtilization(nm.getNodeUtilization());
// Now node data structures are upto date and ready for scheduling.
if(LOG.isDebugEnabled()) {
LOG.debug("Node being looked for scheduling " + nm

View File

@ -1069,6 +1069,11 @@ private synchronized void nodeUpdate(RMNode nm) {
attemptScheduling(node);
}
// Updating node resource utilization
node.setAggregatedContainersUtilization(
nm.getAggregatedContainersUtilization());
node.setNodeUtilization(nm.getNodeUtilization());
long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);
}

View File

@ -743,6 +743,10 @@ private synchronized void nodeUpdate(RMNode rmNode) {
completedContainer, RMContainerEventType.FINISHED);
}
// Updating node resource utilization
node.setAggregatedContainersUtilization(
rmNode.getAggregatedContainersUtilization());
node.setNodeUtilization(rmNode.getNodeUtilization());
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@ -110,11 +111,14 @@ private static class MockRMNodeImpl implements RMNode {
private long lastHealthReportTime;
private NodeState state;
private Set<String> labels;
private ResourceUtilization containersUtilization;
private ResourceUtilization nodeUtilization;
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport,
long lastHealthReportTime, int cmdPort, String hostName, NodeState state,
Set<String> labels) {
Set<String> labels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) {
this.nodeId = nodeId;
this.nodeAddr = nodeAddr;
this.httpAddress = httpAddress;
@ -126,6 +130,8 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
this.hostName = hostName;
this.state = state;
this.labels = labels;
this.containersUtilization = containersUtilization;
this.nodeUtilization = nodeUtilization;
}
@Override
@ -244,6 +250,16 @@ public void updateNodeHeartbeatResponseForContainersDecreasing(
public List<Container> pullNewlyIncreasedContainers() {
return Collections.emptyList();
}
@Override
public ResourceUtilization getAggregatedContainersUtilization() {
return this.containersUtilization;
}
@Override
public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
};
private static RMNode buildRMNode(int rack, final Resource perNode,
@ -254,18 +270,19 @@ private static RMNode buildRMNode(int rack, final Resource perNode,
private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, Set<String> labels) {
return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123,
labels);
labels, null, null);
}
private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, int hostnum, String hostName, int port) {
return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port,
null);
null, null, null);
}
private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, int hostnum, String hostName, int port,
Set<String> labels) {
Set<String> labels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) {
final String rackName = "rack"+ rack;
final int nid = hostnum;
final String nodeAddr = hostName + ":" + nid;
@ -277,7 +294,8 @@ private static RMNode buildRMNode(int rack, final Resource perNode,
final String httpAddress = httpAddr;
String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode,
rackName, healthReport, 0, nid, hostName, state, labels);
rackName, healthReport, 0, nid, hostName, state, labels,
containersUtilization, nodeUtilization);
}
public static RMNode nodeInfo(int rack, final Resource perNode,

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
@ -647,8 +648,9 @@ public void testUpdateHeartbeatResponseForAppLifeCycle() {
statusList.add(status);
NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
"", System.currentTimeMillis());
node.handle(new RMNodeStatusEvent(nodeId, nodeHealth,
statusList, null, null));
NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, 0, statusList, null,
nodeHealth, null, null, null);
node.handle(new RMNodeStatusEvent(nodeId, nodeStatus, null));
Assert.assertEquals(1, node.getRunningApps().size());
@ -689,8 +691,9 @@ private RMNodeImpl getUnhealthyNode() {
RMNodeImpl node = getRunningNode();
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
System.currentTimeMillis());
node.handle(new RMNodeStatusEvent(node.getNodeID(), status,
new ArrayList<ContainerStatus>(), null, null));
NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
new ArrayList<ContainerStatus>(), null, status, null, null, null);
node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
return node;
}
@ -863,8 +866,9 @@ public void testDecommissioningUnhealthy() {
RMNodeImpl node = getDecommissioningNode();
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
System.currentTimeMillis());
node.handle(new RMNodeStatusEvent(node.getNodeID(), status,
new ArrayList<ContainerStatus>(), null, null));
NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
new ArrayList<ContainerStatus>(), null, status, null, null, null);
node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
}

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@ -163,9 +164,11 @@ public void testLogAggregationStatus() throws Exception {
LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING,
messageForNode1_1);
node1ReportForApp.add(report1);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node1ReportForApp, null));
NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0,
new ArrayList<ContainerStatus>(), null,
NodeHealthStatus.newInstance(true, null, 0), null, null, null);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
node1ReportForApp));
List<LogAggregationReport> node2ReportForApp =
new ArrayList<LogAggregationReport>();
@ -175,9 +178,11 @@ public void testLogAggregationStatus() throws Exception {
LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode2_1);
node2ReportForApp.add(report2);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node2ReportForApp, null));
NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0,
new ArrayList<ContainerStatus>(), null,
NodeHealthStatus.newInstance(true, null, 0), null, null, null);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
node2ReportForApp));
// node1 and node2 has updated its log aggregation status
// verify that the log aggregation status for node1, node2
// has been changed
@ -213,9 +218,8 @@ public void testLogAggregationStatus() throws Exception {
LogAggregationReport.newInstance(appId,
LogAggregationStatus.RUNNING, messageForNode1_2);
node1ReportForApp2.add(report1_2);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node1ReportForApp2, null));
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
node1ReportForApp2));
// verify that the log aggregation status for node1
// has been changed
@ -282,9 +286,8 @@ public void testLogAggregationStatus() throws Exception {
LogAggregationStatus.SUCCEEDED, ""));
// For every logAggregationReport cached in memory, we can only save at most
// 10 diagnostic messages/failure messages
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node1ReportForApp3, null));
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
node1ReportForApp3));
logAggregationStatus = rmApp.getLogAggregationReportsForApp();
Assert.assertEquals(2, logAggregationStatus.size());
@ -327,9 +330,8 @@ public void testLogAggregationStatus() throws Exception {
LogAggregationStatus.FAILED, "");
node2ReportForApp2.add(report2_2);
node2ReportForApp2.add(report2_3);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
null, node2ReportForApp2, null));
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
node2ReportForApp2));
Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport());
logAggregationStatus = rmApp.getLogAggregationReportsForApp();

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -142,8 +143,9 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException,
.get(nm3.getNodeId());
NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false,
"test health report", System.currentTimeMillis());
node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeHealth,
new ArrayList<ContainerStatus>(), null, null));
NodeStatus nodeStatus = NodeStatus.newInstance(nm3.getNodeId(), 1,
new ArrayList<ContainerStatus>(), null, nodeHealth, null, null, null);
node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeStatus, null));
rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY);
ClientResponse response =

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
@ -594,19 +595,66 @@ protected synchronized void serviceStop() throws Exception {
}
}
private class CustomNodeManager extends NodeManager {
public class CustomNodeManager extends NodeManager {
protected NodeStatus nodeStatus;
public void setNodeStatus(NodeStatus status) {
this.nodeStatus = status;
}
/**
* Hook to allow modification/replacement of NodeStatus
* @param currentStatus Current status.
* @return New node status.
*/
protected NodeStatus getSimulatedNodeStatus(NodeStatus currentStatus) {
if(nodeStatus == null) {
return currentStatus;
} else {
// Increment response ID, the RMNodeStatusEvent will not get recorded
// for a duplicate heartbeat
nodeStatus.setResponseId(nodeStatus.getResponseId() + 1);
return nodeStatus;
}
}
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context,
dispatcher,
healthChecker,
metrics) {
// Allow simulation of nodestatus
@Override
protected NodeStatus getNodeStatus(int responseId) throws IOException {
return getSimulatedNodeStatus(super.getNodeStatus(responseId));
}
};
}
}
private class ShortCircuitedNodeManager extends CustomNodeManager {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher,
healthChecker, metrics) {
return new NodeStatusUpdaterImpl(context,
dispatcher,
healthChecker,
metrics) {
// Allow simulation of nodestatus
@Override
protected NodeStatus getNodeStatus(int responseId) throws IOException {
return getSimulatedNodeStatus(super.getNodeStatus(responseId));
}
@Override
protected ResourceTracker getRMClient() {
final ResourceTrackerService rt =

View File

@ -0,0 +1,245 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster.CustomNodeManager;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.junit.Before;
import org.junit.Test;
public class TestMiniYarnClusterNodeUtilization {
// Mini YARN cluster setup
private static final int NUM_RM = 1;
private static final int NUM_NM = 1;
// Values for the first round
private static final int CONTAINER_PMEM_1 = 1024;
private static final int CONTAINER_VMEM_1 = 2048;
private static final float CONTAINER_CPU_1 = 11.0f;
private static final int NODE_PMEM_1 = 10240;
private static final int NODE_VMEM_1 = 20480;
private static final float NODE_CPU_1 = 51.0f;
// Values for the second round
private static final int CONTAINER_PMEM_2 = 2048;
private static final int CONTAINER_VMEM_2 = 4096;
private static final float CONTAINER_CPU_2 = 22.0f;
private static final int NODE_PMEM_2 = 20480;
private static final int NODE_VMEM_2 = 40960;
private static final float NODE_CPU_2 = 61.0f;
private MiniYARNCluster cluster;
private CustomNodeManager nm;
private Configuration conf;
private NodeStatus nodeStatus;
@Before
public void setup() {
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
String name = TestMiniYarnClusterNodeUtilization.class.getName();
cluster = new MiniYARNCluster(name, NUM_RM, NUM_NM, 1, 1);
cluster.init(conf);
cluster.start();
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
nm = (CustomNodeManager)cluster.getNodeManager(0);
int responseId = 1;
nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId,
CONTAINER_PMEM_1, CONTAINER_VMEM_1, CONTAINER_CPU_1,
NODE_PMEM_1, NODE_VMEM_1, NODE_CPU_1);
nm.setNodeStatus(nodeStatus);
}
/**
* Simulates a NM heartbeat using the simulated NodeStatus fixture. Verify
* both the RMNode and SchedulerNode have been updated with the new
* utilization.
*/
@Test(timeout=60000)
public void testUpdateNodeUtilization()
throws InterruptedException, IOException, YarnException {
assertTrue("NMs fail to connect to the RM",
cluster.waitForNodeManagersToConnect(10000));
// Simulate heartbeat using NodeStatus fixture
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus, null, null, null);
ResourceTracker tracker =
ServerRMProxy.createRMProxy(conf, ResourceTracker.class);
tracker.nodeHeartbeat(request);
// Give the heartbeat time to propagate to the RM
verifySimulatedUtilization();
// Alter utilization
int responseId = 10;
nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId,
CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2,
NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2);
nm.setNodeStatus(nodeStatus);
tracker.nodeHeartbeat(request);
// Give the heartbeat time to propagate to the RM
verifySimulatedUtilization();
}
/**
* Trigger the NM to send a heartbeat using the simulated NodeStatus fixture.
* Verify both the RMNode and SchedulerNode have been updated with the new
* utilization.
*/
@Test(timeout=60000)
public void testMockNodeStatusHeartbeat()
throws InterruptedException, YarnException {
assertTrue("NMs fail to connect to the RM",
cluster.waitForNodeManagersToConnect(10000));
NodeStatusUpdater updater = nm.getNodeStatusUpdater();
updater.sendOutofBandHeartBeat();
// Give the heartbeat time to propagate to the RM
verifySimulatedUtilization();
// Alter utilization
int responseId = 20;
nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), responseId,
CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2,
NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2);
nm.setNodeStatus(nodeStatus);
updater.sendOutofBandHeartBeat();
verifySimulatedUtilization();
}
/**
* Create a NodeStatus test vector.
* @param nodeId Node identifier.
* @param responseId Response identifier.
* @param containerPMem Virtual memory of the container.
* @param containerVMem Physical memory of the container.
* @param containerCPU CPU percentage of the container.
* @param nodePMem Physical memory of the node.
* @param nodeVMem Virtual memory of the node.
* @param nodeCPU CPU percentage of the node.
*/
private NodeStatus createNodeStatus(
NodeId nodeId,
int responseId,
int containerPMem,
int containerVMem,
float containerCPU,
int nodePMem,
int nodeVMem,
float nodeCPU) {
// Fake node status with fake utilization
ResourceUtilization containersUtilization =
ResourceUtilization.newInstance(containerPMem, containerVMem,
containerCPU);
ResourceUtilization nodeUtilization =
ResourceUtilization.newInstance(nodePMem, nodeVMem, nodeCPU);
NodeStatus status = NodeStatus.newInstance(
nodeId,
responseId,
new ArrayList<ContainerStatus>(),
null,
NodeHealthStatus.newInstance(true, null, 0),
containersUtilization,
nodeUtilization,
null);
return status;
}
/**
* Verify both the RMNode and SchedulerNode have been updated with the test
* fixture utilization data.
* @param containersUtilization Utilization of the container.
* @param nodeUtilization Utilization of the node.
*/
private void verifySimulatedUtilization() throws InterruptedException {
ResourceManager rm = cluster.getResourceManager(0);
RMContext rmContext = rm.getRMContext();
ResourceUtilization containersUtilization =
nodeStatus.getContainersUtilization();
ResourceUtilization nodeUtilization =
nodeStatus.getNodeUtilization();
// Give the heartbeat time to propagate to the RM (max 10 seconds)
// We check if the nodeUtilization is up to date
for (int i=0; i<100; i++) {
for (RMNode ni : rmContext.getRMNodes().values()) {
if (ni.getNodeUtilization().equals(nodeUtilization)) {
break;
}
}
Thread.sleep(100);
}
// Verify the data is readable from the RM and scheduler nodes
for (RMNode ni : rmContext.getRMNodes().values()) {
ResourceUtilization cu = ni.getAggregatedContainersUtilization();
assertEquals("Containers Utillization not propagated to RMNode",
containersUtilization, cu);
ResourceUtilization nu = ni.getNodeUtilization();
assertEquals("Node Utillization not propagated to RMNode",
nodeUtilization, nu);
SchedulerNode scheduler =
rmContext.getScheduler().getSchedulerNode(ni.getNodeID());
cu = scheduler.getAggregatedContainersUtilization();
assertEquals("Containers Utillization not propagated to SchedulerNode",
containersUtilization, cu);
nu = scheduler.getNodeUtilization();
assertEquals("Node Utillization not propagated to SchedulerNode",
nodeUtilization, nu);
}
}
}