YARN-5356. NodeManager should communicate physical resource capability to ResourceManager. Contributed by Inigo Goiri

This commit is contained in:
Jason Lowe 2016-11-08 22:01:26 +00:00
parent dbb133ccfc
commit 3f93ac0733
14 changed files with 166 additions and 22 deletions

View File

@ -218,6 +218,11 @@ public void setUntrackedTimeStamp(long timeStamp) {
public Integer getDecommissioningTimeout() {
return null;
}
@Override
public Resource getPhysicalResource() {
return null;
}
}
public static RMNode newNodeInfo(String rackName, String hostName,

View File

@ -207,4 +207,9 @@ public void setUntrackedTimeStamp(long timeStamp) {
public Integer getDecommissioningTimeout() {
return null;
}
@Override
public Resource getPhysicalResource() {
return null;
}
}

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SysInfo;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* Plugin to calculate resource information on the system.
@ -195,4 +196,42 @@ public static ResourceCalculatorPlugin getResourceCalculatorPlugin(
return null;
}
/**
* Create the ResourceCalculatorPlugin for the containers monitor in the Node
* Manager and configure it. If the plugin is not configured, this method
* will try and return a memory calculator plugin available for this system.
*
* @param conf Configure the plugin with this.
* @return ResourceCalculatorPlugin or null if ResourceCalculatorPlugin is
* not available for current system.
*/
public static ResourceCalculatorPlugin getContainersMonitorPlugin(
Configuration conf) {
Class<? extends ResourceCalculatorPlugin> clazzNM = conf.getClass(
YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class);
Class<? extends ResourceCalculatorPlugin> clazz = conf.getClass(
YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, clazzNM,
ResourceCalculatorPlugin.class);
return ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
}
/**
* Create the ResourceCalculatorPlugin for the node resource monitor in the
* Node Manager and configure it. If the plugin is not configured, this
* method will try and return a memory calculator plugin available for this
* system.
*
* @param conf Configure the plugin with this.
* @return ResourceCalculatorPlugin or null if ResourceCalculatorPlugin is
* not available for current system.
*/
public static ResourceCalculatorPlugin getNodeResourceMonitorPlugin(
Configuration conf) {
Class<? extends ResourceCalculatorPlugin> clazz = conf.getClass(
YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class);
return ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
}
}

View File

@ -41,6 +41,15 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels) {
return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
containerStatuses, runningApplications, nodeLabels, null);
}
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
Resource physicalResource) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
@ -50,6 +59,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
request.setContainerStatuses(containerStatuses);
request.setRunningApplications(runningApplications);
request.setNodeLabels(nodeLabels);
request.setPhysicalResource(physicalResource);
return request;
}
@ -88,4 +98,18 @@ public abstract void setContainerStatuses(
*/
public abstract void setRunningApplications(
List<ApplicationId> runningApplications);
/**
* Get the physical resources in the node to properly estimate resource
* utilization.
* @return Physical resources in the node.
*/
public abstract Resource getPhysicalResource();
/**
* Set the physical resources in the node to properly estimate resource
* utilization.
* @param physicalResource Physical resources in the node.
*/
public abstract void setPhysicalResource(Resource physicalResource);
}

View File

