YARN-10702. Add cluster metric for amount of CPU used by RM Event Processor.
Contributed by Jim Brennan.
This commit is contained in:
parent
d7b31acbba
commit
fb5809984e
@ -2903,6 +2903,18 @@ public static boolean isAclEnabled(Configuration conf) {
|
||||
public static final int
|
||||
DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD = 5000;
|
||||
|
||||
/** Resource manager dispatcher thread monitor sampling rate.
|
||||
* Units are samples per minute. This controls how often to sample
|
||||
* the cpu utilization of the resource manager dispatcher thread.
|
||||
* The cpu utilization is displayed on the RM UI as scheduler busy %.
|
||||
* Set to zero to disable the dispatcher thread monitor.
|
||||
*/
|
||||
public static final String
|
||||
YARN_DISPATCHER_CPU_MONITOR_SAMPLES_PER_MIN =
|
||||
YARN_PREFIX + "dispatcher.cpu-monitor.samples-per-min";
|
||||
public static final int
|
||||
DEFAULT_YARN_DISPATCHER_CPU_MONITOR_SAMPLES_PER_MIN = 60;
|
||||
|
||||
/**
|
||||
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
|
||||
* entries
|
||||
|
@ -136,4 +136,11 @@ public void handle(T event) {
|
||||
public void disableExitOnError() {
|
||||
shouldExitOnError = false;
|
||||
}
|
||||
protected long getEventProcessorId() {
|
||||
return this.eventProcessor.getId();
|
||||
}
|
||||
|
||||
protected boolean isStopped() {
|
||||
return this.stopped;
|
||||
}
|
||||
}
|
||||
|
@ -120,6 +120,19 @@
|
||||
<value>5000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Resource manager dispatcher thread cpu monitor sampling rate.
|
||||
Units are samples per minute. This controls how often to sample
|
||||
the cpu utilization of the resource manager dispatcher thread.
|
||||
The cpu utilization is displayed on the RM UI as scheduler busy %.
|
||||
Set this to zero to disable the dispatcher thread monitor. Defaults
|
||||
to 60 samples per minute.
|
||||
</description>
|
||||
<name>yarn.dispatcher.cpu-monitor.samples-per-min</name>
|
||||
<value>60</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The expiry interval for application master reporting.</description>
|
||||
<name>yarn.am.liveness-monitor.expiry-interval-ms</name>
|
||||
|
@ -62,6 +62,12 @@ public class ClusterMetrics {
|
||||
@Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores;
|
||||
@Metric("Memory Capability") MutableGaugeLong capabilityMB;
|
||||
@Metric("Vcore Capability") MutableGaugeLong capabilityVirtualCores;
|
||||
@Metric("RM Event Processor CPU Usage 60 second Avg") MutableGaugeLong
|
||||
rmEventProcCPUAvg;
|
||||
@Metric("RM Event Processor CPU Usage 60 second Max") MutableGaugeLong
|
||||
rmEventProcCPUMax;
|
||||
|
||||
private boolean rmEventProcMonitorEnable = false;
|
||||
|
||||
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
|
||||
"Metrics for the Yarn Cluster");
|
||||
@ -118,6 +124,27 @@ public synchronized static void destroy() {
|
||||
INSTANCE = null;
|
||||
}
|
||||
|
||||
// Indicate whether RM Event Thread CPU Monitor is enabled
|
||||
public void setRmEventProcMonitorEnable(boolean value) {
|
||||
rmEventProcMonitorEnable = value;
|
||||
}
|
||||
public boolean getRmEventProcMonitorEnable() {
|
||||
return rmEventProcMonitorEnable;
|
||||
}
|
||||
// RM Event Processor CPU Usage
|
||||
public long getRmEventProcCPUAvg() {
|
||||
return rmEventProcCPUAvg.value();
|
||||
}
|
||||
public void setRmEventProcCPUAvg(long value) {
|
||||
rmEventProcCPUAvg.set(value);
|
||||
}
|
||||
public long getRmEventProcCPUMax() {
|
||||
return rmEventProcCPUMax.value();
|
||||
}
|
||||
public void setRmEventProcCPUMax(long value) {
|
||||
rmEventProcCPUMax.set(value);
|
||||
}
|
||||
|
||||
//Active Nodemanagers
|
||||
public int getNumActiveNMs() {
|
||||
return numActiveNMs.value();
|
||||
@ -292,4 +319,4 @@ public long getUtilizedVirtualCores() {
|
||||
public void incrUtilizedVirtualCores(long delta) {
|
||||
utilizedVirtualCores.incr(delta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -48,8 +48,9 @@
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.curator.ZKCuratorManager;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -136,6 +137,8 @@
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.ThreadMXBean;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
@ -449,7 +452,21 @@ protected void setRMStateStore(RMStateStore rmStore) {
|
||||
}
|
||||
|
||||
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||
return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
|
||||
String dispatcherName = "SchedulerEventDispatcher";
|
||||
EventDispatcher dispatcher;
|
||||
int threadMonitorRate = conf.getInt(
|
||||
YarnConfiguration.YARN_DISPATCHER_CPU_MONITOR_SAMPLES_PER_MIN,
|
||||
YarnConfiguration.DEFAULT_YARN_DISPATCHER_CPU_MONITOR_SAMPLES_PER_MIN);
|
||||
|
||||
if (threadMonitorRate > 0) {
|
||||
dispatcher = new SchedulerEventDispatcher(dispatcherName,
|
||||
threadMonitorRate);
|
||||
ClusterMetrics.getMetrics().setRmEventProcMonitorEnable(true);
|
||||
} else {
|
||||
dispatcher = new EventDispatcher(this.scheduler, dispatcherName);
|
||||
}
|
||||
|
||||
return dispatcher;
|
||||
}
|
||||
|
||||
protected Dispatcher createDispatcher() {
|
||||
@ -1004,7 +1021,95 @@ public void handle(RMFatalEvent event) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@Private
|
||||
private class SchedulerEventDispatcher extends
|
||||
EventDispatcher<SchedulerEvent> {
|
||||
|
||||
private final Thread eventProcessorMonitor;
|
||||
|
||||
SchedulerEventDispatcher(String name, int samplesPerMin) {
|
||||
super(scheduler, name);
|
||||
this.eventProcessorMonitor =
|
||||
new Thread(new EventProcessorMonitor(getEventProcessorId(),
|
||||
samplesPerMin));
|
||||
this.eventProcessorMonitor
|
||||
.setName("ResourceManager Event Processor Monitor");
|
||||
}
|
||||
// EventProcessorMonitor keeps track of how much CPU the EventProcessor
|
||||
// thread is using. It takes a configurable number of samples per minute,
|
||||
// and then reports the Avg and Max of previous 60 seconds as cluster
|
||||
// metrics. Units are usecs per second of CPU used.
|
||||
// Avg is not accurate until one minute of samples have been received.
|
||||
private final class EventProcessorMonitor implements Runnable {
|
||||
private final long tid;
|
||||
private final boolean run;
|
||||
private final ThreadMXBean tmxb;
|
||||
private final ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
|
||||
private final int samples;
|
||||
EventProcessorMonitor(long id, int samplesPerMin) {
|
||||
assert samplesPerMin > 0;
|
||||
this.tid = id;
|
||||
this.samples = samplesPerMin;
|
||||
this.tmxb = ManagementFactory.getThreadMXBean();
|
||||
if (clusterMetrics != null &&
|
||||
tmxb != null && tmxb.isThreadCpuTimeSupported()) {
|
||||
this.run = true;
|
||||
clusterMetrics.setRmEventProcMonitorEnable(true);
|
||||
} else {
|
||||
this.run = false;
|
||||
}
|
||||
}
|
||||
public void run() {
|
||||
int index = 0;
|
||||
long[] values = new long[samples];
|
||||
int sleepMs = (60 * 1000) / samples;
|
||||
|
||||
while (run && !isStopped() && !Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
long cpuBefore = tmxb.getThreadCpuTime(tid);
|
||||
long wallClockBefore = Time.monotonicNow();
|
||||
Thread.sleep(sleepMs);
|
||||
long wallClockDelta = Time.monotonicNow() - wallClockBefore;
|
||||
long cpuDelta = tmxb.getThreadCpuTime(tid) - cpuBefore;
|
||||
|
||||
// Nanoseconds / Milliseconds = usec per second
|
||||
values[index] = cpuDelta / wallClockDelta;
|
||||
|
||||
index = (index + 1) % samples;
|
||||
long max = 0;
|
||||
long sum = 0;
|
||||
for (int i = 0; i < samples; i++) {
|
||||
sum += values[i];
|
||||
max = Math.max(max, values[i]);
|
||||
}
|
||||
clusterMetrics.setRmEventProcCPUAvg(sum / samples);
|
||||
clusterMetrics.setRmEventProcCPUMax(max);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Returning, interrupted : " + e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
super.serviceStart();
|
||||
this.eventProcessorMonitor.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
super.serviceStop();
|
||||
this.eventProcessorMonitor.interrupt();
|
||||
try {
|
||||
this.eventProcessorMonitor.join();
|
||||
} catch (InterruptedException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition to standby state in a new thread. The transition operation is
|
||||
* asynchronous to avoid deadlock caused by cyclic dependency.
|
||||
*/
|
||||
|
@ -204,7 +204,8 @@ protected void render(Block html) {
|
||||
}
|
||||
|
||||
SchedulerInfo schedulerInfo = new SchedulerInfo(this.rm);
|
||||
|
||||
int schedBusy = clusterMetrics.getRmSchedulerBusyPercent();
|
||||
|
||||
div.h3("Scheduler Metrics").
|
||||
table("#schedulermetricsoverview").
|
||||
thead().$class("ui-widget-header").
|
||||
@ -215,6 +216,7 @@ protected void render(Block html) {
|
||||
th().$class("ui-state-default").__("Maximum Allocation").__().
|
||||
th().$class("ui-state-default")
|
||||
.__("Maximum Cluster Application Priority").__().
|
||||
th().$class("ui-state-default").__("Scheduler Busy %").__().
|
||||
__().
|
||||
__().
|
||||
tbody().$class("ui-widget-content").
|
||||
@ -225,6 +227,7 @@ protected void render(Block html) {
|
||||
td(schedulerInfo.getMinAllocation().toString()).
|
||||
td(schedulerInfo.getMaxAllocation().toString()).
|
||||
td(String.valueOf(schedulerInfo.getMaxClusterLevelAppPriority())).
|
||||
td(schedBusy == -1 ? UNAVAILABLE : String.valueOf(schedBusy)).
|
||||
__().
|
||||
__().__();
|
||||
|
||||
|
@ -57,6 +57,7 @@ public class ClusterMetricsInfo {
|
||||
private long totalVirtualCores;
|
||||
private int utilizedMBPercent;
|
||||
private int utilizedVirtualCoresPercent;
|
||||
private int rmSchedulerBusyPercent;
|
||||
private int totalNodes;
|
||||
private int lostNodes;
|
||||
private int unhealthyNodes;
|
||||
@ -143,7 +144,11 @@ public ClusterMetricsInfo(final ResourceScheduler rs) {
|
||||
this.utilizedVirtualCoresPercent = baseCores <= 0 ? 0 :
|
||||
(int) (clusterMetrics.getUtilizedVirtualCores() * 100 /
|
||||
baseCores);
|
||||
|
||||
// Scheduler Busy is in usec per sec, so to get percent divide by 10^4
|
||||
// Set to -1 if disabled.
|
||||
this.rmSchedulerBusyPercent =
|
||||
clusterMetrics.getRmEventProcMonitorEnable() ?
|
||||
(int)(clusterMetrics.getRmEventProcCPUAvg() / 10000L) : -1;
|
||||
this.activeNodes = clusterMetrics.getNumActiveNMs();
|
||||
this.lostNodes = clusterMetrics.getNumLostNMs();
|
||||
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
|
||||
@ -271,6 +276,10 @@ public int getUtilizedVirtualCoresPercent() {
|
||||
return utilizedVirtualCoresPercent;
|
||||
}
|
||||
|
||||
public int getRmSchedulerBusyPercent() {
|
||||
return rmSchedulerBusyPercent;
|
||||
}
|
||||
|
||||
public void setContainersReserved(int containersReserved) {
|
||||
this.containersReserved = containersReserved;
|
||||
}
|
||||
@ -383,6 +392,10 @@ public void setUtilizedVirtualCoresPercent(int utilizedVirtualCoresPercent) {
|
||||
this.utilizedVirtualCoresPercent = utilizedVirtualCoresPercent;
|
||||
}
|
||||
|
||||
public void setRmSchedulerBusyPercent(int rmSchedulerBusyPercent) {
|
||||
this.rmSchedulerBusyPercent = rmSchedulerBusyPercent;
|
||||
}
|
||||
|
||||
public ResourceInfo getTotalClusterResourcesAcrossPartition() {
|
||||
return totalClusterResourcesAcrossPartition;
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ public class TestNodesPage {
|
||||
|
||||
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
|
||||
// future. In that case this value should be adjusted to the new value.
|
||||
private final int numberOfThInMetricsTable = 22;
|
||||
private final int numberOfThInMetricsTable = 23;
|
||||
private final int numberOfActualTableHeaders = 18;
|
||||
private final int numberOfThForOpportunisticContainers = 4;
|
||||
|
||||
|
@ -474,7 +474,7 @@ public void verifyClusterMetricsJSON(JSONObject json) throws JSONException,
|
||||
Exception {
|
||||
assertEquals("incorrect number of elements", 1, json.length());
|
||||
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
|
||||
assertEquals("incorrect number of elements", 31, clusterinfo.length());
|
||||
assertEquals("incorrect number of elements", 32, clusterinfo.length());
|
||||
verifyClusterMetrics(
|
||||
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
|
||||
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
|
||||
|
Loading…
Reference in New Issue
Block a user