MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for map only jobs. (amarrk)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1327816 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Amar Kamat 2012-04-19 04:26:53 +00:00
parent ba4ec5f0aa
commit 3edc40e377
4 changed files with 49 additions and 29 deletions

View File

@ -52,6 +52,9 @@ Trunk (unreleased changes)
BUG FIXES BUG FIXES
MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for
map only jobs. (amarrk)
MAPREDUCE-4149. [Rumen] Rumen fails to parse certain counter MAPREDUCE-4149. [Rumen] Rumen fails to parse certain counter
strings. (ravigummadi) strings. (ravigummadi)

View File

@ -85,10 +85,10 @@ class CompressionEmulationUtil {
"gridmix.compression-emulation.map-output.compression-ratio"; "gridmix.compression-emulation.map-output.compression-ratio";
/** /**
* Configuration property for setting the compression ratio of reduce output. * Configuration property for setting the compression ratio of job output.
*/ */
private static final String GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO = private static final String GRIDMIX_JOB_OUTPUT_COMPRESSION_RATIO =
"gridmix.compression-emulation.reduce-output.compression-ratio"; "gridmix.compression-emulation.job-output.compression-ratio";
/** /**
* Default compression ratio. * Default compression ratio.
@ -434,20 +434,20 @@ static float getMapOutputCompressionEmulationRatio(Configuration conf) {
} }
/** /**
* Set the reduce output data compression ratio in the given configuration. * Set the job output data compression ratio in the given configuration.
*/ */
static void setReduceOutputCompressionEmulationRatio(Configuration conf, static void setJobOutputCompressionEmulationRatio(Configuration conf,
float ratio) { float ratio) {
conf.setFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, ratio); conf.setFloat(GRIDMIX_JOB_OUTPUT_COMPRESSION_RATIO, ratio);
} }
/** /**
* Get the reduce output data compression ratio using the given configuration. * Get the job output data compression ratio using the given configuration.
* If the compression ratio is not set in the configuration then use the * If the compression ratio is not set in the configuration then use the
* default value i.e {@value #DEFAULT_COMPRESSION_RATIO}. * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
*/ */
static float getReduceOutputCompressionEmulationRatio(Configuration conf) { static float getJobOutputCompressionEmulationRatio(Configuration conf) {
return conf.getFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, return conf.getFloat(GRIDMIX_JOB_OUTPUT_COMPRESSION_RATIO,
DEFAULT_COMPRESSION_RATIO); DEFAULT_COMPRESSION_RATIO);
} }

View File

@ -288,23 +288,23 @@ protected void setup(Context ctxt)
final long[] reduceBytes = split.getOutputBytes(); final long[] reduceBytes = split.getOutputBytes();
final long[] reduceRecords = split.getOutputRecords(); final long[] reduceRecords = split.getOutputRecords();
// enable gridmix map output record for compression
final boolean emulateMapOutputCompression =
CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
&& conf.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
float compressionRatio = 1.0f;
if (emulateMapOutputCompression) {
compressionRatio =
CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf);
LOG.info("GridMix is configured to use a compression ratio of "
+ compressionRatio + " for the map output data.");
key.setCompressibility(true, compressionRatio);
val.setCompressibility(true, compressionRatio);
}
long totalRecords = 0L; long totalRecords = 0L;
final int nReduces = ctxt.getNumReduceTasks(); final int nReduces = ctxt.getNumReduceTasks();
if (nReduces > 0) { if (nReduces > 0) {
// enable gridmix map output record for compression
boolean emulateMapOutputCompression =
CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
&& conf.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
float compressionRatio = 1.0f;
if (emulateMapOutputCompression) {
compressionRatio =
CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf);
LOG.info("GridMix is configured to use a compression ratio of "
+ compressionRatio + " for the map output data.");
key.setCompressibility(true, compressionRatio);
val.setCompressibility(true, compressionRatio);
}
int idx = 0; int idx = 0;
int id = split.getId(); int id = split.getId();
for (int i = 0; i < nReduces; ++i) { for (int i = 0; i < nReduces; ++i) {
@ -332,7 +332,21 @@ protected void setup(Context ctxt)
} }
} else { } else {
long mapOutputBytes = reduceBytes[0]; long mapOutputBytes = reduceBytes[0];
if (emulateMapOutputCompression) {
// enable gridmix job output compression
boolean emulateJobOutputCompression =
CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
&& conf.getBoolean(FileOutputFormat.COMPRESS, false);
if (emulateJobOutputCompression) {
float compressionRatio =
CompressionEmulationUtil.getJobOutputCompressionEmulationRatio(conf);
LOG.info("GridMix is configured to use a compression ratio of "
+ compressionRatio + " for the job output data.");
key.setCompressibility(true, compressionRatio);
val.setCompressibility(true, compressionRatio);
// set the output size accordingly
mapOutputBytes /= compressionRatio; mapOutputBytes /= compressionRatio;
} }
reduces.add(new AvgRecordFactory(mapOutputBytes, reduceRecords[0], reduces.add(new AvgRecordFactory(mapOutputBytes, reduceRecords[0],
@ -387,9 +401,13 @@ public void map(NullWritable ignored, GridmixRecord rec,
@Override @Override
public void cleanup(Context context) public void cleanup(Context context)
throws IOException, InterruptedException { throws IOException, InterruptedException {
LOG.info("Starting the cleanup phase.");
for (RecordFactory factory : reduces) { for (RecordFactory factory : reduces) {
key.setSeed(r.nextLong()); key.setSeed(r.nextLong());
while (factory.next(key, val)) { while (factory.next(key, val)) {
// send the progress update (maybe make this a thread)
context.progress();
context.write(key, val); context.write(key, val);
key.setSeed(r.nextLong()); key.setSeed(r.nextLong());
@ -462,7 +480,7 @@ protected void setup(Context context)
&& FileOutputFormat.getCompressOutput(context)) { && FileOutputFormat.getCompressOutput(context)) {
float compressionRatio = float compressionRatio =
CompressionEmulationUtil CompressionEmulationUtil
.getReduceOutputCompressionEmulationRatio(conf); .getJobOutputCompressionEmulationRatio(conf);
LOG.info("GridMix is configured to use a compression ratio of " LOG.info("GridMix is configured to use a compression ratio of "
+ compressionRatio + " for the reduce output data."); + compressionRatio + " for the reduce output data.");
val.setCompressibility(true, compressionRatio); val.setCompressibility(true, compressionRatio);

View File

@ -322,10 +322,9 @@ public void testIntermediateCompressionRatioConfiguration()
public void testOutputCompressionRatioConfiguration() throws Exception { public void testOutputCompressionRatioConfiguration() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
float ratio = 0.567F; float ratio = 0.567F;
CompressionEmulationUtil.setReduceOutputCompressionEmulationRatio(conf, CompressionEmulationUtil.setJobOutputCompressionEmulationRatio(conf, ratio);
ratio);
assertEquals(ratio, assertEquals(ratio,
CompressionEmulationUtil.getReduceOutputCompressionEmulationRatio(conf), CompressionEmulationUtil.getJobOutputCompressionEmulationRatio(conf),
0.0D); 0.0D);
} }