@ -56,6 +56,9 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private List<ApplicationId> runningApplications = null;
private Set<NodeLabel> labels = null;
/** Physical resources in the node. */
private Resource physicalResource = null;
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
}
@ -93,6 +96,9 @@ private synchronized void mergeLocalToBuilder() {
}
builder.setNodeLabels(newBuilder.build());
}
if (this.physicalResource != null) {
builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
}
}
private synchronized void addNMContainerStatusesToProto() {
@ -270,6 +276,28 @@ public synchronized void setContainerStatuses(
this.containerStatuses.addAll(containerReports);
}
@Override
public synchronized Resource getPhysicalResource() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.physicalResource != null) {
return this.physicalResource;
}
if (!p.hasPhysicalResource()) {
return null;
}
this.physicalResource = convertFromProtoFormat(p.getPhysicalResource());
return this.physicalResource;
}
@Override
public synchronized void setPhysicalResource(Resource pPhysicalResource) {
maybeInitBuilder();
if (pPhysicalResource == null) {
builder.clearPhysicalResource();
}
this.physicalResource = pPhysicalResource;
}
@Override
public int hashCode() {
return getProto().hashCode();

View File

@ -63,6 +63,7 @@ message RegisterNodeManagerRequestProto {
repeated NMContainerStatusProto container_statuses = 6;
repeated ApplicationIdProto runningApplications = 7;
optional NodeLabelsProto nodeLabels = 8;
optional ResourceProto physicalResource = 9;
}
message RegisterNodeManagerResponseProto {

View File

@ -204,6 +204,7 @@ public void testRegisterNodeManagerRequestPBImpl() {
resource.setMemorySize(10000);
resource.setVirtualCores(2);
original.setResource(resource);
original.setPhysicalResource(resource);
RegisterNodeManagerRequestPBImpl copy = new RegisterNodeManagerRequestPBImpl(
original.getProto());
@ -211,6 +212,8 @@ public void testRegisterNodeManagerRequestPBImpl() {
assertEquals(9090, copy.getNodeId().getPort());
assertEquals(10000, copy.getResource().getMemorySize());
assertEquals(2, copy.getResource().getVirtualCores());
assertEquals(10000, copy.getPhysicalResource().getMemorySize());
assertEquals(2, copy.getPhysicalResource().getVirtualCores());
}

View File

@ -66,12 +66,8 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS);
Class<? extends ResourceCalculatorPlugin> clazz =
conf.getClass(YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class);
this.resourceCalculatorPlugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
ResourceCalculatorPlugin.getNodeResourceMonitorPlugin(conf);
LOG.info(" Using ResourceCalculatorPlugin : "
+ this.resourceCalculatorPlugin);

View File

@ -91,6 +91,7 @@
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
@ -113,6 +114,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private long nextHeartBeatInterval;
private ResourceTracker resourceTracker;
private Resource totalResource;
private Resource physicalResource;
private int httpPort;
private String nodeManagerVersionId;
private String minimumResourceManagerVersion;
@ -187,6 +189,19 @@ protected void serviceInit(Configuration conf) throws Exception {
this.totalResource = Resource.newInstance(memoryMb, virtualCores);
metrics.addResource(totalResource);
// Get actual node physical resources
int physicalMemoryMb = memoryMb;
int physicalCores = virtualCores;
ResourceCalculatorPlugin rcp =
ResourceCalculatorPlugin.getNodeResourceMonitorPlugin(conf);
if (rcp != null) {
physicalMemoryMb = (int) (rcp.getPhysicalMemorySize() / (1024 * 1024));
physicalCores = rcp.getNumProcessors();
}
this.physicalResource =
Resource.newInstance(physicalMemoryMb, physicalCores);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
@ -343,7 +358,7 @@ protected void registerWithRM()
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications(),
nodeLabels);
nodeLabels, physicalResource);
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}

View File

