YARN-10771. Add cluster metric for size of SchedulerEventQueue and RMEventQueue. Contributed by chaosju.
This commit is contained in:
parent
5f400032b6
commit
59172ada90
@ -409,4 +409,8 @@ public void addMetrics(EventTypeMetrics metrics,
|
|||||||
Class<? extends Enum> eventClass) {
|
Class<? extends Enum> eventClass) {
|
||||||
eventTypeMetricsMap.put(eventClass, metrics);
|
eventTypeMetricsMap.put(eventClass, metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getEventQueueSize() {
|
||||||
|
return eventQueue.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -161,4 +161,8 @@ protected long getEventProcessorId() {
|
|||||||
protected boolean isStopped() {
|
protected boolean isStopped() {
|
||||||
return this.stopped;
|
return this.stopped;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getEventQueueSize() {
|
||||||
|
return eventQueue.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,6 +72,10 @@ public class ClusterMetrics {
|
|||||||
rmEventProcCPUMax;
|
rmEventProcCPUMax;
|
||||||
@Metric("# of Containers assigned in the last second") MutableGaugeInt
|
@Metric("# of Containers assigned in the last second") MutableGaugeInt
|
||||||
containerAssignedPerSecond;
|
containerAssignedPerSecond;
|
||||||
|
@Metric("# of rm dispatcher event queue size")
|
||||||
|
MutableGaugeInt rmDispatcherEventQueueSize;
|
||||||
|
@Metric("# of scheduler dispatcher event queue size")
|
||||||
|
MutableGaugeInt schedulerDispatcherEventQueueSize;
|
||||||
|
|
||||||
private boolean rmEventProcMonitorEnable = false;
|
private boolean rmEventProcMonitorEnable = false;
|
||||||
|
|
||||||
@ -356,4 +360,20 @@ public void incrNumContainerAssigned() {
|
|||||||
private ScheduledThreadPoolExecutor getAssignCounterExecutor(){
|
private ScheduledThreadPoolExecutor getAssignCounterExecutor(){
|
||||||
return assignCounterExecutor;
|
return assignCounterExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getRmEventQueueSize() {
|
||||||
|
return rmDispatcherEventQueueSize.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRmEventQueueSize(int rmEventQueueSize) {
|
||||||
|
this.rmDispatcherEventQueueSize.set(rmEventQueueSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getSchedulerEventQueueSize() {
|
||||||
|
return schedulerDispatcherEventQueueSize.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSchedulerEventQueueSize(int schedulerEventQueueSize) {
|
||||||
|
this.schedulerDispatcherEventQueueSize.set(schedulerEventQueueSize);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import com.sun.jersey.spi.container.servlet.ServletContainer;
|
import com.sun.jersey.spi.container.servlet.ServletContainer;
|
||||||
|
|
||||||
@ -152,6 +153,8 @@
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -715,6 +718,7 @@ public class RMActiveServices extends CompositeService {
|
|||||||
private boolean fromActive = false;
|
private boolean fromActive = false;
|
||||||
private StandByTransitionRunnable standByTransitionRunnable;
|
private StandByTransitionRunnable standByTransitionRunnable;
|
||||||
private RMNMInfo rmnmInfo;
|
private RMNMInfo rmnmInfo;
|
||||||
|
private ScheduledThreadPoolExecutor eventQueueMetricExecutor;
|
||||||
|
|
||||||
RMActiveServices(ResourceManager rm) {
|
RMActiveServices(ResourceManager rm) {
|
||||||
super("RMActiveServices");
|
super("RMActiveServices");
|
||||||
@ -937,6 +941,23 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
|||||||
addIfService(volumeManager);
|
addIfService(volumeManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eventQueueMetricExecutor = new ScheduledThreadPoolExecutor(1,
|
||||||
|
new ThreadFactoryBuilder().
|
||||||
|
setDaemon(true).setNameFormat("EventQueueSizeMetricThread").
|
||||||
|
build());
|
||||||
|
eventQueueMetricExecutor.scheduleAtFixedRate(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
int rmEventQueueSize = ((AsyncDispatcher)getRMContext().
|
||||||
|
getDispatcher()).getEventQueueSize();
|
||||||
|
ClusterMetrics.getMetrics().setRmEventQueueSize(rmEventQueueSize);
|
||||||
|
int schedulerEventQueueSize = ((EventDispatcher)schedulerDispatcher).
|
||||||
|
getEventQueueSize();
|
||||||
|
ClusterMetrics.getMetrics().
|
||||||
|
setSchedulerEventQueueSize(schedulerEventQueueSize);
|
||||||
|
}
|
||||||
|
}, 1, 1, TimeUnit.SECONDS);
|
||||||
|
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1012,6 +1033,9 @@ protected void serviceStop() throws Exception {
|
|||||||
LOG.error("Error closing store.", e);
|
LOG.error("Error closing store.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (eventQueueMetricExecutor != null) {
|
||||||
|
eventQueueMetricExecutor.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -205,6 +205,8 @@ protected void render(Block html) {
|
|||||||
|
|
||||||
SchedulerInfo schedulerInfo = new SchedulerInfo(this.rm);
|
SchedulerInfo schedulerInfo = new SchedulerInfo(this.rm);
|
||||||
int schedBusy = clusterMetrics.getRmSchedulerBusyPercent();
|
int schedBusy = clusterMetrics.getRmSchedulerBusyPercent();
|
||||||
|
int rmEventQueueSize = clusterMetrics.getRmEventQueueSize();
|
||||||
|
int schedulerEventQueueSize = clusterMetrics.getSchedulerEventQueueSize();
|
||||||
|
|
||||||
div.h3("Scheduler Metrics").
|
div.h3("Scheduler Metrics").
|
||||||
table("#schedulermetricsoverview").
|
table("#schedulermetricsoverview").
|
||||||
@ -217,6 +219,10 @@ protected void render(Block html) {
|
|||||||
th().$class("ui-state-default")
|
th().$class("ui-state-default")
|
||||||
.__("Maximum Cluster Application Priority").__().
|
.__("Maximum Cluster Application Priority").__().
|
||||||
th().$class("ui-state-default").__("Scheduler Busy %").__().
|
th().$class("ui-state-default").__("Scheduler Busy %").__().
|
||||||
|
th().$class("ui-state-default")
|
||||||
|
.__("RM Dispatcher EventQueue Size").__().
|
||||||
|
th().$class("ui-state-default")
|
||||||
|
.__("Scheduler Dispatcher EventQueue Size").__().
|
||||||
__().
|
__().
|
||||||
__().
|
__().
|
||||||
tbody().$class("ui-widget-content").
|
tbody().$class("ui-widget-content").
|
||||||
@ -228,6 +234,8 @@ protected void render(Block html) {
|
|||||||
td(schedulerInfo.getMaxAllocation().toString()).
|
td(schedulerInfo.getMaxAllocation().toString()).
|
||||||
td(String.valueOf(schedulerInfo.getMaxClusterLevelAppPriority())).
|
td(String.valueOf(schedulerInfo.getMaxClusterLevelAppPriority())).
|
||||||
td(schedBusy == -1 ? UNAVAILABLE : String.valueOf(schedBusy)).
|
td(schedBusy == -1 ? UNAVAILABLE : String.valueOf(schedBusy)).
|
||||||
|
td(String.valueOf(rmEventQueueSize)).
|
||||||
|
td(String.valueOf(schedulerEventQueueSize)).
|
||||||
__().
|
__().
|
||||||
__().__();
|
__().__();
|
||||||
|
|
||||||
|
@ -83,6 +83,9 @@ public class ClusterMetricsInfo {
|
|||||||
|
|
||||||
private boolean crossPartitionMetricsAvailable = false;
|
private boolean crossPartitionMetricsAvailable = false;
|
||||||
|
|
||||||
|
private int rmEventQueueSize;
|
||||||
|
private int schedulerEventQueueSize;
|
||||||
|
|
||||||
public ClusterMetricsInfo() {
|
public ClusterMetricsInfo() {
|
||||||
} // JAXB needs this
|
} // JAXB needs this
|
||||||
|
|
||||||
@ -162,6 +165,8 @@ public ClusterMetricsInfo(final ResourceScheduler rs) {
|
|||||||
+ rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes;
|
+ rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes;
|
||||||
this.containerAssignedPerSecond = clusterMetrics
|
this.containerAssignedPerSecond = clusterMetrics
|
||||||
.getContainerAssignedPerSecond();
|
.getContainerAssignedPerSecond();
|
||||||
|
this.rmEventQueueSize = clusterMetrics.getRmEventQueueSize();
|
||||||
|
this.schedulerEventQueueSize = clusterMetrics.getSchedulerEventQueueSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getAppsSubmitted() {
|
public int getAppsSubmitted() {
|
||||||
@ -419,4 +424,12 @@ public boolean getCrossPartitionMetricsAvailable() {
|
|||||||
public int getContainerAssignedPerSecond() {
|
public int getContainerAssignedPerSecond() {
|
||||||
return this.containerAssignedPerSecond;
|
return this.containerAssignedPerSecond;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getRmEventQueueSize() {
|
||||||
|
return rmEventQueueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getSchedulerEventQueueSize() {
|
||||||
|
return schedulerEventQueueSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,7 +52,7 @@ public class TestNodesPage {
|
|||||||
|
|
||||||
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
|
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
|
||||||
// future. In that case this value should be adjusted to the new value.
|
// future. In that case this value should be adjusted to the new value.
|
||||||
private final int numberOfThInMetricsTable = 23;
|
private final int numberOfThInMetricsTable = 25;
|
||||||
private final int numberOfActualTableHeaders = 16;
|
private final int numberOfActualTableHeaders = 16;
|
||||||
private final int numberOfThForOpportunisticContainers = 4;
|
private final int numberOfThForOpportunisticContainers = 4;
|
||||||
|
|
||||||
|
@ -474,7 +474,7 @@ public void verifyClusterMetricsJSON(JSONObject json) throws JSONException,
|
|||||||
Exception {
|
Exception {
|
||||||
assertEquals("incorrect number of elements", 1, json.length());
|
assertEquals("incorrect number of elements", 1, json.length());
|
||||||
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
|
JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
|
||||||
assertEquals("incorrect number of elements", 33, clusterinfo.length());
|
assertEquals("incorrect number of elements", 35, clusterinfo.length());
|
||||||
verifyClusterMetrics(
|
verifyClusterMetrics(
|
||||||
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
|
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
|
||||||
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
|
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
|
||||||
|
Loading…
Reference in New Issue
Block a user