diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java index f6c5a69ead..ff3cec32b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java @@ -22,6 +22,9 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.util.Records; +import java.util.HashMap; +import java.util.Map; + /** *

* ResourceUtilization models the utilization of a set of computer @@ -33,14 +36,26 @@ public abstract class ResourceUtilization implements Comparable { + private Map customResources + = new HashMap<>(); + @Public @Unstable - public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) { + public static ResourceUtilization newInstance(int pmem, int vmem, + float cpu) { + return newInstance(pmem, vmem, cpu, null); + } + + @Public + @Unstable + public static ResourceUtilization newInstance(int pmem, int vmem, + float cpu, Map customResources) { ResourceUtilization utilization = Records.newRecord(ResourceUtilization.class); utilization.setPhysicalMemory(pmem); utilization.setVirtualMemory(vmem); utilization.setCPU(cpu); + utilization.setCustomResources(customResources); return utilization; } @@ -49,7 +64,9 @@ public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) { public static ResourceUtilization newInstance( ResourceUtilization resourceUtil) { return newInstance(resourceUtil.getPhysicalMemory(), - resourceUtil.getVirtualMemory(), resourceUtil.getCPU()); + resourceUtil.getVirtualMemory(), + resourceUtil.getCPU(), + resourceUtil.getCustomResources()); } /** @@ -106,6 +123,51 @@ public static ResourceUtilization newInstance( @Unstable public abstract void setCPU(float cpu); + /** + * Get custom resource utilization + * (The amount of custom resource used). + * + * @param resourceName resourceName of custom resource + * @return resourceName utilization + */ + @Public + @Unstable + public float getCustomResource(String resourceName) { + if (customResources != null && resourceName != null) { + return customResources.get(resourceName); + } + return 0f; + } + + @Public + @Unstable + public Map getCustomResources() { + return customResources; + } + + @Public + @Unstable + public void setCustomResources(Map customResources) { + if (customResources != null) { + this.customResources = customResources; + } + } + + /** + * Set custom resource utilization + * (The amount of custom resource used). + * @param resourceName resourceName + * @param utilization utilization of custom resource + * + */ + @Public + @Unstable + public void setCustomResource(String resourceName, float utilization) { + if (resourceName != null && !resourceName.isEmpty()) { + customResources.put(resourceName, utilization); + } + } + @Override public int hashCode() { final int prime = 263167; @@ -113,6 +175,12 @@ public int hashCode() { result = prime * result + getVirtualMemory(); result = prime * result + getPhysicalMemory(); result = 31 * result + Float.valueOf(getCPU()).hashCode(); + if (customResources != null && !customResources.isEmpty()) { + for (Map.Entry entry : customResources.entrySet()) { + result = 31 * result + + customResources.get(entry.getKey()).hashCode(); + } + } return result; } @@ -130,7 +198,8 @@ public boolean equals(Object obj) { ResourceUtilization other = (ResourceUtilization) obj; if (getVirtualMemory() != other.getVirtualMemory() || getPhysicalMemory() != other.getPhysicalMemory() - || getCPU() != other.getCPU()) { + || getCPU() != other.getCPU() + || !customResources.equals(other.customResources)) { return false; } return true; @@ -138,8 +207,19 @@ public boolean equals(Object obj) { @Override public String toString() { - return ""; + StringBuilder utilizationString = new StringBuilder(); + utilizationString.append( + " entry : getCustomResources().entrySet()) { + utilizationString.append(", " + + entry.getKey() + ":" + entry.getValue()); + } + } + + utilizationString.append(">"); + return utilizationString.toString(); } /** @@ -151,9 +231,28 @@ public String toString() { @Public @Unstable public void addTo(int pmem, int vmem, float cpu) { + addTo(pmem, vmem, cpu, null, 0f); + } + + /** + * Add utilization to the current one. + * @param pmem Physical memory used to add. + * @param vmem Virtual memory used to add. + * @param cpu CPU utilization to add. + * @param resourceName of custom resource to add. + * @param utilization of custom resource to add. + */ + @Public + @Unstable + public void addTo(int pmem, int vmem, float cpu, + String resourceName, float utilization) { this.setPhysicalMemory(this.getPhysicalMemory() + pmem); this.setVirtualMemory(this.getVirtualMemory() + vmem); this.setCPU(this.getCPU() + cpu); + if (resourceName != null) { + this.setCustomResource(resourceName, + getCustomResource(resourceName) + utilization); + } } /** @@ -165,8 +264,27 @@ public void addTo(int pmem, int vmem, float cpu) { @Public @Unstable public void subtractFrom(int pmem, int vmem, float cpu) { + subtractFrom(pmem, vmem, cpu, null, 0f); + } + + /** + * Subtract utilization from the current one. + * @param pmem Physical memory to be subtracted. + * @param vmem Virtual memory to be subtracted. + * @param cpu CPU utilization to be subtracted. + * @param resourceName of custom resource to be subtracted. + * @param utilization of custom resource to be subtracted. + */ + @Public + @Unstable + public void subtractFrom(int pmem, int vmem, float cpu, + String resourceName, float utilization) { this.setPhysicalMemory(this.getPhysicalMemory() - pmem); this.setVirtualMemory(this.getVirtualMemory() - vmem); this.setCPU(this.getCPU() - cpu); + if (resourceName != null) { + this.setCustomResource(resourceName, + getCustomResource(resourceName) - utilization); + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index c9cc2a7fbe..ac92dfaeb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -83,6 +83,7 @@ message ResourceUtilizationProto { optional int32 pmem = 1; optional int32 vmem = 2; optional float cpu = 3; + repeated StringFloatMapProto customResources = 4; } message ResourceOptionProto { @@ -243,6 +244,11 @@ message StringLongMapProto { required int64 value = 2; } +message StringFloatMapProto { + required string key = 1; + required float value = 2; +} + message ApplicationResourceUsageReportProto { optional int32 num_used_containers = 1; optional int32 num_reserved_containers = 2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index cdeb417243..64bf8cf5d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -587,6 +587,22 @@ public static List convertMapToStringLongMapProto return ret; } + public static List + convertMapToStringFloatMapProtoList( + Map map) { + List ret = new ArrayList<>(); + if (map != null) { + for (Map.Entry entry : map.entrySet()) { + YarnProtos.StringFloatMapProto.Builder tmp = + YarnProtos.StringFloatMapProto.newBuilder(); + tmp.setKey(entry.getKey()); + tmp.setValue(entry.getValue()); + ret.add(tmp.build()); + } + } + return ret; + } + public static Map convertStringStringMapProtoListToMap( List pList) { Map ret = new HashMap<>(); @@ -600,6 +616,19 @@ public static Map convertStringStringMapProtoListToMap( return ret; } + public static Map convertStringFloatMapProtoListToMap( + List pList) { + Map ret = new HashMap<>(); + if (pList != null) { + for (YarnProtos.StringFloatMapProto p : pList) { + if (p.hasKey()) { + ret.put(p.getKey(), p.getValue()); + } + } + } + return ret; + } + public static List convertToProtoFormat( Map stringMap) { List pList = new ArrayList<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java index e37adbe63f..023d1e9ff6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java @@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProtoOrBuilder; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import java.util.Map; + @Private @Unstable public class ResourceUtilizationPBImpl extends ResourceUtilization { @@ -69,7 +71,7 @@ public void setPhysicalMemory(int pmem) { @Override public int getVirtualMemory() { ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; - return (p.getVmem()); + return p.getVmem(); } @Override @@ -90,6 +92,28 @@ public void setCPU(float cpu) { builder.setCpu(cpu); } + @Override + public float getCustomResource(String resourceName) { + return getCustomResources().get(resourceName); + } + + @Override + public Map getCustomResources() { + ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; + return ProtoUtils. + convertStringFloatMapProtoListToMap(p. + getCustomResourcesList()); + } + + @Override + public void setCustomResources(Map customResources) { + if (customResources != null) { + maybeInitBuilder(); + builder.addAllCustomResources(ProtoUtils. + convertMapToStringFloatMapProtoList(customResources)); + } + } + @Override public int compareTo(ResourceUtilization other) { int diff = this.getPhysicalMemory() - other.getPhysicalMemory(); @@ -97,6 +121,11 @@ public int compareTo(ResourceUtilization other) { diff = this.getVirtualMemory() - other.getVirtualMemory(); if (diff == 0) { diff = Float.compare(this.getCPU(), other.getCPU()); + if (diff == 0) { + diff = this.getCustomResources().size() - + other.getCustomResources().size(); + // todo how to compare custom resource in same size + } } } return diff; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java index affa08f082..22b687c159 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java @@ -76,6 +76,8 @@ private static Object genTypeValue(Type type) { 'a' + rand.nextInt(26), 'a' + rand.nextInt(26), 'a' + rand.nextInt(26)); + } else if (type.equals(Float.class)) { + return rand.nextFloat(); } else if (type instanceof Class) { Class clazz = (Class)type; if (clazz.isArray()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestResourceUtilization.java index 5934846e2f..a2b0570832 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestResourceUtilization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestResourceUtilization.java @@ -21,6 +21,9 @@ import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + public class TestResourceUtilization { @Test @@ -60,4 +63,50 @@ public void testResourceUtilization() { u1.subtractFrom(10, 0, 0.0f); Assert.assertEquals(u1, u3); } + + @Test + public void testResourceUtilizationWithCustomResource() { + Map customResources = new HashMap<>(); + customResources.put(ResourceInformation.GPU_URI, 5.0f); + ResourceUtilization u1 = ResourceUtilization. + newInstance(10, 20, 0.5f, customResources); + ResourceUtilization u2 = ResourceUtilization.newInstance(u1); + ResourceUtilization u3 = ResourceUtilization. + newInstance(10, 20, 0.5f, customResources); + ResourceUtilization u4 = ResourceUtilization. + newInstance(20, 20, 0.5f, customResources); + ResourceUtilization u5 = ResourceUtilization. + newInstance(30, 40, 0.8f, customResources); + + Assert.assertEquals(u1, u2); + Assert.assertEquals(u1, u3); + Assert.assertNotEquals(u1, u4); + Assert.assertNotEquals(u2, u5); + Assert.assertNotEquals(u4, u5); + + Assert.assertTrue(u1.hashCode() == u2.hashCode()); + Assert.assertTrue(u1.hashCode() == u3.hashCode()); + Assert.assertFalse(u1.hashCode() == u4.hashCode()); + Assert.assertFalse(u2.hashCode() == u5.hashCode()); + Assert.assertFalse(u4.hashCode() == u5.hashCode()); + + Assert.assertTrue(u1.getPhysicalMemory() == 10); + Assert.assertFalse(u1.getVirtualMemory() == 10); + Assert.assertTrue(u1.getCPU() == 0.5f); + Assert.assertTrue(u1. + getCustomResource(ResourceInformation.GPU_URI) == 5.0f); + + Assert.assertEquals("", u1.toString()); + + u1.addTo(10, 0, 0.0f); + Assert.assertNotEquals(u1, u2); + Assert.assertEquals(u1, u4); + u1.addTo(10, 20, 0.3f); + Assert.assertEquals(u1, u5); + u1.subtractFrom(10, 20, 0.3f); + Assert.assertEquals(u1, u4); + u1.subtractFrom(10, 0, 0.0f); + Assert.assertEquals(u1, u3); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java index 7577b55899..37fa33e14f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java @@ -30,6 +30,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + /** * Implementation of the node resource monitor. It periodically tracks the * resource utilization of the node and reports it to the NM. @@ -54,8 +57,11 @@ public class NodeResourceMonitorImpl extends AbstractService implements private GpuNodeResourceUpdateHandler gpuNodeResourceUpdateHandler; /** Current resource utilization of the node. */ + + private Map customResources = new HashMap<>(); + private ResourceUtilization nodeUtilization = - ResourceUtilization.newInstance(0, 0, 0f); + ResourceUtilization.newInstance(0, 0, 0f, customResources); private Context nmContext; /** @@ -165,22 +171,26 @@ public void run() { resourceCalculatorPlugin.getVirtualMemorySize() - resourceCalculatorPlugin.getAvailableVirtualMemorySize(); float vcores = resourceCalculatorPlugin.getNumVCoresUsed(); - nodeUtilization = - ResourceUtilization.newInstance( - (int) (pmem >> 20), // B -> MB - (int) (vmem >> 20), // B -> MB - vcores); // Used Virtual Cores - float nodeGpuUtilization = 0F; + float totalNodeGpuUtilization = 0F; try { if (gpuNodeResourceUpdateHandler != null) { - nodeGpuUtilization = - gpuNodeResourceUpdateHandler.getNodeGpuUtilization(); + totalNodeGpuUtilization = + gpuNodeResourceUpdateHandler.getTotalNodeGpuUtilization(); } } catch (Exception e) { LOG.error("Get Node GPU Utilization error: " + e); } + customResources. + put(ResourceInformation.GPU_URI, totalNodeGpuUtilization); + nodeUtilization = + ResourceUtilization.newInstance( + (int) (pmem >> 20), // B -> MB + (int) (vmem >> 20), // B -> MB + vcores, // Used Virtual Cores + customResources); // Used GPUs + // Publish the node utilization metrics to node manager // metrics system. NodeManagerMetrics nmMetrics = nmContext.getNodeManagerMetrics(); @@ -188,7 +198,7 @@ public void run() { nmMetrics.setNodeUsedMemGB(nodeUtilization.getPhysicalMemory()); nmMetrics.setNodeUsedVMemGB(nodeUtilization.getVirtualMemory()); nmMetrics.setNodeCpuUtilization(nodeUtilization.getCPU()); - nmMetrics.setNodeGpuUtilization(nodeGpuUtilization); + nmMetrics.setNodeGpuUtilization(totalNodeGpuUtilization); } try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuNodeResourceUpdateHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuNodeResourceUpdateHandler.java index af81709566..c31555ed96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuNodeResourceUpdateHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/GpuNodeResourceUpdateHandler.java @@ -79,19 +79,48 @@ public void updateConfiguredResource(Resource res) throws YarnException { res.setResourceValue(GPU_URI, nUsableGpus); } - public float getNodeGpuUtilization() throws Exception{ + /** + * + * @return The average physical GPUs used in this node. + * + * For example: + * Node with total 4 GPUs + * Physical used 2.4 GPUs + * Will return 2.4/4 = 0.6f + * + * @throws Exception when any error happens + */ + public float getAvgNodeGpuUtilization() throws Exception{ List gpuList = gpuDiscoverer.getGpuDeviceInformation().getGpus(); - Float totalGpuUtilization = 0F; + Float avgGpuUtilization = 0F; if (gpuList != null && gpuList.size() != 0) { - totalGpuUtilization = gpuList - .stream() - .map(g -> g.getGpuUtilizations().getOverallGpuUtilization()) - .collect(Collectors.summingDouble(Float::floatValue)) - .floatValue() / gpuList.size(); + avgGpuUtilization = getTotalNodeGpuUtilization() / gpuList.size(); } + return avgGpuUtilization; + } + + /** + * + * @return The total physical GPUs used in this node. + * + * For example: + * Node with total 4 GPUs + * Physical used 2.4 GPUs + * Will return 2.4f + * + * @throws Exception when any error happens + */ + public float getTotalNodeGpuUtilization() throws Exception{ + List gpuList = + gpuDiscoverer.getGpuDeviceInformation().getGpus(); + Float totalGpuUtilization = gpuList + .stream() + .map(g -> g.getGpuUtilizations().getOverallGpuUtilization()) + .collect(Collectors.summingDouble(Float::floatValue)) + .floatValue(); return totalGpuUtilization; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/TestGpuResourcePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/TestGpuResourcePlugin.java index 749e0cc14d..da1a57ec00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/TestGpuResourcePlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/gpu/TestGpuResourcePlugin.java @@ -126,7 +126,7 @@ public void testGetNMResourceInfoAutoDiscoveryDisabled() } @Test - public void testNodeGPUUtilization() + public void testAvgNodeGpuUtilization() throws Exception { GpuDiscoverer gpuDiscoverer = createNodeGPUUtilizationDiscoverer(); @@ -134,7 +134,7 @@ public void testNodeGPUUtilization() new GpuNodeResourceUpdateHandler(gpuDiscoverer, new Configuration()); Assert.assertEquals(0.5F, - gpuNodeResourceUpdateHandler.getNodeGpuUtilization(), 1e-6); + gpuNodeResourceUpdateHandler.getAvgNodeGpuUtilization(), 1e-6); } private GpuDiscoverer createNodeGPUUtilizationDiscoverer()