YARN-9085. Add Guaranteed and MaxCapacity to CSQueueMetrics

This commit is contained in:
Jonathan Hung 2018-12-07 10:32:53 -08:00
parent 6c852f2a37
commit cc51607ccd
5 changed files with 108 additions and 0 deletions

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableGaugeFloat; import org.apache.hadoop.metrics2.lib.MutableGaugeFloat;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@ -46,6 +47,14 @@ public class CSQueueMetrics extends QueueMetrics {
MutableGaugeFloat usedCapacity; MutableGaugeFloat usedCapacity;
@Metric("Percent of Absolute Capacity Used") @Metric("Percent of Absolute Capacity Used")
MutableGaugeFloat absoluteUsedCapacity; MutableGaugeFloat absoluteUsedCapacity;
@Metric("Guaranteed memory in MB")
MutableGaugeLong guaranteedMB;
@Metric("Guaranteed CPU in virtual cores")
MutableGaugeInt guaranteedVCores;
@Metric("Maximum memory in MB")
MutableGaugeLong maxCapacityMB;
@Metric("Maximum CPU in virtual cores")
MutableGaugeInt maxCapacityVCores;
CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent, CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) { boolean enableUserMetrics, Configuration conf) {
@ -126,6 +135,36 @@ public void setAbsoluteUsedCapacity(String partition,
} }
} }
public long getGuaranteedMB() {
return guaranteedMB.value();
}
public int getGuaranteedVCores() {
return guaranteedVCores.value();
}
public void setGuaranteedResources(String partition, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
guaranteedMB.set(res.getMemorySize());
guaranteedVCores.set(res.getVirtualCores());
}
}
public long getMaxCapacityMB() {
return maxCapacityMB.value();
}
public int getMaxCapacityVCores() {
return maxCapacityVCores.value();
}
public void setMaxCapacityResources(String partition, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
maxCapacityMB.set(res.getMemorySize());
maxCapacityVCores.set(res.getVirtualCores());
}
}
public synchronized static CSQueueMetrics forQueue(String queueName, public synchronized static CSQueueMetrics forQueue(String queueName,
Queue parent, boolean enableUserMetrics, Configuration conf) { Queue parent, boolean enableUserMetrics, Configuration conf) {
MetricsSystem ms = DefaultMetricsSystem.instance(); MetricsSystem ms = DefaultMetricsSystem.instance();

View File

@ -314,4 +314,21 @@ public static void updateQueueStatistics(
childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition, childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition,
getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster)); getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster));
} }
/**
* Updated configured capacity/max-capacity for queue.
* @param rc resource calculator
* @param partitionResource total cluster resources for this partition
* @param partition partition being updated
* @param queue queue
*/
public static void updateConfiguredCapacityMetrics(ResourceCalculator rc,
Resource partitionResource, String partition, AbstractCSQueue queue) {
queue.getMetrics().setGuaranteedResources(partition, rc.multiplyAndNormalizeDown(
partitionResource, queue.getQueueCapacities().getAbsoluteCapacity(partition),
queue.getMinimumAllocation()));
queue.getMetrics().setMaxCapacityResources(partition, rc.multiplyAndNormalizeDown(
partitionResource, queue.getQueueCapacities().getAbsoluteMaximumCapacity(partition),
queue.getMinimumAllocation()));
}
} }

View File

@ -1823,6 +1823,10 @@ public void updateClusterResource(Resource clusterResource,
// Update metrics // Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null); this, labelManager, null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
RMNodeLabelsManager.NO_LABEL, this);
// queue metrics are updated, more resource may be available // queue metrics are updated, more resource may be available
// activate the pending applications if possible // activate the pending applications if possible

View File

@ -911,6 +911,10 @@ public void updateClusterResource(Resource clusterResource,
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null); this, labelManager, null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
RMNodeLabelsManager.NO_LABEL, this);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }

View File

@ -5128,4 +5128,48 @@ public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception {
assertEquals(4, appsInB1.size()); assertEquals(4, appsInB1.size());
rm.close(); rm.close();
} }
@Test
public void testCSQueueMetrics() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.init(conf);
cs.start();
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 1, "n1");
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 2, "n2");
cs.handle(new NodeAddedSchedulerEvent(n1));
cs.handle(new NodeAddedSchedulerEvent(n2));
assertEquals(10240, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB());
assertEquals(71680, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
// Remove a node, metrics should be updated
cs.handle(new NodeRemovedSchedulerEvent(n2));
assertEquals(5120, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB());
assertEquals(35840, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
// Add child queue to a, and reinitialize. Metrics should be updated
conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a", new String[] {"a1", "a2", "a3"} );
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a2", 30.0f);
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 40.0f);
conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 50.0f);
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null));
assertEquals(1024, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getGuaranteedMB());
assertEquals(2048, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getGuaranteedMB());
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB());
assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB());
}
} }