YARN-8842. Expose metrics for custom resource types in QueueMetrics. (Contributed by Szilard Nemeth)

This commit is contained in:
Haibo Chen 2018-10-16 14:12:02 -07:00
parent 538250db26
commit 84e22a6af4
9 changed files with 1325 additions and 168 deletions

View File

@ -16,6 +16,7 @@
package org.apache.hadoop.yarn.resourcetypes;
import com.google.common.collect.Maps;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -24,6 +25,7 @@
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Contains helper methods to create Resource and ResourceInformation objects.
@ -90,4 +92,24 @@ private static ResourceValueAndUnit getResourceValueAndUnit(String val) {
return new ResourceValueAndUnit(value, matcher.group(2));
}
public static Map<String, Long> extractCustomResources(Resource res) {
Map<String, Long> customResources = Maps.newHashMap();
for (int i = 0; i < res.getResources().length; i++) {
ResourceInformation ri = res.getResourceInformation(i);
if (!ri.getName().equals(ResourceInformation.MEMORY_URI)
&& !ri.getName().equals(ResourceInformation.VCORES_URI)) {
customResources.put(ri.getName(), ri.getValue());
}
}
return customResources;
}
public static Map<String, String> extractCustomResourcesAsStrings(
Resource res) {
Map<String, Long> resValues = extractCustomResources(res);
return resValues.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey, e -> String.valueOf(e.getValue())));
}
}

View File

@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@ -45,7 +46,9 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.QueueMetricsForCustomResources.QueueMetricsCustomResource;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -114,6 +117,7 @@ public class QueueMetrics implements MetricsSource {
protected final MetricsSystem metricsSystem;
protected final Map<String, QueueMetrics> users;
protected final Configuration conf;
private QueueMetricsForCustomResources queueMetricsForCustomResources;
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
@ -125,6 +129,11 @@ protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
metricsSystem = ms;
this.conf = conf;
runningTime = buildBuckets(conf);
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
this.queueMetricsForCustomResources =
new QueueMetricsForCustomResources();
}
}
protected QueueMetrics tag(MetricsInfo info, String value) {
@ -350,9 +359,12 @@ public void moveAppTo(AppSchedulingInfo app) {
* @param limit resource limit
*/
public void setAvailableResourcesToQueue(String partition, Resource limit) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
availableMB.set(limit.getMemorySize());
availableVCores.set(limit.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.setAvailable(limit);
}
}
}
@ -392,7 +404,7 @@ public void setAvailableResourcesToUser(String partition,
*/
public void incrPendingResources(String partition, String user,
int containers, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
_incrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -408,12 +420,15 @@ private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increasePending(res, containers);
}
}
public void decrPendingResources(String partition, String user,
int containers, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
_decrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -429,6 +444,9 @@ private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res, containers);
}
}
public void incrNodeTypeAggregations(String user, NodeType type) {
@ -452,12 +470,16 @@ public void incrNodeTypeAggregations(String user, NodeType type) {
public void allocateResources(String partition, String user,
int containers, Resource res, boolean decrPending) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res, containers);
}
if (decrPending) {
_decrPendingResources(containers, res);
}
@ -479,12 +501,18 @@ public void allocateResources(String partition, String user,
* @param res
*/
public void allocateResources(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedMB.incr(res.getMemorySize());
allocatedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res);
}
pendingMB.decr(res.getMemorySize());
pendingVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -498,11 +526,15 @@ public void allocateResources(String partition, String user, Resource res) {
public void releaseResources(String partition,
String user, int containers, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemorySize() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res, containers);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(partition, user, containers, res);
@ -519,9 +551,13 @@ public void releaseResources(String partition,
* @param user
* @param res
*/
public void releaseResources(String user, Resource res) {
private void releaseResources(String user, Resource res) {
allocatedMB.decr(res.getMemorySize());
allocatedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, res);
@ -552,8 +588,19 @@ public void updatePreemptedVcoreSeconds(long vcoreSeconds) {
}
}
public void updatePreemptedSecondsForCustomResources(Resource res,
long seconds) {
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources
.increaseAggregatedPreemptedSeconds(res, seconds);
}
if (parent != null) {
parent.updatePreemptedSecondsForCustomResources(res, seconds);
}
}
public void reserveResource(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
reserveResource(user, res);
}
}
@ -562,6 +609,9 @@ public void reserveResource(String user, Resource res) {
reservedContainers.incr();
reservedMB.incr(res.getMemorySize());
reservedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseReserved(res);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.reserveResource(user, res);
@ -571,10 +621,13 @@ public void reserveResource(String user, Resource res) {
}
}
public void unreserveResource(String user, Resource res) {
private void unreserveResource(String user, Resource res) {
reservedContainers.decr();
reservedMB.decr(res.getMemorySize());
reservedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseReserved(res);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.unreserveResource(user, res);
@ -585,7 +638,7 @@ public void unreserveResource(String user, Resource res) {
}
public void unreserveResource(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
unreserveResource(user, res);
}
}
@ -647,10 +700,59 @@ public int getAppsKilled() {
public int getAppsFailed() {
return appsFailed.value();
}
public Resource getAllocatedResources() {
return BuilderUtils.newResource(allocatedMB.value(),
(int) allocatedVCores.value());
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(allocatedMB.value(), allocatedVCores.value(),
queueMetricsForCustomResources.getAllocatedValues());
}
return Resource.newInstance(allocatedMB.value(),
allocatedVCores.value());
}
public Resource getAvailableResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(availableMB.value(), availableVCores.value(),
queueMetricsForCustomResources.getAvailableValues());
}
return Resource.newInstance(availableMB.value(), availableVCores.value());
}
public Resource getPendingResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(pendingMB.value(), pendingVCores.value(),
queueMetricsForCustomResources.getPendingValues());
}
return Resource.newInstance(pendingMB.value(), pendingVCores.value());
}
public Resource getReservedResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(reservedMB.value(), reservedVCores.value(),
queueMetricsForCustomResources.getReservedValues());
}
return Resource.newInstance(reservedMB.value(), reservedVCores.value());
}
/**
* Handle this specially as this has a long value and it could be
* truncated when casted into an int parameter of
* Resource.newInstance (vCores).
* @return QueueMetricsCustomResource
*/
@VisibleForTesting
public QueueMetricsCustomResource getAggregatedPreemptedSecondsResources() {
return queueMetricsForCustomResources.getAggregatePreemptedSeconds();
}
@VisibleForTesting
public MutableCounterLong getAggregateMemoryMBSecondsPreempted() {
return aggregateMemoryMBSecondsPreempted;
}
@VisibleForTesting
public MutableCounterLong getAggregateVcoreSecondsPreempted() {
return aggregateVcoreSecondsPreempted;
}
public long getAllocatedMB() {

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import com.google.common.collect.Maps;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.Map;
import java.util.function.BiFunction;
/**
* This class is a main entry-point for any kind of metrics for
* custom resources.
* It provides increase and decrease methods for all types of metrics.
*/
public class QueueMetricsForCustomResources {
/**
* Class that holds metrics values for custom resources in a map keyed with
* the name of the custom resource.
* There are different kinds of values like allocated, available and others.
*/
public static class QueueMetricsCustomResource {
private final Map<String, Long> values = Maps.newHashMap();
protected void increase(Resource res) {
update(res, Long::sum);
}
void increaseWithMultiplier(Resource res, long multiplier) {
update(res, (v1, v2) -> v1 + v2 * multiplier);
}
protected void decrease(Resource res) {
update(res, (v1, v2) -> v1 - v2);
}
void decreaseWithMultiplier(Resource res, int containers) {
update(res, (v1, v2) -> v1 - v2 * containers);
}
protected void set(Resource res) {
update(res, (v1, v2) -> v2);
}
private void update(Resource res, BiFunction<Long, Long, Long> operation) {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
ResourceInformation[] resources = res.getResources();
for (int i = 2; i < resources.length; i++) {
ResourceInformation resource = resources[i];
// Map.merge only applies operation if there is
// a value for the key in the map
if (!values.containsKey(resource.getName())) {
values.put(resource.getName(), 0L);
}
values.merge(resource.getName(),
resource.getValue(), operation);
}
}
}
public Map<String, Long> getValues() {
return values;
}
}
private final QueueMetricsCustomResource aggregatePreemptedSeconds =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource allocated =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource available =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource pending =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource reserved =
new QueueMetricsCustomResource();
public void increaseReserved(Resource res) {
reserved.increase(res);
}
public void decreaseReserved(Resource res) {
reserved.decrease(res);
}
public void setAvailable(Resource res) {
available.set(res);
}
public void increasePending(Resource res, int containers) {
pending.increaseWithMultiplier(res, containers);
}
public void decreasePending(Resource res) {
pending.decrease(res);
}
public void decreasePending(Resource res, int containers) {
pending.decreaseWithMultiplier(res, containers);
}
public void increaseAllocated(Resource res) {
allocated.increase(res);
}
public void increaseAllocated(Resource res, int containers) {
allocated.increaseWithMultiplier(res, containers);
}
public void decreaseAllocated(Resource res) {
allocated.decrease(res);
}
public void decreaseAllocated(Resource res, int containers) {
allocated.decreaseWithMultiplier(res, containers);
}
public void increaseAggregatedPreemptedSeconds(Resource res, long seconds) {
aggregatePreemptedSeconds.increaseWithMultiplier(res, seconds);
}
Map<String, Long> getAllocatedValues() {
return allocated.getValues();
}
Map<String, Long> getAvailableValues() {
return available.getValues();
}
Map<String, Long> getPendingValues() {
return pending.getValues();
}
Map<String, Long> getReservedValues() {
return reserved.getValues();
}
QueueMetricsCustomResource getAggregatePreemptedSeconds() {
return aggregatePreemptedSeconds;
}
}

