diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java index db78118731..3c78cfce55 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -362,9 +362,29 @@ public BZip2CompressionInputStream(InputStream in, long start, long end, bufferedIn = new BufferedInputStream(super.in); this.startingPos = super.getPos(); this.readMode = readMode; + long numSkipped = 0; if (this.startingPos == 0) { // We only strip header if it is start of file bufferedIn = readStreamHeader(); + } else if (this.readMode == READ_MODE.BYBLOCK && + this.startingPos <= HEADER_LEN + SUB_HEADER_LEN) { + // When we're in BYBLOCK mode and the start position is >=0 + // and < HEADER_LEN + SUB_HEADER_LEN, we should skip to after + // start of the first bz2 block to avoid duplicated records + numSkipped = HEADER_LEN + SUB_HEADER_LEN + 1 - this.startingPos; + long skipBytes = numSkipped; + while (skipBytes > 0) { + long s = bufferedIn.skip(skipBytes); + if (s > 0) { + skipBytes -= s; + } else { + if (bufferedIn.read() == -1) { + break; // end of the split + } else { + skipBytes--; + } + } + } } input = new CBZip2InputStream(bufferedIn, readMode); if (this.isHeaderStripped) { @@ -375,7 +395,15 @@ public BZip2CompressionInputStream(InputStream in, long start, long end, input.updateReportedByteCount(SUB_HEADER_LEN); } - this.updatePos(false); + if (numSkipped > 0) { + input.updateReportedByteCount((int) numSkipped); + } + + // To avoid dropped records, not advertising a new byte position + // when we are in BYBLOCK mode and the start position is 0 + if (!(this.readMode == READ_MODE.BYBLOCK && this.startingPos == 0)) { + this.updatePos(false); + } } private BufferedInputStream readStreamHeader() throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java index 0ea1d6dbb7..22d9a57b89 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java @@ -183,6 +183,14 @@ public void testSplitableCodecs() throws IOException { // corner case when we have byte alignment and position of stream are same verifyPartitions(471507, 218, file, codec, conf); verifyPartitions(473608, 110, file, codec, conf); + + // corner case when split size is small and position of stream is before + // the first BZip2 block + verifyPartitions(100, 20, file, codec, conf); + verifyPartitions(100, 25, file, codec, conf); + verifyPartitions(100, 30, file, codec, conf); + verifyPartitions(100, 50, file, codec, conf); + verifyPartitions(100, 100, file, codec, conf); } // Test a corner case when position of stream is right after BZip2 marker