MAPREDUCE-3059. QueueMetrics do not have metrics for aggregate containers-allocated and aggregate containers-released. (Devaraj K via mahadev)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1183540 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5f9eab403c
commit
b3284c4607
@ -385,6 +385,10 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-3133. Running a set of methods in a Single Test Class.
|
||||
(Jonathan Eagles via mahadev)
|
||||
|
||||
MAPREDUCE-3059. QueueMetrics do not have metrics for aggregate
|
||||
containers-allocated and aggregate containers-released.
|
||||
(Devaraj K via mahadev)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
||||
|
@ -18,9 +18,11 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import com.google.common.base.Splitter;
|
||||
import java.util.Map;
|
||||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.multiply;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||
@ -28,16 +30,16 @@
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.*;
|
||||
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Splitter;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@Metrics(context="yarn")
|
||||
@ -51,6 +53,8 @@ public class QueueMetrics {
|
||||
|
||||
@Metric("Allocated memory in GiB") MutableGaugeInt allocatedGB;
|
||||
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
|
||||
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
|
||||
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
|
||||
@Metric("Available memory in GiB") MutableGaugeInt availableGB;
|
||||
@Metric("Pending memory allocation in GiB") MutableGaugeInt pendingGB;
|
||||
@Metric("# of pending containers") MutableGaugeInt pendingContainers;
|
||||
@ -234,6 +238,7 @@ private void _decrPendingResources(int containers, Resource res) {
|
||||
|
||||
public void allocateResources(String user, int containers, Resource res) {
|
||||
allocatedContainers.incr(containers);
|
||||
aggregateContainersAllocated.incr(containers);
|
||||
allocatedGB.incr(res.getMemory()/GB * containers);
|
||||
_decrPendingResources(containers, multiply(res, containers));
|
||||
QueueMetrics userMetrics = getUserMetrics(user);
|
||||
@ -247,6 +252,7 @@ public void allocateResources(String user, int containers, Resource res) {
|
||||
|
||||
public void releaseResources(String user, int containers, Resource res) {
|
||||
allocatedContainers.decr(containers);
|
||||
aggregateContainersReleased.incr(containers);
|
||||
allocatedGB.decr(res.getMemory()/GB * containers);
|
||||
QueueMetrics userMetrics = getUserMetrics(user);
|
||||
if (userMetrics != null) {
|
||||
|
@ -18,22 +18,23 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.apache.hadoop.test.MockitoMaker.make;
|
||||
import static org.apache.hadoop.test.MockitoMaker.stub;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.*;
|
||||
import static org.apache.hadoop.test.MockitoMaker.*;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class TestQueueMetrics {
|
||||
static final int GB = 1024; // MB
|
||||
@ -56,16 +57,16 @@ public class TestQueueMetrics {
|
||||
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
|
||||
// Available resources is set externally, as it depends on dynamic
|
||||
// configurable cluster/queue resources
|
||||
checkResources(queueSource, 0, 0, 100, 15, 5, 0, 0);
|
||||
checkResources(queueSource, 0, 0, 0, 0, 100, 15, 5, 0, 0);
|
||||
|
||||
metrics.incrAppsRunning(user);
|
||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
|
||||
|
||||
metrics.allocateResources(user, 3, Resources.createResource(2*GB));
|
||||
checkResources(queueSource, 6, 3, 100, 9, 2, 0, 0);
|
||||
checkResources(queueSource, 6, 3, 3, 0, 100, 9, 2, 0, 0);
|
||||
|
||||
metrics.releaseResources(user, 1, Resources.createResource(2*GB));
|
||||
checkResources(queueSource, 4, 2, 100, 9, 2, 0, 0);
|
||||
checkResources(queueSource, 4, 2, 3, 1, 100, 9, 2, 0, 0);
|
||||
|
||||
metrics.finishApp(app, RMAppAttemptState.FINISHED);
|
||||
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
|
||||
@ -91,20 +92,20 @@ public class TestQueueMetrics {
|
||||
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
|
||||
// Available resources is set externally, as it depends on dynamic
|
||||
// configurable cluster/queue resources
|
||||
checkResources(queueSource, 0, 0, 100, 15, 5, 0, 0);
|
||||
checkResources(userSource, 0, 0, 10, 15, 5, 0, 0);
|
||||
checkResources(queueSource, 0, 0, 0, 0, 100, 15, 5, 0, 0);
|
||||
checkResources(userSource, 0, 0, 0, 0, 10, 15, 5, 0, 0);
|
||||
|
||||
metrics.incrAppsRunning(user);
|
||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
|
||||
checkApps(userSource, 1, 0, 1, 0, 0, 0);
|
||||
|
||||
metrics.allocateResources(user, 3, Resources.createResource(2*GB));
|
||||
checkResources(queueSource, 6, 3, 100, 9, 2, 0, 0);
|
||||
checkResources(userSource, 6, 3, 10, 9, 2, 0, 0);
|
||||
checkResources(queueSource, 6, 3, 3, 0, 100, 9, 2, 0, 0);
|
||||
checkResources(userSource, 6, 3, 3, 0, 10, 9, 2, 0, 0);
|
||||
|
||||
metrics.releaseResources(user, 1, Resources.createResource(2*GB));
|
||||
checkResources(queueSource, 4, 2, 100, 9, 2, 0, 0);
|
||||
checkResources(userSource, 4, 2, 10, 9, 2, 0, 0);
|
||||
checkResources(queueSource, 4, 2, 3, 1, 100, 9, 2, 0, 0);
|
||||
checkResources(userSource, 4, 2, 3, 1, 10, 9, 2, 0, 0);
|
||||
|
||||
metrics.finishApp(app, RMAppAttemptState.FINISHED);
|
||||
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
|
||||
@ -140,10 +141,10 @@ public class TestQueueMetrics {
|
||||
parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
|
||||
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
|
||||
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
|
||||
checkResources(queueSource, 0, 0, 100, 15, 5, 0, 0);
|
||||
checkResources(parentQueueSource, 0, 0, 100, 15, 5, 0, 0);
|
||||
checkResources(userSource, 0, 0, 10, 15, 5, 0, 0);
|
||||
checkResources(parentUserSource, 0, 0, 10, 15, 5, 0, 0);
|
||||
checkResources(queueSource, 0, 0, 0, 0, 100, 15, 5, 0, 0);
|
||||
checkResources(parentQueueSource, 0, 0, 0, 0, 100, 15, 5, 0, 0);
|
||||
checkResources(userSource, 0, 0, 0, 0, 10, 15, 5, 0, 0);
|
||||
checkResources(parentUserSource, 0, 0, 0, 0, 10, 15, 5, 0, 0);
|
||||
|
||||
metrics.incrAppsRunning(user);
|
||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
|
||||
@ -153,17 +154,17 @@ public class TestQueueMetrics {
|
||||
metrics.reserveResource(user, Resources.createResource(3*GB));
|
||||
// Available resources is set externally, as it depends on dynamic
|
||||
// configurable cluster/queue resources
|
||||
checkResources(queueSource, 6, 3, 100, 9, 2, 3, 1);
|
||||
checkResources(parentQueueSource, 6, 3, 100, 9, 2, 3, 1);
|
||||
checkResources(userSource, 6, 3, 10, 9, 2, 3, 1);
|
||||
checkResources(parentUserSource, 6, 3, 10, 9, 2, 3, 1);
|
||||
checkResources(queueSource, 6, 3, 3, 0, 100, 9, 2, 3, 1);
|
||||
checkResources(parentQueueSource, 6, 3, 3, 0, 100, 9, 2, 3, 1);
|
||||
checkResources(userSource, 6, 3, 3, 0, 10, 9, 2, 3, 1);
|
||||
checkResources(parentUserSource, 6, 3, 3, 0, 10, 9, 2, 3, 1);
|
||||
|
||||
metrics.releaseResources(user, 1, Resources.createResource(2*GB));
|
||||
metrics.unreserveResource(user, Resources.createResource(3*GB));
|
||||
checkResources(queueSource, 4, 2, 100, 9, 2, 0, 0);
|
||||
checkResources(parentQueueSource, 4, 2, 100, 9, 2, 0, 0);
|
||||
checkResources(userSource, 4, 2, 10, 9, 2, 0, 0);
|
||||
checkResources(parentUserSource, 4, 2, 10, 9, 2, 0, 0);
|
||||
checkResources(queueSource, 4, 2, 3, 1, 100, 9, 2, 0, 0);
|
||||
checkResources(parentQueueSource, 4, 2, 3, 1, 100, 9, 2, 0, 0);
|
||||
checkResources(userSource, 4, 2, 3, 1, 10, 9, 2, 0, 0);
|
||||
checkResources(parentUserSource, 4, 2, 3, 1, 10, 9, 2, 0, 0);
|
||||
|
||||
metrics.finishApp(app, RMAppAttemptState.FINISHED);
|
||||
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
|
||||
@ -184,11 +185,13 @@ public static void checkApps(MetricsSource source, int submitted, int pending,
|
||||
}
|
||||
|
||||
public static void checkResources(MetricsSource source, int allocGB,
|
||||
int allocCtnrs, int availGB, int pendingGB, int pendingCtnrs,
|
||||
int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs, int availGB, int pendingGB, int pendingCtnrs,
|
||||
int reservedGB, int reservedCtnrs) {
|
||||
MetricsRecordBuilder rb = getMetrics(source);
|
||||
assertGauge("AllocatedGB", allocGB, rb);
|
||||
assertGauge("AllocatedContainers", allocCtnrs, rb);
|
||||
assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
|
||||
assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
|
||||
assertGauge("AvailableGB", availGB, rb);
|
||||
assertGauge("PendingGB", pendingGB, rb);
|
||||
assertGauge("PendingContainers", pendingCtnrs, rb);
|
||||
|
Loading…
Reference in New Issue
Block a user