YARN-6195. Export UsedCapacity and AbsoluteUsedCapacity to JMX. Contributed by Benson Qiu
This commit is contained in:
parent
2fd568fdd4
commit
0e065f2ede
@ -143,6 +143,7 @@ MutableCounterLong newCounter(MetricsInfo info, long iVal) {
|
||||
public MutableGaugeInt newGauge(String name, String desc, int iVal) {
|
||||
return newGauge(Interns.info(name, desc), iVal);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a mutable integer gauge
|
||||
* @param info metadata of the metric
|
||||
@ -180,6 +181,30 @@ public synchronized MutableGaugeLong newGauge(MetricsInfo info, long iVal) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a mutable float gauge
|
||||
* @param name of the metric
|
||||
* @param desc metric description
|
||||
* @param iVal initial value
|
||||
* @return a new gauge object
|
||||
*/
|
||||
public MutableGaugeFloat newGauge(String name, String desc, float iVal) {
|
||||
return newGauge(Interns.info(name, desc), iVal);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a mutable float gauge
|
||||
* @param info metadata of the metric
|
||||
* @param iVal initial value
|
||||
* @return a new gauge object
|
||||
*/
|
||||
public synchronized MutableGaugeFloat newGauge(MetricsInfo info, float iVal) {
|
||||
checkMetricName(info.name());
|
||||
MutableGaugeFloat ret = new MutableGaugeFloat(info, iVal);
|
||||
metricsMap.put(info.name(), ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a mutable metric that estimates quantiles of a stream of values
|
||||
* @param name of the metric
|
||||
@ -420,4 +445,5 @@ public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
|
||||
.add("info", metricsInfo).add("tags", tags()).add("metrics", metrics())
|
||||
.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,80 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.metrics2.lib;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
|
||||
/**
|
||||
* A mutable float gauge.
|
||||
*/
|
||||
public class MutableGaugeFloat extends MutableGauge {
|
||||
|
||||
private AtomicInteger value = new AtomicInteger();
|
||||
|
||||
MutableGaugeFloat(MetricsInfo info, float initValue) {
|
||||
super(info);
|
||||
this.value.set(Float.floatToIntBits(initValue));
|
||||
}
|
||||
|
||||
public float value() {
|
||||
return Float.intBitsToFloat(value.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incr() {
|
||||
incr(1.0f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decr() {
|
||||
incr(-1.0f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void snapshot(MetricsRecordBuilder builder, boolean all) {
|
||||
if (all || changed()) {
|
||||
builder.addGauge(info(), value());
|
||||
clearChanged();
|
||||
}
|
||||
}
|
||||
|
||||
public void set(float value) {
|
||||
this.value.set(Float.floatToIntBits(value));
|
||||
setChanged();
|
||||
}
|
||||
|
||||
private final boolean compareAndSet(float expect, float update) {
|
||||
return value.compareAndSet(Float.floatToIntBits(expect),
|
||||
Float.floatToIntBits(update));
|
||||
}
|
||||
|
||||
private void incr(float delta) {
|
||||
while (true) {
|
||||
float current = value.get();
|
||||
float next = current + delta;
|
||||
if (compareAndSet(current, next)) {
|
||||
setChanged();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -60,6 +60,9 @@ MutableMetric newForField(Field field, Metric annotation,
|
||||
if (cls == MutableGaugeLong.class) {
|
||||
return registry.newGauge(info, 0L);
|
||||
}
|
||||
if (cls == MutableGaugeFloat.class) {
|
||||
return registry.newGauge(info, 0f);
|
||||
}
|
||||
if (cls == MutableRate.class) {
|
||||
return registry.newRate(info.name(), info.description(),
|
||||
annotation.always());
|
||||
|
@ -39,6 +39,7 @@ static class MyMetrics {
|
||||
@Metric({"Counter2", "Counter2 desc"}) MutableCounterLong c2;
|
||||
@Metric MutableGaugeInt g1, g2;
|
||||
@Metric("g3 desc") MutableGaugeLong g3;
|
||||
@Metric("g4 desc") MutableGaugeFloat g4;
|
||||
@Metric MutableRate r1;
|
||||
@Metric MutableStat s1;
|
||||
@Metric MutableRates rs1;
|
||||
@ -53,6 +54,7 @@ static class MyMetrics {
|
||||
metrics.g1.incr();
|
||||
metrics.g2.incr();
|
||||
metrics.g3.incr();
|
||||
metrics.g4.incr();
|
||||
metrics.r1.add(1);
|
||||
metrics.s1.add(1);
|
||||
metrics.rs1.add("rs1", 1);
|
||||
@ -64,6 +66,7 @@ static class MyMetrics {
|
||||
verify(rb).addGauge(info("G1", "G1"), 1);
|
||||
verify(rb).addGauge(info("G2", "G2"), 1);
|
||||
verify(rb).addGauge(info("G3", "g3 desc"), 1L);
|
||||
verify(rb).addGauge(info("G4", "g4 desc"), 1f);
|
||||
verify(rb).addCounter(info("R1NumOps", "Number of ops for r1"), 1L);
|
||||
verify(rb).addGauge(info("R1AvgTime", "Average time for r1"), 1.0);
|
||||
verify(rb).addCounter(info("S1NumOps", "Number of ops for s1"), 1L);
|
||||
|
@ -42,13 +42,15 @@ public class TestMetricsRegistry {
|
||||
r.newCounter("c2", "c2 desc", 2L);
|
||||
r.newGauge("g1", "g1 desc", 3);
|
||||
r.newGauge("g2", "g2 desc", 4L);
|
||||
r.newGauge("g3", "g3 desc", 5f);
|
||||
r.newStat("s1", "s1 desc", "ops", "time");
|
||||
|
||||
assertEquals("num metrics in registry", 5, r.metrics().size());
|
||||
assertEquals("num metrics in registry", 6, r.metrics().size());
|
||||
assertTrue("c1 found", r.get("c1") instanceof MutableCounterInt);
|
||||
assertTrue("c2 found", r.get("c2") instanceof MutableCounterLong);
|
||||
assertTrue("g1 found", r.get("g1") instanceof MutableGaugeInt);
|
||||
assertTrue("g2 found", r.get("g2") instanceof MutableGaugeLong);
|
||||
assertTrue("g3 found", r.get("g3") instanceof MutableGaugeFloat);
|
||||
assertTrue("s1 found", r.get("s1") instanceof MutableStat);
|
||||
|
||||
expectMetricsException("Metric name c1 already exists", new Runnable() {
|
||||
|
@ -59,6 +59,7 @@ public class TestMutableMetrics {
|
||||
registry.newCounter("c2", "long counter", 2L);
|
||||
registry.newGauge("g1", "int gauge", 3);
|
||||
registry.newGauge("g2", "long gauge", 4L);
|
||||
registry.newGauge("g3", "float gauge", 5f);
|
||||
registry.newStat("s1", "stat", "Ops", "Time", true).add(0);
|
||||
registry.newRate("s2", "stat", false).add(0);
|
||||
|
||||
@ -74,6 +75,7 @@ public class TestMutableMetrics {
|
||||
verify(mb).addCounter(info("c2", "long counter"), 2L);
|
||||
verify(mb).addGauge(info("g1", "int gauge"), 3);
|
||||
verify(mb).addGauge(info("g2", "long gauge"), 4L);
|
||||
verify(mb).addGauge(info("g3", "float gauge"), 5f);
|
||||
verify(mb).addCounter(info("S1NumOps", "Number of ops for stat"), 1L);
|
||||
verify(mb).addGauge(eq(info("S1AvgTime", "Average time for stat")),
|
||||
eq(0.0, EPSILON));
|
||||
|
@ -228,16 +228,6 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
||||
null, null, Server.getRemoteAddress(), null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUsedCapacity(float usedCapacity) {
|
||||
queueCapacities.setUsedCapacity(usedCapacity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAbsoluteUsedCapacity(float absUsedCapacity) {
|
||||
queueCapacities.setAbsoluteUsedCapacity(absUsedCapacity);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set maximum capacity - used only for testing.
|
||||
* @param maximumCapacity new max capacity
|
||||
@ -309,7 +299,7 @@ void setupQueueConfigs(Resource clusterResource)
|
||||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
this, labelManager, null);
|
||||
|
||||
// Check if labels of this queue is a subset of parent queue, only do this
|
||||
// when we not root
|
||||
@ -461,7 +451,7 @@ void allocateResource(Resource clusterResource,
|
||||
++numContainers;
|
||||
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, nodePartition);
|
||||
this, labelManager, nodePartition);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
@ -474,7 +464,7 @@ protected void releaseResource(Resource clusterResource,
|
||||
queueUsage.decUsed(nodePartition, resource);
|
||||
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, nodePartition);
|
||||
this, labelManager, nodePartition);
|
||||
|
||||
--numContainers;
|
||||
} finally {
|
||||
@ -735,7 +725,7 @@ public void incUsedResource(String nodeLabel, Resource resourceToInc,
|
||||
queueUsage.incUsed(nodeLabel, resourceToInc);
|
||||
CSQueueUtils.updateUsedCapacity(resourceCalculator,
|
||||
labelManager.getResourceByLabel(nodeLabel, Resources.none()),
|
||||
minimumAllocation, queueUsage, queueCapacities, nodeLabel);
|
||||
nodeLabel, this);
|
||||
if (null != parent) {
|
||||
parent.incUsedResource(nodeLabel, resourceToInc, null);
|
||||
}
|
||||
@ -751,7 +741,7 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec,
|
||||
queueUsage.decUsed(nodeLabel, resourceToDec);
|
||||
CSQueueUtils.updateUsedCapacity(resourceCalculator,
|
||||
labelManager.getResourceByLabel(nodeLabel, Resources.none()),
|
||||
minimumAllocation, queueUsage, queueCapacities, nodeLabel);
|
||||
nodeLabel, this);
|
||||
if (null != parent) {
|
||||
parent.decUsedResource(nodeLabel, resourceToDec, null);
|
||||
}
|
||||
|
@ -120,20 +120,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
|
||||
*/
|
||||
public float getAbsoluteUsedCapacity();
|
||||
|
||||
/**
|
||||
* Set used capacity of the queue.
|
||||
* @param usedCapacity
|
||||
* used capacity of the queue
|
||||
*/
|
||||
public void setUsedCapacity(float usedCapacity);
|
||||
|
||||
/**
|
||||
* Set absolute used capacity of the queue.
|
||||
* @param absUsedCapacity
|
||||
* absolute used capacity of the queue
|
||||
*/
|
||||
public void setAbsoluteUsedCapacity(float absUsedCapacity);
|
||||
|
||||
/**
|
||||
* Get the current used capacity of nodes without label(s) of the queue
|
||||
* and it's children (if any).
|
||||
|
@ -23,6 +23,7 @@
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeFloat;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
@ -39,6 +40,10 @@ public class CSQueueMetrics extends QueueMetrics {
|
||||
MutableGaugeLong usedAMResourceMB;
|
||||
@Metric("Used AM CPU limit in virtual cores")
|
||||
MutableGaugeLong usedAMResourceVCores;
|
||||
@Metric("Percent of Capacity Used")
|
||||
MutableGaugeFloat usedCapacity;
|
||||
@Metric("Percent of Absolute Capacity Used")
|
||||
MutableGaugeFloat absoluteUsedCapacity;
|
||||
|
||||
CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
||||
boolean enableUserMetrics, Configuration conf) {
|
||||
@ -91,6 +96,22 @@ public void decAMUsed(String user, Resource res) {
|
||||
}
|
||||
}
|
||||
|
||||
public float getUsedCapacity() {
|
||||
return usedCapacity.value();
|
||||
}
|
||||
|
||||
public void setUsedCapacity(float usedCapacity) {
|
||||
this.usedCapacity.set(usedCapacity);
|
||||
}
|
||||
|
||||
public float getAbsoluteUsedCapacity() {
|
||||
return absoluteUsedCapacity.value();
|
||||
}
|
||||
|
||||
public void setAbsoluteUsedCapacity(Float absoluteUsedCapacity) {
|
||||
this.absoluteUsedCapacity.set(absoluteUsedCapacity);
|
||||
}
|
||||
|
||||
public synchronized static CSQueueMetrics forQueue(String queueName,
|
||||
Queue parent, boolean enableUserMetrics, Configuration conf) {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
|
@ -17,7 +17,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -181,9 +180,12 @@ private static void updateAbsoluteCapacitiesByNodeLabels(
|
||||
* used resource for all partitions of this queue.
|
||||
*/
|
||||
public static void updateUsedCapacity(final ResourceCalculator rc,
|
||||
final Resource totalPartitionResource, final Resource minimumAllocation,
|
||||
ResourceUsage queueResourceUsage, QueueCapacities queueCapacities,
|
||||
String nodePartition) {
|
||||
final Resource totalPartitionResource, String nodePartition,
|
||||
AbstractCSQueue childQueue) {
|
||||
QueueCapacities queueCapacities = childQueue.getQueueCapacities();
|
||||
CSQueueMetrics queueMetrics = childQueue.getMetrics();
|
||||
ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
|
||||
Resource minimumAllocation = childQueue.getMinimumAllocation();
|
||||
float absoluteUsedCapacity = 0.0f;
|
||||
float usedCapacity = 0.0f;
|
||||
float reservedCapacity = 0.0f;
|
||||
@ -225,8 +227,18 @@ public static void updateUsedCapacity(final ResourceCalculator rc,
|
||||
queueCapacities.setReservedCapacity(nodePartition, reservedCapacity);
|
||||
queueCapacities
|
||||
.setAbsoluteReservedCapacity(nodePartition, absoluteReservedCapacity);
|
||||
|
||||
// QueueMetrics does not support per-label capacities,
|
||||
// so we report values only for the default partition.
|
||||
if (nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
|
||||
queueMetrics.setUsedCapacity(
|
||||
queueCapacities.getUsedCapacity(RMNodeLabelsManager.NO_LABEL));
|
||||
queueMetrics.setAbsoluteUsedCapacity(
|
||||
queueCapacities.getAbsoluteUsedCapacity(
|
||||
RMNodeLabelsManager.NO_LABEL));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static Resource getMaxAvailableResourceToQueue(
|
||||
final ResourceCalculator rc, RMNodeLabelsManager nlm, CSQueue queue,
|
||||
Resource cluster) {
|
||||
@ -270,22 +282,22 @@ private static Resource getMaxAvailableResourceToQueue(
|
||||
*/
|
||||
@Lock(CSQueue.class)
|
||||
public static void updateQueueStatistics(
|
||||
final ResourceCalculator rc, final Resource cluster, final Resource minimumAllocation,
|
||||
final CSQueue childQueue, final RMNodeLabelsManager nlm,
|
||||
final ResourceCalculator rc, final Resource cluster,
|
||||
final AbstractCSQueue childQueue, final RMNodeLabelsManager nlm,
|
||||
final String nodePartition) {
|
||||
QueueCapacities queueCapacities = childQueue.getQueueCapacities();
|
||||
ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
|
||||
|
||||
|
||||
if (nodePartition == null) {
|
||||
for (String partition : Sets.union(
|
||||
queueCapacities.getNodePartitionsSet(),
|
||||
queueResourceUsage.getNodePartitionsSet())) {
|
||||
updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
|
||||
minimumAllocation, queueResourceUsage, queueCapacities, partition);
|
||||
partition, childQueue);
|
||||
}
|
||||
} else {
|
||||
updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
|
||||
minimumAllocation, queueResourceUsage, queueCapacities, nodePartition);
|
||||
nodePartition, childQueue);
|
||||
}
|
||||
|
||||
// Update queue metrics w.r.t node labels. In a generic way, we can
|
||||
|
@ -1676,7 +1676,7 @@ public void updateClusterResource(Resource clusterResource,
|
||||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
this, labelManager, null);
|
||||
|
||||
// queue metrics are updated, more resource may be available
|
||||
// activate the pending applications if possible
|
||||
|
@ -840,7 +840,7 @@ public void updateClusterResource(Resource clusterResource,
|
||||
}
|
||||
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
this, labelManager, null);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ public void reinitialize(CSQueue newlyParsedQueue,
|
||||
}
|
||||
super.reinitialize(newlyParsedQueue, clusterResource);
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
this, labelManager, null);
|
||||
|
||||
updateQuotas(parent.getUserLimitForReservation(),
|
||||
parent.getUserLimitFactor(),
|
||||
|
Loading…
Reference in New Issue
Block a user