YARN-9773: Add QueueMetrics for Custom Resources. Contributed by Manikandan R.

This commit is contained in:
Eric E Payne 2019-10-16 21:10:08 +00:00
parent 9a8edb0aed
commit a5034c7988
4 changed files with 240 additions and 56 deletions

View File

@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
@ -43,6 +44,7 @@
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -121,6 +123,31 @@ public class QueueMetrics implements MetricsSource {
protected final Configuration conf;
private QueueMetricsForCustomResources queueMetricsForCustomResources;
private static final String ALLOCATED_RESOURCE_METRIC_PREFIX =
"AllocatedResource.";
private static final String ALLOCATED_RESOURCE_METRIC_DESC =
"Allocated NAME";
private static final String AVAILABLE_RESOURCE_METRIC_PREFIX =
"AvailableResource.";
private static final String AVAILABLE_RESOURCE_METRIC_DESC =
"Available NAME";
private static final String PENDING_RESOURCE_METRIC_PREFIX =
"PendingResource.";
private static final String PENDING_RESOURCE_METRIC_DESC =
"Pending NAME";
private static final String RESERVED_RESOURCE_METRIC_PREFIX =
"ReservedResource.";
private static final String RESERVED_RESOURCE_METRIC_DESC =
"Reserved NAME";
private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX =
"AggregatePreemptedSeconds.";
private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC =
"Aggregate Preempted Seconds for NAME";
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
registry = new MetricsRegistry(RECORD_INFO);
@ -135,6 +162,7 @@ protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
this.queueMetricsForCustomResources =
new QueueMetricsForCustomResources();
registerCustomResources();
}
}
@ -366,6 +394,9 @@ public void setAvailableResourcesToQueue(String partition, Resource limit) {
availableVCores.set(limit.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.setAvailable(limit);
registerCustomResources(
queueMetricsForCustomResources.getAvailableValues(),
AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
}
}
}
@ -418,16 +449,67 @@ public void incrPendingResources(String partition, String user,
}
}
/**
* Register all custom resources metrics as part of initialization. As and
* when this metric object construction happens for any queue, all custom
* resource metrics value would be initialized with '0' like any other
* mandatory resources metrics
*/
private void registerCustomResources() {
Map<String, Long> customResources =
new HashMap<String, Long>();
ResourceInformation[] resources =
ResourceUtils.getResourceTypesArray();
for (int i =
2; i < resources.length; i++) {
ResourceInformation resource =
resources[i];
customResources.put(resource.getName(), new Long(0));
}
registerCustomResources(customResources, ALLOCATED_RESOURCE_METRIC_PREFIX,
ALLOCATED_RESOURCE_METRIC_DESC);
registerCustomResources(customResources, AVAILABLE_RESOURCE_METRIC_PREFIX,
AVAILABLE_RESOURCE_METRIC_DESC);
registerCustomResources(customResources, PENDING_RESOURCE_METRIC_PREFIX,
PENDING_RESOURCE_METRIC_DESC);
registerCustomResources(customResources, RESERVED_RESOURCE_METRIC_PREFIX,
RESERVED_RESOURCE_METRIC_DESC);
registerCustomResources(customResources,
AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
}
private void registerCustomResources(Map<String, Long> customResources,
String metricPrefix, String metricDesc) {
for (Entry<String, Long> entry : customResources.entrySet()) {
String resourceName = entry.getKey();
Long resourceValue = entry.getValue();
MutableGaugeLong resourceMetric =
(MutableGaugeLong) this.registry.get(metricPrefix + resourceName);
if (resourceMetric == null) {
resourceMetric =
this.registry.newGauge(metricPrefix + resourceName,
metricDesc.replace("NAME", resourceName), 0L);
}
resourceMetric.set(resourceValue);
}
}
private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increasePending(res, containers);
registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
}
public void decrPendingResources(String partition, String user,
int containers, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
@ -448,6 +530,8 @@ private void _decrPendingResources(int containers, Resource res) {
pendingVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res, containers);
registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
}
@ -480,6 +564,9 @@ public void allocateResources(String partition, String user,
allocatedVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res, containers);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
if (decrPending) {
@ -508,12 +595,18 @@ public void allocateResources(String partition, String user, Resource res) {
allocatedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
pendingMB.decr(res.getMemorySize());
pendingVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res);
registerCustomResources(
queueMetricsForCustomResources.getPendingValues(),
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
@ -535,6 +628,9 @@ public void releaseResources(String partition,
allocatedVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res, containers);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
@ -558,6 +654,9 @@ private void releaseResources(String user, Resource res) {
allocatedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
@ -612,6 +711,11 @@ public void updatePreemptedSecondsForCustomResources(Resource res,
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources
.increaseAggregatedPreemptedSeconds(res, seconds);
registerCustomResources(
queueMetricsForCustomResources.getAggregatePreemptedSeconds()
.getValues(),
AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
}
if (parent != null) {
parent.updatePreemptedSecondsForCustomResources(res, seconds);
@ -630,6 +734,9 @@ public void reserveResource(String user, Resource res) {
reservedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseReserved(res);
registerCustomResources(
queueMetricsForCustomResources.getReservedValues(),
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -646,6 +753,9 @@ private void unreserveResource(String user, Resource res) {
reservedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseReserved(res);
registerCustomResources(
queueMetricsForCustomResources.getReservedValues(),
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {

View File

@ -43,6 +43,16 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2;
final class ResourceMetricsChecker {
private final static Logger LOG =
@ -52,22 +62,34 @@ enum ResourceMetricType {
GAUGE_INT, GAUGE_LONG, COUNTER_INT, COUNTER_LONG
}
private static final ResourceMetricsChecker INITIAL_CHECKER =
new ResourceMetricsChecker()
.gaugeLong(ALLOCATED_MB, 0)
.gaugeInt(ALLOCATED_V_CORES, 0)
.gaugeInt(ALLOCATED_CONTAINERS, 0)
private static final ResourceMetricsChecker INITIAL_MANDATORY_RES_CHECKER =
new ResourceMetricsChecker().gaugeLong(ALLOCATED_MB, 0)
.gaugeInt(ALLOCATED_V_CORES, 0).gaugeInt(ALLOCATED_CONTAINERS, 0)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 0)
.counter(AGGREGATE_CONTAINERS_RELEASED, 0)
.gaugeLong(AVAILABLE_MB, 0)
.gaugeInt(AVAILABLE_V_CORES, 0)
.gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0)
.gaugeInt(PENDING_CONTAINERS, 0)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.counter(AGGREGATE_CONTAINERS_RELEASED, 0).gaugeLong(AVAILABLE_MB, 0)
.gaugeInt(AVAILABLE_V_CORES, 0).gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0).gaugeInt(PENDING_CONTAINERS, 0)
.gaugeLong(RESERVED_MB, 0).gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0);
private static final ResourceMetricsChecker INITIAL_CHECKER =
new ResourceMetricsChecker().gaugeLong(ALLOCATED_MB, 0)
.gaugeInt(ALLOCATED_V_CORES, 0).gaugeInt(ALLOCATED_CONTAINERS, 0)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 0)
.counter(AGGREGATE_CONTAINERS_RELEASED, 0).gaugeLong(AVAILABLE_MB, 0)
.gaugeInt(AVAILABLE_V_CORES, 0).gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0).gaugeInt(PENDING_CONTAINERS, 0)
.gaugeLong(RESERVED_MB, 0).gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0).gaugeLong(ALLOCATED_CUSTOM_RES1, 0)
.gaugeLong(ALLOCATED_CUSTOM_RES2, 0).gaugeLong(AVAILABLE_CUSTOM_RES1, 0)
.gaugeLong(AVAILABLE_CUSTOM_RES2, 0).gaugeLong(PENDING_CUSTOM_RES1, 0)
.gaugeLong(PENDING_CUSTOM_RES2, 0).gaugeLong(RESERVED_CUSTOM_RES1, 0)
.gaugeLong(RESERVED_CUSTOM_RES2, 0)
.gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1, 0)
.gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2, 0);
enum ResourceMetricsKey {
ALLOCATED_MB("AllocatedMB", GAUGE_LONG),
ALLOCATED_V_CORES("AllocatedVCores", GAUGE_INT),
@ -87,7 +109,18 @@ enum ResourceMetricsKey {
AGGREGATE_VCORE_SECONDS_PREEMPTED(
"AggregateVcoreSecondsPreempted", COUNTER_LONG),
AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED(
"AggregateMemoryMBSecondsPreempted", COUNTER_LONG);
"AggregateMemoryMBSecondsPreempted", COUNTER_LONG),
ALLOCATED_CUSTOM_RES1("AllocatedResource.custom_res_1", GAUGE_LONG),
ALLOCATED_CUSTOM_RES2("AllocatedResource.custom_res_2", GAUGE_LONG),
AVAILABLE_CUSTOM_RES1("AvailableResource.custom_res_1", GAUGE_LONG),
AVAILABLE_CUSTOM_RES2("AvailableResource.custom_res_2", GAUGE_LONG),
PENDING_CUSTOM_RES1("PendingResource.custom_res_1",GAUGE_LONG),
PENDING_CUSTOM_RES2("PendingResource.custom_res_2",GAUGE_LONG),
RESERVED_CUSTOM_RES1("ReservedResource.custom_res_1",GAUGE_LONG),
RESERVED_CUSTOM_RES2("ReservedResource.custom_res_2", GAUGE_LONG),
AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1("AggregatePreemptedSeconds.custom_res_1", GAUGE_LONG),
AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2("AggregatePreemptedSeconds.custom_res_2", GAUGE_LONG);
private String value;
private ResourceMetricType type;
@ -131,6 +164,10 @@ public static ResourceMetricsChecker create() {
return new ResourceMetricsChecker(INITIAL_CHECKER);
}
public static ResourceMetricsChecker createMandatoryResourceChecker() {
return new ResourceMetricsChecker(INITIAL_MANDATORY_RES_CHECKER);
}
ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) {
ensureTypeIsCorrect(key, GAUGE_LONG);
gaugesLong.put(key, value);

View File

@ -105,13 +105,11 @@ public void testDefaultSingleQueueMetrics() {
USER, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 100 * GB)
.gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource);
ResourceMetricsChecker rmChecker =
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(queueSource);
metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
@ -284,7 +282,7 @@ public void testSingleQueueWithUserMetrics() {
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
ResourceMetricsChecker resMetricsQueueSourceChecker =
ResourceMetricsChecker.create()
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 100 * GB)
.gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB)
@ -292,7 +290,7 @@ public void testSingleQueueWithUserMetrics() {
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource);
ResourceMetricsChecker resMetricsUserSourceChecker =
ResourceMetricsChecker.create()
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 10 * GB)
.gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB)
@ -471,37 +469,25 @@ public void testTwoLevelWithUserMetrics() {
USER, 5, Resources.createResource(3*GB, 3));
ResourceMetricsChecker resMetricsQueueSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 100 * GB)
.gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(leaf.queueSource);
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(leaf.queueSource);
ResourceMetricsChecker resMetricsParentQueueSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 100 * GB)
.gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(root.queueSource);
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(root.queueSource);
ResourceMetricsChecker resMetricsUserSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB)
.gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(leaf.userSource);
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 10 * GB).gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(leaf.userSource);
ResourceMetricsChecker resMetricsParentUserSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB)
.gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(root.userSource);
ResourceMetricsChecker.createMandatoryResourceChecker()
.gaugeLong(AVAILABLE_MB, 10 * GB).gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(root.userSource);
leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsQueueSourceChecker =

