diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 4cf8aef18f..32567db666 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -246,6 +246,13 @@ public boolean isUpdatedCapability() {
@Override
public void resetUpdatedCapability() {
}
+
+ @Override
+ public long calculateHeartBeatInterval(
+ long defaultInterval, long minInterval, long maxInterval,
+ float speedupFactor, float slowdownFactor) {
+ return defaultInterval;
+ }
}
public static RMNode newNodeInfo(String rackName, String hostName,
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 750b708f89..b5ae4f5b3c 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -231,4 +231,11 @@ public boolean isUpdatedCapability() {
@Override
public void resetUpdatedCapability() {
}
+
+ @Override
+ public long calculateHeartBeatInterval(
+ long defaultInterval, long minInterval, long maxInterval,
+ float speedupFactor, float slowdownFactor) {
+ return defaultInterval;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 763b3e0317..cf0c5e97d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -688,6 +688,30 @@ public static boolean isAclEnabled(Configuration conf) {
RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000;
+ /** Enable Heartbeat Interval Scaling based on cpu utilization. */
+ public static final String RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE =
+ RM_PREFIX + "nodemanagers.heartbeat-interval-scaling-enable";
+ public static final boolean
+ DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE = false;
+
+ public static final String RM_NM_HEARTBEAT_INTERVAL_MIN_MS =
+ RM_PREFIX + "nodemanagers.heartbeat-interval-min-ms";
+ public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS = 1000;
+
+ public static final String RM_NM_HEARTBEAT_INTERVAL_MAX_MS =
+ RM_PREFIX + "nodemanagers.heartbeat-interval-max-ms";
+ public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS = 1000;
+
+ public static final String RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR =
+ RM_PREFIX + "nodemanagers.heartbeat-interval-speedup-factor";
+ public static final float
+ DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR = 1.0f;
+
+ public static final String RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR =
+ RM_PREFIX + "nodemanagers.heartbeat-interval-slowdown-factor";
+ public static final float
+ DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f;
+
/** Number of worker threads that write the history data. */
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 815ddd3073..4349d56731 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -860,6 +860,56 @@
1000
+
+ Enables heart-beat interval scaling. The NodeManager
+ heart-beat interval will scale based on the difference between the CPU
+ utilization on the node and the cluster-wide average CPU utilization.
+
+
+ yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable
+
+ false
+
+
+
+ If heart-beat interval scaling is enabled, this is the
+ minimum heart-beat interval in milliseconds
+
+ yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms
+ 1000
+
+
+
+ If heart-beat interval scaling is enabled, this is the
+ maximum heart-beat interval in milliseconds
+ yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms
+ 1000
+
+
+
+ If heart-beat interval scaling is enabled, this controls
+ the degree of adjustment when speeding up heartbeat intervals.
+ At 1.0, 20% less than average CPU utilization will result in a 20%
+ decrease in heartbeat interval.
+
+
+ yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor
+
+ 1.0
+
+
+
+ If heart-beat interval scaling is enabled, this controls
+ the degree of adjustment when slowing down heartbeat intervals.
+ At 1.0, 20% greater than average CPU utilization will result in a 20%
+ increase in heartbeat interval.
+
+
+ yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor
+
+ 1.0
+
+
The minimum allowed version of a connecting nodemanager. The valid values are
NONE (no version checking), EqualToRM (the nodemanager's version is equal to
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 2790155b85..62ff1fe2cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -730,6 +730,14 @@ public RefreshNodesResourcesResponse refreshNodesResources(
// refresh dynamic resource in ResourceTrackerService
this.rm.getRMContext().getResourceTrackerService().
updateDynamicResourceConfiguration(newConf);
+
+ // Update our heartbeat configuration as well
+ Configuration ysconf =
+ getConfiguration(new Configuration(false),
+ YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+ this.rm.getRMContext().getResourceTrackerService()
+ .updateHeartBeatConfiguration(ysconf);
+
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
return response;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
index 752d6a877b..37f4ec436d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
@@ -53,6 +54,8 @@ public class ClusterMetrics {
private MutableRate aMContainerAllocationDelay;
@Metric("Memory Utilization") MutableGaugeLong utilizedMB;
@Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores;
+ @Metric("Memory Capability") MutableGaugeLong capabilityMB;
+ @Metric("Vcore Capability") MutableGaugeLong capabilityVirtualCores;
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
"Metrics for the Yarn Cluster");
@@ -83,7 +86,7 @@ private static void registerMetrics() {
}
@VisibleForTesting
- synchronized static void destroy() {
+ public synchronized static void destroy() {
isInitialized.set(false);
INSTANCE = null;
}
@@ -195,6 +198,28 @@ public void addAMRegisterDelay(long delay) {
aMRegisterDelay.add(delay);
}
+ public long getCapabilityMB() {
+ return capabilityMB.value();
+ }
+
+ public long getCapabilityVirtualCores() {
+ return capabilityVirtualCores.value();
+ }
+
+ public void incrCapability(Resource res) {
+ if (res != null) {
+ capabilityMB.incr(res.getMemorySize());
+ capabilityVirtualCores.incr(res.getVirtualCores());
+ }
+ }
+
+ public void decrCapability(Resource res) {
+ if (res != null) {
+ capabilityMB.decr(res.getMemorySize());
+ capabilityVirtualCores.decr(res.getVirtualCores());
+ }
+ }
+
public void addAMContainerAllocationDelay(long delay) {
aMContainerAllocationDelay.add(delay);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 2a09751665..1f79e2b7d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -55,7 +55,6 @@
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -114,6 +113,13 @@ public class ResourceTrackerService extends AbstractService implements
private final WriteLock writeLock;
private long nextHeartBeatInterval;
+ private boolean heartBeatIntervalScalingEnable;
+ private long heartBeatIntervalMin;
+ private long heartBeatIntervalMax;
+ private float heartBeatIntervalSpeedupFactor;
+ private float heartBeatIntervalSlowdownFactor;
+
+
private Server server;
private InetSocketAddress resourceTrackerAddress;
private String minimumNodeManagerVersion;
@@ -157,14 +163,6 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
RackResolver.init(conf);
- nextHeartBeatInterval =
- conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
- YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
- if (nextHeartBeatInterval <= 0) {
- throw new YarnRuntimeException("Invalid Configuration. "
- + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
- + " should be larger than 0.");
- }
checkIpHostnameInRegistration = conf.getBoolean(
YarnConfiguration.RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
@@ -188,7 +186,7 @@ protected void serviceInit(Configuration conf) throws Exception {
isDelegatedCentralizedNodeLabelsConf =
YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
}
-
+ updateHeartBeatConfiguration(conf);
loadDynamicResourceConfiguration(conf);
decommissioningWatcher.init(conf);
super.serviceInit(conf);
@@ -233,6 +231,84 @@ public void updateDynamicResourceConfiguration(
}
}
+ /**
+ * Update HearBeatConfiguration with new configuration.
+ * @param conf Yarn Configuration
+ */
+ public void updateHeartBeatConfiguration(Configuration conf) {
+ this.writeLock.lock();
+ try {
+ nextHeartBeatInterval =
+ conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
+ heartBeatIntervalScalingEnable =
+ conf.getBoolean(
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE,
+ YarnConfiguration.
+ DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE);
+ heartBeatIntervalMin =
+ conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MIN_MS,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS);
+ heartBeatIntervalMax =
+ conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MAX_MS,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS);
+ heartBeatIntervalSpeedupFactor =
+ conf.getFloat(
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR,
+ YarnConfiguration.
+ DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR);
+ heartBeatIntervalSlowdownFactor =
+ conf.getFloat(
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR,
+ YarnConfiguration.
+ DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR);
+
+ if (nextHeartBeatInterval <= 0) {
+ LOG.warn("HeartBeat interval: " + nextHeartBeatInterval
+ + " must be greater than 0, using default.");
+ nextHeartBeatInterval =
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS;
+ }
+
+ if (heartBeatIntervalScalingEnable) {
+ if (heartBeatIntervalMin <= 0
+ || heartBeatIntervalMin > heartBeatIntervalMax
+ || nextHeartBeatInterval < heartBeatIntervalMin
+ || nextHeartBeatInterval > heartBeatIntervalMax) {
+ LOG.warn("Invalid NM Heartbeat Configuration. "
+ + "Required: 0 < minimum <= interval <= maximum. Got: 0 < "
+ + heartBeatIntervalMin + " <= "
+ + nextHeartBeatInterval + " <= "
+ + heartBeatIntervalMax
+ + " Setting min and max to configured interval.");
+ heartBeatIntervalMin = nextHeartBeatInterval;
+ heartBeatIntervalMax = nextHeartBeatInterval;
+ }
+ if (heartBeatIntervalSpeedupFactor < 0
+ || heartBeatIntervalSlowdownFactor < 0) {
+ LOG.warn(
+ "Heartbeat scaling factors must be >= 0 "
+ + " SpeedupFactor:" + heartBeatIntervalSpeedupFactor
+ + " SlowdownFactor:" + heartBeatIntervalSlowdownFactor
+ + ". Using Defaults");
+ heartBeatIntervalSlowdownFactor =
+ YarnConfiguration.
+ DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR;
+ heartBeatIntervalSpeedupFactor =
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR;
+ }
+ LOG.info("Heartbeat Scaling Configuration: "
+ + " defaultInterval:" + nextHeartBeatInterval
+ + " minimumInterval:" + heartBeatIntervalMin
+ + " maximumInterval:" + heartBeatIntervalMax
+ + " speedupFactor:" + heartBeatIntervalSpeedupFactor
+ + " slowdownFactor:" + heartBeatIntervalSlowdownFactor);
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
@@ -629,10 +705,17 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
}
// Heartbeat response
+ long newInterval = nextHeartBeatInterval;
+ if (heartBeatIntervalScalingEnable) {
+ newInterval = rmNode.calculateHeartBeatInterval(
+ nextHeartBeatInterval, heartBeatIntervalMin,
+ heartBeatIntervalMax, heartBeatIntervalSpeedupFactor,
+ heartBeatIntervalSlowdownFactor);
+ }
NodeHeartbeatResponse nodeHeartBeatResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(
getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
- NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
+ NodeAction.NORMAL, null, null, null, null, newInterval);
rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index d3b515e824..e6205d2dac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -212,4 +212,8 @@ public interface RMNode {
* @return all node attributes as a Set.
*/
Set getAllNodeAttributes();
+
+ long calculateHeartBeatInterval(long defaultInterval,
+ long minInterval, long maxInterval, float speedupFactor,
+ float slowdownFactor);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index af6f914f8d..fc7e88ba12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -716,6 +716,48 @@ public void resetLastNodeHeartBeatResponse() {
}
}
+ @Override
+ public long calculateHeartBeatInterval(long defaultInterval, long minInterval,
+ long maxInterval, float speedupFactor, float slowdownFactor) {
+
+ long newInterval = defaultInterval;
+
+ ClusterMetrics metrics = ClusterMetrics.getMetrics();
+ float clusterUtil = metrics.getUtilizedVirtualCores()
+ / Math.max(1.0f, metrics.getCapabilityVirtualCores());
+
+ if (this.nodeUtilization != null && this.getPhysicalResource() != null) {
+ // getCPU() returns utilization normalized to 1 cpu. getVirtualCores() on
+ // a physicalResource returns number of physical cores. So,
+ // nodeUtil will be CPU utilization of entire node.
+ float nodeUtil = this.nodeUtilization.getCPU()
+ / Math.max(1.0f, this.getPhysicalResource().getVirtualCores());
+
+ // sanitize
+ nodeUtil = Math.min(1.0f, Math.max(0.0f, nodeUtil));
+ clusterUtil = Math.min(1.0f, Math.max(0.0f, clusterUtil));
+
+ if (nodeUtil > clusterUtil) {
+ // Slow down - 20% more CPU utilization means slow down by 20% * factor
+ newInterval = (long) (defaultInterval
+ * (1.0f + (nodeUtil - clusterUtil) * slowdownFactor));
+ } else {
+ // Speed up - 20% less CPU utilization means speed up by 20% * factor
+ newInterval = (long) (defaultInterval
+ * (1.0f - (clusterUtil - nodeUtil) * speedupFactor));
+ }
+ newInterval =
+ Math.min(maxInterval, Math.max(minInterval, newInterval));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting heartbeatinterval to: " + newInterval
+ + " node:" + this.nodeId + " nodeUtil: " + nodeUtil
+ + " clusterUtil: " + clusterUtil);
+ }
+ }
+ return newInterval;
+ }
+
public void handle(RMNodeEvent event) {
LOG.debug("Processing {} of type {}", event.getNodeId(), event.getType());
writeLock.lock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 4ad3b37a5d..c39d57d3b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -106,6 +107,7 @@ public void addNode(N node) {
// Update cluster capacity
Resources.addTo(clusterCapacity, node.getTotalResource());
staleClusterCapacity = Resources.clone(clusterCapacity);
+ ClusterMetrics.getMetrics().incrCapability(node.getTotalResource());
// Update maximumAllocation
updateMaxResources(node, true);
@@ -201,6 +203,7 @@ public N removeNode(NodeId nodeId) {
// Update cluster capacity
Resources.subtractFrom(clusterCapacity, node.getTotalResource());
staleClusterCapacity = Resources.clone(clusterCapacity);
+ ClusterMetrics.getMetrics().decrCapability(node.getTotalResource());
// Update maximumAllocation
updateMaxResources(node, false);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 4ed5f3155e..0de6c572a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -335,6 +335,13 @@ public RMContext getRMContext() {
public Resource getPhysicalResource() {
return this.physicalResource;
}
+
+ @Override
+ public long calculateHeartBeatInterval(
+ long defaultInterval, long minInterval, long maxInterval,
+ float speedupFactor, float slowdownFactor) {
+ return defaultInterval;
+ }
};
private static RMNode buildRMNode(int rack, final Resource perNode,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 931a2e7758..b21bf394ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -1167,4 +1168,112 @@ public void testFinishedContainersPulledByAMOnNewNode() {
Assert.assertEquals(1, rmNode.getContainersToBeRemovedFromNM().size());
}
+
+ private void calcIntervalTest(RMNodeImpl rmNode, ResourceUtilization nodeUtil,
+ long hbDefault, long hbMin, long hbMax, float speedup, float slowdown,
+ float cpuUtil, long expectedHb) {
+ nodeUtil.setCPU(cpuUtil);
+ rmNode.setNodeUtilization(nodeUtil);
+ long hbInterval = rmNode.calculateHeartBeatInterval(hbDefault, hbMin, hbMax,
+ speedup, slowdown);
+ assertEquals("heartbeat interval incorrect", expectedHb, hbInterval);
+ }
+
+ @Test
+ public void testCalculateHeartBeatInterval() {
+ RMNodeImpl rmNode = getRunningNode();
+ Resource nodeCapability = rmNode.getTotalCapability();
+ ClusterMetrics metrics = ClusterMetrics.getMetrics();
+ // Set cluster capability to 10 * nodeCapability
+ int vcoreUnit = nodeCapability.getVirtualCores();
+ rmNode.setPhysicalResource(nodeCapability);
+ int clusterVcores = vcoreUnit * 10;
+ metrics.incrCapability(
+ Resource.newInstance(10 * nodeCapability.getMemorySize(),
+ clusterVcores));
+
+ long hbDefault = 2000;
+ long hbMin = 1500;
+ long hbMax = 2500;
+ float speedup = 1.0F;
+ float slowdown = 1.0F;
+ metrics.incrUtilizedVirtualCores(vcoreUnit * 5); // 50 % cluster util
+ ResourceUtilization nodeUtil = ResourceUtilization.newInstance(
+ 1024, vcoreUnit, 0.0F * vcoreUnit); // 0% rmNode util
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.10F, hbMin); // 10%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.20F, hbMin); // 20%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.30F, 1600); // 30%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.40F, 1800); // 40%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.60F, 2200); // 60%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.70F, 2400); // 70%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.80F, hbMax); // 80%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.90F, hbMax); // 90%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100%
+
+ // Try with 50% speedup/slowdown factors
+ speedup = 0.5F;
+ slowdown = 0.5F;
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.10F, 1600); // 10%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.20F, 1700); // 20%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.30F, 1800); // 30%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.40F, 1900); // 40%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.60F, 2100); // 60%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.70F, 2200); // 70%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.80F, 2300); // 80%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.90F, 2400); // 90%
+
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100%
+
+ // With Physical Resource null, it should always return default
+ rmNode.setPhysicalResource(null);
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 0.1F, hbDefault); // 10%
+ calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+ speedup, slowdown, vcoreUnit * 1.0F, hbDefault); // 100%
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
index c1703bc52e..14eca5ae5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
@@ -24,11 +24,13 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -40,12 +42,19 @@
*/
public class TestClusterNodeTracker {
private ClusterNodeTracker nodeTracker;
+ private ClusterMetrics metrics;
@Before
public void setup() {
+ metrics = ClusterMetrics.getMetrics();
nodeTracker = new ClusterNodeTracker<>();
}
+ @After
+ public void teardown() {
+ ClusterMetrics.destroy();
+ }
+
private void addEight4x4Nodes() {
MockNodes.resetHostIds();
List rmNodes =
@@ -65,6 +74,15 @@ public void testGetNodeCount() {
4, nodeTracker.nodeCount("rack0"));
}
+ @Test
+ public void testIncrCapability() {
+ addEight4x4Nodes();
+ assertEquals("Cluster Capability Memory incorrect",
+ metrics.getCapabilityMB(), (4096 * 8));
+ assertEquals("Cluster Capability Vcores incorrect",
+ metrics.getCapabilityVirtualCores(), 4 * 8);
+ }
+
@Test
public void testGetNodesForResourceName() throws Exception {
addEight4x4Nodes();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md
index d8c368cc8b..596a47e4e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md
@@ -219,3 +219,21 @@ The following parameters can be used to configure the container log dir sizes.
| `yarn.nodemanager.container-log-monitor.interval-ms` | Positive integer | How often to check the usage of a container's log directories in milliseconds. Default is 60000 ms. |
| `yarn.nodemanager.container-log-monitor.dir-size-limit-bytes` | Long | The disk space limit, in bytes, for a single container log directory. Default is 1000000000. |
| `yarn.nodemanager.container-log-monitor.total-size-limit-bytes` | Long | The disk space limit, in bytes, for all of a container's logs. The default is 10000000000. |
+
+Scale Heart-beat Interval Based on CPU Utilization
+-------------------------------------------------
+
+This allows a cluster admin to configure a cluster to allow the heart-beat between the Resource Manager and each NodeManager to be scaled based on the CPU utilization of the node compared to the overall CPU utilization of the cluster.
+
+### Configuration
+
+The following parameters can be used to configure the heart-beat interval and whether and how it scales.
+
+| Configuration Name | Allowed Values | Description |
+|:---- |:---- |:---- |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-ms` | Long | Specifies the default heart-beat interval in milliseconds for every NodeManager in the cluster. Default is 1000 ms. |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable` | true, false | Enables heart-beat interval scaling. If true, The NodeManager heart-beat interval will scale based on the difference between the CPU utilization on the node and the cluster-wide average CPU utilization. Default is false. |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms` | Positive Long | If heart-beat interval scaling is enabled, this is the minimum heart-beat interval in milliseconds. Default is 1000 ms. |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms` | Positive Long | If heart-beat interval scaling is enabled, this is the maximum heart-beat interval in milliseconds. Default is 1000 ms. |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor` | Positive Float | If heart-beat interval scaling is enabled, this controls the degree of adjustment when speeding up heartbeat intervals. At 1.0, 20% less than the average cluster-wide CPU utilization will result in a 20% decrease in the heartbeat interval. Default is 1.0. |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor` | Positive Float | If heart-beat interval scaling is enabled, this controls the degree of adjustment when slowing down heartbeat intervals. At 1.0, 20% greater than the average cluster-wide CPU utilization will result in a 20% increase in the heartbeat interval. Default is 1.0. |