diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/.LineRecordReader.java.swp b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/.LineRecordReader.java.swp new file mode 100644 index 0000000000..86f019b54d Binary files /dev/null and b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/.LineRecordReader.java.swp differ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index ab63c199f2..08c7025203 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -302,6 +302,8 @@ public synchronized void close() throws IOException { try { if (in != null) { in.close(); + } else if (fileIn != null) { + fileIn.close(); } } finally { if (decompressor != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 089208841f..2177d812bc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -98,48 +98,53 @@ public void initialize(InputSplit genericSplit, MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); fileIn = FutureIO.awaitFuture(builder.build()); - - CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); - if (null!=codec) { - isCompressedInput = true; - decompressor = CodecPool.getDecompressor(codec); - if (codec instanceof SplittableCompressionCodec) { - final SplitCompressionInputStream cIn = - ((SplittableCompressionCodec)codec).createInputStream( - fileIn, decompressor, start, end, - SplittableCompressionCodec.READ_MODE.BYBLOCK); - in = new CompressedSplitLineReader(cIn, job, - this.recordDelimiterBytes); - start = cIn.getAdjustedStart(); - end = cIn.getAdjustedEnd(); - filePosition = cIn; - } else { - if (start != 0) { - // So we have a split that is only part of a file stored using - // a Compression codec that cannot be split. - throw new IOException("Cannot seek in " + - codec.getClass().getSimpleName() + " compressed stream"); - } - in = new SplitLineReader(codec.createInputStream(fileIn, - decompressor), job, this.recordDelimiterBytes); + try { + CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); + if (null!=codec) { + isCompressedInput = true; + decompressor = CodecPool.getDecompressor(codec); + if (codec instanceof SplittableCompressionCodec) { + final SplitCompressionInputStream cIn = + ((SplittableCompressionCodec)codec).createInputStream( + fileIn, decompressor, start, end, + SplittableCompressionCodec.READ_MODE.BYBLOCK); + in = new CompressedSplitLineReader(cIn, job, + this.recordDelimiterBytes); + start = cIn.getAdjustedStart(); + end = cIn.getAdjustedEnd(); + filePosition = cIn; + } else { + if (start != 0) { + // So we have a split that is only part of a file stored using + // a Compression codec that cannot be split. + throw new IOException("Cannot seek in " + + codec.getClass().getSimpleName() + " compressed stream"); + } + + in = new SplitLineReader(codec.createInputStream(fileIn, + decompressor), job, this.recordDelimiterBytes); + filePosition = fileIn; + } + } else { + fileIn.seek(start); + in = new UncompressedSplitLineReader( + fileIn, job, this.recordDelimiterBytes, split.getLength()); filePosition = fileIn; } - } else { - fileIn.seek(start); - in = new UncompressedSplitLineReader( - fileIn, job, this.recordDelimiterBytes, split.getLength()); - filePosition = fileIn; + // If this is not the first split, we always throw away first record + // because we always (except the last split) read one extra line in + // next() method. + if (start != 0) { + start += in.readLine(new Text(), 0, maxBytesToConsume(start)); + } + this.pos = start; + } catch (Exception e) { + fileIn.close(); + throw e; } - // If this is not the first split, we always throw away first record - // because we always (except the last split) read one extra line in - // next() method. - if (start != 0) { - start += in.readLine(new Text(), 0, maxBytesToConsume(start)); - } - this.pos = start; } - + private int maxBytesToConsume(long pos) { return isCompressedInput