MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead counter is wrong when compressed input is used.(ravigummadi)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1297052 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ravi Gummadi 2012-03-05 13:44:01 +00:00
parent 7819483b9d
commit 231e39462d
4 changed files with 60 additions and 7 deletions

View File

@ -52,8 +52,12 @@ Trunk (unreleased changes)
BUG FIXES
MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead
counter is wrong when compressed input is used.(ravigummadi)
MAPREDUCE-3757. [Rumen] Fixed Rumen Folder to adjust shuffleFinished and
sortFinished times when needed.
sortFinished times when needed.(ravigummadi)
MAPREDUCE-3194. "mapred mradmin" command is broken in mrv2
(Jason Lowe via bobby)

View File

@ -571,4 +571,25 @@ static void configureCompressionEmulation(Configuration source,
}
setInputCompressionEmulationEnabled(target, needsCompressedInput);
}
/**
* Get the uncompressed input bytes count from the given possibly compressed
* input bytes count.
* @param possiblyCompressedInputBytes input bytes count. This is compressed
* input size if compression emulation is on.
* @param conf configuration of the Gridmix simulated job
* @return uncompressed input bytes count. Compute this in case if compressed
* input was used
*/
static long getUncompressedInputBytes(long possiblyCompressedInputBytes,
Configuration conf) {
long uncompressedInputBytes = possiblyCompressedInputBytes;
if (CompressionEmulationUtil.isInputCompressionEmulationEnabled(conf)) {
float inputCompressionRatio =
CompressionEmulationUtil.getMapInputCompressionEmulationRatio(conf);
uncompressedInputBytes /= inputCompressionRatio;
}
return uncompressedInputBytes;
}
}

View File

@ -627,9 +627,14 @@ void buildSplits(FilePool inputDir) throws IOException {
}
}
final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
long possiblyCompressedInputBytes = info.getInputBytes();
Configuration conf = job.getConfiguration();
long uncompressedInputBytes =
CompressionEmulationUtil.getUncompressedInputBytes(
possiblyCompressedInputBytes, conf);
splits.add(
new LoadSplit(striper.splitFor(inputDir, info.getInputBytes(), 3),
maps, i, info.getInputBytes(), info.getInputRecords(),
new LoadSplit(striper.splitFor(inputDir, uncompressedInputBytes, 3),
maps, i, uncompressedInputBytes, info.getInputRecords(),
info.getOutputBytes(), info.getOutputRecords(),
reduceByteRatio, reduceRecordRatio, specBytes,
specRecords, info.getResourceUsageMetrics(),

View File

@ -19,7 +19,6 @@
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@ -31,13 +30,11 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Utils;
@ -561,4 +558,30 @@ public void testFileQueueDecompression() throws IOException {
String readLine = new String(bytes);
assertEquals("Compression/Decompression error", inputLine, readLine);
}
/**
* Tests the computation logic of uncompressed input bytes by
* {@link LoadJob#getUncompressedInputBytes(long, Configuration)}
*/
@Test
public void testComputeUncompressedInputBytes() {
long possiblyCompressedInputBytes = 100000;
float compressionRatio = 0.45F;
Configuration conf = new Configuration();
CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf,
compressionRatio);
// By default, input compression emulation is diabled. Verify the
// computation of uncompressed input bytes.
long result = CompressionEmulationUtil.getUncompressedInputBytes(
possiblyCompressedInputBytes, conf);
assertEquals(possiblyCompressedInputBytes, result);
// Enable input compression emulation and verify uncompressed
// input bytes computation logic
CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
result = CompressionEmulationUtil.getUncompressedInputBytes(
possiblyCompressedInputBytes, conf);
assertEquals((long)(possiblyCompressedInputBytes/compressionRatio), result);
}
}