diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java index 1564ae9085..7508def9a7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -335,6 +335,7 @@ private static class BZip2CompressionInputStream extends private boolean isSubHeaderStripped = false; private READ_MODE readMode = READ_MODE.CONTINUOUS; private long startingPos = 0L; + private boolean didInitialRead; // Following state machine handles different states of compressed stream // position @@ -480,24 +481,42 @@ public void close() throws IOException { */ public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } if (needsReset) { internalReset(); } - - int result = 0; - result = this.input.read(b, off, len); + // When startingPos > 0, the stream should be initialized at the end of + // one block (which would correspond to be the start of another block). + // Thus, the initial read would technically be reading one byte passed a + // BZip2 end of block marker. To be consistent, we should also be + // updating the position to be one byte after the end of an block on the + // initial read. + boolean initializedAtEndOfBlock = + !didInitialRead && startingPos > 0 && readMode == READ_MODE.BYBLOCK; + int result = initializedAtEndOfBlock + ? BZip2Constants.END_OF_BLOCK + : this.input.read(b, off, len); if (result == BZip2Constants.END_OF_BLOCK) { this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; } if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { - result = this.input.read(b, off, off + 1); + result = this.input.read(b, off, 1); // This is the precise time to update compressed stream position // to the client of this code. this.updatePos(true); this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; } + didInitialRead = true; return result; } @@ -513,6 +532,7 @@ private void internalReset() throws IOException { needsReset = false; BufferedInputStream bufferedIn = readStreamHeader(); input = new CBZip2InputStream(bufferedIn, this.readMode); + didInitialRead = false; } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestBZip2Codec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestBZip2Codec.java new file mode 100644 index 0000000000..9dd3215f90 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestBZip2Codec.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import org.apache.hadoop.thirdparty.com.google.common.primitives.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE; +import org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter; +import org.apache.hadoop.io.compress.bzip2.BZip2Utils; + +import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.BYBLOCK; +import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.CONTINUOUS; +import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE; +import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.assertj.core.api.Assertions.assertThatNullPointerException; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public final class TestBZip2Codec { + + private static final long HEADER_LEN = 2; + + private Configuration conf; + private FileSystem fs; + private BZip2Codec codec; + private Decompressor decompressor; + private Path tempFile; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + + Path workDir = new Path(System.getProperty("test.build.data", "target"), + "data/" + getClass().getSimpleName()); + + Path inputDir = new Path(workDir, "input"); + tempFile = new Path(inputDir, "test.txt.bz2"); + + fs = workDir.getFileSystem(conf); + + codec = new BZip2Codec(); + codec.setConf(new Configuration(/* loadDefaults */ false)); + decompressor = CodecPool.getDecompressor(codec); + } + + @After + public void tearDown() throws Exception { + CodecPool.returnDecompressor(decompressor); + fs.delete(tempFile, /* recursive */ false); + } + + @Test + public void createInputStreamWithStartAndEnd() throws Exception { + byte[] data1 = newAlternatingByteArray(BLOCK_SIZE, 'a', 'b'); + byte[] data2 = newAlternatingByteArray(BLOCK_SIZE, 'c', 'd'); + byte[] data3 = newAlternatingByteArray(BLOCK_SIZE, 'e', 'f'); + + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.write(data1); + writer.write(data2); + writer.write(data3); + } + long fileSize = fs.getFileStatus(tempFile).getLen(); + + List nextBlockOffsets = BZip2Utils.getNextBlockMarkerOffsets(tempFile, conf); + long block2Start = nextBlockOffsets.get(0); + long block3Start = nextBlockOffsets.get(1); + + try (SplitCompressionInputStream stream = newCompressionStream(tempFile, 0, fileSize, + BYBLOCK)) { + assertEquals(0, stream.getPos()); + assertCasesWhereReadDoesNotAdvanceStream(stream); + assertReadingAtPositionZero(stream, data1); + assertCasesWhereReadDoesNotAdvanceStream(stream); + assertReadingPastEndOfBlock(stream, block2Start, data2); + assertReadingPastEndOfBlock(stream, block3Start, data3); + assertEquals(-1, stream.read()); + } + + try (SplitCompressionInputStream stream = newCompressionStream(tempFile, 1, fileSize - 1, + BYBLOCK)) { + assertEquals(block2Start, stream.getPos()); + assertCasesWhereReadDoesNotAdvanceStream(stream); + assertReadingPastEndOfBlock(stream, block2Start, data2); + assertCasesWhereReadDoesNotAdvanceStream(stream); + assertReadingPastEndOfBlock(stream, block3Start, data3); + assertEquals(-1, stream.read()); + } + + // With continuous mode, only starting at or after the stream header is + // supported. + byte[] allData = Bytes.concat(data1, data2, data3); + assertReadingWithContinuousMode(tempFile, 0, fileSize, allData); + assertReadingWithContinuousMode(tempFile, HEADER_LEN, fileSize - HEADER_LEN, allData); + } + + private void assertReadingWithContinuousMode(Path file, long start, long length, + byte[] expectedData) throws IOException { + try (SplitCompressionInputStream stream = newCompressionStream(file, start, length, + CONTINUOUS)) { + assertEquals(HEADER_LEN, stream.getPos()); + + assertRead(stream, expectedData); + assertEquals(-1, stream.read()); + + // When specifying CONTINUOUS read mode, the position ends up not being + // updated at all. + assertEquals(HEADER_LEN, stream.getPos()); + } + } + + private SplitCompressionInputStream newCompressionStream(Path file, long start, long length, + READ_MODE readMode) throws IOException { + FSDataInputStream rawIn = fs.open(file); + rawIn.seek(start); + long end = start + length; + return codec.createInputStream(rawIn, decompressor, start, end, readMode); + } + + private static byte[] newAlternatingByteArray(int size, int... choices) { + checkArgument(choices.length > 1); + byte[] result = new byte[size]; + for (int i = 0; i < size; i++) { + result[i] = (byte) choices[i % choices.length]; + } + return result; + } + + private static void assertCasesWhereReadDoesNotAdvanceStream(SplitCompressionInputStream in) + throws IOException { + long initialPos = in.getPos(); + + assertEquals(0, in.read(new byte[0])); + + assertThatNullPointerException().isThrownBy(() -> in.read(null, 0, 1)); + assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy( + () -> in.read(new byte[5], -1, 2)); + assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy( + () -> in.read(new byte[5], 0, -1)); + assertThatExceptionOfType(IndexOutOfBoundsException.class).isThrownBy( + () -> in.read(new byte[5], 1, 5)); + + assertEquals(initialPos, in.getPos()); + } + + private static void assertReadingAtPositionZero(SplitCompressionInputStream in, + byte[] expectedData) throws IOException { + byte[] buffer = new byte[expectedData.length]; + assertEquals(1, in.read(buffer, 0, 1)); + assertEquals(expectedData[0], buffer[0]); + assertEquals(0, in.getPos()); + + IOUtils.readFully(in, buffer, 1, expectedData.length - 1); + assertArrayEquals(expectedData, buffer); + assertEquals(0, in.getPos()); + } + + private static void assertReadingPastEndOfBlock(SplitCompressionInputStream in, + long endOfBlockPos, byte[] expectedData) throws IOException { + byte[] buffer = new byte[expectedData.length]; + assertEquals(1, in.read(buffer)); + assertEquals(expectedData[0], buffer[0]); + assertEquals(endOfBlockPos + 1, in.getPos()); + + IOUtils.readFully(in, buffer, 1, expectedData.length - 1); + assertArrayEquals(expectedData, buffer); + assertEquals(endOfBlockPos + 1, in.getPos()); + } + + private static void assertRead(InputStream in, byte[] expectedData) throws IOException { + byte[] buffer = new byte[expectedData.length]; + IOUtils.readFully(in, buffer); + assertArrayEquals(expectedData, buffer); + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java index da6bedde13..1e06f12656 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java @@ -19,8 +19,10 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.StringJoiner; import org.junit.After; import org.junit.Before; @@ -34,6 +36,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE; +import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.junit.Assert.assertEquals; public abstract class BaseTestLineRecordReaderBZip2 { @@ -306,6 +309,8 @@ private void assertRecordCountsPerSplit( countAssert.assertSingleSplit(); countAssert.assertSplittingAtBlocks(); countAssert.assertSplittingJustAfterSecondBlockStarts(); + countAssert.assertSplittingEachBlockRangeInThreeParts(); + countAssert.assertSplitsAroundBlockStartOffsets(); } private class RecordCountAssert { @@ -334,16 +339,7 @@ private void assertSingleSplit() throws IOException { } private void assertSplittingAtBlocks() throws IOException { - for (int i = 0; i < numBlocks; i++) { - long start = i == 0 ? 0 : nextBlockOffsets.get(i - 1); - long end = i == numBlocks - 1 ? fileSize : nextBlockOffsets.get(i); - long length = end - start; - - String message = "At i=" + i; - long expectedCount = countsIfSplitAtBlocks[i]; - assertEquals( - message, expectedCount, reader.countRecords(start, length)); - } + assertSplits(getSplitsAtBlocks()); } private void assertSplittingJustAfterSecondBlockStarts() @@ -363,6 +359,123 @@ private void assertSplittingJustAfterSecondBlockStarts() remainingRecords, reader.countRecords(firstSplitSize, fileSize - firstSplitSize)); } + + private void assertSplittingEachBlockRangeInThreeParts() + throws IOException { + for (SplitRange splitRange : getSplitsAtBlocks()) { + long[] expectedNumRecordsPerPart = new long[] { + splitRange.expectedNumRecords, 0, 0 + }; + List parts = splitRange.divide(expectedNumRecordsPerPart); + assertSplits(parts); + } + } + + private void assertSplitsAroundBlockStartOffsets() + throws IOException { + for (SplitRange split : getSplitsAtBlocks()) { + assertSplit(split.withLength(1)); + if (split.start > 0) { + assertSplit(split.moveBy(-2).withLength(3)); + assertSplit(split.moveBy(-2).withLength(2).withExpectedNumRecords(0)); + assertSplit(split.moveBy(-1).withLength(2)); + assertSplit(split.moveBy(-1).withLength(1).withExpectedNumRecords(0)); + } + assertSplit(split.moveBy(1).withLength(1).withExpectedNumRecords(0)); + assertSplit(split.moveBy(2).withLength(1).withExpectedNumRecords(0)); + } + } + + private List getSplitsAtBlocks() { + List splits = new ArrayList<>(); + for (int i = 0; i < numBlocks; i++) { + String name = "Block" + i; + long start = i == 0 ? 0 : nextBlockOffsets.get(i - 1); + long end = i == numBlocks - 1 ? fileSize : nextBlockOffsets.get(i); + long length = end - start; + long expectedNumRecords = countsIfSplitAtBlocks[i]; + splits.add(new SplitRange(name, start, length, expectedNumRecords)); + } + return splits; + } + + private void assertSplits(Iterable splitRanges) + throws IOException { + for (SplitRange splitRange : splitRanges) { + assertSplit(splitRange); + } + } + + private void assertSplit(SplitRange splitRange) throws IOException { + String message = splitRange.toString(); + long actual = reader.countRecords(splitRange.start, splitRange.length); + assertEquals(message, splitRange.expectedNumRecords, actual); + } + } + + private static class SplitRange { + final private String name; + final private long start; + final private long length; + final private long expectedNumRecords; + + SplitRange( + String name, + long start, + long length, + long expectedNumRecords) { + this.name = name; + this.start = start; + this.length = length; + this.expectedNumRecords = expectedNumRecords; + } + + @Override + public String toString() { + return new StringJoiner(", ", SplitRange.class.getSimpleName() + "[", "]") + .add("name='" + name + "'") + .add("start=" + start) + .add("length=" + length) + .add("expectedNumRecords=" + expectedNumRecords) + .toString(); + } + + List divide(long[] expectedNumRecordsPerPart) { + int numParts = expectedNumRecordsPerPart.length; + checkArgument(numParts > 0); + + long minPartSize = length / numParts; + checkArgument(minPartSize > 0); + long lastPartExtraSize = length % numParts; + + List partRanges = new ArrayList<>(); + long partStart = start; + for (int i = 0; i < numParts; i++) { + String partName = name + "_Part" + i; + + long extraSize = i == numParts - 1 ? lastPartExtraSize : 0; + long partSize = minPartSize + extraSize; + + long partExpectedNumRecords = expectedNumRecordsPerPart[i]; + + partRanges.add(new SplitRange( + partName, partStart, partSize, partExpectedNumRecords)); + partStart += partSize; + } + return partRanges; + } + + SplitRange withLength(long newLength) { + return new SplitRange(name, start, newLength, expectedNumRecords); + } + + SplitRange withExpectedNumRecords(long newExpectedNumRecords) { + return new SplitRange(name, start, length, newExpectedNumRecords); + } + + SplitRange moveBy(long delta) { + return new SplitRange(name, start + delta, length, expectedNumRecords); + } } private long getFileSize(Path path) throws IOException {