YARN-8152. Add chart in SLS to illustrate the throughput of the scheduler. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2018-04-27 15:36:21 +08:00
parent 71220d218d
commit ba2db39c65
4 changed files with 155 additions and 10 deletions

View File

@ -36,19 +36,19 @@
</div> </div>
<div class="row"> <div class="row">
<div class="divborder span8" style="margin-left:50px" id="area1"></div> <div class="divborder span8" style="margin-left:50px" id="area1"></div>
<div class="divborder span8" id="area2"></div> <div class="divborder span8" style="margin-left:50px" id="area2"></div>
</div> </div>
<div class="row"> <div class="row">
<div class="divborder span8" style="margin-left:50px" id="area3"></div> <div class="divborder span8" style="margin-left:50px" id="area3"></div>
<div class="divborder span8" id="area4"></div> <div class="divborder span8" style="margin-left:50px" id="area4"></div>
</div> </div>
<div class="row"> <div class="row">
<div class="divborder span8" style="margin-left:50px" id="area5"></div> <div class="divborder span8" style="margin-left:50px" id="area5"></div>
<div class="divborder span8" id="area6"></div> <div class="divborder span8" style="margin-left:50px" id="area6"></div>
</div> </div>
<div class="row"> <div class="row">
<div class="divborder span8" style="margin-left:50px" id="area7"></div> <div class="divborder span8" style="margin-left:50px" id="area7"></div>
<div class="span8" id="area8"></div> <div class="divborder span8" style="margin-left:50px" id="area8"></div>
</div><br/><br/> </div><br/><br/>
<script> <script>
@ -82,7 +82,11 @@
''scheduler.handle-NODE_UPDATE.timecost'', ''scheduler.handle-NODE_UPDATE.timecost'',
''scheduler.handle-APP_ADDED.timecost'', ''scheduler.handle-APP_ADDED.timecost'',
''scheduler.handle-APP_REMOVED.timecost'', ''scheduler.handle-APP_REMOVED.timecost'',
''scheduler.handle-CONTAINER_EXPIRED.timecost'']; ''scheduler.handle-CONTAINER_EXPIRED.timecost'',
''scheduler.commit.success.timecost'',
''scheduler.commit.failure.timecost''];
legends[7] = [''scheduler.commit.success.throughput'',
''scheduler.commit.failure.throughput''];
// title // title
titles[0] = ''Cluster running applications & containers''; titles[0] = ''Cluster running applications & containers'';
@ -91,7 +95,8 @@
titles[3] = ''Cluster allocated & available vcores''; titles[3] = ''Cluster allocated & available vcores'';
titles[4] = ''Queue allocated memory''; titles[4] = ''Queue allocated memory'';
titles[5] = ''Queue allocated vcores''; titles[5] = ''Queue allocated vcores'';
titles[6] = ''Scheduler allocate & handle operation timecost''; titles[6] = ''Scheduler allocate & handle & commit operation timecost'';
titles[7] = ''Scheduler commit success/failure operation throughput'';
// ylabels // ylabels
yLabels[0] = ''Number''; yLabels[0] = ''Number'';
@ -101,12 +106,13 @@
yLabels[4] = ''Memory (GB)''; yLabels[4] = ''Memory (GB)'';
yLabels[5] = ''Number''; yLabels[5] = ''Number'';
yLabels[6] = ''Timecost (ms)''; yLabels[6] = ''Timecost (ms)'';
yLabels[7] = ''Number'';
// is area? // is area?
isAreas = [0, 0, 0, 0, 1, 1, 0]; isAreas = [0, 0, 0, 0, 1, 1, 0, 0];
// draw all charts // draw all charts
for (var i = 0; i < 7; i ++) '{' for (var i = 0; i < 8; i ++) '{'
drawEachChart(i); drawEachChart(i);
'}' '}'
@ -174,7 +180,7 @@
data.push(point); data.push(point);
// clear old // clear old
for (var i = 0; i < 7; i ++) '{' for (var i = 0; i < 8; i ++) '{'
svgs[i].selectAll(''g.tick'').remove(); svgs[i].selectAll(''g.tick'').remove();
svgs[i].selectAll(''g'').remove(); svgs[i].selectAll(''g'').remove();
var color = d3.scale.category10(); var color = d3.scale.category10();

View File

