From 59172ada9014f8c056f2bd37b25a26572ca643af Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 24 May 2021 23:12:07 +0800 Subject: [PATCH] YARN-10771. Add cluster metric for size of SchedulerEventQueue and RMEventQueue. Contributed by chaosju. --- .../hadoop/yarn/event/AsyncDispatcher.java | 4 ++++ .../hadoop/yarn/event/EventDispatcher.java | 4 ++++ .../resourcemanager/ClusterMetrics.java | 20 ++++++++++++++++ .../resourcemanager/ResourceManager.java | 24 +++++++++++++++++++ .../webapp/MetricsOverviewTable.java | 8 +++++++ .../webapp/dao/ClusterMetricsInfo.java | 13 ++++++++++ .../resourcemanager/webapp/TestNodesPage.java | 2 +- .../webapp/TestRMWebServices.java | 2 +- 8 files changed, 75 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index ba6bb435ec..0915eb4a98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -409,4 +409,8 @@ public void addMetrics(EventTypeMetrics metrics, Class eventClass) { eventTypeMetricsMap.put(eventClass, metrics); } + + public int getEventQueueSize() { + return eventQueue.size(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java index 11cdf150dd..6731bdacac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java @@ -161,4 +161,8 @@ protected long getEventProcessorId() { protected boolean isStopped() { return this.stopped; } + + public int getEventQueueSize() { + return eventQueue.size(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java index 67a3a62087..fa3c5c9094 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java @@ -72,6 +72,10 @@ public class ClusterMetrics { rmEventProcCPUMax; @Metric("# of Containers assigned in the last second") MutableGaugeInt containerAssignedPerSecond; + @Metric("# of rm dispatcher event queue size") + MutableGaugeInt rmDispatcherEventQueueSize; + @Metric("# of scheduler dispatcher event queue size") + MutableGaugeInt schedulerDispatcherEventQueueSize; private boolean rmEventProcMonitorEnable = false; @@ -356,4 +360,20 @@ public void incrNumContainerAssigned() { private ScheduledThreadPoolExecutor getAssignCounterExecutor(){ return assignCounterExecutor; } + + public int getRmEventQueueSize() { + return rmDispatcherEventQueueSize.value(); + } + + public void setRmEventQueueSize(int rmEventQueueSize) { + this.rmDispatcherEventQueueSize.set(rmEventQueueSize); + } + + public int getSchedulerEventQueueSize() { + return schedulerDispatcherEventQueueSize.value(); + } + + public void setSchedulerEventQueueSize(int schedulerEventQueueSize) { + this.schedulerDispatcherEventQueueSize.set(schedulerEventQueueSize); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index b0dc218f3d..a813a8524e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import com.sun.jersey.spi.container.servlet.ServletContainer; @@ -152,6 +153,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -715,6 +718,7 @@ public class RMActiveServices extends CompositeService { private boolean fromActive = false; private StandByTransitionRunnable standByTransitionRunnable; private RMNMInfo rmnmInfo; + private ScheduledThreadPoolExecutor eventQueueMetricExecutor; RMActiveServices(ResourceManager rm) { super("RMActiveServices"); @@ -937,6 +941,23 @@ protected void serviceInit(Configuration configuration) throws Exception { addIfService(volumeManager); } + eventQueueMetricExecutor = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder(). + setDaemon(true).setNameFormat("EventQueueSizeMetricThread"). + build()); + eventQueueMetricExecutor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + int rmEventQueueSize = ((AsyncDispatcher)getRMContext(). + getDispatcher()).getEventQueueSize(); + ClusterMetrics.getMetrics().setRmEventQueueSize(rmEventQueueSize); + int schedulerEventQueueSize = ((EventDispatcher)schedulerDispatcher). + getEventQueueSize(); + ClusterMetrics.getMetrics(). + setSchedulerEventQueueSize(schedulerEventQueueSize); + } + }, 1, 1, TimeUnit.SECONDS); + super.serviceInit(conf); } @@ -1012,6 +1033,9 @@ protected void serviceStop() throws Exception { LOG.error("Error closing store.", e); } } + if (eventQueueMetricExecutor != null) { + eventQueueMetricExecutor.shutdownNow(); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java index 3ce4f2b518..c9922964ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/MetricsOverviewTable.java @@ -205,6 +205,8 @@ protected void render(Block html) { SchedulerInfo schedulerInfo = new SchedulerInfo(this.rm); int schedBusy = clusterMetrics.getRmSchedulerBusyPercent(); + int rmEventQueueSize = clusterMetrics.getRmEventQueueSize(); + int schedulerEventQueueSize = clusterMetrics.getSchedulerEventQueueSize(); div.h3("Scheduler Metrics"). table("#schedulermetricsoverview"). @@ -217,6 +219,10 @@ protected void render(Block html) { th().$class("ui-state-default") .__("Maximum Cluster Application Priority").__(). th().$class("ui-state-default").__("Scheduler Busy %").__(). + th().$class("ui-state-default") + .__("RM Dispatcher EventQueue Size").__(). + th().$class("ui-state-default") + .__("Scheduler Dispatcher EventQueue Size").__(). __(). __(). tbody().$class("ui-widget-content"). @@ -228,6 +234,8 @@ protected void render(Block html) { td(schedulerInfo.getMaxAllocation().toString()). td(String.valueOf(schedulerInfo.getMaxClusterLevelAppPriority())). td(schedBusy == -1 ? UNAVAILABLE : String.valueOf(schedBusy)). + td(String.valueOf(rmEventQueueSize)). + td(String.valueOf(schedulerEventQueueSize)). __(). __().__(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java index 7dc2d8ac1e..e188fa0526 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java @@ -83,6 +83,9 @@ public class ClusterMetricsInfo { private boolean crossPartitionMetricsAvailable = false; + private int rmEventQueueSize; + private int schedulerEventQueueSize; + public ClusterMetricsInfo() { } // JAXB needs this @@ -162,6 +165,8 @@ public ClusterMetricsInfo(final ResourceScheduler rs) { + rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes; this.containerAssignedPerSecond = clusterMetrics .getContainerAssignedPerSecond(); + this.rmEventQueueSize = clusterMetrics.getRmEventQueueSize(); + this.schedulerEventQueueSize = clusterMetrics.getSchedulerEventQueueSize(); } public int getAppsSubmitted() { @@ -419,4 +424,12 @@ public boolean getCrossPartitionMetricsAvailable() { public int getContainerAssignedPerSecond() { return this.containerAssignedPerSecond; } + + public int getRmEventQueueSize() { + return rmEventQueueSize; + } + + public int getSchedulerEventQueueSize() { + return schedulerEventQueueSize; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java index dd271fd34d..891c8d6c32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java @@ -52,7 +52,7 @@ public class TestNodesPage { // Number of Actual Table Headers for NodesPage.NodesBlock might change in // future. In that case this value should be adjusted to the new value. - private final int numberOfThInMetricsTable = 23; + private final int numberOfThInMetricsTable = 25; private final int numberOfActualTableHeaders = 16; private final int numberOfThForOpportunisticContainers = 4; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java index b651c7959a..673fbbe2ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java @@ -474,7 +474,7 @@ public void verifyClusterMetricsJSON(JSONObject json) throws JSONException, Exception { assertEquals("incorrect number of elements", 1, json.length()); JSONObject clusterinfo = json.getJSONObject("clusterMetrics"); - assertEquals("incorrect number of elements", 33, clusterinfo.length()); + assertEquals("incorrect number of elements", 35, clusterinfo.length()); verifyClusterMetrics( clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"), clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),