From 231e39462dbfe60f66710e0425dbf16069382dbe Mon Sep 17 00:00:00 2001 From: Ravi Gummadi Date: Mon, 5 Mar 2012 13:44:01 +0000 Subject: [PATCH] MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead counter is wrong when compressed input is used.(ravigummadi) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1297052 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 6 +++- .../gridmix/CompressionEmulationUtil.java | 21 ++++++++++++++ .../apache/hadoop/mapred/gridmix/LoadJob.java | 11 +++++-- .../TestCompressionEmulationUtils.java | 29 +++++++++++++++++-- 4 files changed, 60 insertions(+), 7 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 320b88d056..95678030e3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -52,8 +52,12 @@ Trunk (unreleased changes) BUG FIXES + MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead + counter is wrong when compressed input is used.(ravigummadi) + MAPREDUCE-3757. [Rumen] Fixed Rumen Folder to adjust shuffleFinished and - sortFinished times when needed. + sortFinished times when needed.(ravigummadi) + MAPREDUCE-3194. "mapred mradmin" command is broken in mrv2 (Jason Lowe via bobby) diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java index 1308869195..898ca50b51 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java @@ -571,4 +571,25 @@ static void configureCompressionEmulation(Configuration source, } setInputCompressionEmulationEnabled(target, needsCompressedInput); } + + /** + * Get the uncompressed input bytes count from the given possibly compressed + * input bytes count. + * @param possiblyCompressedInputBytes input bytes count. This is compressed + * input size if compression emulation is on. + * @param conf configuration of the Gridmix simulated job + * @return uncompressed input bytes count. Compute this in case if compressed + * input was used + */ + static long getUncompressedInputBytes(long possiblyCompressedInputBytes, + Configuration conf) { + long uncompressedInputBytes = possiblyCompressedInputBytes; + + if (CompressionEmulationUtil.isInputCompressionEmulationEnabled(conf)) { + float inputCompressionRatio = + CompressionEmulationUtil.getMapInputCompressionEmulationRatio(conf); + uncompressedInputBytes /= inputCompressionRatio; + } + return uncompressedInputBytes; + } } diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java index ae2c8814f2..8d69414b6e 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java @@ -627,9 +627,14 @@ void buildSplits(FilePool inputDir) throws IOException { } } final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i); + long possiblyCompressedInputBytes = info.getInputBytes(); + Configuration conf = job.getConfiguration(); + long uncompressedInputBytes = + CompressionEmulationUtil.getUncompressedInputBytes( + possiblyCompressedInputBytes, conf); splits.add( - new LoadSplit(striper.splitFor(inputDir, info.getInputBytes(), 3), - maps, i, info.getInputBytes(), info.getInputRecords(), + new LoadSplit(striper.splitFor(inputDir, uncompressedInputBytes, 3), + maps, i, uncompressedInputBytes, info.getInputRecords(), info.getOutputBytes(), info.getOutputRecords(), reduceByteRatio, reduceRecordRatio, specBytes, specRecords, info.getResourceUsageMetrics(), @@ -637,4 +642,4 @@ void buildSplits(FilePool inputDir) throws IOException { } pushDescription(id(), splits); } -} \ No newline at end of file +} diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java index 51071a07a0..1e971f7146 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java @@ -19,7 +19,6 @@ import java.io.BufferedReader; import java.io.BufferedWriter; -import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -31,13 +30,11 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Utils; @@ -561,4 +558,30 @@ public void testFileQueueDecompression() throws IOException { String readLine = new String(bytes); assertEquals("Compression/Decompression error", inputLine, readLine); } + + /** + * Tests the computation logic of uncompressed input bytes by + * {@link LoadJob#getUncompressedInputBytes(long, Configuration)} + */ + @Test + public void testComputeUncompressedInputBytes() { + long possiblyCompressedInputBytes = 100000; + float compressionRatio = 0.45F; + Configuration conf = new Configuration(); + CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, + compressionRatio); + + // By default, input compression emulation is diabled. Verify the + // computation of uncompressed input bytes. + long result = CompressionEmulationUtil.getUncompressedInputBytes( + possiblyCompressedInputBytes, conf); + assertEquals(possiblyCompressedInputBytes, result); + + // Enable input compression emulation and verify uncompressed + // input bytes computation logic + CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true); + result = CompressionEmulationUtil.getUncompressedInputBytes( + possiblyCompressedInputBytes, conf); + assertEquals((long)(possiblyCompressedInputBytes/compressionRatio), result); + } }