YARN-7625. Expose NM node/containers resource utilization in JVM metrics. Contributed by Weiwei Yang
This commit is contained in:
parent
8bb83a8f62
commit
06f0eb2dce
@ -35,6 +35,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
|
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||||
@ -125,4 +126,6 @@ public interface Context {
|
|||||||
ContainerStateTransitionListener getContainerStateTransitionListener();
|
ContainerStateTransitionListener getContainerStateTransitionListener();
|
||||||
|
|
||||||
ResourcePluginManager getResourcePluginManager();
|
ResourcePluginManager getResourcePluginManager();
|
||||||
|
|
||||||
|
NodeManagerMetrics getNodeManagerMetrics();
|
||||||
}
|
}
|
||||||
|
@ -205,7 +205,7 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected NodeResourceMonitor createNodeResourceMonitor() {
|
protected NodeResourceMonitor createNodeResourceMonitor() {
|
||||||
return new NodeResourceMonitorImpl();
|
return new NodeResourceMonitorImpl(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ContainerManagerImpl createContainerManager(Context context,
|
protected ContainerManagerImpl createContainerManager(Context context,
|
||||||
@ -242,6 +242,7 @@ protected NMContext createNMContext(
|
|||||||
NMContext nmContext = new NMContext(containerTokenSecretManager,
|
NMContext nmContext = new NMContext(containerTokenSecretManager,
|
||||||
nmTokenSecretManager, dirsHandler, aclsManager, stateStore,
|
nmTokenSecretManager, dirsHandler, aclsManager, stateStore,
|
||||||
isDistSchedulerEnabled, conf);
|
isDistSchedulerEnabled, conf);
|
||||||
|
nmContext.setNodeManagerMetrics(metrics);
|
||||||
DefaultContainerStateListener defaultListener =
|
DefaultContainerStateListener defaultListener =
|
||||||
new DefaultContainerStateListener();
|
new DefaultContainerStateListener();
|
||||||
nmContext.setContainerStateTransitionListener(defaultListener);
|
nmContext.setContainerStateTransitionListener(defaultListener);
|
||||||
@ -574,6 +575,8 @@ public static class NMContext implements Context {
|
|||||||
|
|
||||||
private Configuration conf = null;
|
private Configuration conf = null;
|
||||||
|
|
||||||
|
private NodeManagerMetrics metrics = null;
|
||||||
|
|
||||||
protected final ConcurrentMap<ApplicationId, Application> applications =
|
protected final ConcurrentMap<ApplicationId, Application> applications =
|
||||||
new ConcurrentHashMap<ApplicationId, Application>();
|
new ConcurrentHashMap<ApplicationId, Application>();
|
||||||
|
|
||||||
@ -823,6 +826,20 @@ public ResourcePluginManager getResourcePluginManager() {
|
|||||||
return resourcePluginManager;
|
return resourcePluginManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the {@link NodeManagerMetrics} instance of this node.
|
||||||
|
* This might return a null if the instance was not set to the context.
|
||||||
|
* @return node manager metrics.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public NodeManagerMetrics getNodeManagerMetrics() {
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNodeManagerMetrics(NodeManagerMetrics nmMetrics) {
|
||||||
|
this.metrics = nmMetrics;
|
||||||
|
}
|
||||||
|
|
||||||
public void setResourcePluginManager(
|
public void setResourcePluginManager(
|
||||||
ResourcePluginManager resourcePluginManager) {
|
ResourcePluginManager resourcePluginManager) {
|
||||||
this.resourcePluginManager = resourcePluginManager;
|
this.resourcePluginManager = resourcePluginManager;
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -48,12 +49,14 @@ public class NodeResourceMonitorImpl extends AbstractService implements
|
|||||||
/** Current <em>resource utilization</em> of the node. */
|
/** Current <em>resource utilization</em> of the node. */
|
||||||
private ResourceUtilization nodeUtilization;
|
private ResourceUtilization nodeUtilization;
|
||||||
|
|
||||||
|
private Context nmContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the node resource monitor.
|
* Initialize the node resource monitor.
|
||||||
*/
|
*/
|
||||||
public NodeResourceMonitorImpl() {
|
public NodeResourceMonitorImpl(Context context) {
|
||||||
super(NodeResourceMonitorImpl.class.getName());
|
super(NodeResourceMonitorImpl.class.getName());
|
||||||
|
this.nmContext = context;
|
||||||
this.monitoringThread = new MonitoringThread();
|
this.monitoringThread = new MonitoringThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -149,6 +152,15 @@ public void run() {
|
|||||||
(int) (vmem >> 20), // B -> MB
|
(int) (vmem >> 20), // B -> MB
|
||||||
vcores); // Used Virtual Cores
|
vcores); // Used Virtual Cores
|
||||||
|
|
||||||
|
// Publish the node utilization metrics to node manager
|
||||||
|
// metrics system.
|
||||||
|
NodeManagerMetrics nmMetrics = nmContext.getNodeManagerMetrics();
|
||||||
|
if (nmMetrics != null) {
|
||||||
|
nmMetrics.setNodeUsedMemGB(nodeUtilization.getPhysicalMemory());
|
||||||
|
nmMetrics.setNodeUsedVMemGB(nodeUtilization.getVirtualMemory());
|
||||||
|
nmMetrics.setNodeCpuUtilization(nodeUtilization.getCPU());
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(monitoringInterval);
|
Thread.sleep(monitoringInterval);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
@ -488,6 +489,18 @@ public void run() {
|
|||||||
// Save the aggregated utilization of the containers
|
// Save the aggregated utilization of the containers
|
||||||
setContainersUtilization(trackedContainersUtilization);
|
setContainersUtilization(trackedContainersUtilization);
|
||||||
|
|
||||||
|
// Publish the container utilization metrics to node manager
|
||||||
|
// metrics system.
|
||||||
|
NodeManagerMetrics nmMetrics = context.getNodeManagerMetrics();
|
||||||
|
if (nmMetrics != null) {
|
||||||
|
nmMetrics.setContainerUsedMemGB(
|
||||||
|
trackedContainersUtilization.getPhysicalMemory());
|
||||||
|
nmMetrics.setContainerUsedVMemGB(
|
||||||
|
trackedContainersUtilization.getVirtualMemory());
|
||||||
|
nmMetrics.setContainerCpuUtilization(
|
||||||
|
trackedContainersUtilization.getCPU());
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(monitoringInterval);
|
Thread.sleep(monitoringInterval);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableGaugeFloat;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
@ -77,6 +78,18 @@ public class NodeManagerMetrics {
|
|||||||
MutableGaugeLong publicBytesDeleted;
|
MutableGaugeLong publicBytesDeleted;
|
||||||
@Metric("# of bytes deleted from the private local cache")
|
@Metric("# of bytes deleted from the private local cache")
|
||||||
MutableGaugeLong privateBytesDeleted;
|
MutableGaugeLong privateBytesDeleted;
|
||||||
|
@Metric("Current used physical memory by all containers in GB")
|
||||||
|
MutableGaugeInt containerUsedMemGB;
|
||||||
|
@Metric("Current used virtual memory by all containers in GB")
|
||||||
|
MutableGaugeInt containerUsedVMemGB;
|
||||||
|
@Metric("Aggregated CPU utilization of all containers")
|
||||||
|
MutableGaugeFloat containerCpuUtilization;
|
||||||
|
@Metric("Current used memory by this node in GB")
|
||||||
|
MutableGaugeInt nodeUsedMemGB;
|
||||||
|
@Metric("Current used virtual memory by this node in GB")
|
||||||
|
MutableGaugeInt nodeUsedVMemGB;
|
||||||
|
@Metric("Current CPU utilization")
|
||||||
|
MutableGaugeFloat nodeCpuUtilization;
|
||||||
|
|
||||||
// CHECKSTYLE:ON:VisibilityModifier
|
// CHECKSTYLE:ON:VisibilityModifier
|
||||||
|
|
||||||
@ -316,4 +329,52 @@ public long getPublicBytesDeleted() {
|
|||||||
public long getPrivateBytesDeleted() {
|
public long getPrivateBytesDeleted() {
|
||||||
return this.privateBytesDeleted.value();
|
return this.privateBytesDeleted.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setContainerUsedMemGB(long usedMem) {
|
||||||
|
this.containerUsedMemGB.set((int)Math.floor(usedMem/1024d));
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getContainerUsedMemGB() {
|
||||||
|
return this.containerUsedMemGB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContainerUsedVMemGB(long usedVMem) {
|
||||||
|
this.containerUsedVMemGB.set((int)Math.floor(usedVMem/1024d));
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getContainerUsedVMemGB() {
|
||||||
|
return this.containerUsedVMemGB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContainerCpuUtilization(float cpuUtilization) {
|
||||||
|
this.containerCpuUtilization.set(cpuUtilization);
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getContainerCpuUtilization() {
|
||||||
|
return this.containerCpuUtilization.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNodeUsedMemGB(long totalUsedMemGB) {
|
||||||
|
this.nodeUsedMemGB.set((int)Math.floor(totalUsedMemGB/1024d));
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNodeUsedMemGB() {
|
||||||
|
return nodeUsedMemGB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNodeUsedVMemGB(long totalUsedVMemGB) {
|
||||||
|
this.nodeUsedVMemGB.set((int)Math.floor(totalUsedVMemGB/1024d));
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNodeUsedVMemGB() {
|
||||||
|
return nodeUsedVMemGB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getNodeCpuUtilization() {
|
||||||
|
return nodeCpuUtilization.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNodeCpuUtilization(float cpuUtilization) {
|
||||||
|
this.nodeCpuUtilization.set(cpuUtilization);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,18 +18,46 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import java.io.IOException;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager
|
||||||
|
.BaseContainerManagerTest;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager
|
||||||
|
.monitor.MockResourceCalculatorPlugin;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.timeout;
|
||||||
|
|
||||||
public class TestNodeResourceMonitor extends BaseContainerManagerTest {
|
public class TestNodeResourceMonitor extends BaseContainerManagerTest {
|
||||||
public TestNodeResourceMonitor() throws UnsupportedFileSystemException {
|
public TestNodeResourceMonitor() throws UnsupportedFileSystemException {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws IOException {
|
||||||
|
// Enable node resource monitor with a mocked resource calculator.
|
||||||
|
conf.set(
|
||||||
|
YarnConfiguration.NM_MON_RESOURCE_CALCULATOR,
|
||||||
|
MockResourceCalculatorPlugin.class.getCanonicalName());
|
||||||
|
super.setup();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeResourceMonitor() {
|
public void testMetricsUpdate() throws Exception {
|
||||||
NodeResourceMonitor nrm = new NodeResourceMonitorImpl();
|
// This test doesn't verify the correction of those metrics
|
||||||
|
// updated by the monitor, it only verifies that the monitor
|
||||||
|
// do publish these info to node manager metrics system in
|
||||||
|
// each monitor interval.
|
||||||
|
Context spyContext = spy(context);
|
||||||
|
NodeResourceMonitor nrm = new NodeResourceMonitorImpl(spyContext);
|
||||||
|
nrm.init(conf);
|
||||||
|
nrm.start();
|
||||||
|
Mockito.verify(spyContext, timeout(500).atLeastOnce())
|
||||||
|
.getNodeManagerMetrics();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -55,6 +55,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
|
||||||
@ -802,5 +803,10 @@ public ContainerExecutor getContainerExecutor() {
|
|||||||
public ResourcePluginManager getResourcePluginManager() {
|
public ResourcePluginManager getResourcePluginManager() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeManagerMetrics getNodeManagerMetrics() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -70,4 +70,9 @@ public long getCumulativeCpuTime() {
|
|||||||
public float getCpuUsagePercentage() {
|
public float getCpuUsagePercentage() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getNumVCoresUsed() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.timeout;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@ -67,7 +69,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
|
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
|
||||||
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
|
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
|
||||||
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
||||||
@ -75,6 +76,7 @@
|
|||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class TestContainersMonitor extends BaseContainerManagerTest {
|
public class TestContainersMonitor extends BaseContainerManagerTest {
|
||||||
@ -95,6 +97,22 @@ public void setup() throws IOException {
|
|||||||
super.setup();
|
super.setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsUpdate() throws Exception {
|
||||||
|
// This test doesn't verify the correction of those metrics
|
||||||
|
// updated by the monitor, it only verifies that the monitor
|
||||||
|
// do publish these info to node manager metrics system in
|
||||||
|
// each monitor interval.
|
||||||
|
Context spyContext = spy(context);
|
||||||
|
ContainersMonitorImpl cm =
|
||||||
|
new ContainersMonitorImpl(mock(ContainerExecutor.class),
|
||||||
|
mock(AsyncDispatcher.class), spyContext);
|
||||||
|
cm.init(getConfForCM(false, true, 1024, 2.1f));
|
||||||
|
cm.start();
|
||||||
|
Mockito.verify(spyContext, timeout(500).atLeastOnce())
|
||||||
|
.getNodeManagerMetrics();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test to verify the check for whether a process tree is over limit or not.
|
* Test to verify the check for whether a process tree is over limit or not.
|
||||||
*
|
*
|
||||||
|
Loading…
Reference in New Issue
Block a user