YARN-10707. Support custom resources in ResourceUtilization, and update Node GPU Utilization to use. Contributed by Qi Zhu
(cherry picked from commit 803ac4b1a0
)
This commit is contained in:
parent
0e9042fed5
commit
da2ebfa8a9
@ -22,6 +22,9 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* <code>ResourceUtilization</code> models the utilization of a set of computer
|
* <code>ResourceUtilization</code> models the utilization of a set of computer
|
||||||
@ -33,14 +36,26 @@
|
|||||||
public abstract class ResourceUtilization implements
|
public abstract class ResourceUtilization implements
|
||||||
Comparable<ResourceUtilization> {
|
Comparable<ResourceUtilization> {
|
||||||
|
|
||||||
|
private Map<String, Float> customResources
|
||||||
|
= new HashMap<>();
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) {
|
public static ResourceUtilization newInstance(int pmem, int vmem,
|
||||||
|
float cpu) {
|
||||||
|
return newInstance(pmem, vmem, cpu, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public static ResourceUtilization newInstance(int pmem, int vmem,
|
||||||
|
float cpu, Map<String, Float> customResources) {
|
||||||
ResourceUtilization utilization =
|
ResourceUtilization utilization =
|
||||||
Records.newRecord(ResourceUtilization.class);
|
Records.newRecord(ResourceUtilization.class);
|
||||||
utilization.setPhysicalMemory(pmem);
|
utilization.setPhysicalMemory(pmem);
|
||||||
utilization.setVirtualMemory(vmem);
|
utilization.setVirtualMemory(vmem);
|
||||||
utilization.setCPU(cpu);
|
utilization.setCPU(cpu);
|
||||||
|
utilization.setCustomResources(customResources);
|
||||||
return utilization;
|
return utilization;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,7 +64,9 @@ public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) {
|
|||||||
public static ResourceUtilization newInstance(
|
public static ResourceUtilization newInstance(
|
||||||
ResourceUtilization resourceUtil) {
|
ResourceUtilization resourceUtil) {
|
||||||
return newInstance(resourceUtil.getPhysicalMemory(),
|
return newInstance(resourceUtil.getPhysicalMemory(),
|
||||||
resourceUtil.getVirtualMemory(), resourceUtil.getCPU());
|
resourceUtil.getVirtualMemory(),
|
||||||
|
resourceUtil.getCPU(),
|
||||||
|
resourceUtil.getCustomResources());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -106,6 +123,51 @@ public static ResourceUtilization newInstance(
|
|||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setCPU(float cpu);
|
public abstract void setCPU(float cpu);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get <em>custom resource</em> utilization
|
||||||
|
* (The amount of custom resource used).
|
||||||
|
*
|
||||||
|
* @param resourceName <em>resourceName of custom resource</em>
|
||||||
|
* @return <em>resourceName utilization</em>
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public float getCustomResource(String resourceName) {
|
||||||
|
if (customResources != null && resourceName != null) {
|
||||||
|
return customResources.get(resourceName);
|
||||||
|
}
|
||||||
|
return 0f;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public Map<String, Float> getCustomResources() {
|
||||||
|
return customResources;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public void setCustomResources(Map<String, Float> customResources) {
|
||||||
|
if (customResources != null) {
|
||||||
|
this.customResources = customResources;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set <em>custom resource</em> utilization
|
||||||
|
* (The amount of custom resource used).
|
||||||
|
* @param resourceName <em>resourceName</em>
|
||||||
|
* @param utilization <em>utilization of custom resource</em>
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public void setCustomResource(String resourceName, float utilization) {
|
||||||
|
if (resourceName != null && !resourceName.isEmpty()) {
|
||||||
|
customResources.put(resourceName, utilization);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
final int prime = 263167;
|
final int prime = 263167;
|
||||||
@ -113,6 +175,12 @@ public int hashCode() {
|
|||||||
result = prime * result + getVirtualMemory();
|
result = prime * result + getVirtualMemory();
|
||||||
result = prime * result + getPhysicalMemory();
|
result = prime * result + getPhysicalMemory();
|
||||||
result = 31 * result + Float.valueOf(getCPU()).hashCode();
|
result = 31 * result + Float.valueOf(getCPU()).hashCode();
|
||||||
|
if (customResources != null && !customResources.isEmpty()) {
|
||||||
|
for (Map.Entry<String, Float> entry : customResources.entrySet()) {
|
||||||
|
result = 31 * result +
|
||||||
|
customResources.get(entry.getKey()).hashCode();
|
||||||
|
}
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,7 +198,8 @@ public boolean equals(Object obj) {
|
|||||||
ResourceUtilization other = (ResourceUtilization) obj;
|
ResourceUtilization other = (ResourceUtilization) obj;
|
||||||
if (getVirtualMemory() != other.getVirtualMemory()
|
if (getVirtualMemory() != other.getVirtualMemory()
|
||||||
|| getPhysicalMemory() != other.getPhysicalMemory()
|
|| getPhysicalMemory() != other.getPhysicalMemory()
|
||||||
|| getCPU() != other.getCPU()) {
|
|| getCPU() != other.getCPU()
|
||||||
|
|| !customResources.equals(other.customResources)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
@ -138,8 +207,19 @@ public boolean equals(Object obj) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "<pmem:" + getPhysicalMemory() + ", vmem:" + getVirtualMemory()
|
StringBuilder utilizationString = new StringBuilder();
|
||||||
+ ", vCores:" + getCPU() + ">";
|
utilizationString.append(
|
||||||
|
"<pmem:" + getPhysicalMemory() + ", vmem:" + getVirtualMemory()
|
||||||
|
+ ", vCores:" + getCPU());
|
||||||
|
if (getCustomResources() != null && !getCustomResources().isEmpty()) {
|
||||||
|
for (Map.Entry<String, Float> entry : getCustomResources().entrySet()) {
|
||||||
|
utilizationString.append(", "
|
||||||
|
+ entry.getKey() + ":" + entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
utilizationString.append(">");
|
||||||
|
return utilizationString.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -151,9 +231,28 @@ public String toString() {
|
|||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public void addTo(int pmem, int vmem, float cpu) {
|
public void addTo(int pmem, int vmem, float cpu) {
|
||||||
|
addTo(pmem, vmem, cpu, null, 0f);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add utilization to the current one.
|
||||||
|
* @param pmem Physical memory used to add.
|
||||||
|
* @param vmem Virtual memory used to add.
|
||||||
|
* @param cpu CPU utilization to add.
|
||||||
|
* @param resourceName of custom resource to add.
|
||||||
|
* @param utilization of custom resource to add.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public void addTo(int pmem, int vmem, float cpu,
|
||||||
|
String resourceName, float utilization) {
|
||||||
this.setPhysicalMemory(this.getPhysicalMemory() + pmem);
|
this.setPhysicalMemory(this.getPhysicalMemory() + pmem);
|
||||||
this.setVirtualMemory(this.getVirtualMemory() + vmem);
|
this.setVirtualMemory(this.getVirtualMemory() + vmem);
|
||||||
this.setCPU(this.getCPU() + cpu);
|
this.setCPU(this.getCPU() + cpu);
|
||||||
|
if (resourceName != null) {
|
||||||
|
this.setCustomResource(resourceName,
|
||||||
|
getCustomResource(resourceName) + utilization);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -165,8 +264,27 @@ public void addTo(int pmem, int vmem, float cpu) {
|
|||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public void subtractFrom(int pmem, int vmem, float cpu) {
|
public void subtractFrom(int pmem, int vmem, float cpu) {
|
||||||
|
subtractFrom(pmem, vmem, cpu, null, 0f);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subtract utilization from the current one.
|
||||||
|
* @param pmem Physical memory to be subtracted.
|
||||||
|
* @param vmem Virtual memory to be subtracted.
|
||||||
|
* @param cpu CPU utilization to be subtracted.
|
||||||
|
* @param resourceName of custom resource to be subtracted.
|
||||||
|
* @param utilization of custom resource to be subtracted.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public void subtractFrom(int pmem, int vmem, float cpu,
|
||||||
|
String resourceName, float utilization) {
|
||||||
this.setPhysicalMemory(this.getPhysicalMemory() - pmem);
|
this.setPhysicalMemory(this.getPhysicalMemory() - pmem);
|
||||||
this.setVirtualMemory(this.getVirtualMemory() - vmem);
|
this.setVirtualMemory(this.getVirtualMemory() - vmem);
|
||||||
this.setCPU(this.getCPU() - cpu);
|
this.setCPU(this.getCPU() - cpu);
|
||||||
|
if (resourceName != null) {
|
||||||
|
this.setCustomResource(resourceName,
|
||||||
|
getCustomResource(resourceName) - utilization);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -83,6 +83,7 @@ message ResourceUtilizationProto {
|
|||||||
optional int32 pmem = 1;
|
optional int32 pmem = 1;
|
||||||
optional int32 vmem = 2;
|
optional int32 vmem = 2;
|
||||||
optional float cpu = 3;
|
optional float cpu = 3;
|
||||||
|
repeated StringFloatMapProto customResources = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ResourceOptionProto {
|
message ResourceOptionProto {
|
||||||
@ -243,6 +244,11 @@ message StringLongMapProto {
|
|||||||
required int64 value = 2;
|
required int64 value = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message StringFloatMapProto {
|
||||||
|
required string key = 1;
|
||||||
|
required float value = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message ApplicationResourceUsageReportProto {
|
message ApplicationResourceUsageReportProto {
|
||||||
optional int32 num_used_containers = 1;
|
optional int32 num_used_containers = 1;
|
||||||
optional int32 num_reserved_containers = 2;
|
optional int32 num_reserved_containers = 2;
|
||||||
|
@ -587,6 +587,22 @@ public static List<YarnProtos.StringLongMapProto> convertMapToStringLongMapProto
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static List<YarnProtos.StringFloatMapProto>
|
||||||
|
convertMapToStringFloatMapProtoList(
|
||||||
|
Map<String, Float> map) {
|
||||||
|
List<YarnProtos.StringFloatMapProto> ret = new ArrayList<>();
|
||||||
|
if (map != null) {
|
||||||
|
for (Map.Entry<String, Float> entry : map.entrySet()) {
|
||||||
|
YarnProtos.StringFloatMapProto.Builder tmp =
|
||||||
|
YarnProtos.StringFloatMapProto.newBuilder();
|
||||||
|
tmp.setKey(entry.getKey());
|
||||||
|
tmp.setValue(entry.getValue());
|
||||||
|
ret.add(tmp.build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
public static Map<String, String> convertStringStringMapProtoListToMap(
|
public static Map<String, String> convertStringStringMapProtoListToMap(
|
||||||
List<StringStringMapProto> pList) {
|
List<StringStringMapProto> pList) {
|
||||||
Map<String, String> ret = new HashMap<>();
|
Map<String, String> ret = new HashMap<>();
|
||||||
@ -600,6 +616,19 @@ public static Map<String, String> convertStringStringMapProtoListToMap(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Map<String, Float> convertStringFloatMapProtoListToMap(
|
||||||
|
List<YarnProtos.StringFloatMapProto> pList) {
|
||||||
|
Map<String, Float> ret = new HashMap<>();
|
||||||
|
if (pList != null) {
|
||||||
|
for (YarnProtos.StringFloatMapProto p : pList) {
|
||||||
|
if (p.hasKey()) {
|
||||||
|
ret.put(p.getKey(), p.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
public static List<YarnProtos.StringStringMapProto> convertToProtoFormat(
|
public static List<YarnProtos.StringStringMapProto> convertToProtoFormat(
|
||||||
Map<String, String> stringMap) {
|
Map<String, String> stringMap) {
|
||||||
List<YarnProtos.StringStringMapProto> pList = new ArrayList<>();
|
List<YarnProtos.StringStringMapProto> pList = new ArrayList<>();
|
||||||
|
@ -24,6 +24,8 @@
|
|||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class ResourceUtilizationPBImpl extends ResourceUtilization {
|
public class ResourceUtilizationPBImpl extends ResourceUtilization {
|
||||||
@ -69,7 +71,7 @@ public void setPhysicalMemory(int pmem) {
|
|||||||
@Override
|
@Override
|
||||||
public int getVirtualMemory() {
|
public int getVirtualMemory() {
|
||||||
ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder;
|
ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
return (p.getVmem());
|
return p.getVmem();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -90,6 +92,28 @@ public void setCPU(float cpu) {
|
|||||||
builder.setCpu(cpu);
|
builder.setCpu(cpu);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getCustomResource(String resourceName) {
|
||||||
|
return getCustomResources().get(resourceName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Float> getCustomResources() {
|
||||||
|
ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
return ProtoUtils.
|
||||||
|
convertStringFloatMapProtoListToMap(p.
|
||||||
|
getCustomResourcesList());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setCustomResources(Map<String, Float> customResources) {
|
||||||
|
if (customResources != null) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.addAllCustomResources(ProtoUtils.
|
||||||
|
convertMapToStringFloatMapProtoList(customResources));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compareTo(ResourceUtilization other) {
|
public int compareTo(ResourceUtilization other) {
|
||||||
int diff = this.getPhysicalMemory() - other.getPhysicalMemory();
|
int diff = this.getPhysicalMemory() - other.getPhysicalMemory();
|
||||||
@ -97,6 +121,11 @@ public int compareTo(ResourceUtilization other) {
|
|||||||
diff = this.getVirtualMemory() - other.getVirtualMemory();
|
diff = this.getVirtualMemory() - other.getVirtualMemory();
|
||||||
if (diff == 0) {
|
if (diff == 0) {
|
||||||
diff = Float.compare(this.getCPU(), other.getCPU());
|
diff = Float.compare(this.getCPU(), other.getCPU());
|
||||||
|
if (diff == 0) {
|
||||||
|
diff = this.getCustomResources().size() -
|
||||||
|
other.getCustomResources().size();
|
||||||
|
// todo how to compare custom resource in same size
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return diff;
|
return diff;
|
||||||
|
@ -76,6 +76,8 @@ private static Object genTypeValue(Type type) {
|
|||||||
'a' + rand.nextInt(26),
|
'a' + rand.nextInt(26),
|
||||||
'a' + rand.nextInt(26),
|
'a' + rand.nextInt(26),
|
||||||
'a' + rand.nextInt(26));
|
'a' + rand.nextInt(26));
|
||||||
|
} else if (type.equals(Float.class)) {
|
||||||
|
return rand.nextFloat();
|
||||||
} else if (type instanceof Class) {
|
} else if (type instanceof Class) {
|
||||||
Class clazz = (Class)type;
|
Class clazz = (Class)type;
|
||||||
if (clazz.isArray()) {
|
if (clazz.isArray()) {
|
||||||
|
@ -21,6 +21,9 @@
|
|||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class TestResourceUtilization {
|
public class TestResourceUtilization {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -60,4 +63,50 @@ public void testResourceUtilization() {
|
|||||||
u1.subtractFrom(10, 0, 0.0f);
|
u1.subtractFrom(10, 0, 0.0f);
|
||||||
Assert.assertEquals(u1, u3);
|
Assert.assertEquals(u1, u3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceUtilizationWithCustomResource() {
|
||||||
|
Map<String, Float> customResources = new HashMap<>();
|
||||||
|
customResources.put(ResourceInformation.GPU_URI, 5.0f);
|
||||||
|
ResourceUtilization u1 = ResourceUtilization.
|
||||||
|
newInstance(10, 20, 0.5f, customResources);
|
||||||
|
ResourceUtilization u2 = ResourceUtilization.newInstance(u1);
|
||||||
|
ResourceUtilization u3 = ResourceUtilization.
|
||||||
|
newInstance(10, 20, 0.5f, customResources);
|
||||||
|
ResourceUtilization u4 = ResourceUtilization.
|
||||||
|
newInstance(20, 20, 0.5f, customResources);
|
||||||
|
ResourceUtilization u5 = ResourceUtilization.
|
||||||
|
newInstance(30, 40, 0.8f, customResources);
|
||||||
|
|
||||||
|
Assert.assertEquals(u1, u2);
|
||||||
|
Assert.assertEquals(u1, u3);
|
||||||
|
Assert.assertNotEquals(u1, u4);
|
||||||
|
Assert.assertNotEquals(u2, u5);
|
||||||
|
Assert.assertNotEquals(u4, u5);
|
||||||
|
|
||||||
|
Assert.assertTrue(u1.hashCode() == u2.hashCode());
|
||||||
|
Assert.assertTrue(u1.hashCode() == u3.hashCode());
|
||||||
|
Assert.assertFalse(u1.hashCode() == u4.hashCode());
|
||||||
|
Assert.assertFalse(u2.hashCode() == u5.hashCode());
|
||||||
|
Assert.assertFalse(u4.hashCode() == u5.hashCode());
|
||||||
|
|
||||||
|
Assert.assertTrue(u1.getPhysicalMemory() == 10);
|
||||||
|
Assert.assertFalse(u1.getVirtualMemory() == 10);
|
||||||
|
Assert.assertTrue(u1.getCPU() == 0.5f);
|
||||||
|
Assert.assertTrue(u1.
|
||||||
|
getCustomResource(ResourceInformation.GPU_URI) == 5.0f);
|
||||||
|
|
||||||
|
Assert.assertEquals("<pmem:10, vmem:" + u1.getVirtualMemory()
|
||||||
|
+ ", vCores:0.5, yarn.io/gpu:5.0>", u1.toString());
|
||||||
|
|
||||||
|
u1.addTo(10, 0, 0.0f);
|
||||||
|
Assert.assertNotEquals(u1, u2);
|
||||||
|
Assert.assertEquals(u1, u4);
|
||||||
|
u1.addTo(10, 20, 0.3f);
|
||||||
|
Assert.assertEquals(u1, u5);
|
||||||
|
u1.subtractFrom(10, 20, 0.3f);
|
||||||
|
Assert.assertEquals(u1, u4);
|
||||||
|
u1.subtractFrom(10, 0, 0.0f);
|
||||||
|
Assert.assertEquals(u1, u3);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,9 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of the node resource monitor. It periodically tracks the
|
* Implementation of the node resource monitor. It periodically tracks the
|
||||||
* resource utilization of the node and reports it to the NM.
|
* resource utilization of the node and reports it to the NM.
|
||||||
@ -54,8 +57,11 @@ public class NodeResourceMonitorImpl extends AbstractService implements
|
|||||||
private GpuNodeResourceUpdateHandler gpuNodeResourceUpdateHandler;
|
private GpuNodeResourceUpdateHandler gpuNodeResourceUpdateHandler;
|
||||||
|
|
||||||
/** Current <em>resource utilization</em> of the node. */
|
/** Current <em>resource utilization</em> of the node. */
|
||||||
|
|
||||||
|
private Map<String, Float> customResources = new HashMap<>();
|
||||||
|
|
||||||
private ResourceUtilization nodeUtilization =
|
private ResourceUtilization nodeUtilization =
|
||||||
ResourceUtilization.newInstance(0, 0, 0f);
|
ResourceUtilization.newInstance(0, 0, 0f, customResources);
|
||||||
private Context nmContext;
|
private Context nmContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -165,22 +171,26 @@ public void run() {
|
|||||||
resourceCalculatorPlugin.getVirtualMemorySize()
|
resourceCalculatorPlugin.getVirtualMemorySize()
|
||||||
- resourceCalculatorPlugin.getAvailableVirtualMemorySize();
|
- resourceCalculatorPlugin.getAvailableVirtualMemorySize();
|
||||||
float vcores = resourceCalculatorPlugin.getNumVCoresUsed();
|
float vcores = resourceCalculatorPlugin.getNumVCoresUsed();
|
||||||
nodeUtilization =
|
|
||||||
ResourceUtilization.newInstance(
|
|
||||||
(int) (pmem >> 20), // B -> MB
|
|
||||||
(int) (vmem >> 20), // B -> MB
|
|
||||||
vcores); // Used Virtual Cores
|
|
||||||
|
|
||||||
float nodeGpuUtilization = 0F;
|
float totalNodeGpuUtilization = 0F;
|
||||||
try {
|
try {
|
||||||
if (gpuNodeResourceUpdateHandler != null) {
|
if (gpuNodeResourceUpdateHandler != null) {
|
||||||
nodeGpuUtilization =
|
totalNodeGpuUtilization =
|
||||||
gpuNodeResourceUpdateHandler.getNodeGpuUtilization();
|
gpuNodeResourceUpdateHandler.getTotalNodeGpuUtilization();
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Get Node GPU Utilization error: " + e);
|
LOG.error("Get Node GPU Utilization error: " + e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
customResources.
|
||||||
|
put(ResourceInformation.GPU_URI, totalNodeGpuUtilization);
|
||||||
|
nodeUtilization =
|
||||||
|
ResourceUtilization.newInstance(
|
||||||
|
(int) (pmem >> 20), // B -> MB
|
||||||
|
(int) (vmem >> 20), // B -> MB
|
||||||
|
vcores, // Used Virtual Cores
|
||||||
|
customResources); // Used GPUs
|
||||||
|
|
||||||
// Publish the node utilization metrics to node manager
|
// Publish the node utilization metrics to node manager
|
||||||
// metrics system.
|
// metrics system.
|
||||||
NodeManagerMetrics nmMetrics = nmContext.getNodeManagerMetrics();
|
NodeManagerMetrics nmMetrics = nmContext.getNodeManagerMetrics();
|
||||||
@ -188,7 +198,7 @@ public void run() {
|
|||||||
nmMetrics.setNodeUsedMemGB(nodeUtilization.getPhysicalMemory());
|
nmMetrics.setNodeUsedMemGB(nodeUtilization.getPhysicalMemory());
|
||||||
nmMetrics.setNodeUsedVMemGB(nodeUtilization.getVirtualMemory());
|
nmMetrics.setNodeUsedVMemGB(nodeUtilization.getVirtualMemory());
|
||||||
nmMetrics.setNodeCpuUtilization(nodeUtilization.getCPU());
|
nmMetrics.setNodeCpuUtilization(nodeUtilization.getCPU());
|
||||||
nmMetrics.setNodeGpuUtilization(nodeGpuUtilization);
|
nmMetrics.setNodeGpuUtilization(totalNodeGpuUtilization);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -79,19 +79,48 @@ public void updateConfiguredResource(Resource res) throws YarnException {
|
|||||||
res.setResourceValue(GPU_URI, nUsableGpus);
|
res.setResourceValue(GPU_URI, nUsableGpus);
|
||||||
}
|
}
|
||||||
|
|
||||||
public float getNodeGpuUtilization() throws Exception{
|
/**
|
||||||
|
*
|
||||||
|
* @return The average physical GPUs used in this node.
|
||||||
|
*
|
||||||
|
* For example:
|
||||||
|
* Node with total 4 GPUs
|
||||||
|
* Physical used 2.4 GPUs
|
||||||
|
* Will return 2.4/4 = 0.6f
|
||||||
|
*
|
||||||
|
* @throws Exception when any error happens
|
||||||
|
*/
|
||||||
|
public float getAvgNodeGpuUtilization() throws Exception{
|
||||||
List<PerGpuDeviceInformation> gpuList =
|
List<PerGpuDeviceInformation> gpuList =
|
||||||
gpuDiscoverer.getGpuDeviceInformation().getGpus();
|
gpuDiscoverer.getGpuDeviceInformation().getGpus();
|
||||||
Float totalGpuUtilization = 0F;
|
Float avgGpuUtilization = 0F;
|
||||||
if (gpuList != null &&
|
if (gpuList != null &&
|
||||||
gpuList.size() != 0) {
|
gpuList.size() != 0) {
|
||||||
|
|
||||||
totalGpuUtilization = gpuList
|
avgGpuUtilization = getTotalNodeGpuUtilization() / gpuList.size();
|
||||||
.stream()
|
|
||||||
.map(g -> g.getGpuUtilizations().getOverallGpuUtilization())
|
|
||||||
.collect(Collectors.summingDouble(Float::floatValue))
|
|
||||||
.floatValue() / gpuList.size();
|
|
||||||
}
|
}
|
||||||
|
return avgGpuUtilization;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @return The total physical GPUs used in this node.
|
||||||
|
*
|
||||||
|
* For example:
|
||||||
|
* Node with total 4 GPUs
|
||||||
|
* Physical used 2.4 GPUs
|
||||||
|
* Will return 2.4f
|
||||||
|
*
|
||||||
|
* @throws Exception when any error happens
|
||||||
|
*/
|
||||||
|
public float getTotalNodeGpuUtilization() throws Exception{
|
||||||
|
List<PerGpuDeviceInformation> gpuList =
|
||||||
|
gpuDiscoverer.getGpuDeviceInformation().getGpus();
|
||||||
|
Float totalGpuUtilization = gpuList
|
||||||
|
.stream()
|
||||||
|
.map(g -> g.getGpuUtilizations().getOverallGpuUtilization())
|
||||||
|
.collect(Collectors.summingDouble(Float::floatValue))
|
||||||
|
.floatValue();
|
||||||
return totalGpuUtilization;
|
return totalGpuUtilization;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,7 +126,7 @@ public void testGetNMResourceInfoAutoDiscoveryDisabled()
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeGPUUtilization()
|
public void testAvgNodeGpuUtilization()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
GpuDiscoverer gpuDiscoverer = createNodeGPUUtilizationDiscoverer();
|
GpuDiscoverer gpuDiscoverer = createNodeGPUUtilizationDiscoverer();
|
||||||
|
|
||||||
@ -134,7 +134,7 @@ public void testNodeGPUUtilization()
|
|||||||
new GpuNodeResourceUpdateHandler(gpuDiscoverer, new Configuration());
|
new GpuNodeResourceUpdateHandler(gpuDiscoverer, new Configuration());
|
||||||
|
|
||||||
Assert.assertEquals(0.5F,
|
Assert.assertEquals(0.5F,
|
||||||
gpuNodeResourceUpdateHandler.getNodeGpuUtilization(), 1e-6);
|
gpuNodeResourceUpdateHandler.getAvgNodeGpuUtilization(), 1e-6);
|
||||||
}
|
}
|
||||||
|
|
||||||
private GpuDiscoverer createNodeGPUUtilizationDiscoverer()
|
private GpuDiscoverer createNodeGPUUtilizationDiscoverer()
|
||||||
|
Loading…
Reference in New Issue
Block a user