@ -23,6 +23,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -44,6 +45,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -129,6 +131,33 @@ public Allocation allocate(ApplicationAttemptId attemptId,
} }
} }
@Override
public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
boolean updatePending) {
if (metricsON) {
boolean isSuccess = false;
long startTimeNs = System.nanoTime();
try {
isSuccess = super.tryCommit(cluster, r, updatePending);
return isSuccess;
} finally {
long elapsedNs = System.nanoTime() - startTimeNs;
if (isSuccess) {
schedulerMetrics.getSchedulerCommitSuccessTimer()
.update(elapsedNs, TimeUnit.NANOSECONDS);
schedulerMetrics.increaseSchedulerCommitSuccessCounter();
} else {
schedulerMetrics.getSchedulerCommitFailureTimer()
.update(elapsedNs, TimeUnit.NANOSECONDS);
schedulerMetrics.increaseSchedulerCommitFailureCounter();
}
}
} else {
return super.tryCommit(cluster, r, updatePending);
}
}
@Override @Override
public void handle(SchedulerEvent schedulerEvent) { public void handle(SchedulerEvent schedulerEvent) {
if (!metricsON) { if (!metricsON) {

View File

@ -89,11 +89,15 @@ public abstract class SchedulerMetrics {
// counters for scheduler allocate/handle operations // counters for scheduler allocate/handle operations
private Counter schedulerAllocateCounter; private Counter schedulerAllocateCounter;
private Counter schedulerCommitSuccessCounter;
private Counter schedulerCommitFailureCounter;
private Counter schedulerHandleCounter; private Counter schedulerHandleCounter;
private Map<SchedulerEventType, Counter> schedulerHandleCounterMap; private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
// Timers for scheduler allocate/handle operations // Timers for scheduler allocate/handle operations
private Timer schedulerAllocateTimer; private Timer schedulerAllocateTimer;
private Timer schedulerCommitSuccessTimer;
private Timer schedulerCommitFailureTimer;
private Timer schedulerHandleTimer; private Timer schedulerHandleTimer;
private Map<SchedulerEventType, Timer> schedulerHandleTimerMap; private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
private List<Histogram> schedulerHistogramList; private List<Histogram> schedulerHistogramList;
@ -387,6 +391,10 @@ private void registerSchedulerMetrics() {
// counters for scheduler operations // counters for scheduler operations
schedulerAllocateCounter = metrics.counter( schedulerAllocateCounter = metrics.counter(
"counter.scheduler.operation.allocate"); "counter.scheduler.operation.allocate");
schedulerCommitSuccessCounter = metrics.counter(
"counter.scheduler.operation.commit.success");
schedulerCommitFailureCounter = metrics.counter(
"counter.scheduler.operation.commit.failure");
schedulerHandleCounter = metrics.counter( schedulerHandleCounter = metrics.counter(
"counter.scheduler.operation.handle"); "counter.scheduler.operation.handle");
schedulerHandleCounterMap = new HashMap<>(); schedulerHandleCounterMap = new HashMap<>();
@ -401,6 +409,10 @@ private void registerSchedulerMetrics() {
SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
schedulerAllocateTimer = new Timer( schedulerAllocateTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize)); new SlidingWindowReservoir(timeWindowSize));
schedulerCommitSuccessTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize));
schedulerCommitFailureTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimer = new Timer( schedulerHandleTimer = new Timer(
new SlidingWindowReservoir(timeWindowSize)); new SlidingWindowReservoir(timeWindowSize));
schedulerHandleTimerMap = new HashMap<>(); schedulerHandleTimerMap = new HashMap<>();
@ -417,6 +429,20 @@ private void registerSchedulerMetrics() {
schedulerAllocateHistogram); schedulerAllocateHistogram);
schedulerHistogramList.add(schedulerAllocateHistogram); schedulerHistogramList.add(schedulerAllocateHistogram);
histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer);
Histogram schedulerCommitHistogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.commit.success.timecost",
schedulerCommitHistogram);
schedulerHistogramList.add(schedulerCommitHistogram);
histogramTimerMap
.put(schedulerCommitHistogram, schedulerCommitSuccessTimer);
Histogram schedulerCommitFailureHistogram =
new Histogram(new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.commit.failure.timecost",
schedulerCommitFailureHistogram);
schedulerHistogramList.add(schedulerCommitFailureHistogram);
histogramTimerMap
.put(schedulerCommitFailureHistogram, schedulerCommitFailureTimer);
Histogram schedulerHandleHistogram = new Histogram( Histogram schedulerHandleHistogram = new Histogram(
new SlidingWindowReservoir(SAMPLING_SIZE)); new SlidingWindowReservoir(SAMPLING_SIZE));
metrics.register("sampler.scheduler.operation.handle.timecost", metrics.register("sampler.scheduler.operation.handle.timecost",
@ -534,6 +560,14 @@ void increaseSchedulerAllocationCounter() {
schedulerAllocateCounter.inc(); schedulerAllocateCounter.inc();
} }
void increaseSchedulerCommitSuccessCounter() {
schedulerCommitSuccessCounter.inc();
}
void increaseSchedulerCommitFailureCounter() {
schedulerCommitFailureCounter.inc();
}
void increaseSchedulerHandleCounter(SchedulerEventType schedulerEventType) { void increaseSchedulerHandleCounter(SchedulerEventType schedulerEventType) {
schedulerHandleCounter.inc(); schedulerHandleCounter.inc();
schedulerHandleCounterMap.get(schedulerEventType).inc(); schedulerHandleCounterMap.get(schedulerEventType).inc();
@ -543,6 +577,14 @@ Timer getSchedulerAllocateTimer() {
return schedulerAllocateTimer; return schedulerAllocateTimer;
} }
Timer getSchedulerCommitSuccessTimer() {
return schedulerCommitSuccessTimer;
}
Timer getSchedulerCommitFailureTimer() {
return schedulerCommitFailureTimer;
}
Timer getSchedulerHandleTimer() { Timer getSchedulerHandleTimer() {
return schedulerHandleTimer; return schedulerHandleTimer;
} }

View File

@ -69,6 +69,8 @@ public class SLSWebApp extends HttpServlet {
private transient Gauge availableMemoryGauge; private transient Gauge availableMemoryGauge;
private transient Gauge availableVCoresGauge; private transient Gauge availableVCoresGauge;
private transient Histogram allocateTimecostHistogram; private transient Histogram allocateTimecostHistogram;
private transient Histogram commitSuccessTimecostHistogram;
private transient Histogram commitFailureTimecostHistogram;
private transient Histogram handleTimecostHistogram; private transient Histogram handleTimecostHistogram;
private transient Map<SchedulerEventType, Histogram> private transient Map<SchedulerEventType, Histogram>
handleOperTimecostHistogramMap; handleOperTimecostHistogramMap;
@ -81,6 +83,12 @@ public class SLSWebApp extends HttpServlet {
private String simulateTemplate; private String simulateTemplate;
private String trackTemplate; private String trackTemplate;
private transient Counter schedulerCommitSuccessCounter;
private transient Counter schedulerCommitFailureCounter;
private Long lastTrackingTime;
private Long lastSchedulerCommitSuccessCount;
private Long lastSchedulerCommitFailureCount;
{ {
// load templates // load templates
ClassLoader cl = Thread.currentThread().getContextClassLoader(); ClassLoader cl = Thread.currentThread().getContextClassLoader();
@ -386,13 +394,26 @@ public String generateRealTimeTrackingMetrics() {
Double.parseDouble(availableVCoresGauge.getValue().toString()); Double.parseDouble(availableVCoresGauge.getValue().toString());
// scheduler operation // scheduler operation
double allocateTimecost, handleTimecost; double allocateTimecost, commitSuccessTimecost, commitFailureTimecost,
handleTimecost;
if (allocateTimecostHistogram == null && if (allocateTimecostHistogram == null &&
metrics.getHistograms().containsKey( metrics.getHistograms().containsKey(
"sampler.scheduler.operation.allocate.timecost")) { "sampler.scheduler.operation.allocate.timecost")) {
allocateTimecostHistogram = metrics.getHistograms() allocateTimecostHistogram = metrics.getHistograms()
.get("sampler.scheduler.operation.allocate.timecost"); .get("sampler.scheduler.operation.allocate.timecost");
} }
if (commitSuccessTimecostHistogram == null &&
metrics.getHistograms().containsKey(
"sampler.scheduler.operation.commit.success.timecost")) {
commitSuccessTimecostHistogram = metrics.getHistograms()
.get("sampler.scheduler.operation.commit.success.timecost");
}
if (commitFailureTimecostHistogram == null &&
metrics.getHistograms().containsKey(
"sampler.scheduler.operation.commit.failure.timecost")) {
commitFailureTimecostHistogram = metrics.getHistograms()
.get("sampler.scheduler.operation.commit.failure.timecost");
}
if (handleTimecostHistogram == null && if (handleTimecostHistogram == null &&
metrics.getHistograms().containsKey( metrics.getHistograms().containsKey(
"sampler.scheduler.operation.handle.timecost")) { "sampler.scheduler.operation.handle.timecost")) {
@ -401,6 +422,10 @@ public String generateRealTimeTrackingMetrics() {
} }
allocateTimecost = allocateTimecostHistogram == null ? 0.0 : allocateTimecost = allocateTimecostHistogram == null ? 0.0 :
allocateTimecostHistogram.getSnapshot().getMean()/1000000; allocateTimecostHistogram.getSnapshot().getMean()/1000000;
commitSuccessTimecost = commitSuccessTimecostHistogram == null ? 0.0 :
commitSuccessTimecostHistogram.getSnapshot().getMean()/1000000;
commitFailureTimecost = commitFailureTimecostHistogram == null ? 0.0 :
commitFailureTimecostHistogram.getSnapshot().getMean()/1000000;
handleTimecost = handleTimecostHistogram == null ? 0.0 : handleTimecost = handleTimecostHistogram == null ? 0.0 :
handleTimecostHistogram.getSnapshot().getMean()/1000000; handleTimecostHistogram.getSnapshot().getMean()/1000000;
// various handle operation // various handle operation
@ -447,6 +472,41 @@ public String generateRealTimeTrackingMetrics() {
queueAllocatedVCoresMap.put(queue, queueAllocatedVCores); queueAllocatedVCoresMap.put(queue, queueAllocatedVCores);
} }
// calculate commit throughput, unit is number/second
if (schedulerCommitSuccessCounter == null && metrics.getCounters()
.containsKey("counter.scheduler.operation.commit.success")) {
schedulerCommitSuccessCounter = metrics.getCounters()
.get("counter.scheduler.operation.commit.success");
}
if (schedulerCommitFailureCounter == null && metrics.getCounters()
.containsKey("counter.scheduler.operation.commit.failure")) {
schedulerCommitFailureCounter = metrics.getCounters()
.get("counter.scheduler.operation.commit.failure");
}
long schedulerCommitSuccessThroughput = 0;
long schedulerCommitFailureThroughput = 0;
if (schedulerCommitSuccessCounter != null
&& schedulerCommitFailureCounter != null) {
long currentTrackingTime = System.currentTimeMillis();
long currentSchedulerCommitSucessCount =
schedulerCommitSuccessCounter.getCount();
long currentSchedulerCommitFailureCount =
schedulerCommitFailureCounter.getCount();
if (lastTrackingTime != null) {
double intervalSeconds =
(double) (currentTrackingTime - lastTrackingTime) / 1000;
schedulerCommitSuccessThroughput = Math.round(
(currentSchedulerCommitSucessCount
- lastSchedulerCommitSuccessCount) / intervalSeconds);
schedulerCommitFailureThroughput = Math.round(
(currentSchedulerCommitFailureCount
- lastSchedulerCommitFailureCount) / intervalSeconds);
}
lastTrackingTime = currentTrackingTime;
lastSchedulerCommitSuccessCount = currentSchedulerCommitSucessCount;
lastSchedulerCommitFailureCount = currentSchedulerCommitFailureCount;
}
// package results // package results
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("{"); sb.append("{");
@ -469,6 +529,14 @@ public String generateRealTimeTrackingMetrics() {
} }
// scheduler allocate & handle // scheduler allocate & handle
sb.append(",\"scheduler.allocate.timecost\":").append(allocateTimecost); sb.append(",\"scheduler.allocate.timecost\":").append(allocateTimecost);
sb.append(",\"scheduler.commit.success.timecost\":")
.append(commitSuccessTimecost);
sb.append(",\"scheduler.commit.failure.timecost\":")
.append(commitFailureTimecost);
sb.append(",\"scheduler.commit.success.throughput\":")
.append(schedulerCommitSuccessThroughput);
sb.append(",\"scheduler.commit.failure.throughput\":")
.append(schedulerCommitFailureThroughput);
sb.append(",\"scheduler.handle.timecost\":").append(handleTimecost); sb.append(",\"scheduler.handle.timecost\":").append(handleTimecost);
for (SchedulerEventType e : SchedulerEventType.values()) { for (SchedulerEventType e : SchedulerEventType.values()) {
sb.append(",\"scheduler.handle-").append(e).append(".timecost\":") sb.append(",\"scheduler.handle-").append(e).append(".timecost\":")