MAPREDUCE-3008. Improvements to cumulative CPU emulation for short running tasks in Gridmix. (amarrk)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1179933 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
77f79ee92b
commit
c1c0e8c9ea
@ -10,6 +10,8 @@ Trunk (unreleased changes)
|
|||||||
(Plamen Jeliazkov via shv)
|
(Plamen Jeliazkov via shv)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
MAPREDUCE-3008. Improvements to cumulative CPU emulation for short running
|
||||||
|
tasks in Gridmix. (amarrk)
|
||||||
|
|
||||||
MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols
|
MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols
|
||||||
including multuple versions of the same protocol (sanjay Radia)
|
including multuple versions of the same protocol (sanjay Radia)
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.RecordReader;
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
import org.apache.hadoop.mapreduce.Reducer;
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||||
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
|
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
@ -72,7 +73,7 @@ public Job run() throws IOException, ClassNotFoundException,
|
|||||||
job.setNumReduceTasks(jobdesc.getNumberReduces());
|
job.setNumReduceTasks(jobdesc.getNumberReduces());
|
||||||
job.setMapOutputKeyClass(GridmixKey.class);
|
job.setMapOutputKeyClass(GridmixKey.class);
|
||||||
job.setMapOutputValueClass(GridmixRecord.class);
|
job.setMapOutputValueClass(GridmixRecord.class);
|
||||||
job.setSortComparatorClass(GridmixKey.Comparator.class);
|
job.setSortComparatorClass(LoadSortComparator.class);
|
||||||
job.setGroupingComparatorClass(SpecGroupingComparator.class);
|
job.setGroupingComparatorClass(SpecGroupingComparator.class);
|
||||||
job.setInputFormatClass(LoadInputFormat.class);
|
job.setInputFormatClass(LoadInputFormat.class);
|
||||||
job.setOutputFormatClass(RawBytesOutputFormat.class);
|
job.setOutputFormatClass(RawBytesOutputFormat.class);
|
||||||
@ -93,18 +94,85 @@ protected boolean canEmulateCompression() {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a load matching key comparator which will make sure that the
|
||||||
|
* resource usage load is matched even when the framework is in control.
|
||||||
|
*/
|
||||||
|
public static class LoadSortComparator extends GridmixKey.Comparator {
|
||||||
|
private ResourceUsageMatcherRunner matcher = null;
|
||||||
|
private boolean isConfigured = false;
|
||||||
|
|
||||||
|
public LoadSortComparator() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
|
||||||
|
configure();
|
||||||
|
int ret = super.compare(b1, s1, l1, b2, s2, l2);
|
||||||
|
if (matcher != null) {
|
||||||
|
try {
|
||||||
|
matcher.match(); // match the resource usage now
|
||||||
|
} catch (Exception e) {}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO Note that the sorter will be instantiated 2 times as follows
|
||||||
|
// 1. During the sort/spill in the map phase
|
||||||
|
// 2. During the merge in the sort phase
|
||||||
|
// We need the handle to the matcher thread only in (2).
|
||||||
|
// This logic can be relaxed to run only in (2).
|
||||||
|
private void configure() {
|
||||||
|
if (!isConfigured) {
|
||||||
|
ThreadGroup group = Thread.currentThread().getThreadGroup();
|
||||||
|
Thread[] threads = new Thread[group.activeCount() * 2];
|
||||||
|
group.enumerate(threads, true);
|
||||||
|
for (Thread t : threads) {
|
||||||
|
if (t != null && (t instanceof ResourceUsageMatcherRunner)) {
|
||||||
|
this.matcher = (ResourceUsageMatcherRunner) t;
|
||||||
|
isConfigured = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a progress based resource usage matcher.
|
* This is a progress based resource usage matcher.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
static class ResourceUsageMatcherRunner extends Thread {
|
static class ResourceUsageMatcherRunner extends Thread
|
||||||
|
implements Progressive {
|
||||||
private final ResourceUsageMatcher matcher;
|
private final ResourceUsageMatcher matcher;
|
||||||
private final Progressive progress;
|
private final BoostingProgress progress;
|
||||||
private final long sleepTime;
|
private final long sleepTime;
|
||||||
private static final String SLEEP_CONFIG =
|
private static final String SLEEP_CONFIG =
|
||||||
"gridmix.emulators.resource-usage.sleep-duration";
|
"gridmix.emulators.resource-usage.sleep-duration";
|
||||||
private static final long DEFAULT_SLEEP_TIME = 100; // 100ms
|
private static final long DEFAULT_SLEEP_TIME = 100; // 100ms
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a progress bar that can be boosted for weaker use-cases.
|
||||||
|
*/
|
||||||
|
private static class BoostingProgress implements Progressive {
|
||||||
|
private float boostValue = 0f;
|
||||||
|
TaskInputOutputContext context;
|
||||||
|
|
||||||
|
BoostingProgress(TaskInputOutputContext context) {
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setBoostValue(float boostValue) {
|
||||||
|
this.boostValue = boostValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
return Math.min(1f, context.getProgress() + boostValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ResourceUsageMatcherRunner(final TaskInputOutputContext context,
|
ResourceUsageMatcherRunner(final TaskInputOutputContext context,
|
||||||
ResourceUsageMetrics metrics) {
|
ResourceUsageMetrics metrics) {
|
||||||
Configuration conf = context.getConfiguration();
|
Configuration conf = context.getConfiguration();
|
||||||
@ -118,19 +186,14 @@ static class ResourceUsageMatcherRunner extends Thread {
|
|||||||
|
|
||||||
// set the other parameters
|
// set the other parameters
|
||||||
this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME);
|
this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME);
|
||||||
progress = new Progressive() {
|
progress = new BoostingProgress(context);
|
||||||
@Override
|
|
||||||
public float getProgress() {
|
|
||||||
return context.getProgress();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// instantiate a resource-usage-matcher
|
// instantiate a resource-usage-matcher
|
||||||
matcher = new ResourceUsageMatcher();
|
matcher = new ResourceUsageMatcher();
|
||||||
matcher.configure(conf, plugin, metrics, progress);
|
matcher.configure(conf, plugin, metrics, progress);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void match() throws Exception {
|
protected void match() throws IOException, InterruptedException {
|
||||||
// match the resource usage
|
// match the resource usage
|
||||||
matcher.matchResourceUsage();
|
matcher.matchResourceUsage();
|
||||||
}
|
}
|
||||||
@ -157,21 +220,34 @@ public void run() {
|
|||||||
+ " thread! Exiting.", e);
|
+ " thread! Exiting.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
return matcher.getProgress();
|
||||||
|
}
|
||||||
|
|
||||||
|
// boost the progress bar as fasten up the emulation cycles.
|
||||||
|
void boost(float value) {
|
||||||
|
progress.setBoostValue(value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Makes sure that the TaskTracker doesn't kill the map/reduce tasks while
|
// Makes sure that the TaskTracker doesn't kill the map/reduce tasks while
|
||||||
// they are emulating
|
// they are emulating
|
||||||
private static class StatusReporter extends Thread {
|
private static class StatusReporter extends Thread {
|
||||||
private TaskAttemptContext context;
|
private final TaskAttemptContext context;
|
||||||
StatusReporter(TaskAttemptContext context) {
|
private final Progressive progress;
|
||||||
|
|
||||||
|
StatusReporter(TaskAttemptContext context, Progressive progress) {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
|
this.progress = progress;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("Status reporter thread started.");
|
LOG.info("Status reporter thread started.");
|
||||||
try {
|
try {
|
||||||
while (context.getProgress() < 1) {
|
while (!isInterrupted() && progress.getProgress() < 1) {
|
||||||
// report progress
|
// report progress
|
||||||
context.progress();
|
context.progress();
|
||||||
|
|
||||||
@ -277,7 +353,7 @@ protected void setup(Context ctxt)
|
|||||||
split.getMapResourceUsageMetrics());
|
split.getMapResourceUsageMetrics());
|
||||||
|
|
||||||
// start the status reporter thread
|
// start the status reporter thread
|
||||||
reporter = new StatusReporter(ctxt);
|
reporter = new StatusReporter(ctxt, matcher);
|
||||||
reporter.start();
|
reporter.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -324,6 +400,17 @@ public void cleanup(Context context)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if the thread will get a chance to run or not
|
||||||
|
// check if there will be a sort&spill->merge phase or not
|
||||||
|
// check if the final sort&spill->merge phase is gonna happen or not
|
||||||
|
if (context.getNumReduceTasks() > 0
|
||||||
|
&& context.getCounter(TaskCounter.SPILLED_RECORDS).getValue() == 0) {
|
||||||
|
LOG.info("Boosting the map phase progress.");
|
||||||
|
// add the sort phase progress to the map phase and emulate
|
||||||
|
matcher.boost(0.33f);
|
||||||
|
matcher.match();
|
||||||
|
}
|
||||||
|
|
||||||
// start the matcher thread since the map phase ends here
|
// start the matcher thread since the map phase ends here
|
||||||
matcher.start();
|
matcher.start();
|
||||||
}
|
}
|
||||||
@ -392,7 +479,7 @@ protected void setup(Context context)
|
|||||||
matcher = new ResourceUsageMatcherRunner(context, metrics);
|
matcher = new ResourceUsageMatcherRunner(context, metrics);
|
||||||
|
|
||||||
// start the status reporter thread
|
// start the status reporter thread
|
||||||
reporter = new StatusReporter(context);
|
reporter = new StatusReporter(context, matcher);
|
||||||
reporter.start();
|
reporter.start();
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
@ -528,9 +615,13 @@ void buildSplits(FilePool inputDir) throws IOException {
|
|||||||
specRecords[j] = info.getOutputRecords();
|
specRecords[j] = info.getOutputRecords();
|
||||||
metrics[j] = info.getResourceUsageMetrics();
|
metrics[j] = info.getResourceUsageMetrics();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
|
LOG.debug(String.format("SPEC(%d) %d -> %d %d %d %d %d %d %d", id(), i,
|
||||||
i + j * maps, info.getOutputRecords(),
|
i + j * maps, info.getOutputRecords(),
|
||||||
info.getOutputBytes()));
|
info.getOutputBytes(),
|
||||||
|
info.getResourceUsageMetrics().getCumulativeCpuUsage(),
|
||||||
|
info.getResourceUsageMetrics().getPhysicalMemoryUsage(),
|
||||||
|
info.getResourceUsageMetrics().getVirtualMemoryUsage(),
|
||||||
|
info.getResourceUsageMetrics().getHeapUsage()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
|
final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
|
||||||
|
@ -67,7 +67,7 @@ public class CumulativeCpuUsageEmulatorPlugin
|
|||||||
private float emulationInterval; // emulation interval
|
private float emulationInterval; // emulation interval
|
||||||
private long targetCpuUsage = 0;
|
private long targetCpuUsage = 0;
|
||||||
private float lastSeenProgress = 0;
|
private float lastSeenProgress = 0;
|
||||||
private long lastSeenCpuUsageCpuUsage = 0;
|
private long lastSeenCpuUsage = 0;
|
||||||
|
|
||||||
// Configuration parameters
|
// Configuration parameters
|
||||||
public static final String CPU_EMULATION_PROGRESS_INTERVAL =
|
public static final String CPU_EMULATION_PROGRESS_INTERVAL =
|
||||||
@ -229,6 +229,15 @@ private float getWeightForProgressInterval(float progress) {
|
|||||||
return progress * progress * progress * progress;
|
return progress * progress * progress * progress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized long getCurrentCPUUsage() {
|
||||||
|
return monitor.getProcResourceValues().getCumulativeCpuTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
return Math.min(1f, ((float)getCurrentCPUUsage())/targetCpuUsage);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
//TODO Multi-threading for speedup?
|
//TODO Multi-threading for speedup?
|
||||||
public void emulate() throws IOException, InterruptedException {
|
public void emulate() throws IOException, InterruptedException {
|
||||||
@ -249,10 +258,9 @@ public void emulate() throws IOException, InterruptedException {
|
|||||||
// Note that (Cc-Cl)/(Pc-Pl) is termed as 'rate' in the following
|
// Note that (Cc-Cl)/(Pc-Pl) is termed as 'rate' in the following
|
||||||
// section
|
// section
|
||||||
|
|
||||||
long currentCpuUsage =
|
long currentCpuUsage = getCurrentCPUUsage();
|
||||||
monitor.getProcResourceValues().getCumulativeCpuTime();
|
|
||||||
// estimate the cpu usage rate
|
// estimate the cpu usage rate
|
||||||
float rate = (currentCpuUsage - lastSeenCpuUsageCpuUsage)
|
float rate = (currentCpuUsage - lastSeenCpuUsage)
|
||||||
/ (currentProgress - lastSeenProgress);
|
/ (currentProgress - lastSeenProgress);
|
||||||
long projectedUsage =
|
long projectedUsage =
|
||||||
currentCpuUsage + (long)((1 - currentProgress) * rate);
|
currentCpuUsage + (long)((1 - currentProgress) * rate);
|
||||||
@ -264,8 +272,7 @@ public void emulate() throws IOException, InterruptedException {
|
|||||||
(long)(targetCpuUsage
|
(long)(targetCpuUsage
|
||||||
* getWeightForProgressInterval(currentProgress));
|
* getWeightForProgressInterval(currentProgress));
|
||||||
|
|
||||||
while (monitor.getProcResourceValues().getCumulativeCpuTime()
|
while (getCurrentCPUUsage() < currentWeighedTarget) {
|
||||||
< currentWeighedTarget) {
|
|
||||||
emulatorCore.compute();
|
emulatorCore.compute();
|
||||||
// sleep for 100ms
|
// sleep for 100ms
|
||||||
try {
|
try {
|
||||||
@ -281,8 +288,7 @@ public void emulate() throws IOException, InterruptedException {
|
|||||||
// set the last seen progress
|
// set the last seen progress
|
||||||
lastSeenProgress = progress.getProgress();
|
lastSeenProgress = progress.getProgress();
|
||||||
// set the last seen usage
|
// set the last seen usage
|
||||||
lastSeenCpuUsageCpuUsage =
|
lastSeenCpuUsage = getCurrentCPUUsage();
|
||||||
monitor.getProcResourceValues().getCumulativeCpuTime();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -310,6 +316,6 @@ public void initialize(Configuration conf, ResourceUsageMetrics metrics,
|
|||||||
|
|
||||||
// initialize the states
|
// initialize the states
|
||||||
lastSeenProgress = 0;
|
lastSeenProgress = 0;
|
||||||
lastSeenCpuUsageCpuUsage = 0;
|
lastSeenCpuUsage = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -42,7 +42,7 @@
|
|||||||
* For configuring GridMix to load and and use a resource usage emulator,
|
* For configuring GridMix to load and and use a resource usage emulator,
|
||||||
* see {@link ResourceUsageMatcher}.
|
* see {@link ResourceUsageMatcher}.
|
||||||
*/
|
*/
|
||||||
public interface ResourceUsageEmulatorPlugin {
|
public interface ResourceUsageEmulatorPlugin extends Progressive {
|
||||||
/**
|
/**
|
||||||
* Initialize the plugin. This might involve
|
* Initialize the plugin. This might involve
|
||||||
* - initializing the variables
|
* - initializing the variables
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
|
package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -35,7 +36,7 @@
|
|||||||
* <p>Note that the order in which the emulators are invoked is same as the
|
* <p>Note that the order in which the emulators are invoked is same as the
|
||||||
* order in which they are configured.
|
* order in which they are configured.
|
||||||
*/
|
*/
|
||||||
public class ResourceUsageMatcher {
|
public class ResourceUsageMatcher implements Progressive {
|
||||||
/**
|
/**
|
||||||
* Configuration key to set resource usage emulators.
|
* Configuration key to set resource usage emulators.
|
||||||
*/
|
*/
|
||||||
@ -80,10 +81,31 @@ public void configure(Configuration conf, ResourceCalculatorPlugin monitor,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void matchResourceUsage() throws Exception {
|
public void matchResourceUsage() throws IOException, InterruptedException {
|
||||||
for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
|
for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
|
||||||
// match the resource usage
|
// match the resource usage
|
||||||
emulator.emulate();
|
emulator.emulate();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the average progress.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
if (emulationPlugins.size() > 0) {
|
||||||
|
// return the average progress
|
||||||
|
float progress = 0f;
|
||||||
|
for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
|
||||||
|
// consider weighted progress of each emulator
|
||||||
|
progress += emulator.getProgress();
|
||||||
|
}
|
||||||
|
|
||||||
|
return progress / emulationPlugins.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
// if no emulators are configured then return 1
|
||||||
|
return 1f;
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
@ -186,6 +186,11 @@ protected long getMaxHeapUsageInMB() {
|
|||||||
return Runtime.getRuntime().maxMemory() / ONE_MB;
|
return Runtime.getRuntime().maxMemory() / ONE_MB;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
return Math.min(1f, ((float)getTotalHeapUsageInMB())/targetHeapUsageInMB);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void emulate() throws IOException, InterruptedException {
|
public void emulate() throws IOException, InterruptedException {
|
||||||
if (enabled) {
|
if (enabled) {
|
||||||
|
@ -135,6 +135,14 @@ static long testEmulation(String id, Configuration conf)
|
|||||||
? fs.getFileStatus(testPath).getModificationTime()
|
? fs.getFileStatus(testPath).getModificationTime()
|
||||||
: 0;
|
: 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
try {
|
||||||
|
return fs.exists(touchPath) ? 1.0f : 0f;
|
||||||
|
} catch (IOException ioe) {}
|
||||||
|
return 0f;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user