@ -125,15 +125,8 @@ protected void serviceInit(Configuration myConf) throws Exception {
this.conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS));
Class<? extends ResourceCalculatorPlugin> clazz =
this.conf.getClass(YarnConfiguration
.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
this.conf.getClass(
YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class),
ResourceCalculatorPlugin.class);
this.resourceCalculatorPlugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, this.conf);
ResourceCalculatorPlugin.getContainersMonitorPlugin(this.conf);
LOG.info(" Using ResourceCalculatorPlugin : "
+ this.resourceCalculatorPlugin);
processTreeClass = this.conf.getClass(

View File

@ -318,6 +318,7 @@ public RegisterNodeManagerResponse registerNodeManager(
int httpPort = request.getHttpPort();
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
Resource physicalResource = request.getPhysicalResource();
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
@ -387,7 +388,7 @@ public RegisterNodeManagerResponse registerNodeManager(
.getCurrentKey());
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability, nodeManagerVersion);
resolve(host), capability, nodeManagerVersion, physicalResource);
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {

View File

@ -113,6 +113,12 @@ public interface RMNode {
*/
public ResourceUtilization getNodeUtilization();
/**
* the physical resources in the node.
* @return the physical resources in the node.
*/
Resource getPhysicalResource();
/**
* The rack name for this node manager.
* @return the rack name.

View File

@ -133,6 +133,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* Resource utilization for the node. */
private ResourceUtilization nodeUtilization;
/** Physical resources in the node. */
private volatile Resource physicalResource;
/* Container Queue Information for the node.. Used by Distributed Scheduler */
private OpportunisticContainersStatus opportunisticContainersStatus;
@ -353,7 +356,15 @@ RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
RMNodeEvent> stateMachine;
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) {
int cmPort, int httpPort, Node node, Resource capability,
String nodeManagerVersion) {
this(nodeId, context, hostName, cmPort, httpPort, node, capability,
nodeManagerVersion, null);
}
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
int cmPort, int httpPort, Node node, Resource capability,
String nodeManagerVersion, Resource physResource) {
this.nodeId = nodeId;
this.context = context;
this.hostName = hostName;
@ -367,6 +378,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion;
this.timeStamp = 0;
this.physicalResource = physResource;
this.latestNodeHeartBeatResponse.setResponseId(0);
@ -526,6 +538,15 @@ public void setNodeUtilization(ResourceUtilization nodeUtilization) {
}
}
@Override
public Resource getPhysicalResource() {
return this.physicalResource;
}
public void setPhysicalResource(Resource physicalResource) {
this.physicalResource = physicalResource;
}
@Override
public NodeState getState() {
this.readLock.lock();

View File

@ -112,12 +112,13 @@ private static class MockRMNodeImpl implements RMNode {
private Set<String> labels;
private ResourceUtilization containersUtilization;
private ResourceUtilization nodeUtilization;
private Resource physicalResource;
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport,
long lastHealthReportTime, int cmdPort, String hostName, NodeState state,
Set<String> labels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) {
ResourceUtilization nodeUtilization, Resource pPhysicalResource) {
this.nodeId = nodeId;
this.nodeAddr = nodeAddr;
this.httpAddress = httpAddress;
@ -131,6 +132,7 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
this.labels = labels;
this.containersUtilization = containersUtilization;
this.nodeUtilization = nodeUtilization;
this.physicalResource = pPhysicalResource;
}
@Override
@ -277,6 +279,11 @@ public void setUntrackedTimeStamp(long timeStamp) {
public Integer getDecommissioningTimeout() {
return null;
}
@Override
public Resource getPhysicalResource() {
return this.physicalResource;
}
};
private static RMNode buildRMNode(int rack, final Resource perNode,
@ -287,19 +294,19 @@ private static RMNode buildRMNode(int rack, final Resource perNode,
private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, Set<String> labels) {
return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123,
labels, null, null);
labels, null, null, null);
}
private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, int hostnum, String hostName, int port) {
return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port,
null, null, null);
null, null, null, null);
}
private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, int hostnum, String hostName, int port,
Set<String> labels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) {
ResourceUtilization nodeUtilization, Resource physicalResource) {
final String rackName = "rack"+ rack;
final int nid = hostnum;
final String nodeAddr = hostName + ":" + nid;
@ -312,7 +319,7 @@ private static RMNode buildRMNode(int rack, final Resource perNode,
String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode,
rackName, healthReport, 0, nid, hostName, state, labels,
containersUtilization, nodeUtilization);
containersUtilization, nodeUtilization, physicalResource);
}
public static RMNode nodeInfo(int rack, final Resource perNode,