View File

@ -2106,7 +2106,8 @@ protected void completedContainerInternal(
private void updateQueuePreemptionMetrics(
CSQueue queue, RMContainer rmc) {
QueueMetrics qMetrics = queue.getMetrics();
long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND;
Resource containerResource = rmc.getAllocatedResource();
qMetrics.preemptContainer();
long mbSeconds = (containerResource.getMemorySize() * usedMillis)
@ -2115,6 +2116,8 @@ private void updateQueuePreemptionMetrics(
/ DateUtils.MILLIS_PER_SECOND;
qMetrics.updatePreemptedMemoryMBSeconds(mbSeconds);
qMetrics.updatePreemptedVcoreSeconds(vcSeconds);
qMetrics.updatePreemptedSecondsForCustomResources(containerResource,
usedSeconds);
}
@Lock(Lock.NoLock.class)

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import java.util.function.Consumer;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.TestQueueMetrics.userSource;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* This class holds queue and user metrics for a particular queue,
* used for testing metrics.
* Reference for the parent queue is also stored for every queue,
* except if the queue is root.
*/
public final class QueueInfo {
private final QueueInfo parentQueueInfo;
private final Queue queue;
final QueueMetrics queueMetrics;
final MetricsSource queueSource;
final MetricsSource userSource;
public QueueInfo(QueueInfo parent, String queueName, MetricsSystem ms,
Configuration conf, String user) {
Queue parentQueue = parent == null ? null : parent.queue;
parentQueueInfo = parent;
queueMetrics =
QueueMetrics.forQueue(ms, queueName, parentQueue, true, conf);
queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(queueMetrics);
queueSource = ms.getSource(QueueMetrics.sourceName(queueName).toString());
// need to call getUserMetrics so that a non-null userSource is returned
// with the call to userSource(..)
queueMetrics.getUserMetrics(user);
userSource = userSource(ms, queueName, user);
}
public QueueInfo getRoot() {
QueueInfo root = this;
while (root.parentQueueInfo != null) {
root = root.parentQueueInfo;
}
return root;
}
public void checkAllQueueSources(Consumer<MetricsSource> consumer) {
checkQueueSourcesRecursive(this, consumer);
}
private void checkQueueSourcesRecursive(QueueInfo queueInfo,
Consumer<MetricsSource> consumer) {
consumer.accept(queueInfo.queueSource);
if (queueInfo.parentQueueInfo != null) {
checkQueueSourcesRecursive(queueInfo.parentQueueInfo, consumer);
}
}
public void checkAllQueueMetrics(Consumer<QueueMetrics> consumer) {
checkAllQueueMetricsRecursive(this, consumer);
}
private void checkAllQueueMetricsRecursive(QueueInfo queueInfo, Consumer
<QueueMetrics> consumer) {
consumer.accept(queueInfo.queueMetrics);
if (queueInfo.parentQueueInfo != null) {
checkAllQueueMetricsRecursive(queueInfo.parentQueueInfo, consumer);
}
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.Resource;
import java.util.Map;
import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper
.extractCustomResources;
/**
* This class is to test standard and custom resource metrics for all types.
* Metrics types can be one of: allocated, pending, reserved
* and other resources.
*/
public final class QueueMetricsTestData {
public static final class Builder {
private int containers;
private Resource resource;
private Resource resourceToDecrease;
private Map<String, Long> customResourceValues;
private int containersToDecrease;
private String user;
private String partition;
private QueueInfo queueInfo;
private Builder() {
}
public static Builder create() {
return new Builder();
}
public Builder withContainers(int containers) {
this.containers = containers;
return this;
}
public Builder withResourceToDecrease(Resource res, int containers) {
this.resourceToDecrease = res;
this.containersToDecrease = containers;
return this;
}
public Builder withResources(Resource res) {
this.resource = res;
return this;
}
public Builder withUser(String user) {
this.user = user;
return this;
}
public Builder withPartition(String partition) {
this.partition = partition;
return this;
}
public Builder withLeafQueue(QueueInfo qInfo) {
this.queueInfo = qInfo;
return this;
}
public QueueMetricsTestData build() {
this.customResourceValues = extractCustomResources(resource);
return new QueueMetricsTestData(this);
}
}
final Map<String, Long> customResourceValues;
final int containers;
final Resource resourceToDecrease;
final int containersToDecrease;
final Resource resource;
final String partition;
final QueueInfo leafQueue;
final String user;
private QueueMetricsTestData(Builder builder) {
this.customResourceValues = builder.customResourceValues;
this.containers = builder.containers;
this.resourceToDecrease = builder.resourceToDecrease;
this.containersToDecrease = builder.containersToDecrease;
this.resource = builder.resource;
this.partition = builder.partition;
this.leafQueue = builder.queueInfo;
this.user = builder.user;
}
}

View File

@ -27,34 +27,31 @@
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.COUNTER_LONG;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_INT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_LONG;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
final class ResourceMetricsChecker {
private final static Logger LOG =
LoggerFactory.getLogger(ResourceMetricsChecker.class);
enum ResourceMetricType {
GAUGE_INT, GAUGE_LONG, COUNTER_INT, COUNTER_LONG
}
private static final ResourceMetricsChecker INITIAL_CHECKER =
new ResourceMetricsChecker()
.gaugeLong(ALLOCATED_MB, 0)
@ -72,29 +69,41 @@ final class ResourceMetricsChecker {
.gaugeInt(RESERVED_CONTAINERS, 0);
enum ResourceMetricsKey {
ALLOCATED_MB("AllocatedMB"),
ALLOCATED_V_CORES("AllocatedVCores"),
ALLOCATED_CONTAINERS("AllocatedContainers"),
AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated"),
AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased"),
AVAILABLE_MB("AvailableMB"),
AVAILABLE_V_CORES("AvailableVCores"),
PENDING_MB("PendingMB"),
PENDING_V_CORES("PendingVCores"),
PENDING_CONTAINERS("PendingContainers"),
RESERVED_MB("ReservedMB"),
RESERVED_V_CORES("ReservedVCores"),
RESERVED_CONTAINERS("ReservedContainers");
ALLOCATED_MB("AllocatedMB", GAUGE_LONG),
ALLOCATED_V_CORES("AllocatedVCores", GAUGE_INT),
ALLOCATED_CONTAINERS("AllocatedContainers", GAUGE_INT),
AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated",
COUNTER_LONG),
AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased",
COUNTER_LONG),
AVAILABLE_MB("AvailableMB", GAUGE_LONG),
AVAILABLE_V_CORES("AvailableVCores", GAUGE_INT),
PENDING_MB("PendingMB", GAUGE_LONG),
PENDING_V_CORES("PendingVCores", GAUGE_INT),
PENDING_CONTAINERS("PendingContainers", GAUGE_INT),
RESERVED_MB("ReservedMB", GAUGE_LONG),
RESERVED_V_CORES("ReservedVCores", GAUGE_INT),
RESERVED_CONTAINERS("ReservedContainers", GAUGE_INT),
AGGREGATE_VCORE_SECONDS_PREEMPTED(
"AggregateVcoreSecondsPreempted", COUNTER_LONG),
AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED(
"AggregateMemoryMBSecondsPreempted", COUNTER_LONG);
private String value;
private ResourceMetricType type;
ResourceMetricsKey(String value) {
ResourceMetricsKey(String value, ResourceMetricType type) {
this.value = value;
this.type = type;
}
public String getValue() {
return value;
}
public ResourceMetricType getType() {
return type;
}
}
private final Map<ResourceMetricsKey, Long> gaugesLong;
@ -123,20 +132,31 @@ public static ResourceMetricsChecker create() {
}
ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) {
ensureTypeIsCorrect(key, GAUGE_LONG);
gaugesLong.put(key, value);
return this;
}
ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) {
ensureTypeIsCorrect(key, GAUGE_INT);
gaugesInt.put(key, value);
return this;
}
ResourceMetricsChecker counter(ResourceMetricsKey key, long value) {
ensureTypeIsCorrect(key, COUNTER_LONG);
counters.put(key, value);
return this;
}
private void ensureTypeIsCorrect(ResourceMetricsKey
key, ResourceMetricType actualType) {
if (key.type != actualType) {
throw new IllegalStateException("Metrics type should be " + key.type
+ " instead of " + actualType + " for metrics: " + key.value);
}
}
ResourceMetricsChecker checkAgainst(MetricsSource source) {
if (source == null) {
throw new IllegalStateException("MetricsSource should not be null!");

View File

@ -18,15 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.AppMetricsChecker.AppMetricsKey.*;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.*;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
@ -46,8 +37,40 @@
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestQueueMetrics {
private static Queue createMockQueue(QueueMetrics metrics) {
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);
return queue;
}
private static final int GB = 1024; // MB
private static final String USER = "alice";
private static final String USER_2 = "dodo";
private static final Configuration conf = new Configuration();
private MetricsSystem ms;
@ -60,19 +83,18 @@ public void setUp() {
@Test
public void testDefaultSingleQueueMetrics() {
String queueName = "single";
String user = "alice";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
conf);
MetricsSource queueSource= queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user);
AppSchedulingInfo app = mockApp(USER);
metrics.submitApp(user);
MetricsSource userSource = userSource(ms, queueName, user);
metrics.submitApp(USER);
MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
metrics.submitAppAttempt(user);
metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
@ -80,7 +102,7 @@ public void testDefaultSingleQueueMetrics() {
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3));
USER, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create()
@ -91,14 +113,14 @@ public void testDefaultSingleQueueMetrics() {
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource);
metrics.runAppAttempt(app.getApplicationId(), user);
metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true);
USER, 3, Resources.createResource(2*GB, 2), true);
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
@ -110,7 +132,7 @@ public void testDefaultSingleQueueMetrics() {
.checkAgainst(queueSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2));
USER, 1, Resources.createResource(2*GB, 2));
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@ -119,13 +141,13 @@ public void testDefaultSingleQueueMetrics() {
.checkAgainst(queueSource);
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 0, Resources.createResource(2 * GB, 2));
USER, 0, Resources.createResource(2 * GB, 2));
//nothing should change in values
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.checkAgainst(queueSource);
metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 0, Resources.createResource(2 * GB, 2));
USER, 0, Resources.createResource(2 * GB, 2));
//nothing should change in values
ResourceMetricsChecker.createFromChecker(rmChecker)
.checkAgainst(queueSource);
@ -136,7 +158,7 @@ public void testDefaultSingleQueueMetrics() {
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
metrics.finishApp(user, RMAppState.FINISHED);
metrics.finishApp(USER, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
@ -146,24 +168,23 @@ public void testDefaultSingleQueueMetrics() {
@Test
public void testQueueAppMetricsForMultipleFailures() {
String queueName = "single";
String user = "alice";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
new Configuration());
MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user);
AppSchedulingInfo app = mockApp(USER);
metrics.submitApp(user);
MetricsSource userSource = userSource(ms, queueName, user);
metrics.submitApp(USER);
MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
metrics.submitAppAttempt(user);
metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), user);
metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
@ -177,12 +198,12 @@ public void testQueueAppMetricsForMultipleFailures() {
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitAppAttempt(user);
metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), user);
metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
@ -197,12 +218,12 @@ public void testQueueAppMetricsForMultipleFailures() {
// As the application has failed, framework retries the same application
// based on configuration
metrics.submitAppAttempt(user);
metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), user);
metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
@ -215,7 +236,7 @@ public void testQueueAppMetricsForMultipleFailures() {
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
metrics.finishApp(user, RMAppState.FAILED);
metrics.finishApp(USER, RMAppState.FAILED);
AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.counter(APPS_FAILED, 1)
@ -227,15 +248,14 @@ public void testQueueAppMetricsForMultipleFailures() {
@Test
public void testSingleQueueWithUserMetrics() {
String queueName = "single2";
String user = "dodo";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true,
conf);
MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user);
AppSchedulingInfo app = mockApp(USER_2);
metrics.submitApp(user);
MetricsSource userSource = userSource(ms, queueName, user);
metrics.submitApp(USER_2);
MetricsSource userSource = userSource(ms, queueName, USER_2);
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
@ -244,7 +264,7 @@ public void testSingleQueueWithUserMetrics() {
.counter(APPS_SUBMITTED, 1)
.checkAgainst(userSource, true);
metrics.submitAppAttempt(user);
metrics.submitAppAttempt(USER_2);
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
@ -257,9 +277,9 @@ public void testSingleQueueWithUserMetrics() {
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(10*GB, 10));
USER_2, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3));
USER_2, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
@ -280,7 +300,7 @@ public void testSingleQueueWithUserMetrics() {
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(userSource);
metrics.runAppAttempt(app.getApplicationId(), user);
metrics.runAppAttempt(app.getApplicationId(), USER_2);
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0)
@ -293,7 +313,7 @@ public void testSingleQueueWithUserMetrics() {
.checkAgainst(userSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true);
USER_2, 3, Resources.createResource(2*GB, 2), true);
resMetricsQueueSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
@ -316,7 +336,7 @@ public void testSingleQueueWithUserMetrics() {
.checkAgainst(userSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2));
USER_2, 1, Resources.createResource(2*GB, 2));
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@ -340,7 +360,7 @@ public void testSingleQueueWithUserMetrics() {
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(userSource, true);
metrics.finishApp(user, RMAppState.FINISHED);
metrics.finishApp(USER_2, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
@ -353,7 +373,6 @@ public void testSingleQueueWithUserMetrics() {
public void testNodeTypeMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
String user = "alice";
QueueMetrics parentMetrics =
QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
@ -365,29 +384,29 @@ public void testNodeTypeMetrics() {
MetricsSource queueSource = queueSource(ms, leafQueueName);
//AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user);
MetricsSource userSource = userSource(ms, leafQueueName, user);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
metrics.submitApp(USER);
MetricsSource userSource = userSource(ms, leafQueueName, USER);
MetricsSource parentUserSource = userSource(ms, parentQueueName, USER);
metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL);
metrics.incrNodeTypeAggregations(USER, NodeType.NODE_LOCAL);
checkAggregatedNodeTypes(queueSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(userSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(parentUserSource, 1L, 0L, 0L);
metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL);
metrics.incrNodeTypeAggregations(USER, NodeType.RACK_LOCAL);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 0L);
metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 1L);
metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 2L);
@ -396,67 +415,60 @@ public void testNodeTypeMetrics() {
@Test
public void testTwoLevelWithUserMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
String user = "alice";
AppSchedulingInfo app = mockApp(USER);
QueueMetrics parentMetrics =
QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getMetrics()).thenReturn(parentMetrics);
QueueMetrics metrics =
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf);
MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
MetricsSource queueSource = queueSource(ms, leafQueueName);
AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user);
MetricsSource userSource = userSource(ms, leafQueueName, user);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
QueueInfo root = new QueueInfo(null, "root", ms, conf, USER);
QueueInfo leaf = new QueueInfo(root, "root.leaf", ms, conf, USER);
leaf.queueMetrics.submitApp(USER);
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
.checkAgainst(leaf.queueSource, true);
AppMetricsChecker appMetricsParentQueueSourceChecker =
AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(parentQueueSource, true);
.checkAgainst(root.queueSource, true);
AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(userSource, true);
.checkAgainst(leaf.userSource, true);
AppMetricsChecker appMetricsParentUserSourceChecker =
AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(parentUserSource, true);
.checkAgainst(root.userSource, true);
metrics.submitAppAttempt(user);
leaf.queueMetrics.submitAppAttempt(USER);
appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
.checkAgainst(leaf.queueSource, true);
appMetricsParentQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(parentQueueSource, true);
.checkAgainst(root.queueSource, true);
appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(userSource, true);
.checkAgainst(leaf.userSource, true);
appMetricsParentUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(parentUserSource, true);
.checkAgainst(root.userSource, true);
parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
root.queueMetrics.setAvailableResourcesToQueue(
RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
leaf.queueMetrics.setAvailableResourcesToQueue(
RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100));
parentMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(10*GB, 10));
metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3));
root.queueMetrics.setAvailableResourcesToUser(
RMNodeLabelsManager.NO_LABEL,
USER, Resources.createResource(10*GB, 10));
leaf.queueMetrics.setAvailableResourcesToUser(
RMNodeLabelsManager.NO_LABEL,
USER, Resources.createResource(10*GB, 10));
leaf.queueMetrics.incrPendingResources(
RMNodeLabelsManager.NO_LABEL,
USER, 5, Resources.createResource(3*GB, 3));
ResourceMetricsChecker resMetricsQueueSourceChecker =
ResourceMetricsChecker.create()
@ -465,7 +477,7 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource);
.checkAgainst(leaf.queueSource);
ResourceMetricsChecker resMetricsParentQueueSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 100 * GB)
@ -473,7 +485,7 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(parentQueueSource);
.checkAgainst(root.queueSource);
ResourceMetricsChecker resMetricsUserSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB)
@ -481,7 +493,7 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(userSource);
.checkAgainst(leaf.userSource);
ResourceMetricsChecker resMetricsParentUserSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB)
@ -489,24 +501,24 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(parentUserSource);
.checkAgainst(root.userSource);
metrics.runAppAttempt(app.getApplicationId(), user);
leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
.checkAgainst(leaf.queueSource, true);
appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(userSource, true);
.checkAgainst(leaf.userSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true);
metrics.reserveResource(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(3*GB, 3));
leaf.queueMetrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
USER, 3, Resources.createResource(2*GB, 2), true);
leaf.queueMetrics.reserveResource(RMNodeLabelsManager.NO_LABEL,
USER, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
resMetricsQueueSourceChecker =
@ -521,7 +533,7 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(queueSource);
.checkAgainst(leaf.queueSource);
resMetricsParentQueueSourceChecker =
ResourceMetricsChecker
.createFromChecker(resMetricsParentQueueSourceChecker)
@ -535,7 +547,7 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(parentQueueSource);
.checkAgainst(root.queueSource);
resMetricsUserSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
@ -548,7 +560,7 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(userSource);
.checkAgainst(leaf.userSource);
resMetricsParentUserSourceChecker = ResourceMetricsChecker
.createFromChecker(resMetricsParentUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
@ -561,12 +573,12 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(parentUserSource);
.checkAgainst(root.userSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2));
metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(3*GB, 3));
leaf.queueMetrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
USER, 1, Resources.createResource(2*GB, 2));
leaf.queueMetrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
USER, Resources.createResource(3*GB, 3));
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@ -575,7 +587,7 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(queueSource);
.checkAgainst(leaf.queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@ -584,7 +596,7 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(parentQueueSource);
.checkAgainst(root.queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@ -593,7 +605,7 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(userSource);
.checkAgainst(leaf.userSource);
ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
@ -602,46 +614,46 @@ public void testTwoLevelWithUserMetrics() {
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(parentUserSource);
.checkAgainst(root.userSource);
metrics.finishAppAttempt(
leaf.queueMetrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser());
appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
.checkAgainst(leaf.queueSource, true);
appMetricsParentQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsParentQueueSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(parentQueueSource, true);
.checkAgainst(root.queueSource, true);
appMetricsUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(userSource, true);
.checkAgainst(leaf.userSource, true);
appMetricsParentUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsParentUserSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(parentUserSource, true);
.checkAgainst(root.userSource, true);
metrics.finishApp(user, RMAppState.FINISHED);
leaf.queueMetrics.finishApp(USER, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
.checkAgainst(leaf.queueSource, true);
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(parentQueueSource, true);
.checkAgainst(root.queueSource, true);
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(userSource, true);
.checkAgainst(leaf.userSource, true);
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(parentUserSource, true);
.checkAgainst(root.userSource, true);
}
@Test
@ -719,7 +731,7 @@ private static void checkAggregatedNodeTypes(MetricsSource source,
assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb);
}
private static AppSchedulingInfo mockApp(String user) {
static AppSchedulingInfo mockApp(String user) {
AppSchedulingInfo app = mock(AppSchedulingInfo.class);
when(app.getUser()).thenReturn(user);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
@ -732,7 +744,7 @@ public static MetricsSource queueSource(MetricsSystem ms, String queue) {
return ms.getSource(QueueMetrics.sourceName(queue).toString());
}
private static MetricsSource userSource(MetricsSystem ms, String queue,
public static MetricsSource userSource(MetricsSystem ms, String queue,
String user) {
return ms.getSource(QueueMetrics.sourceName(queue).
append(",user=").append(user).toString());

View File

@ -0,0 +1,645 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.QueueMetricsForCustomResources.QueueMetricsCustomResource;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper
.extractCustomResourcesAsStrings;
import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper.newResource;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_VCORE_SECONDS_PREEMPTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestQueueMetricsForCustomResources {
public enum MetricsForCustomResource {
ALLOCATED, AVAILABLE, PENDING, RESERVED, AGGREGATE_PREEMPTED_SECONDS
}
public static final long GB = 1024; // MB
private static final Configuration CONF = new Configuration();
private static final String CUSTOM_RES_1 = "custom_res_1";
private static final String CUSTOM_RES_2 = "custom_res_2";
public static final String USER = "alice";
private Resource defaultResource;
private MetricsSystem ms;
@Before
public void setUp() {
ms = new MetricsSystemImpl();
QueueMetrics.clearQueueMetrics();
initializeResourceTypes();
createDefaultResource();
}
private void createDefaultResource() {
defaultResource = newResource(4 * GB, 4,
ImmutableMap.<String, String> builder()
.put(CUSTOM_RES_1, String.valueOf(15 * GB))
.put(CUSTOM_RES_2, String.valueOf(20 * GB))
.build());
}
private void initializeResourceTypes() {
Map<String, ResourceInformation> riMap = new HashMap<>();
ResourceInformation memory = ResourceInformation.newInstance(
ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores = ResourceInformation.newInstance(
ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES_1,
ResourceInformation.VCORES.getUnits(), 0, 2000);
ResourceInformation res2 = ResourceInformation.newInstance(CUSTOM_RES_2,
ResourceInformation.VCORES.getUnits(), 0, 2000);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
riMap.put(CUSTOM_RES_1, res1);
riMap.put(CUSTOM_RES_2, res2);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}
private static void assertCustomResourceValue(QueueMetrics metrics,
MetricsForCustomResource metricsType,
Function<QueueMetrics, Resource> func,
String resourceName,
long expectedValue) {
Resource res = func.apply(metrics);
Long value = res.getResourceValue(resourceName);
assertCustomResourceValueInternal(metricsType, resourceName,
expectedValue, value);
}
private static void assertCustomResourceValueInternal(
MetricsForCustomResource metricsType, String resourceName, long
expectedValue, Long value) {
assertNotNull(
"QueueMetrics should have custom resource metrics value " +
"for resource: " + resourceName, value);
assertEquals(String.format(
"QueueMetrics should have custom resource metrics value %d " +
"for resource: %s for metrics type %s",
expectedValue, resourceName, metricsType), expectedValue,
(long) value);
}
private static Map<String, String> getCustomResourcesWithValue(long value) {
return ImmutableMap.<String, String>builder()
.put(CUSTOM_RES_1, String.valueOf(value))
.put(CUSTOM_RES_2, String.valueOf(value))
.build();
}
private QueueInfo createFourLevelQueueHierarchy() {
QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
QueueInfo sub = new QueueInfo(root, "root.subQ", ms, CONF, USER);
QueueInfo sub2 = new QueueInfo(sub, "root.subQ2", ms, CONF, USER);
return new QueueInfo(sub2, "root.subQ2.leafQ", ms, CONF, USER);
}
private QueueInfo createBasicQueueHierarchy() {
QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
return new QueueInfo(root, "root.leaf", ms, CONF, USER);
}
private QueueMetricsTestData.Builder
createQueueMetricsTestDataWithContainers(int containers) {
return createDefaultQueueMetricsTestData()
.withContainers(containers);
}
private QueueMetricsTestData.Builder createDefaultQueueMetricsTestData() {
return QueueMetricsTestData.Builder.create()
.withUser(USER)
.withPartition(RMNodeLabelsManager.NO_LABEL);
}
private void testIncreasePendingResources(QueueMetricsTestData testData) {
testIncreasePendingResourcesInternal(testData.containers, testData);
}
private void testIncreasePendingResourcesWithoutContainer(
QueueMetricsTestData testData) {
testIncreasePendingResourcesInternal(1, testData);
}
private void testIncreasePendingResourcesInternal(int containers,
QueueMetricsTestData testData) {
testData.leafQueue.queueMetrics.incrPendingResources(testData.partition,
testData.user, containers, testData.resource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(PENDING_CONTAINERS, containers)
.gaugeLong(PENDING_MB, containers *
testData.resource.getMemorySize())
.gaugeInt(PENDING_V_CORES, containers *
testData.resource.getVirtualCores());
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getPendingResources,
MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> v * containers));
}
private void testAllocateResources(boolean decreasePending,
QueueMetricsTestData testData) {
testData.leafQueue.queueMetrics.allocateResources(testData.partition,
testData.user, testData.containers, testData.resource, decreasePending);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(ALLOCATED_CONTAINERS, testData.containers)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, testData.containers)
.gaugeLong(ALLOCATED_MB, testData.containers *
testData.resource.getMemorySize())
.gaugeInt(ALLOCATED_V_CORES, testData.containers *
testData.resource.getVirtualCores())
.gaugeInt(PENDING_CONTAINERS, 0)
.gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0)
.checkAgainst(testData.leafQueue.queueSource);
if (decreasePending) {
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getPendingResources,
MetricsForCustomResource.PENDING,
computeExpectedCustomResourceValues(testData.customResourceValues,
(k, v) -> 0L));
}
if (!testData.customResourceValues.isEmpty()) {
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getAllocatedResources,
MetricsForCustomResource.ALLOCATED,
computeExpectedCustomResourceValues(testData.customResourceValues,
(k, v) -> v * testData.containers));
}
}
private void testUpdatePreemptedSeconds(QueueMetricsTestData testData,
int seconds) {
testData.leafQueue.queueMetrics.updatePreemptedMemoryMBSeconds(
testData.resource.getMemorySize() * seconds);
testData.leafQueue.queueMetrics.updatePreemptedVcoreSeconds(
testData.resource.getVirtualCores() * seconds);
testData.leafQueue.queueMetrics.updatePreemptedSecondsForCustomResources(
testData.resource, seconds);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED,
testData.resource.getMemorySize() * seconds)
.counter(AGGREGATE_VCORE_SECONDS_PREEMPTED,
testData.resource.getVirtualCores() * seconds);
assertQueueMetricsOnly(testData.leafQueue, checker,
this::convertPreemptedSecondsToResource,
MetricsForCustomResource.AGGREGATE_PREEMPTED_SECONDS,
computeExpectedCustomResourceValues(testData.customResourceValues,
(k, v) -> v * seconds));
}
private Resource convertPreemptedSecondsToResource(QueueMetrics qm) {
QueueMetricsCustomResource customValues = qm
.getAggregatedPreemptedSecondsResources();
MutableCounterLong vcoreSeconds = qm
.getAggregateVcoreSecondsPreempted();
MutableCounterLong memorySeconds = qm
.getAggregateMemoryMBSecondsPreempted();
return Resource.newInstance(
memorySeconds.value(), (int) vcoreSeconds.value(),
customValues.getValues());
}
private void testReserveResources(QueueMetricsTestData testData) {
testData.leafQueue.queueMetrics.reserveResource(testData.partition,
testData.user, testData.resource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(RESERVED_CONTAINERS, 1)
.gaugeLong(RESERVED_MB, testData.resource.getMemorySize())
.gaugeInt(RESERVED_V_CORES, testData.resource.getVirtualCores())
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getReservedResources,
MetricsForCustomResource.RESERVED,
computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> v));
}
private void testGetAllocatedResources(QueueMetricsTestData testData) {
testAllocateResources(false, testData);
Resource res = testData.leafQueue.queueMetrics.getAllocatedResources();
if (testData.customResourceValues.size() > 0) {
assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
CUSTOM_RES_1,
testData.customResourceValues.get(CUSTOM_RES_1) * testData.containers,
res.getResourceValue(CUSTOM_RES_1));
assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
CUSTOM_RES_2,
testData.customResourceValues.get(CUSTOM_RES_2) * testData.containers,
res.getResourceValue(CUSTOM_RES_2));
}
}
private void assertAllMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
Function<QueueMetrics, Resource> func,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllQueueMetrics(queueInfo, checker, func, metricsType,
expectedCustomResourceValues);
//assert leaf and root userSources
checker = ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.userSource);
ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.getRoot().userSource);
}
private void assertQueueMetricsOnly(QueueInfo queueInfo,
ResourceMetricsChecker checker,
Function<QueueMetrics, Resource> func,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllQueueMetrics(queueInfo, checker, func, metricsType,
expectedCustomResourceValues);
}
private void assertAllQueueMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
Function<QueueMetrics, Resource> func,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
// assert normal resource metrics values
queueInfo.checkAllQueueSources(qs -> ResourceMetricsChecker
.createFromChecker(checker).checkAgainst(qs));
// assert custom resource metrics values
queueInfo.checkAllQueueMetrics(qm -> {
assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_1,
expectedCustomResourceValues.get(CUSTOM_RES_1));
assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_2,
expectedCustomResourceValues.get(CUSTOM_RES_2));
});
}
private Map<String, Long> computeExpectedCustomResourceValues(
Map<String, Long> customResourceValues,
BiFunction<String, Long, Long> func) {
Map<String, Long> values = Maps.newHashMap();
for (Map.Entry<String, Long> res : customResourceValues.entrySet()) {
values.put(res.getKey(), func.apply(res.getKey(), res.getValue()));
}
return values;
}
@Test
public void testSetAvailableResourcesToQueue1() {
String queueName = "single";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
false, CONF);
MetricsSource queueSource = queueSource(ms, queueName);
metrics.setAvailableResourcesToQueue(newResource(
GB, 4,
ImmutableMap.<String, String> builder()
.put(CUSTOM_RES_1, String.valueOf(5 * GB))
.put(CUSTOM_RES_2, String.valueOf(6 * GB))
.build()));
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, GB)
.gaugeInt(AVAILABLE_V_CORES, 4)
.checkAgainst(queueSource);
assertCustomResourceValue(metrics,
MetricsForCustomResource.AVAILABLE,
QueueMetrics::getAvailableResources, CUSTOM_RES_1, 5 * GB);
assertCustomResourceValue(metrics,
MetricsForCustomResource.AVAILABLE,
QueueMetrics::getAvailableResources, CUSTOM_RES_2, 6 * GB);
}
@Test
public void testSetAvailableResourcesToQueue2() {
String queueName = "single";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
false, CONF);
MetricsSource queueSource = queueSource(ms, queueName);
metrics.setAvailableResourcesToQueue(null,
newResource(GB, 4,
ImmutableMap.<String, String> builder()
.put(CUSTOM_RES_1, String.valueOf(15 * GB))
.put(CUSTOM_RES_2, String.valueOf(20 * GB))
.build()));
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, GB)
.gaugeInt(AVAILABLE_V_CORES, 4)
.checkAgainst(queueSource);
assertCustomResourceValue(metrics,
MetricsForCustomResource.AVAILABLE,
QueueMetrics::getAvailableResources, CUSTOM_RES_1, 15 * GB);
assertCustomResourceValue(metrics,
MetricsForCustomResource.AVAILABLE,
QueueMetrics::getAvailableResources, CUSTOM_RES_2, 20 * GB);
}
@Test
public void testIncreasePendingResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(
newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
.withResources(defaultResource)
.build();
testIncreasePendingResources(testData);
}
@Test
public void testDecreasePendingResources() {
Resource resourceToDecrease =
newResource(GB, 2, getCustomResourcesWithValue(2 * GB));
int containersToDecrease = 2;
int containers = 5;
QueueMetricsTestData testData =
createQueueMetricsTestDataWithContainers(containers)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(resourceToDecrease, containers)
.withResources(defaultResource)
.build();
//compute expected values
final int vCoresToDecrease = resourceToDecrease.getVirtualCores();
final long memoryMBToDecrease = resourceToDecrease.getMemorySize();
final int containersAfterDecrease = containers - containersToDecrease;
final int vcoresAfterDecrease =
(defaultResource.getVirtualCores() * containers)
- (vCoresToDecrease * containersToDecrease);
final long memoryAfterDecrease =
(defaultResource.getMemorySize() * containers)
- (memoryMBToDecrease * containersToDecrease);
//first, increase resources to be able to decrease some
testIncreasePendingResources(testData);
//decrease resources
testData.leafQueue.queueMetrics.decrPendingResources(testData.partition,
testData.user, containersToDecrease,
ResourceTypesTestHelper.newResource(memoryMBToDecrease,
vCoresToDecrease,
extractCustomResourcesAsStrings(resourceToDecrease)));
//check
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(PENDING_CONTAINERS, containersAfterDecrease)
.gaugeLong(PENDING_MB, memoryAfterDecrease)
.gaugeInt(PENDING_V_CORES, vcoresAfterDecrease)
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getPendingResources,
MetricsForCustomResource.PENDING,
computeExpectedCustomResourceValues(testData.customResourceValues,
(k, v) -> v * containers - (resourceToDecrease.getResourceValue(k)
* containersToDecrease)));
}
@Test
public void testAllocateResourcesWithoutDecreasePending() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testAllocateResources(false, testData);
}
@Test
public void testAllocateResourcesWithDecreasePending() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(
newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
.withResources(defaultResource)
.build();
//first, increase pending resources to be able to decrease some
testIncreasePendingResources(testData);
//then allocate with decrease pending resources
testAllocateResources(true, testData);
}
@Test
public void testAllocateResourcesWithoutContainer() {
QueueMetricsTestData testData = createDefaultQueueMetricsTestData()
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
//first, increase pending resources
testIncreasePendingResourcesWithoutContainer(testData);
Resource resource = testData.resource;
testData.leafQueue.queueMetrics.allocateResources(testData.partition,
testData.user, resource);
ResourceMetricsChecker checker = ResourceMetricsChecker.create()
.gaugeLong(ALLOCATED_MB, resource.getMemorySize())
.gaugeInt(ALLOCATED_V_CORES, resource.getVirtualCores())
.gaugeInt(PENDING_CONTAINERS, 1).gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0);
checker.checkAgainst(testData.leafQueue.queueSource);
checker.checkAgainst(testData.leafQueue.getRoot().queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getPendingResources,
MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> 0L));
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getAllocatedResources,
MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> v));
}
@Test
public void testReleaseResources() {
int containers = 5;
QueueMetricsTestData testData =
createQueueMetricsTestDataWithContainers(containers)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(defaultResource, containers)
.withResources(defaultResource)
.build();
//first, allocate some resources so that we can release some
testAllocateResources(false, testData);
testData.leafQueue.queueMetrics.releaseResources(testData.partition,
testData.user, containers, defaultResource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.counter(AGGREGATE_CONTAINERS_ALLOCATED, containers)
.counter(AGGREGATE_CONTAINERS_RELEASED, containers)
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getAllocatedResources,
MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> 0L));
}
@Test
public void testUpdatePreemptedSecondsForCustomResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createFourLevelQueueHierarchy())
.withResources(defaultResource)
.build();
final int seconds = 1;
testUpdatePreemptedSeconds(testData, seconds);
}
@Test
public void testUpdatePreemptedSecondsForCustomResourcesMoreSeconds() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createFourLevelQueueHierarchy())
.withResources(defaultResource)
.build();
final int seconds = 15;
testUpdatePreemptedSeconds(testData, seconds);
}
@Test
public void testReserveResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testReserveResources(testData);
}
@Test
public void testUnreserveResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testReserveResources(testData);
testData.leafQueue.queueMetrics.unreserveResource(testData.partition,
testData.user, defaultResource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(RESERVED_CONTAINERS, 0)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.checkAgainst(testData.leafQueue.queueSource);
assertAllMetrics(testData.leafQueue, checker,
QueueMetrics::getReservedResources,
MetricsForCustomResource.RESERVED, computeExpectedCustomResourceValues(
testData.customResourceValues, (k, v) -> 0L));
}
@Test
public void testGetAllocatedResourcesWithCustomResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testGetAllocatedResources(testData);
}
@Test
public void testGetAllocatedResourcesWithoutCustomResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withResources(newResource(4 * GB, 4, Collections.emptyMap()))
.withLeafQueue(createBasicQueueHierarchy())
.build();
testGetAllocatedResources(testData);
}
}