View File

@ -71,6 +71,17 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -204,7 +215,11 @@ private void testIncreasePendingResourcesInternal(int containers,
.gaugeLong(PENDING_MB, containers *
testData.resource.getMemorySize())
.gaugeInt(PENDING_V_CORES, containers *
testData.resource.getVirtualCores());
testData.resource.getVirtualCores())
.gaugeLong(PENDING_CUSTOM_RES1,
containers * testData.customResourceValues.get(CUSTOM_RES_1))
.gaugeLong(PENDING_CUSTOM_RES2,
containers * testData.customResourceValues.get(CUSTOM_RES_2));
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getPendingResources,
MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
@ -227,6 +242,12 @@ private void testAllocateResources(boolean decreasePending,
.gaugeInt(PENDING_CONTAINERS, 0)
.gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0)
.gaugeLong(ALLOCATED_CUSTOM_RES1,
testData.containers
* testData.customResourceValues.get(CUSTOM_RES_1))
.gaugeLong(ALLOCATED_CUSTOM_RES2,
testData.containers
* testData.customResourceValues.get(CUSTOM_RES_2))
.checkAgainst(testData.leafQueue.queueSource);
if (decreasePending) {
assertAllMetrics(testData.leafQueue, checker,
@ -258,7 +279,11 @@ private void testUpdatePreemptedSeconds(QueueMetricsTestData testData,
.counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED,
testData.resource.getMemorySize() * seconds)
.counter(AGGREGATE_VCORE_SECONDS_PREEMPTED,
testData.resource.getVirtualCores() * seconds);
testData.resource.getVirtualCores() * seconds)
.gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1,
testData.customResourceValues.get(CUSTOM_RES_1) * seconds)
.gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2,
testData.customResourceValues.get(CUSTOM_RES_2) * seconds);
assertQueueMetricsOnly(testData.leafQueue, checker,
this::convertPreemptedSecondsToResource,
@ -288,6 +313,10 @@ private void testReserveResources(QueueMetricsTestData testData) {
.gaugeInt(RESERVED_CONTAINERS, 1)
.gaugeLong(RESERVED_MB, testData.resource.getMemorySize())
.gaugeInt(RESERVED_V_CORES, testData.resource.getVirtualCores())
.gaugeLong(RESERVED_CUSTOM_RES1,
testData.customResourceValues.get(CUSTOM_RES_1))
.gaugeLong(RESERVED_CUSTOM_RES2,
testData.customResourceValues.get(CUSTOM_RES_2))
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getReservedResources,
@ -380,6 +409,8 @@ ImmutableMap.<String, String> builder()
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, GB)
.gaugeInt(AVAILABLE_V_CORES, 4)
.gaugeLong(AVAILABLE_CUSTOM_RES1, 5 * GB)
.gaugeLong(AVAILABLE_CUSTOM_RES2, 6 * GB)
.checkAgainst(queueSource);
assertCustomResourceValue(metrics,
@ -406,6 +437,8 @@ ImmutableMap.<String, String> builder()
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, GB)
.gaugeInt(AVAILABLE_V_CORES, 4)
.gaugeLong(AVAILABLE_CUSTOM_RES1, 15 * GB)
.gaugeLong(AVAILABLE_CUSTOM_RES2, 20 * GB)
.checkAgainst(queueSource);
assertCustomResourceValue(metrics,
@ -445,12 +478,23 @@ public void testDecreasePendingResources() {
final int vCoresToDecrease = resourceToDecrease.getVirtualCores();
final long memoryMBToDecrease = resourceToDecrease.getMemorySize();
final int containersAfterDecrease = containers - containersToDecrease;
final long customRes1ToDecrease =
resourceToDecrease.getResourceValue(CUSTOM_RES_1);
final long customRes2ToDecrease =
resourceToDecrease.getResourceValue(CUSTOM_RES_2);
final int vcoresAfterDecrease =
(defaultResource.getVirtualCores() * containers)
- (vCoresToDecrease * containersToDecrease);
final long memoryAfterDecrease =
(defaultResource.getMemorySize() * containers)
- (memoryMBToDecrease * containersToDecrease);
final long customResource1AfterDecrease =
(testData.customResourceValues.get(CUSTOM_RES_1) * containers)
- (customRes1ToDecrease * containersToDecrease);
final long customResource2AfterDecrease =
(testData.customResourceValues.get(CUSTOM_RES_2) * containers)
- (customRes2ToDecrease * containersToDecrease);
//first, increase resources to be able to decrease some
testIncreasePendingResources(testData);
@ -468,6 +512,8 @@ public void testDecreasePendingResources() {
.gaugeInt(PENDING_CONTAINERS, containersAfterDecrease)
.gaugeLong(PENDING_MB, memoryAfterDecrease)
.gaugeInt(PENDING_V_CORES, vcoresAfterDecrease)
.gaugeLong(PENDING_CUSTOM_RES1, customResource1AfterDecrease)
.gaugeLong(PENDING_CUSTOM_RES2, customResource2AfterDecrease)
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
@ -522,7 +568,11 @@ public void testAllocateResourcesWithoutContainer() {
.gaugeLong(ALLOCATED_MB, resource.getMemorySize())
.gaugeInt(ALLOCATED_V_CORES, resource.getVirtualCores())
.gaugeInt(PENDING_CONTAINERS, 1).gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0);
.gaugeInt(PENDING_V_CORES, 0)
.gaugeLong(ALLOCATED_CUSTOM_RES1,
testData.customResourceValues.get(CUSTOM_RES_1))
.gaugeLong(ALLOCATED_CUSTOM_RES2,
testData.customResourceValues.get(CUSTOM_RES_2));
checker.checkAgainst(testData.leafQueue.queueSource);
checker.checkAgainst(testData.leafQueue.getRoot().queueSource);
@ -613,6 +663,7 @@ public void testUnreserveResources() {
.gaugeInt(RESERVED_CONTAINERS, 0)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeLong(RESERVED_CUSTOM_RES1, 0).gaugeLong(RESERVED_CUSTOM_RES2, 0)
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getReservedResources,