diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index bb7242f29c..cdd954d670 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -10,6 +10,8 @@ Trunk (unreleased changes) (Plamen Jeliazkov via shv) IMPROVEMENTS + MAPREDUCE-3008. Improvements to cumulative CPU emulation for short running + tasks in Gridmix. (amarrk) MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java index e48106890f..d0d2f9183f 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -72,7 +73,7 @@ public Job run() throws IOException, ClassNotFoundException, job.setNumReduceTasks(jobdesc.getNumberReduces()); job.setMapOutputKeyClass(GridmixKey.class); job.setMapOutputValueClass(GridmixRecord.class); - job.setSortComparatorClass(GridmixKey.Comparator.class); + job.setSortComparatorClass(LoadSortComparator.class); job.setGroupingComparatorClass(SpecGroupingComparator.class); job.setInputFormatClass(LoadInputFormat.class); job.setOutputFormatClass(RawBytesOutputFormat.class); @@ -93,18 +94,85 @@ protected boolean canEmulateCompression() { return true; } + /** + * This is a load matching key comparator which will make sure that the + * resource usage load is matched even when the framework is in control. + */ + public static class LoadSortComparator extends GridmixKey.Comparator { + private ResourceUsageMatcherRunner matcher = null; + private boolean isConfigured = false; + + public LoadSortComparator() { + super(); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + configure(); + int ret = super.compare(b1, s1, l1, b2, s2, l2); + if (matcher != null) { + try { + matcher.match(); // match the resource usage now + } catch (Exception e) {} + } + return ret; + } + + //TODO Note that the sorter will be instantiated 2 times as follows + // 1. During the sort/spill in the map phase + // 2. During the merge in the sort phase + // We need the handle to the matcher thread only in (2). + // This logic can be relaxed to run only in (2). + private void configure() { + if (!isConfigured) { + ThreadGroup group = Thread.currentThread().getThreadGroup(); + Thread[] threads = new Thread[group.activeCount() * 2]; + group.enumerate(threads, true); + for (Thread t : threads) { + if (t != null && (t instanceof ResourceUsageMatcherRunner)) { + this.matcher = (ResourceUsageMatcherRunner) t; + isConfigured = true; + break; + } + } + } + } + } + /** * This is a progress based resource usage matcher. */ @SuppressWarnings("unchecked") - static class ResourceUsageMatcherRunner extends Thread { + static class ResourceUsageMatcherRunner extends Thread + implements Progressive { private final ResourceUsageMatcher matcher; - private final Progressive progress; + private final BoostingProgress progress; private final long sleepTime; private static final String SLEEP_CONFIG = "gridmix.emulators.resource-usage.sleep-duration"; private static final long DEFAULT_SLEEP_TIME = 100; // 100ms + /** + * This is a progress bar that can be boosted for weaker use-cases. + */ + private static class BoostingProgress implements Progressive { + private float boostValue = 0f; + TaskInputOutputContext context; + + BoostingProgress(TaskInputOutputContext context) { + this.context = context; + } + + void setBoostValue(float boostValue) { + this.boostValue = boostValue; + } + + @Override + public float getProgress() { + return Math.min(1f, context.getProgress() + boostValue); + } + } + ResourceUsageMatcherRunner(final TaskInputOutputContext context, ResourceUsageMetrics metrics) { Configuration conf = context.getConfiguration(); @@ -118,19 +186,14 @@ static class ResourceUsageMatcherRunner extends Thread { // set the other parameters this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME); - progress = new Progressive() { - @Override - public float getProgress() { - return context.getProgress(); - } - }; + progress = new BoostingProgress(context); // instantiate a resource-usage-matcher matcher = new ResourceUsageMatcher(); matcher.configure(conf, plugin, metrics, progress); } - protected void match() throws Exception { + protected void match() throws IOException, InterruptedException { // match the resource usage matcher.matchResourceUsage(); } @@ -157,21 +220,34 @@ public void run() { + " thread! Exiting.", e); } } + + @Override + public float getProgress() { + return matcher.getProgress(); + } + + // boost the progress bar as fasten up the emulation cycles. + void boost(float value) { + progress.setBoostValue(value); + } } // Makes sure that the TaskTracker doesn't kill the map/reduce tasks while // they are emulating private static class StatusReporter extends Thread { - private TaskAttemptContext context; - StatusReporter(TaskAttemptContext context) { + private final TaskAttemptContext context; + private final Progressive progress; + + StatusReporter(TaskAttemptContext context, Progressive progress) { this.context = context; + this.progress = progress; } @Override public void run() { LOG.info("Status reporter thread started."); try { - while (context.getProgress() < 1) { + while (!isInterrupted() && progress.getProgress() < 1) { // report progress context.progress(); @@ -277,7 +353,7 @@ protected void setup(Context ctxt) split.getMapResourceUsageMetrics()); // start the status reporter thread - reporter = new StatusReporter(ctxt); + reporter = new StatusReporter(ctxt, matcher); reporter.start(); } @@ -324,6 +400,17 @@ public void cleanup(Context context) } } + // check if the thread will get a chance to run or not + // check if there will be a sort&spill->merge phase or not + // check if the final sort&spill->merge phase is gonna happen or not + if (context.getNumReduceTasks() > 0 + && context.getCounter(TaskCounter.SPILLED_RECORDS).getValue() == 0) { + LOG.info("Boosting the map phase progress."); + // add the sort phase progress to the map phase and emulate + matcher.boost(0.33f); + matcher.match(); + } + // start the matcher thread since the map phase ends here matcher.start(); } @@ -392,7 +479,7 @@ protected void setup(Context context) matcher = new ResourceUsageMatcherRunner(context, metrics); // start the status reporter thread - reporter = new StatusReporter(context); + reporter = new StatusReporter(context, matcher); reporter.start(); } @Override @@ -528,9 +615,13 @@ void buildSplits(FilePool inputDir) throws IOException { specRecords[j] = info.getOutputRecords(); metrics[j] = info.getResourceUsageMetrics(); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i, + LOG.debug(String.format("SPEC(%d) %d -> %d %d %d %d %d %d %d", id(), i, i + j * maps, info.getOutputRecords(), - info.getOutputBytes())); + info.getOutputBytes(), + info.getResourceUsageMetrics().getCumulativeCpuUsage(), + info.getResourceUsageMetrics().getPhysicalMemoryUsage(), + info.getResourceUsageMetrics().getVirtualMemoryUsage(), + info.getResourceUsageMetrics().getHeapUsage())); } } final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i); diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java index 8f4af1add0..22acb42728 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java @@ -67,7 +67,7 @@ public class CumulativeCpuUsageEmulatorPlugin private float emulationInterval; // emulation interval private long targetCpuUsage = 0; private float lastSeenProgress = 0; - private long lastSeenCpuUsageCpuUsage = 0; + private long lastSeenCpuUsage = 0; // Configuration parameters public static final String CPU_EMULATION_PROGRESS_INTERVAL = @@ -229,6 +229,15 @@ private float getWeightForProgressInterval(float progress) { return progress * progress * progress * progress; } + private synchronized long getCurrentCPUUsage() { + return monitor.getProcResourceValues().getCumulativeCpuTime(); + } + + @Override + public float getProgress() { + return Math.min(1f, ((float)getCurrentCPUUsage())/targetCpuUsage); + } + @Override //TODO Multi-threading for speedup? public void emulate() throws IOException, InterruptedException { @@ -249,10 +258,9 @@ public void emulate() throws IOException, InterruptedException { // Note that (Cc-Cl)/(Pc-Pl) is termed as 'rate' in the following // section - long currentCpuUsage = - monitor.getProcResourceValues().getCumulativeCpuTime(); + long currentCpuUsage = getCurrentCPUUsage(); // estimate the cpu usage rate - float rate = (currentCpuUsage - lastSeenCpuUsageCpuUsage) + float rate = (currentCpuUsage - lastSeenCpuUsage) / (currentProgress - lastSeenProgress); long projectedUsage = currentCpuUsage + (long)((1 - currentProgress) * rate); @@ -264,8 +272,7 @@ public void emulate() throws IOException, InterruptedException { (long)(targetCpuUsage * getWeightForProgressInterval(currentProgress)); - while (monitor.getProcResourceValues().getCumulativeCpuTime() - < currentWeighedTarget) { + while (getCurrentCPUUsage() < currentWeighedTarget) { emulatorCore.compute(); // sleep for 100ms try { @@ -281,8 +288,7 @@ public void emulate() throws IOException, InterruptedException { // set the last seen progress lastSeenProgress = progress.getProgress(); // set the last seen usage - lastSeenCpuUsageCpuUsage = - monitor.getProcResourceValues().getCumulativeCpuTime(); + lastSeenCpuUsage = getCurrentCPUUsage(); } } } @@ -310,6 +316,6 @@ public void initialize(Configuration conf, ResourceUsageMetrics metrics, // initialize the states lastSeenProgress = 0; - lastSeenCpuUsageCpuUsage = 0; + lastSeenCpuUsage = 0; } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java index 7d40cfd5e7..bff45fc545 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java @@ -42,7 +42,7 @@ * For configuring GridMix to load and and use a resource usage emulator, * see {@link ResourceUsageMatcher}. */ -public interface ResourceUsageEmulatorPlugin { +public interface ResourceUsageEmulatorPlugin extends Progressive { /** * Initialize the plugin. This might involve * - initializing the variables diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java index 917cd09372..b10ad43460 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred.gridmix.emulators.resourceusage; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -35,7 +36,7 @@ *
Note that the order in which the emulators are invoked is same as the * order in which they are configured. */ -public class ResourceUsageMatcher { +public class ResourceUsageMatcher implements Progressive { /** * Configuration key to set resource usage emulators. */ @@ -80,10 +81,31 @@ public void configure(Configuration conf, ResourceCalculatorPlugin monitor, } } - public void matchResourceUsage() throws Exception { + public void matchResourceUsage() throws IOException, InterruptedException { for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) { // match the resource usage emulator.emulate(); } } + + /** + * Returns the average progress. + */ + @Override + public float getProgress() { + if (emulationPlugins.size() > 0) { + // return the average progress + float progress = 0f; + for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) { + // consider weighted progress of each emulator + progress += emulator.getProgress(); + } + + return progress / emulationPlugins.size(); + } + + // if no emulators are configured then return 1 + return 1f; + + } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java index a50358a41a..3af1f3558f 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java @@ -186,6 +186,11 @@ protected long getMaxHeapUsageInMB() { return Runtime.getRuntime().maxMemory() / ONE_MB; } + @Override + public float getProgress() { + return Math.min(1f, ((float)getTotalHeapUsageInMB())/targetHeapUsageInMB); + } + @Override public void emulate() throws IOException, InterruptedException { if (enabled) { diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java index 35db026807..f55e8ac9db 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java @@ -135,6 +135,14 @@ static long testEmulation(String id, Configuration conf) ? fs.getFileStatus(testPath).getModificationTime() : 0; } + + @Override + public float getProgress() { + try { + return fs.exists(touchPath) ? 1.0f : 0f; + } catch (IOException ioe) {} + return 0f; + } } /**