MAPREDUCE-2323. Add metrics to the fair scheduler. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1143252 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-07-06 05:11:42 +00:00
parent d782c40d58
commit 99f0b7d8cd
11 changed files with 388 additions and 8 deletions

View File

@ -31,6 +31,8 @@ Trunk (unreleased changes)
deployment layout to be consistent across the binary tgz, rpm, and deb.
(Eric Yang via omalley)
MAPREDUCE-2323. Add metrics to the fair scheduler. (todd)
IMPROVEMENTS
MAPREDUCE-2563. [Gridmix] Add High-Ram emulation system tests to

View File

@ -307,6 +307,7 @@
errorProperty="tests.failed" failureProperty="tests.failed"
timeout="${test.timeout}">
<assertions><enable/></assertions>
<sysproperty key="test.build.data" value="${build.test}/data"/>
<sysproperty key="build.test" value="${build.test}"/>
<sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />

View File

@ -38,6 +38,9 @@
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.util.ReflectionUtils;
/**
@ -71,6 +74,7 @@ public class FairScheduler extends TaskScheduler {
protected Map<JobInProgress, JobInfo> infos = // per-job scheduling variables
new HashMap<JobInProgress, JobInfo>();
protected long lastUpdateTime; // Time when we last updated infos
protected long lastPreemptionUpdateTime; // Time when we last updated preemption vars
protected boolean initialized; // Are we initialized?
protected volatile boolean running; // Are we running?
protected boolean assignMultiple; // Simultaneously assign map and reduce?
@ -210,6 +214,9 @@ public void start() {
infoServer.addServlet("scheduler", "/scheduler",
FairSchedulerServlet.class);
}
initMetrics();
eventLog.log("INITIALIZED");
} catch (Exception e) {
// Can't load one of the managers - crash the JobTracker now while it is
@ -219,6 +226,8 @@ public void start() {
LOG.info("Successfully configured FairScheduler");
}
private MetricsUpdater metricsUpdater; // responsible for pushing hadoop metrics
/**
* Returns the LoadManager object used by the Fair Share scheduler
*/
@ -226,6 +235,16 @@ LoadManager getLoadManager() {
return loadMgr;
}
/**
* Register metrics for the fair scheduler, and start a thread
* to update them periodically.
*/
private void initMetrics() {
MetricsContext context = MetricsUtil.getContext("fairscheduler");
metricsUpdater = new MetricsUpdater();
context.registerUpdater(metricsUpdater);
}
@Override
public void terminate() throws IOException {
if (eventLog != null)
@ -236,6 +255,11 @@ public void terminate() throws IOException {
taskTrackerManager.removeJobInProgressListener(jobListener);
if (eventLog != null)
eventLog.shutdown();
if (metricsUpdater != null) {
MetricsContext context = MetricsUtil.getContext("fairscheduler");
context.unregisterUpdater(metricsUpdater);
metricsUpdater = null;
}
}
@ -298,8 +322,7 @@ public void jobAdded(JobInProgress job) {
public void jobRemoved(JobInProgress job) {
synchronized (FairScheduler.this) {
eventLog.log("JOB_REMOVED", job.getJobID());
poolMgr.removeJob(job);
infos.remove(job);
jobNoLongerRunning(job);
}
}
@ -331,6 +354,20 @@ public void run() {
}
}
}
/**
* Responsible for updating metrics when the metrics context requests it.
*/
private class MetricsUpdater implements Updater {
@Override
public void doUpdates(MetricsContext context) {
updateMetrics();
}
}
synchronized void updateMetrics() {
poolMgr.updateMetrics();
}
@Override
public synchronized List<Task> assignTasks(TaskTracker tracker)
@ -617,8 +654,7 @@ protected void update() {
}
}
for (JobInProgress job: toRemove) {
infos.remove(job);
poolMgr.removeJob(job);
jobNoLongerRunning(job);
}
updateRunnability(); // Set job runnability based on user/pool limits
@ -647,6 +683,16 @@ protected void update() {
updatePreemptionVariables();
}
}
private void jobNoLongerRunning(JobInProgress job) {
assert Thread.holdsLock(this);
JobInfo info = infos.remove(job);
if (info != null) {
info.mapSchedulable.cleanupMetrics();
info.reduceSchedulable.cleanupMetrics();
}
poolMgr.removeJob(job);
}
public List<PoolSchedulable> getPoolSchedulables(TaskType type) {
List<PoolSchedulable> scheds = new ArrayList<PoolSchedulable>();
@ -744,6 +790,7 @@ private int getTotalSlots(TaskType type, ClusterStatus clusterStatus) {
*/
private void updatePreemptionVariables() {
long now = clock.getTime();
lastPreemptionUpdateTime = now;
for (TaskType type: MAP_AND_REDUCE) {
for (PoolSchedulable sched: getPoolSchedulables(type)) {
if (!isStarvedForMinShare(sched)) {
@ -1044,4 +1091,11 @@ public FairSchedulerEventLog getEventLog() {
public JobInfo getJobInfo(JobInProgress job) {
return infos.get(job);
}
boolean isPreemptionEnabled() {
return preemptionEnabled;
}
long getLastPreemptionUpdateTime() {
return lastPreemptionUpdateTime;
}
}

View File

@ -35,6 +35,13 @@ public JobSchedulable(FairScheduler scheduler, JobInProgress job,
this.scheduler = scheduler;
this.job = job;
this.taskType = taskType;
initMetrics();
}
@Override
public TaskType getTaskType() {
return taskType;
}
@Override
@ -151,4 +158,18 @@ public Task assignTask(TaskTrackerStatus tts, long currentTime,
return null;
}
}
@Override
protected String getMetricsContextName() {
return "jobs";
}
@Override
void updateMetrics() {
assert metrics != null;
super.setMetricValues(metrics);
metrics.update();
}
}

View File

@ -22,6 +22,7 @@
import java.util.Collection;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.metrics.MetricsContext;
/**
* A schedulable pool of jobs.
@ -91,4 +92,9 @@ public PoolSchedulable getReduceSchedulable() {
public PoolSchedulable getSchedulable(TaskType type) {
return type == TaskType.MAP ? mapSchedulable : reduceSchedulable;
}
public void updateMetrics() {
mapSchedulable.updateMetrics();
reduceSchedulable.updateMetrics();
}
}

View File

@ -37,6 +37,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.metrics.MetricsContext;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@ -536,4 +537,10 @@ public long getMinSharePreemptionTimeout(String pool) {
public long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
}
synchronized void updateMetrics() {
for (Pool pool : pools.values()) {
pool.updateMetrics();
}
}
}

View File

@ -54,6 +54,8 @@ public PoolSchedulable(FairScheduler scheduler, Pool pool, TaskType type) {
long currentTime = scheduler.getClock().getTime();
this.lastTimeAtMinShare = currentTime;
this.lastTimeAtHalfFairShare = currentTime;
initMetrics();
}
public void addJob(JobInProgress job) {
@ -171,6 +173,7 @@ Pool getPool() {
return pool;
}
@Override
public TaskType getTaskType() {
return taskType;
}
@ -194,4 +197,25 @@ public long getLastTimeAtHalfFairShare() {
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
}
protected String getMetricsContextName() {
return "pools";
}
@Override
public void updateMetrics() {
super.setMetricValues(metrics);
if (scheduler.isPreemptionEnabled()) {
// These won't be set if preemption is off
long lastCheck = scheduler.getLastPreemptionUpdateTime();
metrics.setMetric("millisSinceAtMinShare", lastCheck - lastTimeAtMinShare);
metrics.setMetric("millisSinceAtHalfFairShare", lastCheck - lastTimeAtHalfFairShare);
}
metrics.update();
for (JobSchedulable job : jobScheds) {
job.updateMetrics();
}
}
}

View File

@ -21,6 +21,11 @@
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
/**
* A Schedulable represents an entity that can launch tasks, such as a job
* or a pool. It provides a common interface so that algorithms such as fair
@ -53,13 +58,19 @@
abstract class Schedulable {
/** Fair share assigned to this Schedulable */
private double fairShare = 0;
protected MetricsRecord metrics;
/**
* Name of job/pool, used for debugging as well as for breaking ties in
* scheduling order deterministically.
*/
public abstract String getName();
/**
* @return the type of tasks that this pool schedules
*/
public abstract TaskType getTaskType();
/**
* Maximum number of tasks required by this Schedulable. This is defined as
* number of currently running tasks + number of unlaunched tasks (tasks that
@ -122,6 +133,35 @@ public double getFairShare() {
return fairShare;
}
/** Return the name of the metrics context for this schedulable */
protected abstract String getMetricsContextName();
/**
* Set up metrics context
*/
protected void initMetrics() {
MetricsContext metricsContext = MetricsUtil.getContext("fairscheduler");
this.metrics = MetricsUtil.createRecord(metricsContext,
getMetricsContextName());
metrics.setTag("name", getName());
metrics.setTag("taskType", getTaskType().toString());
}
void cleanupMetrics() {
metrics.remove();
metrics = null;
}
protected void setMetricValues(MetricsRecord metrics) {
metrics.setMetric("fairShare", (float)getFairShare());
metrics.setMetric("minShare", getMinShare());
metrics.setMetric("demand", getDemand());
metrics.setMetric("weight", (float)getWeight());
metrics.setMetric("runningTasks", getRunningTasks());
}
abstract void updateMetrics();
/** Convenient toString implementation for debugging. */
@Override
public String toString() {

View File

@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.mapreduce.TaskType;
/**
* Dummy implementation of Schedulable for unit testing.
*/
@ -105,4 +107,18 @@ public void redistributeShare() {}
@Override
public void updateDemand() {}
@Override
public TaskType getTaskType() {
return TaskType.MAP;
}
@Override
protected String getMetricsContextName() {
return "fake";
}
@Override
void updateMetrics() {
}
}

View File

@ -47,6 +47,11 @@
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.metrics.ContextFactory;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
import org.apache.hadoop.metrics.spi.OutputRecord;
import org.apache.hadoop.net.Node;
import org.mortbay.log.Log;
@ -516,7 +521,10 @@ public String getRack(String hostname) {
}
private void setUpCluster(int numRacks, int numNodesPerRack,
boolean assignMultiple) {
boolean assignMultiple) throws IOException {
resetMetrics();
conf = new JobConf();
conf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
conf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
@ -534,6 +542,20 @@ private void setUpCluster(int numRacks, int numNodesPerRack,
scheduler.start();
}
/**
* Set up a metrics context that doesn't emit anywhere but stores the data
* so we can verify it. Also clears it of any data so that different test
* cases don't pollute each other.
*/
private void resetMetrics() throws IOException {
ContextFactory factory = ContextFactory.getFactory();
factory.setAttribute("fairscheduler.class",
NoEmitMetricsContext.class.getName());
MetricsUtil.getContext("fairscheduler").createRecord("jobs").remove();
MetricsUtil.getContext("fairscheduler").createRecord("pools").remove();
}
@Override
protected void tearDown() throws Exception {
if (scheduler != null) {
@ -689,7 +711,8 @@ public void testSmallJobs() throws IOException {
assertEquals(1, info1.reduceSchedulable.getDemand());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(1.0, info1.reduceSchedulable.getFairShare());
verifyMetrics();
// Advance time before submitting another job j2, to make j1 run before j2
// deterministically.
advanceTime(100);
@ -709,6 +732,7 @@ public void testSmallJobs() throws IOException {
assertEquals(2, info2.reduceSchedulable.getDemand());
assertEquals(1.0, info2.mapSchedulable.getFairShare());
assertEquals(2.0, info2.reduceSchedulable.getFairShare());
verifyMetrics();
// Assign tasks and check that jobs alternate in filling slots
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
@ -729,8 +753,8 @@ public void testSmallJobs() throws IOException {
assertEquals(2, info2.reduceSchedulable.getRunningTasks());
assertEquals(1, info2.mapSchedulable.getDemand());
assertEquals(2, info2.reduceSchedulable.getDemand());
verifyMetrics();
}
/**
* This test is identical to testSmallJobs but sets assignMultiple to
* true so that multiple tasks can be assigned per heartbeat.
@ -748,6 +772,7 @@ public void testSmallJobsWithAssignMultiple() throws IOException {
assertEquals(1, info1.reduceSchedulable.getDemand());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(1.0, info1.reduceSchedulable.getFairShare());
verifyMetrics();
// Advance time before submitting another job j2, to make j1 run before j2
// deterministically.
@ -768,6 +793,7 @@ public void testSmallJobsWithAssignMultiple() throws IOException {
assertEquals(2, info2.reduceSchedulable.getDemand());
assertEquals(1.0, info2.mapSchedulable.getFairShare());
assertEquals(2.0, info2.reduceSchedulable.getFairShare());
verifyMetrics();
// Assign tasks and check that jobs alternate in filling slots
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
@ -788,6 +814,7 @@ public void testSmallJobsWithAssignMultiple() throws IOException {
assertEquals(2, info2.reduceSchedulable.getRunningTasks());
assertEquals(1, info2.mapSchedulable.getDemand());
assertEquals(2, info2.reduceSchedulable.getDemand());
verifyMetrics();
}
/**
@ -1632,6 +1659,7 @@ public void testPoolWeights() throws Exception {
assertEquals(0.28, info3.reduceSchedulable.getFairShare(), 0.01);
assertEquals(0.28, info4.mapSchedulable.getFairShare(), 0.01);
assertEquals(0.28, info4.reduceSchedulable.getFairShare(), 0.01);
verifyMetrics();
}
/**
@ -2730,6 +2758,65 @@ public void testPoolAssignment() throws Exception {
assertEquals(33, poolA.getMapSchedulable().getDemand());
assertEquals(39, poolA.getReduceSchedulable().getDemand());
}
/**
* Test switching a job from one pool to another, then back to the original
* one. This is a regression test for a bug seen during development of
* MAPREDUCE-2323 (fair scheduler metrics).
*/
public void testSetPoolTwice() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"default\">");
out.println("<schedulingMode>fair</schedulingMode>");
out.println("</pool>");
out.println("<pool name=\"poolA\">");
out.println("<schedulingMode>fair</schedulingMode>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
Pool defaultPool = scheduler.getPoolManager().getPool("default");
Pool poolA = scheduler.getPoolManager().getPool("poolA");
// Submit a job to the default pool. All specifications take default values.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 3);
assertEquals(1, defaultPool.getMapSchedulable().getDemand());
assertEquals(3, defaultPool.getReduceSchedulable().getDemand());
assertEquals(0, poolA.getMapSchedulable().getDemand());
assertEquals(0, poolA.getReduceSchedulable().getDemand());
// Move job to poolA and make sure demand moves with it
scheduler.getPoolManager().setPool(job1, "poolA");
assertEquals("poolA", scheduler.getPoolManager().getPoolName(job1));
defaultPool.getMapSchedulable().updateDemand();
defaultPool.getReduceSchedulable().updateDemand();
poolA.getMapSchedulable().updateDemand();
poolA.getReduceSchedulable().updateDemand();
assertEquals(0, defaultPool.getMapSchedulable().getDemand());
assertEquals(0, defaultPool.getReduceSchedulable().getDemand());
assertEquals(1, poolA.getMapSchedulable().getDemand());
assertEquals(3, poolA.getReduceSchedulable().getDemand());
// Move back to default pool and make sure demand goes back
scheduler.getPoolManager().setPool(job1, "default");
assertEquals("default", scheduler.getPoolManager().getPoolName(job1));
defaultPool.getMapSchedulable().updateDemand();
defaultPool.getReduceSchedulable().updateDemand();
poolA.getMapSchedulable().updateDemand();
poolA.getReduceSchedulable().updateDemand();
assertEquals(1, defaultPool.getMapSchedulable().getDemand());
assertEquals(3, defaultPool.getReduceSchedulable().getDemand());
assertEquals(0, poolA.getMapSchedulable().getDemand());
assertEquals(0, poolA.getReduceSchedulable().getDemand());
}
private void advanceTime(long time) {
clock.advance(time);
@ -2828,4 +2915,108 @@ public void testFairSharePreemptionWithShortTimeout() throws Exception {
assertNull(scheduler.assignTasks(tracker("tt1")));
assertNull(scheduler.assignTasks(tracker("tt2")));
}
/**
* Ask scheduler to update metrics and then verify that they're all
* correctly published to the metrics context
*/
private void verifyMetrics() {
scheduler.updateMetrics();
verifyPoolMetrics();
verifyJobMetrics();
}
/**
* Verify that pool-level metrics match internal data
*/
private void verifyPoolMetrics() {
MetricsContext ctx = MetricsUtil.getContext("fairscheduler");
Collection<OutputRecord> records = ctx.getAllRecords().get("pools");
try {
assertEquals(scheduler.getPoolSchedulables(TaskType.MAP).size() * 2,
records.size());
} catch (Error e) {
for (OutputRecord rec : records) {
System.err.println("record:");
System.err.println(" name: " + rec.getTag("name"));
System.err.println(" type: " + rec.getTag("type"));
}
throw e;
}
Map<String, OutputRecord> byPoolAndType =
new HashMap<String, OutputRecord>();
for (OutputRecord rec : records) {
String pool = (String)rec.getTag("name");
String type = (String)rec.getTag("taskType");
assertNotNull(pool);
assertNotNull(type);
byPoolAndType.put(pool + "_" + type, rec);
}
List<PoolSchedulable> poolScheds = new ArrayList<PoolSchedulable>();
poolScheds.addAll(scheduler.getPoolSchedulables(TaskType.MAP));
poolScheds.addAll(scheduler.getPoolSchedulables(TaskType.REDUCE));
for (PoolSchedulable pool : poolScheds) {
String poolName = pool.getName();
OutputRecord metrics = byPoolAndType.get(
poolName + "_" + pool.getTaskType().toString());
assertNotNull("Need metrics for " + pool, metrics);
verifySchedulableMetrics(pool, metrics);
}
}
/**
* Verify that the job-level metrics match internal data
*/
private void verifyJobMetrics() {
MetricsContext ctx = MetricsUtil.getContext("fairscheduler");
Collection<OutputRecord> records = ctx.getAllRecords().get("jobs");
System.out.println("Checking job metrics...");
Map<String, OutputRecord> byJobIdAndType =
new HashMap<String, OutputRecord>();
for (OutputRecord rec : records) {
String jobId = (String)rec.getTag("name");
String type = (String)rec.getTag("taskType");
assertNotNull(jobId);
assertNotNull(type);
byJobIdAndType.put(jobId + "_" + type, rec);
System.out.println("Got " + type + " metrics for job: " + jobId);
}
assertEquals(scheduler.infos.size() * 2, byJobIdAndType.size());
for (Map.Entry<JobInProgress, JobInfo> entry :
scheduler.infos.entrySet()) {
JobInfo info = entry.getValue();
String jobId = entry.getKey().getJobID().toString();
OutputRecord mapMetrics = byJobIdAndType.get(jobId + "_MAP");
assertNotNull("Job " + jobId + " should have map metrics", mapMetrics);
verifySchedulableMetrics(info.mapSchedulable, mapMetrics);
OutputRecord reduceMetrics = byJobIdAndType.get(jobId + "_REDUCE");
assertNotNull("Job " + jobId + " should have reduce metrics", reduceMetrics);
verifySchedulableMetrics(info.reduceSchedulable, reduceMetrics);
}
}
/**
* Verify that the metrics for a given Schedulable are correct
*/
private void verifySchedulableMetrics(
Schedulable sched, OutputRecord metrics) {
assertEquals(sched.getRunningTasks(), metrics.getMetric("runningTasks"));
assertEquals(sched.getDemand(), metrics.getMetric("demand"));
assertEquals(sched.getFairShare(),
metrics.getMetric("fairShare").doubleValue(), .001);
assertEquals(sched.getWeight(),
metrics.getMetric("weight").doubleValue(), .001);
}
}

View File

@ -517,6 +517,24 @@
<em>NewJobWeightBooster</em> are enabled.</li>
</ul>
</section>
<section>
<title>Metrics</title>
<p>
The fair scheduler can export metrics using the Hadoop metrics interface.
This can be enabled by adding an entry to <code>hadoop-metrics.properties</code>
to enable the <code>fairscheduler</code> metrics context. For example, to
simply retain the metrics in memory so they may be viewed in the <code>/metrics</code>
servlet:
</p>
<p>
<code>fairscheduler.class=org.apache.hadoop.metrics.spi.NoEmitMetricsContext</code>
</p>
<p>
Metrics are generated for each pool and job, and contain the same information that
is visible on the <code>/scheduler</code> web page.
</p>
</section>
<!--
<section>
<title>Implementation</title>