YARN-10763. Add the number of containers assigned per second metrics to ClusterMetrics. Contributed by chaosju.
This commit is contained in:
parent
d92a25b790
commit
8891e5c028
@ -21,8 +21,12 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||||||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
@ -66,6 +70,8 @@ public class ClusterMetrics {
|
|||||||
rmEventProcCPUAvg;
|
rmEventProcCPUAvg;
|
||||||
@Metric("RM Event Processor CPU Usage 60 second Max") MutableGaugeLong
|
@Metric("RM Event Processor CPU Usage 60 second Max") MutableGaugeLong
|
||||||
rmEventProcCPUMax;
|
rmEventProcCPUMax;
|
||||||
|
@Metric("# of Containers assigned in the last second") MutableGaugeInt
|
||||||
|
containerAssignedPerSecond;
|
||||||
|
|
||||||
private boolean rmEventProcMonitorEnable = false;
|
private boolean rmEventProcMonitorEnable = false;
|
||||||
|
|
||||||
@ -85,6 +91,22 @@ public class ClusterMetrics {
|
|||||||
private static volatile ClusterMetrics INSTANCE = null;
|
private static volatile ClusterMetrics INSTANCE = null;
|
||||||
private static MetricsRegistry registry;
|
private static MetricsRegistry registry;
|
||||||
|
|
||||||
|
private AtomicInteger numContainersAssigned = new AtomicInteger(0);
|
||||||
|
private ScheduledThreadPoolExecutor assignCounterExecutor;
|
||||||
|
|
||||||
|
ClusterMetrics() {
|
||||||
|
assignCounterExecutor = new ScheduledThreadPoolExecutor(1,
|
||||||
|
new ThreadFactoryBuilder().
|
||||||
|
setDaemon(true).setNameFormat("ContainerAssignmentCounterThread").
|
||||||
|
build());
|
||||||
|
assignCounterExecutor.scheduleAtFixedRate(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
containerAssignedPerSecond.set(numContainersAssigned.getAndSet(0));
|
||||||
|
}
|
||||||
|
}, 1, 1, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
public static ClusterMetrics getMetrics() {
|
public static ClusterMetrics getMetrics() {
|
||||||
if(!isInitialized.get()){
|
if(!isInitialized.get()){
|
||||||
synchronized (ClusterMetrics.class) {
|
synchronized (ClusterMetrics.class) {
|
||||||
@ -120,6 +142,9 @@ public class ClusterMetrics {
|
|||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public synchronized static void destroy() {
|
public synchronized static void destroy() {
|
||||||
|
if (INSTANCE != null && INSTANCE.getAssignCounterExecutor() != null) {
|
||||||
|
INSTANCE.getAssignCounterExecutor().shutdownNow();
|
||||||
|
}
|
||||||
isInitialized.set(false);
|
isInitialized.set(false);
|
||||||
INSTANCE = null;
|
INSTANCE = null;
|
||||||
}
|
}
|
||||||
@ -319,4 +344,16 @@ public class ClusterMetrics {
|
|||||||
public void incrUtilizedVirtualCores(long delta) {
|
public void incrUtilizedVirtualCores(long delta) {
|
||||||
utilizedVirtualCores.incr(delta);
|
utilizedVirtualCores.incr(delta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getContainerAssignedPerSecond() {
|
||||||
|
return containerAssignedPerSecond.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrNumContainerAssigned() {
|
||||||
|
numContainersAssigned.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ScheduledThreadPoolExecutor getAssignCounterExecutor(){
|
||||||
|
return assignCounterExecutor;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
@ -773,6 +774,7 @@ public class AppSchedulingInfo {
|
|||||||
containerAllocated.getContainer().getResource());
|
containerAllocated.getContainer().getResource());
|
||||||
}
|
}
|
||||||
queue.getMetrics().incrNodeTypeAggregations(user, type);
|
queue.getMetrics().incrNodeTypeAggregations(user, type);
|
||||||
|
ClusterMetrics.getMetrics().incrNumContainerAssigned();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get AppPlacementAllocator by specified schedulerKey
|
// Get AppPlacementAllocator by specified schedulerKey
|
||||||
|
@ -67,6 +67,8 @@ public class ClusterMetricsInfo {
|
|||||||
private int activeNodes;
|
private int activeNodes;
|
||||||
private int shutdownNodes;
|
private int shutdownNodes;
|
||||||
|
|
||||||
|
private int containerAssignedPerSecond;
|
||||||
|
|
||||||
// Total used resource of the cluster, including all partitions
|
// Total used resource of the cluster, including all partitions
|
||||||
private ResourceInfo totalUsedResourcesAcrossPartition;
|
private ResourceInfo totalUsedResourcesAcrossPartition;
|
||||||
|
|
||||||
@ -158,6 +160,8 @@ public class ClusterMetricsInfo {
|
|||||||
this.shutdownNodes = clusterMetrics.getNumShutdownNMs();
|
this.shutdownNodes = clusterMetrics.getNumShutdownNMs();
|
||||||
this.totalNodes = activeNodes + lostNodes + decommissionedNodes
|
this.totalNodes = activeNodes + lostNodes + decommissionedNodes
|
||||||
+ rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes;
|
+ rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes;
|
||||||
|
this.containerAssignedPerSecond = clusterMetrics
|
||||||
|
.getContainerAssignedPerSecond();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getAppsSubmitted() {
|
public int getAppsSubmitted() {
|
||||||
@ -411,4 +415,8 @@ public class ClusterMetricsInfo {
|
|||||||
public boolean getCrossPartitionMetricsAvailable() {
|
public boolean getCrossPartitionMetricsAvailable() {
|
||||||
return crossPartitionMetricsAvailable;
|
return crossPartitionMetricsAvailable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getContainerAssignedPerSecond() {
|
||||||
|
return this.containerAssignedPerSecond;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,11 +20,14 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||||||
|
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
public class TestClusterMetrics {
|
public class TestClusterMetrics {
|
||||||
|
|
||||||
private ClusterMetrics metrics;
|
private ClusterMetrics metrics;
|
||||||
@ -63,4 +66,18 @@ public class TestClusterMetrics {
|
|||||||
DefaultMetricsSystem.shutdown();
|
DefaultMetricsSystem.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClusterMetrics() throws Exception {
|
||||||
|
Assert.assertTrue(!metrics.containerAssignedPerSecond.changed());
|
||||||
|
metrics.incrNumContainerAssigned();
|
||||||
|
metrics.incrNumContainerAssigned();
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return metrics.getContainerAssignedPerSecond() == 2;
|
||||||
|
}
|
||||||
|
}, 500, 5000);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -474,7 +474,7 @@ public class TestRMWebServices extends JerseyTestBase {
|
|||||||
Exception {
|
Exception {
|
||||||
assertEquals("incorrect number of elements", 1, json.length());
|
assertEquals("incorrect number of elements", 1, json.length());
|
||||||
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
|
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
|
||||||
assertEquals("incorrect number of elements", 32, clusterinfo.length());
|
assertEquals("incorrect number of elements", 33, clusterinfo.length());
|
||||||
verifyClusterMetrics(
|
verifyClusterMetrics(
|
||||||
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
|
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
|
||||||
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
|
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user