YARN-10519. Refactor QueueMetricsForCustomResources class to move to yarn-common package. Contributed by Minni Mittal
(cherry picked from commit 8bc2dfbf36
)
This commit is contained in:
parent
763157dd12
commit
1520b84b36
@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
package org.apache.hadoop.yarn.metrics;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -29,26 +29,26 @@
|
||||
* the name of the custom resource.
|
||||
* There are different kinds of values like allocated, available and others.
|
||||
*/
|
||||
public class QueueMetricsCustomResource {
|
||||
public class CustomResourceMetricValue {
|
||||
private final Map<String, Long> values = Maps.newHashMap();
|
||||
|
||||
protected void increase(Resource res) {
|
||||
public void increase(Resource res) {
|
||||
update(res, Long::sum);
|
||||
}
|
||||
|
||||
void increaseWithMultiplier(Resource res, long multiplier) {
|
||||
public void increaseWithMultiplier(Resource res, long multiplier) {
|
||||
update(res, (v1, v2) -> v1 + v2 * multiplier);
|
||||
}
|
||||
|
||||
protected void decrease(Resource res) {
|
||||
public void decrease(Resource res) {
|
||||
update(res, (v1, v2) -> v1 - v2);
|
||||
}
|
||||
|
||||
void decreaseWithMultiplier(Resource res, int containers) {
|
||||
public void decreaseWithMultiplier(Resource res, int containers) {
|
||||
update(res, (v1, v2) -> v1 - v2 * containers);
|
||||
}
|
||||
|
||||
protected void set(Resource res) {
|
||||
public void set(Resource res) {
|
||||
update(res, (v1, v2) -> v2);
|
||||
}
|
||||
|
||||
@ -64,8 +64,7 @@ private void update(Resource res, BiFunction<Long, Long, Long> operation) {
|
||||
if (!values.containsKey(resource.getName())) {
|
||||
values.put(resource.getName(), 0L);
|
||||
}
|
||||
values.merge(resource.getName(),
|
||||
resource.getValue(), operation);
|
||||
values.merge(resource.getName(), resource.getValue(), operation);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,133 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.metrics;
|
||||
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This is base class for allocated and available metrics for
|
||||
* custom resources.
|
||||
*/
|
||||
public class CustomResourceMetrics {
|
||||
private static final String ALLOCATED_RESOURCE_METRIC_PREFIX =
|
||||
"AllocatedResource.";
|
||||
private static final String ALLOCATED_RESOURCE_METRIC_DESC = "Allocated NAME";
|
||||
|
||||
private static final String AVAILABLE_RESOURCE_METRIC_PREFIX =
|
||||
"AvailableResource.";
|
||||
private static final String AVAILABLE_RESOURCE_METRIC_DESC = "Available NAME";
|
||||
|
||||
private final CustomResourceMetricValue allocated =
|
||||
new CustomResourceMetricValue();
|
||||
private final CustomResourceMetricValue available =
|
||||
new CustomResourceMetricValue();
|
||||
|
||||
/**
|
||||
* Register all custom resources metrics as part of initialization.
|
||||
* @param customResources Map containing all custom resource types
|
||||
* @param registry of the metric type
|
||||
*/
|
||||
public void registerCustomResources(Map<String, Long> customResources,
|
||||
MetricsRegistry registry) {
|
||||
registerCustomResources(customResources, registry,
|
||||
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
|
||||
registerCustomResources(customResources, registry,
|
||||
AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a map of all custom resource metric.
|
||||
* @return map of custom resource
|
||||
*/
|
||||
public Map<String, Long> initAndGetCustomResources() {
|
||||
Map<String, Long> customResources = new HashMap<String, Long>();
|
||||
ResourceInformation[] resources = ResourceUtils.getResourceTypesArray();
|
||||
|
||||
for (int i = 2; i < resources.length; i++) {
|
||||
ResourceInformation resource = resources[i];
|
||||
customResources.put(resource.getName(), Long.valueOf(0));
|
||||
}
|
||||
return customResources;
|
||||
}
|
||||
|
||||
/**
|
||||
* As and when this metric object construction happens for any queue, all
|
||||
* custom resource metrics value would be initialized with '0' like any other
|
||||
* mandatory resources metrics.
|
||||
* @param customResources Map containing all custom resource types
|
||||
* @param registry of the metric type
|
||||
* @param metricPrefix prefix in metric name
|
||||
* @param metricDesc suffix for metric name
|
||||
*/
|
||||
public void registerCustomResources(Map<String, Long> customResources,
|
||||
MetricsRegistry registry, String metricPrefix, String metricDesc) {
|
||||
for (Map.Entry<String, Long> entry : customResources.entrySet()) {
|
||||
String resourceName = entry.getKey();
|
||||
Long resourceValue = entry.getValue();
|
||||
|
||||
MutableGaugeLong resourceMetric =
|
||||
(MutableGaugeLong) registry.get(metricPrefix + resourceName);
|
||||
|
||||
if (resourceMetric == null) {
|
||||
resourceMetric = registry.newGauge(metricPrefix + resourceName,
|
||||
metricDesc.replace("NAME", resourceName), 0L);
|
||||
}
|
||||
resourceMetric.set(resourceValue);
|
||||
}
|
||||
}
|
||||
|
||||
public void setAvailable(Resource res) {
|
||||
available.set(res);
|
||||
}
|
||||
|
||||
public void increaseAllocated(Resource res) {
|
||||
allocated.increase(res);
|
||||
}
|
||||
|
||||
public void increaseAllocated(Resource res, int containers) {
|
||||
allocated.increaseWithMultiplier(res, containers);
|
||||
}
|
||||
|
||||
public void decreaseAllocated(Resource res) {
|
||||
allocated.decrease(res);
|
||||
}
|
||||
|
||||
public void decreaseAllocated(Resource res, int containers) {
|
||||
allocated.decreaseWithMultiplier(res, containers);
|
||||
}
|
||||
|
||||
public Map<String, Long> getAllocatedValues() {
|
||||
return allocated.getValues();
|
||||
}
|
||||
|
||||
public Map<String, Long> getAvailableValues() {
|
||||
return available.getValues();
|
||||
}
|
||||
|
||||
public CustomResourceMetricValue getAvailable() {
|
||||
return available;
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Provides common metrics (available, allocated) for custom resources.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
package org.apache.hadoop.yarn.metrics;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
@ -17,6 +17,7 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@ -27,10 +28,10 @@
|
||||
*/
|
||||
public class CSQueueMetricsForCustomResources
|
||||
extends QueueMetricsForCustomResources {
|
||||
private final QueueMetricsCustomResource guaranteedCapacity =
|
||||
new QueueMetricsCustomResource();
|
||||
private final QueueMetricsCustomResource maxCapacity =
|
||||
new QueueMetricsCustomResource();
|
||||
private final CustomResourceMetricValue guaranteedCapacity =
|
||||
new CustomResourceMetricValue();
|
||||
private final CustomResourceMetricValue maxCapacity =
|
||||
new CustomResourceMetricValue();
|
||||
|
||||
public void setGuaranteedCapacity(Resource res) {
|
||||
guaranteedCapacity.set(res);
|
||||
|
@ -17,6 +17,7 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@ -26,20 +27,20 @@
|
||||
* It provides increase and decrease methods for all types of metrics.
|
||||
*/
|
||||
public class FSQueueMetricsForCustomResources {
|
||||
private final QueueMetricsCustomResource fairShare =
|
||||
new QueueMetricsCustomResource();
|
||||
private final QueueMetricsCustomResource steadyFairShare =
|
||||
new QueueMetricsCustomResource();
|
||||
private final QueueMetricsCustomResource minShare =
|
||||
new QueueMetricsCustomResource();
|
||||
private final QueueMetricsCustomResource maxShare =
|
||||
new QueueMetricsCustomResource();
|
||||
private final QueueMetricsCustomResource maxAMShare =
|
||||
new QueueMetricsCustomResource();
|
||||
private final QueueMetricsCustomResource amResourceUsage =
|
||||
new QueueMetricsCustomResource();
|
||||
private final CustomResourceMetricValue
|
||||
fairShare = new CustomResourceMetricValue();
|
||||
private final CustomResourceMetricValue steadyFairShare =
|
||||
new CustomResourceMetricValue();
|
||||
private final CustomResourceMetricValue
|
||||
minShare = new CustomResourceMetricValue();
|
||||
private final CustomResourceMetricValue
|
||||
maxShare = new CustomResourceMetricValue();
|
||||
private final CustomResourceMetricValue
|
||||
maxAMShare = new CustomResourceMetricValue();
|
||||
private final CustomResourceMetricValue amResourceUsage =
|
||||
new CustomResourceMetricValue();
|
||||
|
||||
public QueueMetricsCustomResource getFairShare() {
|
||||
public CustomResourceMetricValue getFairShare() {
|
||||
return fairShare;
|
||||
}
|
||||
|
||||
@ -51,7 +52,7 @@ public Map<String, Long> getFairShareValues() {
|
||||
return fairShare.getValues();
|
||||
}
|
||||
|
||||
public QueueMetricsCustomResource getSteadyFairShare() {
|
||||
public CustomResourceMetricValue getSteadyFairShare() {
|
||||
return steadyFairShare;
|
||||
}
|
||||
|
||||
@ -63,7 +64,7 @@ public Map<String, Long> getSteadyFairShareValues() {
|
||||
return steadyFairShare.getValues();
|
||||
}
|
||||
|
||||
public QueueMetricsCustomResource getMinShare() {
|
||||
public CustomResourceMetricValue getMinShare() {
|
||||
return minShare;
|
||||
}
|
||||
|
||||
@ -75,7 +76,7 @@ public Map<String, Long> getMinShareValues() {
|
||||
return minShare.getValues();
|
||||
}
|
||||
|
||||
public QueueMetricsCustomResource getMaxShare() {
|
||||
public CustomResourceMetricValue getMaxShare() {
|
||||
return maxShare;
|
||||
}
|
||||
|
||||
@ -87,7 +88,7 @@ public Map<String, Long> getMaxShareValues() {
|
||||
return maxShare.getValues();
|
||||
}
|
||||
|
||||
public QueueMetricsCustomResource getMaxAMShare() {
|
||||
public CustomResourceMetricValue getMaxAMShare() {
|
||||
return maxAMShare;
|
||||
}
|
||||
|
||||
@ -99,7 +100,7 @@ public Map<String, Long> getMaxAMShareValues() {
|
||||
return maxAMShare.getValues();
|
||||
}
|
||||
|
||||
public QueueMetricsCustomResource getAMResourceUsage() {
|
||||
public CustomResourceMetricValue getAMResourceUsage() {
|
||||
return amResourceUsage;
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,6 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
@ -43,8 +42,8 @@
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
@ -533,8 +532,8 @@ public void setAvailableResources(Resource limit) {
|
||||
availableVCores.set(limit.getVirtualCores());
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
queueMetricsForCustomResources.setAvailable(limit);
|
||||
registerCustomResources(
|
||||
queueMetricsForCustomResources.getAvailableValues(),
|
||||
queueMetricsForCustomResources.registerCustomResources(
|
||||
queueMetricsForCustomResources.getAvailableValues(), registry,
|
||||
AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
|
||||
}
|
||||
}
|
||||
@ -616,16 +615,6 @@ public void internalIncrPendingResources(String partition, String user,
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<String, Long> initAndGetCustomResources() {
|
||||
Map<String, Long> customResources = new HashMap<String, Long>();
|
||||
ResourceInformation[] resources = ResourceUtils.getResourceTypesArray();
|
||||
|
||||
for (int i = 2; i < resources.length; i++) {
|
||||
ResourceInformation resource = resources[i];
|
||||
customResources.put(resource.getName(), Long.valueOf(0));
|
||||
}
|
||||
return customResources;
|
||||
}
|
||||
|
||||
protected void createQueueMetricsForCustomResources() {
|
||||
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
||||
@ -635,43 +624,21 @@ protected void createQueueMetricsForCustomResources() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register all custom resources metrics as part of initialization. As and
|
||||
* when this metric object construction happens for any queue, all custom
|
||||
* resource metrics value would be initialized with '0' like any other
|
||||
* mandatory resources metrics
|
||||
*/
|
||||
protected void registerCustomResources() {
|
||||
Map<String, Long> customResources = initAndGetCustomResources();
|
||||
registerCustomResources(customResources, ALLOCATED_RESOURCE_METRIC_PREFIX,
|
||||
ALLOCATED_RESOURCE_METRIC_DESC);
|
||||
registerCustomResources(customResources, AVAILABLE_RESOURCE_METRIC_PREFIX,
|
||||
AVAILABLE_RESOURCE_METRIC_DESC);
|
||||
registerCustomResources(customResources, PENDING_RESOURCE_METRIC_PREFIX,
|
||||
PENDING_RESOURCE_METRIC_DESC);
|
||||
registerCustomResources(customResources, RESERVED_RESOURCE_METRIC_PREFIX,
|
||||
RESERVED_RESOURCE_METRIC_DESC);
|
||||
registerCustomResources(customResources,
|
||||
AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
|
||||
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
|
||||
}
|
||||
|
||||
protected void registerCustomResources(Map<String, Long> customResources,
|
||||
String metricPrefix, String metricDesc) {
|
||||
for (Entry<String, Long> entry : customResources.entrySet()) {
|
||||
String resourceName = entry.getKey();
|
||||
Long resourceValue = entry.getValue();
|
||||
|
||||
MutableGaugeLong resourceMetric =
|
||||
(MutableGaugeLong) this.registry.get(metricPrefix + resourceName);
|
||||
|
||||
if (resourceMetric == null) {
|
||||
resourceMetric =
|
||||
this.registry.newGauge(metricPrefix + resourceName,
|
||||
metricDesc.replace("NAME", resourceName), 0L);
|
||||
}
|
||||
resourceMetric.set(resourceValue);
|
||||
}
|
||||
Map<String, Long> customResources =
|
||||
queueMetricsForCustomResources.initAndGetCustomResources();
|
||||
queueMetricsForCustomResources
|
||||
.registerCustomResources(customResources, this.registry);
|
||||
queueMetricsForCustomResources
|
||||
.registerCustomResources(customResources, this.registry,
|
||||
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
|
||||
queueMetricsForCustomResources
|
||||
.registerCustomResources(customResources, this.registry,
|
||||
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
|
||||
queueMetricsForCustomResources
|
||||
.registerCustomResources(customResources, this.registry,
|
||||
AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
|
||||
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
|
||||
}
|
||||
|
||||
private void incrementPendingResources(int containers, Resource res) {
|
||||
@ -680,7 +647,8 @@ private void incrementPendingResources(int containers, Resource res) {
|
||||
pendingVCores.incr(res.getVirtualCores() * containers);
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
queueMetricsForCustomResources.increasePending(res, containers);
|
||||
registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
|
||||
queueMetricsForCustomResources.registerCustomResources(
|
||||
queueMetricsForCustomResources.getPendingValues(), this.registry,
|
||||
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
|
||||
}
|
||||
}
|
||||
@ -722,7 +690,8 @@ private void decrementPendingResources(int containers, Resource res) {
|
||||
pendingVCores.decr(res.getVirtualCores() * containers);
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
queueMetricsForCustomResources.decreasePending(res, containers);
|
||||
registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
|
||||
queueMetricsForCustomResources.registerCustomResources(
|
||||
queueMetricsForCustomResources.getPendingValues(), this.registry,
|
||||
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
|
||||
}
|
||||
}
|
||||
@ -793,8 +762,8 @@ private void computeAllocateResources(int containers, Resource res,
|
||||
allocatedVCores.incr(res.getVirtualCores() * containers);
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
queueMetricsForCustomResources.increaseAllocated(res, containers);
|
||||
registerCustomResources(
|
||||
queueMetricsForCustomResources.getAllocatedValues(),
|
||||
queueMetricsForCustomResources.registerCustomResources(
|
||||
queueMetricsForCustomResources.getAllocatedValues(), this.registry,
|
||||
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
|
||||
}
|
||||
if (decrPending) {
|
||||
@ -813,8 +782,8 @@ public void allocateResources(String partition, String user, Resource res) {
|
||||
allocatedVCores.incr(res.getVirtualCores());
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
queueMetricsForCustomResources.increaseAllocated(res);
|
||||
registerCustomResources(
|
||||
queueMetricsForCustomResources.getAllocatedValues(),
|
||||
queueMetricsForCustomResources.registerCustomResources(
|
||||
queueMetricsForCustomResources.getAllocatedValues(), this.registry,
|
||||
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
|
||||
}
|
||||
|
||||
@ -822,7 +791,8 @@ public void allocateResources(String partition, String user, Resource res) {
|
||||
pendingVCores.decr(res.getVirtualCores());
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
queueMetricsForCustomResources.decreasePending(res);
|
||||
registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
|
||||
queueMetricsForCustomResources.registerCustomResources(
|
||||
queueMetricsForCustomResources.getPendingValues(), this.registry,
|
||||
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
|
||||
}
|
||||
|
||||
@ -879,8 +849,8 @@ private void computeReleaseResources(int containers, Resource res) {
|
||||
allocatedVCores.decr(res.getVirtualCores() * containers);
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
queueMetricsForCustomResources.decreaseAllocated(res, containers);
|
||||
registerCustomResources(
|
||||
queueMetricsForCustomResources.getAllocatedValues(),
|
||||
queueMetricsForCustomResources.registerCustomResources(
|
||||
queueMetricsForCustomResources.getAllocatedValues(), this.registry,
|
||||
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
|
||||
}
|
||||
}
|
||||
@ -928,9 +898,9 @@ public void updatePreemptedSecondsForCustomResources(Resource res,
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
queueMetricsForCustomResources
|
||||
.increaseAggregatedPreemptedSeconds(res, seconds);
|
||||
registerCustomResources(
|
||||
queueMetricsForCustomResources.registerCustomResources(
|
||||
queueMetricsForCustomResources.getAggregatePreemptedSeconds()
|
||||
.getValues(),
|
||||
.getValues(), this.registry,
|
||||
AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
|
||||
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
|
||||
}
|
||||
@ -971,8 +941,8 @@ public void incrReserveResources(Resource res) {
|
||||
reservedVCores.incr(res.getVirtualCores());
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
queueMetricsForCustomResources.increaseReserved(res);
|
||||
registerCustomResources(
|
||||
queueMetricsForCustomResources.getReservedValues(),
|
||||
queueMetricsForCustomResources.registerCustomResources(
|
||||
queueMetricsForCustomResources.getReservedValues(), this.registry,
|
||||
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
|
||||
}
|
||||
}
|
||||
@ -1010,8 +980,8 @@ public void decrReserveResource(Resource res) {
|
||||
reservedVCores.decr(res.getVirtualCores());
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
queueMetricsForCustomResources.decreaseReserved(res);
|
||||
registerCustomResources(
|
||||
queueMetricsForCustomResources.getReservedValues(),
|
||||
queueMetricsForCustomResources.registerCustomResources(
|
||||
queueMetricsForCustomResources.getReservedValues(), this.registry,
|
||||
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
|
||||
}
|
||||
}
|
||||
@ -1114,7 +1084,7 @@ public Resource getReservedResources() {
|
||||
* @return QueueMetricsCustomResource
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public QueueMetricsCustomResource getAggregatedPreemptedSecondsResources() {
|
||||
public CustomResourceMetricValue getAggregatedPreemptedSecondsResources() {
|
||||
return queueMetricsForCustomResources.getAggregatePreemptedSeconds();
|
||||
}
|
||||
|
||||
@ -1232,7 +1202,7 @@ public long getAggregatePreemptedContainers() {
|
||||
public void fillInValuesFromAvailableResources(Resource fromResource,
|
||||
Resource targetResource) {
|
||||
if (queueMetricsForCustomResources != null) {
|
||||
QueueMetricsCustomResource availableResources =
|
||||
CustomResourceMetricValue availableResources =
|
||||
queueMetricsForCustomResources.getAvailable();
|
||||
|
||||
// We expect all custom resources contained in availableResources,
|
||||
@ -1257,7 +1227,7 @@ public QueueMetricsForCustomResources getQueueMetricsForCustomResources() {
|
||||
return this.queueMetricsForCustomResources;
|
||||
}
|
||||
|
||||
public void setQueueMetricsForCustomResources(
|
||||
protected void setQueueMetricsForCustomResources(
|
||||
QueueMetricsForCustomResources metrics) {
|
||||
this.queueMetricsForCustomResources = metrics;
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -6,7 +6,9 @@
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
@ -17,28 +19,20 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.metrics.CustomResourceMetrics;
|
||||
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This class is a main entry-point for any kind of metrics for
|
||||
* custom resources.
|
||||
* It provides increase and decrease methods for all types of metrics.
|
||||
*/
|
||||
public class QueueMetricsForCustomResources {
|
||||
private final QueueMetricsCustomResource aggregatePreemptedSeconds =
|
||||
new QueueMetricsCustomResource();
|
||||
private final QueueMetricsCustomResource aggregatePreempted =
|
||||
new QueueMetricsCustomResource();
|
||||
private final QueueMetricsCustomResource allocated =
|
||||
new QueueMetricsCustomResource();
|
||||
private final QueueMetricsCustomResource available =
|
||||
new QueueMetricsCustomResource();
|
||||
private final QueueMetricsCustomResource pending =
|
||||
new QueueMetricsCustomResource();
|
||||
|
||||
private final QueueMetricsCustomResource reserved =
|
||||
new QueueMetricsCustomResource();
|
||||
public class QueueMetricsForCustomResources extends CustomResourceMetrics {
|
||||
private final CustomResourceMetricValue aggregatePreemptedSeconds =
|
||||
new CustomResourceMetricValue();
|
||||
private final CustomResourceMetricValue aggregatePreempted =
|
||||
new CustomResourceMetricValue();
|
||||
private final CustomResourceMetricValue pending =
|
||||
new CustomResourceMetricValue();
|
||||
private final CustomResourceMetricValue reserved =
|
||||
new CustomResourceMetricValue();
|
||||
|
||||
public void increaseReserved(Resource res) {
|
||||
reserved.increase(res);
|
||||
@ -48,10 +42,6 @@ public void decreaseReserved(Resource res) {
|
||||
reserved.decrease(res);
|
||||
}
|
||||
|
||||
public void setAvailable(Resource res) {
|
||||
available.set(res);
|
||||
}
|
||||
|
||||
public void increasePending(Resource res, int containers) {
|
||||
pending.increaseWithMultiplier(res, containers);
|
||||
}
|
||||
@ -64,20 +54,12 @@ public void decreasePending(Resource res, int containers) {
|
||||
pending.decreaseWithMultiplier(res, containers);
|
||||
}
|
||||
|
||||
public void increaseAllocated(Resource res) {
|
||||
allocated.increase(res);
|
||||
public Map<String, Long> getPendingValues() {
|
||||
return pending.getValues();
|
||||
}
|
||||
|
||||
public void increaseAllocated(Resource res, int containers) {
|
||||
allocated.increaseWithMultiplier(res, containers);
|
||||
}
|
||||
|
||||
public void decreaseAllocated(Resource res) {
|
||||
allocated.decrease(res);
|
||||
}
|
||||
|
||||
public void decreaseAllocated(Resource res, int containers) {
|
||||
allocated.decreaseWithMultiplier(res, containers);
|
||||
public Map<String, Long> getReservedValues() {
|
||||
return reserved.getValues();
|
||||
}
|
||||
|
||||
public void increaseAggregatedPreemptedSeconds(Resource res, long seconds) {
|
||||
@ -88,27 +70,7 @@ public void increaseAggregatedPreempted(Resource res) {
|
||||
aggregatePreempted.increase(res);
|
||||
}
|
||||
|
||||
Map<String, Long> getAllocatedValues() {
|
||||
return allocated.getValues();
|
||||
}
|
||||
|
||||
Map<String, Long> getAvailableValues() {
|
||||
return available.getValues();
|
||||
}
|
||||
|
||||
Map<String, Long> getPendingValues() {
|
||||
return pending.getValues();
|
||||
}
|
||||
|
||||
Map<String, Long> getReservedValues() {
|
||||
return reserved.getValues();
|
||||
}
|
||||
|
||||
QueueMetricsCustomResource getAggregatePreemptedSeconds() {
|
||||
CustomResourceMetricValue getAggregatePreemptedSeconds() {
|
||||
return aggregatePreemptedSeconds;
|
||||
}
|
||||
|
||||
public QueueMetricsCustomResource getAvailable() {
|
||||
return available;
|
||||
}
|
||||
}
|
||||
|
@ -78,6 +78,8 @@ public class CSQueueMetrics extends QueueMetrics {
|
||||
private static final String MAX_CAPACITY_METRIC_DESC =
|
||||
"MaxCapacity of NAME";
|
||||
|
||||
private CSQueueMetricsForCustomResources csQueueMetricsForCustomResources;
|
||||
|
||||
CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
||||
boolean enableUserMetrics, Configuration conf) {
|
||||
super(ms, queueName, parent, enableUserMetrics, conf);
|
||||
@ -90,11 +92,14 @@ public class CSQueueMetrics extends QueueMetrics {
|
||||
* mandatory resources metrics
|
||||
*/
|
||||
protected void registerCustomResources() {
|
||||
Map<String, Long> customResources = initAndGetCustomResources();
|
||||
registerCustomResources(customResources, GUARANTEED_CAPACITY_METRIC_PREFIX,
|
||||
GUARANTEED_CAPACITY_METRIC_DESC);
|
||||
registerCustomResources(customResources, MAX_CAPACITY_METRIC_PREFIX,
|
||||
MAX_CAPACITY_METRIC_DESC);
|
||||
Map<String, Long> customResources =
|
||||
csQueueMetricsForCustomResources.initAndGetCustomResources();
|
||||
csQueueMetricsForCustomResources
|
||||
.registerCustomResources(customResources, this.registry,
|
||||
GUARANTEED_CAPACITY_METRIC_PREFIX, GUARANTEED_CAPACITY_METRIC_DESC);
|
||||
csQueueMetricsForCustomResources
|
||||
.registerCustomResources(customResources, this.registry,
|
||||
MAX_CAPACITY_METRIC_PREFIX, MAX_CAPACITY_METRIC_DESC);
|
||||
super.registerCustomResources();
|
||||
}
|
||||
|
||||
@ -184,12 +189,10 @@ public void setGuaranteedResources(String partition, Resource res) {
|
||||
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
guaranteedMB.set(res.getMemorySize());
|
||||
guaranteedVCores.set(res.getVirtualCores());
|
||||
if (getQueueMetricsForCustomResources() != null) {
|
||||
((CSQueueMetricsForCustomResources) getQueueMetricsForCustomResources())
|
||||
.setGuaranteedCapacity(res);
|
||||
registerCustomResources(
|
||||
((CSQueueMetricsForCustomResources)
|
||||
getQueueMetricsForCustomResources()).getGuaranteedCapacity(),
|
||||
if (csQueueMetricsForCustomResources != null) {
|
||||
csQueueMetricsForCustomResources.setGuaranteedCapacity(res);
|
||||
csQueueMetricsForCustomResources.registerCustomResources(
|
||||
csQueueMetricsForCustomResources.getGuaranteedCapacity(), registry,
|
||||
GUARANTEED_CAPACITY_METRIC_PREFIX, GUARANTEED_CAPACITY_METRIC_DESC);
|
||||
}
|
||||
}
|
||||
@ -207,12 +210,10 @@ public void setMaxCapacityResources(String partition, Resource res) {
|
||||
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
maxCapacityMB.set(res.getMemorySize());
|
||||
maxCapacityVCores.set(res.getVirtualCores());
|
||||
if (getQueueMetricsForCustomResources() != null) {
|
||||
((CSQueueMetricsForCustomResources) getQueueMetricsForCustomResources())
|
||||
.setMaxCapacity(res);
|
||||
registerCustomResources(
|
||||
((CSQueueMetricsForCustomResources)
|
||||
getQueueMetricsForCustomResources()).getMaxCapacity(),
|
||||
if (csQueueMetricsForCustomResources != null) {
|
||||
csQueueMetricsForCustomResources.setMaxCapacity(res);
|
||||
csQueueMetricsForCustomResources.registerCustomResources(
|
||||
csQueueMetricsForCustomResources.getMaxCapacity(), registry,
|
||||
MAX_CAPACITY_METRIC_PREFIX, MAX_CAPACITY_METRIC_DESC);
|
||||
}
|
||||
}
|
||||
@ -221,7 +222,9 @@ public void setMaxCapacityResources(String partition, Resource res) {
|
||||
@Override
|
||||
protected void createQueueMetricsForCustomResources() {
|
||||
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
||||
setQueueMetricsForCustomResources(new CSQueueMetricsForCustomResources());
|
||||
this.csQueueMetricsForCustomResources =
|
||||
new CSQueueMetricsForCustomResources();
|
||||
setQueueMetricsForCustomResources(csQueueMetricsForCustomResources);
|
||||
registerCustomResources();
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
|
||||
import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
|
||||
@ -293,7 +294,7 @@ private void testUpdatePreemptedSeconds(QueueMetricsTestData testData,
|
||||
}
|
||||
|
||||
private Resource convertPreemptedSecondsToResource(QueueMetrics qm) {
|
||||
QueueMetricsCustomResource customValues = qm
|
||||
CustomResourceMetricValue customValues = qm
|
||||
.getAggregatedPreemptedSecondsResources();
|
||||
MutableCounterLong vcoreSeconds = qm
|
||||
.getAggregateVcoreSecondsPreempted();
|
||||
|
@ -38,11 +38,11 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
@ -344,7 +344,7 @@ private void verifyAMShare(FSLeafQueue schedulable,
|
||||
|
||||
private Map<String, Long> verifyQueueMetricsForCustomResources(
|
||||
FSLeafQueue schedulable) {
|
||||
QueueMetricsCustomResource maxAMShareCustomResources =
|
||||
CustomResourceMetricValue maxAMShareCustomResources =
|
||||
schedulable.getMetrics().getCustomResources().getMaxAMShare();
|
||||
Map<String, Long> customResourceValues = maxAMShareCustomResources
|
||||
.getValues();
|
||||
|
Loading…
Reference in New Issue
Block a user