diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 93b5d63e76..58e134f6d3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -200,6 +200,9 @@ Release 2.0.5-beta - UNRELEASED OPTIMIZATIONS + MAPREDUCE-4974. Optimising the LineRecordReader initialize() method + (Gelesh via bobby) + BUG FIXES MAPREDUCE-4671. AM does not tell the RM about container requests which are diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index e1dcee0b24..8927adf7cd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -52,7 +52,6 @@ public class LineRecordReader extends RecordReader { public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength"; - private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; @@ -60,9 +59,9 @@ public class LineRecordReader extends RecordReader { private FSDataInputStream fileIn; private Seekable filePosition; private int maxLineLength; - private LongWritable key = null; - private Text value = null; - private CompressionCodec codec; + private LongWritable key; + private Text value; + private boolean isCompressedInput; private Decompressor decompressor; private byte[] recordDelimiterBytes; @@ -81,13 +80,14 @@ public void initialize(InputSplit genericSplit, start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); - compressionCodecs = new CompressionCodecFactory(job); - codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); - if (isCompressedInput()) { + + CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); + if (null!=codec) { + isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = @@ -132,19 +132,16 @@ public void initialize(InputSplit genericSplit, this.pos = start; } - private boolean isCompressedInput() { - return (codec != null); - } private int maxBytesToConsume(long pos) { - return isCompressedInput() + return isCompressedInput ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos); } private long getFilePosition() throws IOException { long retVal; - if (isCompressedInput() && null != filePosition) { + if (isCompressedInput && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; @@ -166,9 +163,6 @@ public boolean nextKeyValue() throws IOException { while (getFilePosition() <= end) { newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength)); - if (newSize == 0) { - break; - } pos += newSize; if (newSize < maxLineLength) { break;