YARN-10702. Add cluster metric for amount of CPU used by RM Event Processor.

Contributed by Jim Brennan.
This commit is contained in:
Eric Badger 2021-04-06 01:16:14 +00:00
parent de05cefbae
commit 26b8f678b2
9 changed files with 186 additions and 9 deletions

View File

@ -2927,6 +2927,18 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int
DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD = 5000;
/** Resource manager dispatcher thread monitor sampling rate.
* Units are samples per minute. This controls how often to sample
* the cpu utilization of the resource manager dispatcher thread.
* The cpu utilization is displayed on the RM UI as scheduler busy %.
* Set to zero to disable the dispatcher thread monitor.
*/
public static final String
YARN_DISPATCHER_CPU_MONITOR_SAMPLES_PER_MIN =
YARN_PREFIX + "dispatcher.cpu-monitor.samples-per-min";
public static final int
DEFAULT_YARN_DISPATCHER_CPU_MONITOR_SAMPLES_PER_MIN = 60;
/**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries

View File

@ -154,4 +154,11 @@ public void setMetrics(EventTypeMetrics metrics) {
this.metrics = metrics;
}
protected long getEventProcessorId() {
return this.eventProcessor.getId();
}
protected boolean isStopped() {
return this.stopped;
}
}

View File

@ -120,6 +120,19 @@
<value>5000</value>
</property>
<property>
<description>
Resource manager dispatcher thread cpu monitor sampling rate.
Units are samples per minute. This controls how often to sample
the cpu utilization of the resource manager dispatcher thread.
The cpu utilization is displayed on the RM UI as scheduler busy %.
Set this to zero to disable the dispatcher thread monitor. Defaults
to 60 samples per minute.
</description>
<name>yarn.dispatcher.cpu-monitor.samples-per-min</name>
<value>60</value>
</property>
<property>
<description>The expiry interval for application master reporting.</description>
<name>yarn.am.liveness-monitor.expiry-interval-ms</name>

View File

@ -62,6 +62,12 @@ public class ClusterMetrics {
@Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores;
@Metric("Memory Capability") MutableGaugeLong capabilityMB;
@Metric("Vcore Capability") MutableGaugeLong capabilityVirtualCores;
@Metric("RM Event Processor CPU Usage 60 second Avg") MutableGaugeLong
rmEventProcCPUAvg;
@Metric("RM Event Processor CPU Usage 60 second Max") MutableGaugeLong
rmEventProcCPUMax;
private boolean rmEventProcMonitorEnable = false;
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
"Metrics for the Yarn Cluster");
@ -118,6 +124,27 @@ public synchronized static void destroy() {
INSTANCE = null;
}
// Indicate whether RM Event Thread CPU Monitor is enabled
public void setRmEventProcMonitorEnable(boolean value) {
rmEventProcMonitorEnable = value;
}
public boolean getRmEventProcMonitorEnable() {
return rmEventProcMonitorEnable;
}
// RM Event Processor CPU Usage
public long getRmEventProcCPUAvg() {
return rmEventProcCPUAvg.value();
}
public void setRmEventProcCPUAvg(long value) {
rmEventProcCPUAvg.set(value);
}
public long getRmEventProcCPUMax() {
return rmEventProcCPUMax.value();
}
public void setRmEventProcCPUMax(long value) {
rmEventProcCPUMax.set(value);
}
//Active Nodemanagers
public int getNumActiveNMs() {
return numActiveNMs.value();

View File

@ -49,8 +49,9 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -138,6 +139,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
@ -451,8 +454,19 @@ protected void setRMStateStore(RMStateStore rmStore) {
}
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
EventDispatcher dispatcher = new
EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
String dispatcherName = "SchedulerEventDispatcher";
EventDispatcher dispatcher;
int threadMonitorRate = conf.getInt(
YarnConfiguration.YARN_DISPATCHER_CPU_MONITOR_SAMPLES_PER_MIN,
YarnConfiguration.DEFAULT_YARN_DISPATCHER_CPU_MONITOR_SAMPLES_PER_MIN);
if (threadMonitorRate > 0) {
dispatcher = new SchedulerEventDispatcher(dispatcherName,
threadMonitorRate);
ClusterMetrics.getMetrics().setRmEventProcMonitorEnable(true);
} else {
dispatcher = new EventDispatcher(this.scheduler, dispatcherName);
}
dispatcher.
setMetrics(GenericEventTypeMetricsManager.
create(dispatcher.getName(), SchedulerEventType.class));
@ -1018,6 +1032,94 @@ public void handle(RMFatalEvent event) {
}
}
@Private
private class SchedulerEventDispatcher extends
EventDispatcher<SchedulerEvent> {
private final Thread eventProcessorMonitor;
SchedulerEventDispatcher(String name, int samplesPerMin) {
super(scheduler, name);
this.eventProcessorMonitor =
new Thread(new EventProcessorMonitor(getEventProcessorId(),
samplesPerMin));
this.eventProcessorMonitor
.setName("ResourceManager Event Processor Monitor");
}
// EventProcessorMonitor keeps track of how much CPU the EventProcessor
// thread is using. It takes a configurable number of samples per minute,
// and then reports the Avg and Max of previous 60 seconds as cluster
// metrics. Units are usecs per second of CPU used.
// Avg is not accurate until one minute of samples have been received.
private final class EventProcessorMonitor implements Runnable {
private final long tid;
private final boolean run;
private final ThreadMXBean tmxb;
private final ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
private final int samples;
EventProcessorMonitor(long id, int samplesPerMin) {
assert samplesPerMin > 0;
this.tid = id;
this.samples = samplesPerMin;
this.tmxb = ManagementFactory.getThreadMXBean();
if (clusterMetrics != null &&
tmxb != null && tmxb.isThreadCpuTimeSupported()) {
this.run = true;
clusterMetrics.setRmEventProcMonitorEnable(true);
} else {
this.run = false;
}
}
public void run() {
int index = 0;
long[] values = new long[samples];
int sleepMs = (60 * 1000) / samples;
while (run && !isStopped() && !Thread.currentThread().isInterrupted()) {
try {
long cpuBefore = tmxb.getThreadCpuTime(tid);
long wallClockBefore = Time.monotonicNow();
Thread.sleep(sleepMs);
long wallClockDelta = Time.monotonicNow() - wallClockBefore;
long cpuDelta = tmxb.getThreadCpuTime(tid) - cpuBefore;
// Nanoseconds / Milliseconds = usec per second
values[index] = cpuDelta / wallClockDelta;
index = (index + 1) % samples;
long max = 0;
long sum = 0;
for (int i = 0; i < samples; i++) {
sum += values[i];
max = Math.max(max, values[i]);
}
clusterMetrics.setRmEventProcCPUAvg(sum / samples);
clusterMetrics.setRmEventProcCPUMax(max);
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
return;
}
}
}
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
this.eventProcessorMonitor.start();
}
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
this.eventProcessorMonitor.interrupt();
try {
this.eventProcessorMonitor.join();
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
}
}
/**
* Transition to standby state in a new thread. The transition operation is
* asynchronous to avoid deadlock caused by cyclic dependency.

View File

@ -204,6 +204,7 @@ protected void render(Block html) {
}
SchedulerInfo schedulerInfo = new SchedulerInfo(this.rm);
int schedBusy = clusterMetrics.getRmSchedulerBusyPercent();
div.h3("Scheduler Metrics").
table("#schedulermetricsoverview").
@ -215,6 +216,7 @@ protected void render(Block html) {
th().$class("ui-state-default").__("Maximum Allocation").__().
th().$class("ui-state-default")
.__("Maximum Cluster Application Priority").__().
th().$class("ui-state-default").__("Scheduler Busy %").__().
__().
__().
tbody().$class("ui-widget-content").
@ -225,6 +227,7 @@ protected void render(Block html) {
td(schedulerInfo.getMinAllocation().toString()).
td(schedulerInfo.getMaxAllocation().toString()).
td(String.valueOf(schedulerInfo.getMaxClusterLevelAppPriority())).
td(schedBusy == -1 ? UNAVAILABLE : String.valueOf(schedBusy)).
__().
__().__();

View File

@ -57,6 +57,7 @@ public class ClusterMetricsInfo {
private long totalVirtualCores;
private int utilizedMBPercent;
private int utilizedVirtualCoresPercent;
private int rmSchedulerBusyPercent;
private int totalNodes;
private int lostNodes;
private int unhealthyNodes;
@ -143,7 +144,11 @@ public ClusterMetricsInfo(final ResourceScheduler rs) {
this.utilizedVirtualCoresPercent = baseCores <= 0 ? 0 :
(int) (clusterMetrics.getUtilizedVirtualCores() * 100 /
baseCores);
// Scheduler Busy is in usec per sec, so to get percent divide by 10^4
// Set to -1 if disabled.
this.rmSchedulerBusyPercent =
clusterMetrics.getRmEventProcMonitorEnable() ?
(int)(clusterMetrics.getRmEventProcCPUAvg() / 10000L) : -1;
this.activeNodes = clusterMetrics.getNumActiveNMs();
this.lostNodes = clusterMetrics.getNumLostNMs();
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
@ -271,6 +276,10 @@ public int getUtilizedVirtualCoresPercent() {
return utilizedVirtualCoresPercent;
}
public int getRmSchedulerBusyPercent() {
return rmSchedulerBusyPercent;
}
public void setContainersReserved(int containersReserved) {
this.containersReserved = containersReserved;
}
@ -383,6 +392,10 @@ public void setUtilizedVirtualCoresPercent(int utilizedVirtualCoresPercent) {
this.utilizedVirtualCoresPercent = utilizedVirtualCoresPercent;
}
public void setRmSchedulerBusyPercent(int rmSchedulerBusyPercent) {
this.rmSchedulerBusyPercent = rmSchedulerBusyPercent;
}
public ResourceInfo getTotalClusterResourcesAcrossPartition() {
return totalClusterResourcesAcrossPartition;
}

View File

@ -52,7 +52,7 @@ public class TestNodesPage {
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.
private final int numberOfThInMetricsTable = 22;
private final int numberOfThInMetricsTable = 23;
private final int numberOfActualTableHeaders = 18;
private final int numberOfThForOpportunisticContainers = 4;

View File

@ -474,7 +474,7 @@ public void verifyClusterMetricsJSON(JSONObject json) throws JSONException,
Exception {
assertEquals("incorrect number of elements", 1, json.length());
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
assertEquals("incorrect number of elements", 31, clusterinfo.length());
assertEquals("incorrect number of elements", 32, clusterinfo.length());
verifyClusterMetrics(
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),