YARN-10692. Add Node GPU Utilization and apply to NodeMetrics. Contributed by Qi Zhu.
This commit is contained in:
parent
a5745711dd
commit
38495af325
@ -20,8 +20,11 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuNodeResourceUpdateHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -46,6 +49,10 @@ public class NodeResourceMonitorImpl extends AbstractService implements
|
|||||||
/** Resource calculator. */
|
/** Resource calculator. */
|
||||||
private ResourceCalculatorPlugin resourceCalculatorPlugin;
|
private ResourceCalculatorPlugin resourceCalculatorPlugin;
|
||||||
|
|
||||||
|
/** Gpu related plugin. */
|
||||||
|
private GpuResourcePlugin gpuResourcePlugin;
|
||||||
|
private GpuNodeResourceUpdateHandler gpuNodeResourceUpdateHandler;
|
||||||
|
|
||||||
/** Current <em>resource utilization</em> of the node. */
|
/** Current <em>resource utilization</em> of the node. */
|
||||||
private ResourceUtilization nodeUtilization =
|
private ResourceUtilization nodeUtilization =
|
||||||
ResourceUtilization.newInstance(0, 0, 0f);
|
ResourceUtilization.newInstance(0, 0, 0f);
|
||||||
@ -72,6 +79,18 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
this.resourceCalculatorPlugin =
|
this.resourceCalculatorPlugin =
|
||||||
ResourceCalculatorPlugin.getNodeResourceMonitorPlugin(conf);
|
ResourceCalculatorPlugin.getNodeResourceMonitorPlugin(conf);
|
||||||
|
|
||||||
|
if (nmContext.getResourcePluginManager() != null) {
|
||||||
|
this.gpuResourcePlugin =
|
||||||
|
(GpuResourcePlugin)nmContext.getResourcePluginManager().
|
||||||
|
getNameToPlugins().get(ResourceInformation.GPU_URI);
|
||||||
|
|
||||||
|
if (gpuResourcePlugin != null) {
|
||||||
|
this.gpuNodeResourceUpdateHandler =
|
||||||
|
(GpuNodeResourceUpdateHandler)gpuResourcePlugin.
|
||||||
|
getNodeResourceHandlerInstance();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info(" Using ResourceCalculatorPlugin : "
|
LOG.info(" Using ResourceCalculatorPlugin : "
|
||||||
+ this.resourceCalculatorPlugin);
|
+ this.resourceCalculatorPlugin);
|
||||||
}
|
}
|
||||||
@ -152,6 +171,14 @@ public void run() {
|
|||||||
(int) (vmem >> 20), // B -> MB
|
(int) (vmem >> 20), // B -> MB
|
||||||
vcores); // Used Virtual Cores
|
vcores); // Used Virtual Cores
|
||||||
|
|
||||||
|
float nodeGpuUtilization = 0F;
|
||||||
|
try {
|
||||||
|
nodeGpuUtilization =
|
||||||
|
gpuNodeResourceUpdateHandler.getNodeGpuUtilization();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Get Node GPU Utilization error: " + e);
|
||||||
|
}
|
||||||
|
|
||||||
// Publish the node utilization metrics to node manager
|
// Publish the node utilization metrics to node manager
|
||||||
// metrics system.
|
// metrics system.
|
||||||
NodeManagerMetrics nmMetrics = nmContext.getNodeManagerMetrics();
|
NodeManagerMetrics nmMetrics = nmContext.getNodeManagerMetrics();
|
||||||
@ -159,6 +186,7 @@ public void run() {
|
|||||||
nmMetrics.setNodeUsedMemGB(nodeUtilization.getPhysicalMemory());
|
nmMetrics.setNodeUsedMemGB(nodeUtilization.getPhysicalMemory());
|
||||||
nmMetrics.setNodeUsedVMemGB(nodeUtilization.getVirtualMemory());
|
nmMetrics.setNodeUsedVMemGB(nodeUtilization.getVirtualMemory());
|
||||||
nmMetrics.setNodeCpuUtilization(nodeUtilization.getCPU());
|
nmMetrics.setNodeCpuUtilization(nodeUtilization.getCPU());
|
||||||
|
nmMetrics.setNodeGpuUtilization(nodeGpuUtilization);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -26,12 +26,14 @@
|
|||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.gpu.PerGpuDeviceInformation;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
|
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
|
||||||
|
|
||||||
@ -76,4 +78,20 @@ public void updateConfiguredResource(Resource res) throws YarnException {
|
|||||||
|
|
||||||
res.setResourceValue(GPU_URI, nUsableGpus);
|
res.setResourceValue(GPU_URI, nUsableGpus);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public float getNodeGpuUtilization() throws Exception{
|
||||||
|
List<PerGpuDeviceInformation> gpuList =
|
||||||
|
gpuDiscoverer.getGpuDeviceInformation().getGpus();
|
||||||
|
Float totalGpuUtilization = 0F;
|
||||||
|
if (gpuList != null &&
|
||||||
|
gpuList.size() != 0) {
|
||||||
|
|
||||||
|
totalGpuUtilization = gpuList
|
||||||
|
.stream()
|
||||||
|
.map(g -> g.getGpuUtilizations().getOverallGpuUtilization())
|
||||||
|
.collect(Collectors.summingDouble(Float::floatValue))
|
||||||
|
.floatValue() / gpuList.size();
|
||||||
|
}
|
||||||
|
return totalGpuUtilization;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -98,6 +98,8 @@ public class NodeManagerMetrics {
|
|||||||
MutableGaugeInt nodeUsedVMemGB;
|
MutableGaugeInt nodeUsedVMemGB;
|
||||||
@Metric("Current CPU utilization")
|
@Metric("Current CPU utilization")
|
||||||
MutableGaugeFloat nodeCpuUtilization;
|
MutableGaugeFloat nodeCpuUtilization;
|
||||||
|
@Metric("Current GPU utilization")
|
||||||
|
MutableGaugeFloat nodeGpuUtilization;
|
||||||
|
|
||||||
@Metric("Missed localization requests in bytes")
|
@Metric("Missed localization requests in bytes")
|
||||||
MutableCounterLong localizedCacheMissBytes;
|
MutableCounterLong localizedCacheMissBytes;
|
||||||
@ -428,6 +430,14 @@ public void setNodeCpuUtilization(float cpuUtilization) {
|
|||||||
this.nodeCpuUtilization.set(cpuUtilization);
|
this.nodeCpuUtilization.set(cpuUtilization);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setNodeGpuUtilization(float nodeGpuUtilization) {
|
||||||
|
this.nodeGpuUtilization.set(nodeGpuUtilization);
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getNodeGpuUtilization() {
|
||||||
|
return nodeGpuUtilization.value();
|
||||||
|
}
|
||||||
|
|
||||||
private void updateLocalizationHitRatios() {
|
private void updateLocalizationHitRatios() {
|
||||||
updateLocalizationHitRatio(localizedCacheHitBytes, localizedCacheMissBytes,
|
updateLocalizationHitRatio(localizedCacheHitBytes, localizedCacheMissBytes,
|
||||||
localizedCacheHitBytesRatio);
|
localizedCacheHitBytesRatio);
|
||||||
|
@ -437,14 +437,16 @@ public void testNodeManagerMetricsRecovery() throws Exception {
|
|||||||
waitForNMContainerState(cm, cid,
|
waitForNMContainerState(cm, cid,
|
||||||
org.apache.hadoop.yarn.server.nodemanager
|
org.apache.hadoop.yarn.server.nodemanager
|
||||||
.containermanager.container.ContainerState.RUNNING);
|
.containermanager.container.ContainerState.RUNNING);
|
||||||
TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7);
|
TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0,
|
||||||
|
1, 1, 1, 9, 1, 7, 0F);
|
||||||
|
|
||||||
// restart and verify metrics could be recovered
|
// restart and verify metrics could be recovered
|
||||||
cm.stop();
|
cm.stop();
|
||||||
DefaultMetricsSystem.shutdown();
|
DefaultMetricsSystem.shutdown();
|
||||||
metrics = NodeManagerMetrics.create();
|
metrics = NodeManagerMetrics.create();
|
||||||
metrics.addResource(Resource.newInstance(10240, 8));
|
metrics.addResource(Resource.newInstance(10240, 8));
|
||||||
TestNodeManagerMetrics.checkMetrics(0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 8);
|
TestNodeManagerMetrics.checkMetrics(0, 0, 0, 0, 0, 0,
|
||||||
|
0, 0, 10, 0, 8, 0F);
|
||||||
context = createContext(conf, stateStore);
|
context = createContext(conf, stateStore);
|
||||||
cm = createContainerManager(context, delSrvc);
|
cm = createContainerManager(context, delSrvc);
|
||||||
cm.init(conf);
|
cm.init(conf);
|
||||||
@ -452,7 +454,8 @@ public void testNodeManagerMetricsRecovery() throws Exception {
|
|||||||
assertEquals(1, context.getApplications().size());
|
assertEquals(1, context.getApplications().size());
|
||||||
app = context.getApplications().get(appId);
|
app = context.getApplications().get(appId);
|
||||||
assertNotNull(app);
|
assertNotNull(app);
|
||||||
TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7);
|
TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0,
|
||||||
|
1, 1, 1, 9, 1, 7, 0F);
|
||||||
cm.stop();
|
cm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,11 +21,13 @@
|
|||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.gpu.GpuDeviceInformation;
|
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.gpu.GpuDeviceInformation;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.gpu.NMGpuResourceInfo;
|
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.gpu.NMGpuResourceInfo;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.gpu.PerGpuDeviceInformation;
|
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.gpu.PerGpuDeviceInformation;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.gpu.PerGpuUtilizations;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -122,4 +124,45 @@ public void testGetNMResourceInfoAutoDiscoveryDisabled()
|
|||||||
(NMGpuResourceInfo) target.getNMResourceInfo();
|
(NMGpuResourceInfo) target.getNMResourceInfo();
|
||||||
Assert.assertNull(resourceInfo.getGpuDeviceInformation());
|
Assert.assertNull(resourceInfo.getGpuDeviceInformation());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeGPUUtilization()
|
||||||
|
throws Exception {
|
||||||
|
GpuDiscoverer gpuDiscoverer = createNodeGPUUtilizationDiscoverer();
|
||||||
|
|
||||||
|
GpuNodeResourceUpdateHandler gpuNodeResourceUpdateHandler =
|
||||||
|
new GpuNodeResourceUpdateHandler(gpuDiscoverer, new Configuration());
|
||||||
|
|
||||||
|
Assert.assertEquals(0.5F,
|
||||||
|
gpuNodeResourceUpdateHandler.getNodeGpuUtilization(), 1e-6);
|
||||||
|
}
|
||||||
|
|
||||||
|
private GpuDiscoverer createNodeGPUUtilizationDiscoverer()
|
||||||
|
throws YarnException {
|
||||||
|
GpuDiscoverer gpuDiscoverer = mock(GpuDiscoverer.class);
|
||||||
|
|
||||||
|
PerGpuDeviceInformation gpu1 =
|
||||||
|
new PerGpuDeviceInformation();
|
||||||
|
PerGpuUtilizations perGpuUtilizations1 =
|
||||||
|
new PerGpuUtilizations();
|
||||||
|
perGpuUtilizations1.setOverallGpuUtilization(0.4F);
|
||||||
|
|
||||||
|
gpu1.setGpuUtilizations(perGpuUtilizations1);
|
||||||
|
|
||||||
|
PerGpuDeviceInformation gpu2 =
|
||||||
|
new PerGpuDeviceInformation();
|
||||||
|
PerGpuUtilizations perGpuUtilizations2 =
|
||||||
|
new PerGpuUtilizations();
|
||||||
|
perGpuUtilizations2.setOverallGpuUtilization(0.6F);
|
||||||
|
gpu2.setGpuUtilizations(perGpuUtilizations2);
|
||||||
|
|
||||||
|
List<PerGpuDeviceInformation> gpus = Lists.newArrayList();
|
||||||
|
gpus.add(gpu1);
|
||||||
|
gpus.add(gpu2);
|
||||||
|
|
||||||
|
GpuDeviceInformation gpuDeviceInfo = new GpuDeviceInformation();
|
||||||
|
gpuDeviceInfo.setGpus(gpus);
|
||||||
|
when(gpuDiscoverer.getGpuDeviceInformation()).thenReturn(gpuDeviceInfo);
|
||||||
|
return gpuDiscoverer;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -100,11 +100,15 @@ public void testReferenceOfSingletonJvmMetrics() {
|
|||||||
metrics.addContainerLaunchDuration(1);
|
metrics.addContainerLaunchDuration(1);
|
||||||
Assert.assertTrue(metrics.containerLaunchDuration.changed());
|
Assert.assertTrue(metrics.containerLaunchDuration.changed());
|
||||||
|
|
||||||
|
// Set node gpu utilization
|
||||||
|
metrics.setNodeGpuUtilization(35.5F);
|
||||||
|
|
||||||
// availableGB is expected to be floored,
|
// availableGB is expected to be floored,
|
||||||
// while allocatedGB is expected to be ceiled.
|
// while allocatedGB is expected to be ceiled.
|
||||||
// allocatedGB: 3.75GB allocated memory is shown as 4GB
|
// allocatedGB: 3.75GB allocated memory is shown as 4GB
|
||||||
// availableGB: 4.25GB available memory is shown as 4GB
|
// availableGB: 4.25GB available memory is shown as 4GB
|
||||||
checkMetrics(10, 1, 1, 1, 1, 1, 4, 7, 4, 13, 3);
|
checkMetrics(10, 1, 1, 1, 1,
|
||||||
|
1, 4, 7, 4, 13, 3, 35.5F);
|
||||||
|
|
||||||
// Update resource and check available resource again
|
// Update resource and check available resource again
|
||||||
metrics.addResource(total);
|
metrics.addResource(total);
|
||||||
@ -116,7 +120,7 @@ public void testReferenceOfSingletonJvmMetrics() {
|
|||||||
public static void checkMetrics(int launched, int completed, int failed,
|
public static void checkMetrics(int launched, int completed, int failed,
|
||||||
int killed, int initing, int running, int allocatedGB,
|
int killed, int initing, int running, int allocatedGB,
|
||||||
int allocatedContainers, int availableGB, int allocatedVCores,
|
int allocatedContainers, int availableGB, int allocatedVCores,
|
||||||
int availableVCores) {
|
int availableVCores, Float nodeGpuUtilization) {
|
||||||
MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics");
|
MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics");
|
||||||
assertCounter("ContainersLaunched", launched, rb);
|
assertCounter("ContainersLaunched", launched, rb);
|
||||||
assertCounter("ContainersCompleted", completed, rb);
|
assertCounter("ContainersCompleted", completed, rb);
|
||||||
@ -129,6 +133,7 @@ public static void checkMetrics(int launched, int completed, int failed,
|
|||||||
assertGauge("AllocatedContainers", allocatedContainers, rb);
|
assertGauge("AllocatedContainers", allocatedContainers, rb);
|
||||||
assertGauge("AvailableGB", availableGB, rb);
|
assertGauge("AvailableGB", availableGB, rb);
|
||||||
assertGauge("AvailableVCores",availableVCores, rb);
|
assertGauge("AvailableVCores",availableVCores, rb);
|
||||||
|
assertGauge("NodeGpuUtilization", nodeGpuUtilization, rb);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user