diff --git a/CHANGES.txt b/CHANGES.txt index 042974eab3..ad726d1b37 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -187,6 +187,9 @@ Trunk (unreleased changes) HADOOP-5073. Add annotation mechanism for interface classification. (Jakob Homan via suresh) + HADOOP-4012. Provide splitting support for bzip2 compressed files. (Abdul + Qadeer via cdouglas) + IMPROVEMENTS HADOOP-4565. Added CombineFileInputFormat to use data locality information diff --git a/src/java/org/apache/hadoop/fs/FSInputChecker.java b/src/java/org/apache/hadoop/fs/FSInputChecker.java index a0db285a03..6de45d2c0d 100644 --- a/src/java/org/apache/hadoop/fs/FSInputChecker.java +++ b/src/java/org/apache/hadoop/fs/FSInputChecker.java @@ -296,12 +296,12 @@ static public long checksum2long(byte[] checksum) { @Override public synchronized long getPos() throws IOException { - return chunkPos-(count-pos); + return chunkPos-Math.max(0L, count - pos); } @Override public synchronized int available() throws IOException { - return count-pos; + return Math.max(0, count - pos); } /** diff --git a/src/java/org/apache/hadoop/io/compress/BZip2Codec.java b/src/java/org/apache/hadoop/io/compress/BZip2Codec.java index 84a51410b8..66db3290a4 100644 --- a/src/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/src/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -23,6 +23,9 @@ import java.io.InputStream; import java.io.OutputStream; + +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.compress.bzip2.BZip2Constants; import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor; import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor; import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; @@ -35,17 +38,17 @@ * CompressionCodec which have a Compressor or Decompressor type argument, throw * UnsupportedOperationException. */ -public class BZip2Codec implements - org.apache.hadoop.io.compress.CompressionCodec { +public class BZip2Codec implements SplittableCompressionCodec { private static final String HEADER = "BZ"; private static final int HEADER_LEN = HEADER.length(); + private static final String SUB_HEADER = "h9"; + private static final int SUB_HEADER_LEN = SUB_HEADER.length(); /** * Creates a new instance of BZip2Codec */ - public BZip2Codec() { - } + public BZip2Codec() { } /** * Creates CompressionOutputStream for BZip2 @@ -62,10 +65,10 @@ public CompressionOutputStream createOutputStream(OutputStream out) } /** - * This functionality is currently not supported. + * Creates a compressor using given OutputStream. * - * @throws java.lang.UnsupportedOperationException - * Throws UnsupportedOperationException + * @return CompressionOutputStream + @throws java.io.IOException */ public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException { @@ -75,8 +78,7 @@ public CompressionOutputStream createOutputStream(OutputStream out, /** * This functionality is currently not supported. * - * @throws java.lang.UnsupportedOperationException - * Throws UnsupportedOperationException + * @return BZip2DummyCompressor.class */ public Class extends org.apache.hadoop.io.compress.Compressor> getCompressorType() { return BZip2DummyCompressor.class; @@ -85,8 +87,7 @@ public Class extends org.apache.hadoop.io.compress.Compressor> getCompressorTy /** * This functionality is currently not supported. * - * @throws java.lang.UnsupportedOperationException - * Throws UnsupportedOperationException + * @return Compressor */ public Compressor createCompressor() { return new BZip2DummyCompressor(); @@ -109,19 +110,72 @@ public CompressionInputStream createInputStream(InputStream in) /** * This functionality is currently not supported. * - * @throws java.lang.UnsupportedOperationException - * Throws UnsupportedOperationException + * @return CompressionInputStream */ public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException { return createInputStream(in); } + /** + * Creates CompressionInputStream to be used to read off uncompressed data + * in one of the two reading modes. i.e. Continuous or Blocked reading modes + * + * @param seekableIn The InputStream + * @param start The start offset into the compressed stream + * @param end The end offset into the compressed stream + * @param readMode Controls whether progress is reported continuously or + * only at block boundaries. + * + * @return CompressionInputStream for BZip2 aligned at block boundaries + */ + public SplitCompressionInputStream createInputStream(InputStream seekableIn, + Decompressor decompressor, long start, long end, READ_MODE readMode) + throws IOException { + + if (!(seekableIn instanceof Seekable)) { + throw new IOException("seekableIn must be an instance of " + + Seekable.class.getName()); + } + + //find the position of first BZip2 start up marker + ((Seekable)seekableIn).seek(0); + + // BZip2 start of block markers are of 6 bytes. But the very first block + // also has "BZh9", making it 10 bytes. This is the common case. But at + // time stream might start without a leading BZ. + final long FIRST_BZIP2_BLOCK_MARKER_POSITION = + CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); + long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION); + + ((Seekable)seekableIn).seek(adjStart); + SplitCompressionInputStream in = + new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode); + + + // The following if clause handles the following case: + // Assume the following scenario in BZip2 compressed stream where + // . represent compressed data. + // .....[48 bit Block].....[48 bit Block].....[48 bit Block]... + // ........................[47 bits][1 bit].....[48 bit Block]... + // ................................^[Assume a Byte alignment here] + // ........................................^^[current position of stream] + // .....................^^[We go back 10 Bytes in stream and find a Block marker] + // ........................................^^[We align at wrong position!] + // ...........................................................^^[While this pos is correct] + + if (in.getPos() <= start) { + ((Seekable)seekableIn).seek(start); + in = new BZip2CompressionInputStream(seekableIn, start, end, readMode); + } + + return in; + } + /** * This functionality is currently not supported. * - * @throws java.lang.UnsupportedOperationException - * Throws UnsupportedOperationException + * @return BZip2DummyDecompressor.class */ public Class extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() { return BZip2DummyDecompressor.class; @@ -130,8 +184,7 @@ public Class extends org.apache.hadoop.io.compress.Decompressor> getDecompress /** * This functionality is currently not supported. * - * @throws java.lang.UnsupportedOperationException - * Throws UnsupportedOperationException + * @return Decompressor */ public Decompressor createDecompressor() { return new BZip2DummyDecompressor(); @@ -146,7 +199,8 @@ public String getDefaultExtension() { return ".bz2"; } - private static class BZip2CompressionOutputStream extends CompressionOutputStream { + private static class BZip2CompressionOutputStream extends + CompressionOutputStream { // class data starts here// private CBZip2OutputStream output; @@ -221,26 +275,79 @@ public void close() throws IOException { }// end of class BZip2CompressionOutputStream - private static class BZip2CompressionInputStream extends CompressionInputStream { + /** + * This class is capable to de-compress BZip2 data in two modes; + * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to + * do decompression starting any arbitrary position in the stream. + * + * So this facility can easily be used to parallelize decompression + * of a large BZip2 file for performance reasons. (It is exactly + * done so for Hadoop framework. See LineRecordReader for an + * example). So one can break the file (of course logically) into + * chunks for parallel processing. These "splits" should be like + * default Hadoop splits (e.g as in FileInputFormat getSplit metod). + * So this code is designed and tested for FileInputFormat's way + * of splitting only. + */ + + private static class BZip2CompressionInputStream extends + SplitCompressionInputStream { // class data starts here// private CBZip2InputStream input; boolean needsReset; + private BufferedInputStream bufferedIn; + private boolean isHeaderStripped = false; + private boolean isSubHeaderStripped = false; + private READ_MODE readMode = READ_MODE.CONTINUOUS; + private long startingPos = 0L; + + // Following state machine handles different states of compressed stream + // position + // HOLD : Don't advertise compressed stream position + // ADVERTISE : Read 1 more character and advertise stream position + // See more comments about it before updatePos method. + private enum POS_ADVERTISEMENT_STATE_MACHINE { + HOLD, ADVERTISE + }; + + POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; + long compressedStreamPosition = 0; + // class data ends here// public BZip2CompressionInputStream(InputStream in) throws IOException { + this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS); + } - super(in); - needsReset = true; + public BZip2CompressionInputStream(InputStream in, long start, long end, + READ_MODE readMode) throws IOException { + super(in, start, end); + needsReset = false; + bufferedIn = new BufferedInputStream(super.in); + this.startingPos = super.getPos(); + this.readMode = readMode; + if (this.startingPos == 0) { + // We only strip header if it is start of file + bufferedIn = readStreamHeader(); + } + input = new CBZip2InputStream(bufferedIn, readMode); + if (this.isHeaderStripped) { + input.updateReportedByteCount(HEADER_LEN); + } + + if (this.isSubHeaderStripped) { + input.updateReportedByteCount(SUB_HEADER_LEN); + } + + this.updatePos(false); } private BufferedInputStream readStreamHeader() throws IOException { // We are flexible enough to allow the compressed stream not to // start with the header of BZ. So it works fine either we have // the header or not. - BufferedInputStream bufferedIn = null; if (super.in != null) { - bufferedIn = new BufferedInputStream(super.in); bufferedIn.mark(HEADER_LEN); byte[] headerBytes = new byte[HEADER_LEN]; int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); @@ -248,6 +355,17 @@ private BufferedInputStream readStreamHeader() throws IOException { String header = new String(headerBytes); if (header.compareTo(HEADER) != 0) { bufferedIn.reset(); + } else { + this.isHeaderStripped = true; + // In case of BYBLOCK mode, we also want to strip off + // remaining two character of the header. + if (this.readMode == READ_MODE.BYBLOCK) { + actualRead = bufferedIn.read(headerBytes, 0, + SUB_HEADER_LEN); + if (actualRead != -1) { + this.isSubHeaderStripped = true; + } + } } } } @@ -267,33 +385,96 @@ public void close() throws IOException { } } + /** + * This method updates compressed stream position exactly when the + * client of this code has read off at least one byte passed any BZip2 + * end of block marker. + * + * This mechanism is very helpful to deal with data level record + * boundaries. Please see constructor and next methods of + * org.apache.hadoop.mapred.LineRecordReader as an example usage of this + * feature. We elaborate it with an example in the following: + * + * Assume two different scenarios of the BZip2 compressed stream, where + * [m] represent end of block, \n is line delimiter and . represent compressed + * data. + * + * ............[m]......\n....... + * + * ..........\n[m]......\n....... + * + * Assume that end is right after [m]. In the first case the reading + * will stop at \n and there is no need to read one more line. (To see the + * reason of reading one more line in the next() method is explained in LineRecordReader.) + * While in the second example LineRecordReader needs to read one more line + * (till the second \n). Now since BZip2Codecs only update position + * at least one byte passed a maker, so it is straight forward to differentiate + * between the two cases mentioned. + * + */ + public int read(byte[] b, int off, int len) throws IOException { if (needsReset) { internalReset(); } - return this.input.read(b, off, len); + int result = 0; + result = this.input.read(b, off, len); + if (result == BZip2Constants.END_OF_BLOCK) { + this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; + } + + if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { + result = this.input.read(b, off, off + 1); + // This is the precise time to update compressed stream position + // to the client of this code. + this.updatePos(true); + this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; + } + + return result; + + } + + public int read() throws IOException { + byte b[] = new byte[1]; + int result = this.read(b, 0, 1); + return (result < 0) ? result : b[0]; } private void internalReset() throws IOException { if (needsReset) { needsReset = false; BufferedInputStream bufferedIn = readStreamHeader(); - input = new CBZip2InputStream(bufferedIn); + input = new CBZip2InputStream(bufferedIn, this.readMode); } } public void resetState() throws IOException { - // Cannot read from bufferedIn at this point because bufferedIn might not be ready + // Cannot read from bufferedIn at this point because bufferedIn + // might not be ready // yet, as in SequenceFile.Reader implementation. needsReset = true; } - public int read() throws IOException { - if (needsReset) { - internalReset(); + public long getPos() { + return this.compressedStreamPosition; } - return this.input.read(); + + /* + * As the comments before read method tell that + * compressed stream is advertised when at least + * one byte passed EOB have been read off. But + * there is an exception to this rule. When we + * construct the stream we advertise the position + * exactly at EOB. In the following method + * shouldAddOn boolean captures this exception. + * + */ + private void updatePos(boolean shouldAddOn) { + int addOn = shouldAddOn ? 1 : 0; + this.compressedStreamPosition = this.startingPos + + this.input.getProcessedByteCount() + addOn; } }// end of BZip2CompressionInputStream diff --git a/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java b/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java index 96636e7a4f..bcdeb37ae2 100644 --- a/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java +++ b/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java @@ -38,9 +38,10 @@ public class BlockDecompressorStream extends DecompressorStream { * @param in input stream * @param decompressor decompressor to use * @param bufferSize size of buffer + * @throws IOException */ public BlockDecompressorStream(InputStream in, Decompressor decompressor, - int bufferSize) { + int bufferSize) throws IOException { super(in, decompressor, bufferSize); } @@ -49,12 +50,13 @@ public BlockDecompressorStream(InputStream in, Decompressor decompressor, * * @param in input stream * @param decompressor decompressor to use + * @throws IOException */ - public BlockDecompressorStream(InputStream in, Decompressor decompressor) { + public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException { super(in, decompressor); } - protected BlockDecompressorStream(InputStream in) { + protected BlockDecompressorStream(InputStream in) throws IOException { super(in); } diff --git a/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java b/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java index aabdd2b5e4..0b7f9bbce6 100644 --- a/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java +++ b/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; /** * A compression input stream. * @@ -28,19 +30,25 @@ * reposition the underlying input stream then call {@link #resetState()}, * without having to also synchronize client buffers. */ -public abstract class CompressionInputStream extends InputStream { + +public abstract class CompressionInputStream extends InputStream implements Seekable { /** * The input stream to be compressed. */ protected final InputStream in; + protected long maxAvailableData = 0L; /** * Create a compression input stream that reads * the decompressed bytes from the given stream. * * @param in The input stream to be compressed. + * @throws IOException */ - protected CompressionInputStream(InputStream in) { + protected CompressionInputStream(InputStream in) throws IOException { + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { + this.maxAvailableData = in.available(); + } this.in = in; } @@ -60,4 +68,40 @@ public void close() throws IOException { */ public abstract void resetState() throws IOException; + /** + * This method returns the current position in the stream. + * + * @return Current position in stream as a long + */ + public long getPos() throws IOException { + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){ + //This way of getting the current position will not work for file + //size which can be fit in an int and hence can not be returned by + //available method. + return (this.maxAvailableData - this.in.available()); + } + else{ + return ((Seekable)this.in).getPos(); + } + + } + + /** + * This method is current not supported. + * + * @throws UnsupportedOperationException + */ + + public void seek(long pos) throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + /** + * This method is current not supported. + * + * @throws UnsupportedOperationException + */ + public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } } diff --git a/src/java/org/apache/hadoop/io/compress/DecompressorStream.java b/src/java/org/apache/hadoop/io/compress/DecompressorStream.java index a84bea443e..6e8a8d5c5a 100644 --- a/src/java/org/apache/hadoop/io/compress/DecompressorStream.java +++ b/src/java/org/apache/hadoop/io/compress/DecompressorStream.java @@ -30,7 +30,7 @@ public class DecompressorStream extends CompressionInputStream { protected boolean eof = false; protected boolean closed = false; - public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) { + public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) throws IOException { super(in); if (in == null || decompressor == null) { @@ -43,7 +43,7 @@ public DecompressorStream(InputStream in, Decompressor decompressor, int bufferS buffer = new byte[bufferSize]; } - public DecompressorStream(InputStream in, Decompressor decompressor) { + public DecompressorStream(InputStream in, Decompressor decompressor) throws IOException { this(in, decompressor, 512); } @@ -51,8 +51,9 @@ public DecompressorStream(InputStream in, Decompressor decompressor) { * Allow derived classes to directly set the underlying stream. * * @param in Underlying input stream. + * @throws IOException */ - protected DecompressorStream(InputStream in) { + protected DecompressorStream(InputStream in) throws IOException { super(in); } diff --git a/src/java/org/apache/hadoop/io/compress/GzipCodec.java b/src/java/org/apache/hadoop/io/compress/GzipCodec.java index 674dce280f..f7bb792488 100644 --- a/src/java/org/apache/hadoop/io/compress/GzipCodec.java +++ b/src/java/org/apache/hadoop/io/compress/GzipCodec.java @@ -103,8 +103,9 @@ public GzipInputStream(InputStream in) throws IOException { /** * Allow subclasses to directly set the inflater stream. + * @throws IOException */ - protected GzipInputStream(DecompressorStream in) { + protected GzipInputStream(DecompressorStream in) throws IOException { super(in); } diff --git a/src/java/org/apache/hadoop/io/compress/SplitCompressionInputStream.java b/src/java/org/apache/hadoop/io/compress/SplitCompressionInputStream.java new file mode 100644 index 0000000000..96d7664469 --- /dev/null +++ b/src/java/org/apache/hadoop/io/compress/SplitCompressionInputStream.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An InputStream covering a range of compressed data. The start and end + * offsets requested by a client may be modified by the codec to fit block + * boundaries or other algorithm-dependent requirements. + */ +public abstract class SplitCompressionInputStream + extends CompressionInputStream { + + private long start; + private long end; + + public SplitCompressionInputStream(InputStream in, long start, long end) + throws IOException { + super(in); + this.start = start; + this.end = end; + } + + protected void setStart(long start) { + this.start = start; + } + + protected void setEnd(long end) { + this.end = end; + } + + /** + * After calling createInputStream, the values of start or end + * might change. So this method can be used to get the new value of start. + * @return The changed value of start + */ + public long getAdjustedStart() { + return start; + } + + /** + * After calling createInputStream, the values of start or end + * might change. So this method can be used to get the new value of end. + * @return The changed value of end + */ + public long getAdjustedEnd() { + return end; + } +} diff --git a/src/java/org/apache/hadoop/io/compress/SplittableCompressionCodec.java b/src/java/org/apache/hadoop/io/compress/SplittableCompressionCodec.java new file mode 100644 index 0000000000..a48c3a8313 --- /dev/null +++ b/src/java/org/apache/hadoop/io/compress/SplittableCompressionCodec.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; + + +/** + * This interface is meant to be implemented by those compression codecs + * which are capable to compress / de-compress a stream starting at any + * arbitrary position. + * + * Especially the process of de-compressing a stream starting at some arbitrary + * position is challenging. Most of the codecs are only able to successfully + * de-compress a stream, if they start from the very beginning till the end. + * One of the reasons is the stored state at the beginning of the stream which + * is crucial for de-compression. + * + * Yet there are few codecs which do not save the whole state at the beginning + * of the stream and hence can be used to de-compress stream starting at any + * arbitrary points. This interface is meant to be used by such codecs. Such + * codecs are highly valuable, especially in the context of Hadoop, because + * an input compressed file can be split and hence can be worked on by multiple + * machines in parallel. + */ +public interface SplittableCompressionCodec extends CompressionCodec { + + /** + * During decompression, data can be read off from the decompressor in two + * modes, namely continuous and blocked. Few codecs (e.g. BZip2) are capable + * of compressing data in blocks and then decompressing the blocks. In + * Blocked reading mode codecs inform 'end of block' events to its caller. + * While in continuous mode, the caller of codecs is unaware about the blocks + * and uncompressed data is spilled out like a continuous stream. + */ + public enum READ_MODE {CONTINUOUS, BYBLOCK}; + + /** + * Create a stream as dictated by the readMode. This method is used when + * the codecs wants the ability to work with the underlying stream positions. + * + * @param seekableIn The seekable input stream (seeks in compressed data) + * @param start The start offset into the compressed stream. May be changed + * by the underlying codec. + * @param end The end offset into the compressed stream. May be changed by + * the underlying codec. + * @param readMode Controls whether stream position is reported continuously + * from the compressed stream only only at block boundaries. + * @return a stream to read uncompressed bytes from + */ + SplitCompressionInputStream createInputStream(InputStream seekableIn, + Decompressor decompressor, long start, long end, READ_MODE readMode) + throws IOException; + +} diff --git a/src/java/org/apache/hadoop/io/compress/bzip2/BZip2Constants.java b/src/java/org/apache/hadoop/io/compress/bzip2/BZip2Constants.java index 99dc28146d..cfefb9903c 100644 --- a/src/java/org/apache/hadoop/io/compress/bzip2/BZip2Constants.java +++ b/src/java/org/apache/hadoop/io/compress/bzip2/BZip2Constants.java @@ -44,6 +44,14 @@ public interface BZip2Constants { int N_ITERS = 4; int MAX_SELECTORS = (2 + (900000 / G_SIZE)); int NUM_OVERSHOOT_BYTES = 20; + /** + * End of a BZip2 block + */ + public static final int END_OF_BLOCK = -2; + /** + * End of BZip2 stream. + */ + public static final int END_OF_STREAM = -1; /** * This array really shouldn't be here. Again, for historical purposes it diff --git a/src/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java b/src/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java index 567cb5efd3..14cc9d5b82 100644 --- a/src/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java +++ b/src/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java @@ -23,9 +23,13 @@ */ package org.apache.hadoop.io.compress.bzip2; +import java.io.BufferedInputStream; import java.io.InputStream; import java.io.IOException; +import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE; + + /** * An input stream that decompresses from the BZip2 format (without the file * header chars) to be read as any other stream. @@ -45,30 +49,43 @@ *
* *+ * This Ant code was enhanced so that it can de-compress blocks of bzip2 data. + * Current position in the stream is an important statistic for Hadoop. For + * example in LineRecordReader, we solely depend on the current position in the + * stream to know about the progess. The notion of position becomes complicated + * for compressed files. The Hadoop splitting is done in terms of compressed + * file. But a compressed file deflates to a large amount of data. So we have + * handled this problem in the following way. + * + * On object creation time, we find the next block start delimiter. Once such a + * marker is found, the stream stops there (we discard any read compressed data + * in this process) and the position is updated (i.e. the caller of this class + * will find out the stream location). At this point we are ready for actual + * reading (i.e. decompression) of data. + * + * The subsequent read calls give out data. The position is updated when the + * caller of this class has read off the current block + 1 bytes. In between the + * block reading, position is not updated. (We can only update the postion on + * block boundaries). + *
+ * + ** Instances of this class are not threadsafe. *
*/ public class CBZip2InputStream extends InputStream implements BZip2Constants { - private static void reportCRCError() throws IOException { - throw new IOException("BZip2 CRC error"); - - } - - private void makeMaps() { - final boolean[] inUse = this.data.inUse; - final byte[] seqToUnseq = this.data.seqToUnseq; - - int nInUseShadow = 0; - - for (int i = 0; i < 256; i++) { - if (inUse[i]) - seqToUnseq[nInUseShadow++] = (byte) i; - } - - this.nInUse = nInUseShadow; - } + public static final long BLOCK_DELIMITER = 0X314159265359L;// start of block + public static final long EOS_DELIMITER = 0X177245385090L;// end of bzip2 stream + private static final int DELIMITER_BIT_LENGTH = 48; + READ_MODE readMode = READ_MODE.CONTINUOUS; + // The variable records the current advertised position of the stream. + private long reportedBytesReadFromCompressedStream = 0L; + // The following variable keep record of compressed bytes read. + private long bytesReadFromCompressedStream = 0L; + private boolean lazyInitialization = false; + private byte array[] = new byte[1]; /** * Index of the last char in the block, so the block size == last + 1. @@ -86,32 +103,34 @@ private void makeMaps() { */ private int blockSize100k; - private boolean blockRandomised; + private boolean blockRandomised = false; - private int bsBuff; - private int bsLive; + private long bsBuff; + private long bsLive; private final CRC crc = new CRC(); private int nInUse; - private InputStream in; + private BufferedInputStream in; private int currentChar = -1; - private static final int EOF = 0; - private static final int START_BLOCK_STATE = 1; - private static final int RAND_PART_A_STATE = 2; - private static final int RAND_PART_B_STATE = 3; - private static final int RAND_PART_C_STATE = 4; - private static final int NO_RAND_PART_A_STATE = 5; - private static final int NO_RAND_PART_B_STATE = 6; - private static final int NO_RAND_PART_C_STATE = 7; + /** + * A state machine to keep track of current state of the de-coder + * + */ + public enum STATE { + EOF, START_BLOCK_STATE, RAND_PART_A_STATE, RAND_PART_B_STATE, RAND_PART_C_STATE, NO_RAND_PART_A_STATE, NO_RAND_PART_B_STATE, NO_RAND_PART_C_STATE, NO_PROCESS_STATE + }; - private int currentState = START_BLOCK_STATE; + private STATE currentState = STATE.START_BLOCK_STATE; private int storedBlockCRC, storedCombinedCRC; private int computedBlockCRC, computedCombinedCRC; + private boolean skipResult = false;// used by skipToNextMarker + private static boolean skipDecompression = false; + // Variables used by setup* methods exclusively private int su_count; @@ -129,6 +148,121 @@ private void makeMaps() { */ private CBZip2InputStream.Data data; + /** + * This method reports the processed bytes so far. Please note that this + * statistic is only updated on block boundaries and only when the stream is + * initiated in BYBLOCK mode. + */ + public long getProcessedByteCount() { + return reportedBytesReadFromCompressedStream; + } + + /** + * This method keeps track of raw processed compressed + * bytes. + * + * @param count count is the number of bytes to be + * added to raw processed bytes + */ + + protected void updateProcessedByteCount(int count) { + this.bytesReadFromCompressedStream += count; + } + + /** + * This method is called by the client of this + * class in case there are any corrections in + * the stream position. One common example is + * when client of this code removes starting BZ + * characters from the compressed stream. + * + * @param count count bytes are added to the reported bytes + * + */ + public void updateReportedByteCount(int count) { + this.reportedBytesReadFromCompressedStream += count; + this.updateProcessedByteCount(count); + } + + /** + * This method reads a Byte from the compressed stream. Whenever we need to + * read from the underlying compressed stream, this method should be called + * instead of directly calling the read method of the underlying compressed + * stream. This method does important record keeping to have the statistic + * that how many bytes have been read off the compressed stream. + */ + private int readAByte(InputStream inStream) throws IOException { + int read = inStream.read(); + if (read >= 0) { + this.updateProcessedByteCount(1); + } + return read; + } + + /** + * This method tries to find the marker (passed to it as the first parameter) + * in the stream. It can find bit patterns of length <= 63 bits. Specifically + * this method is used in CBZip2InputStream to find the end of block (EOB) + * delimiter in the stream, starting from the current position of the stream. + * If marker is found, the stream position will be right after marker at the + * end of this call. + * + * @param marker The bit pattern to be found in the stream + * @param markerBitLength No of bits in the marker + * + * @throws IOException + * @throws IllegalArgumentException if marketBitLength is greater than 63 + */ + public boolean skipToNextMarker(long marker, int markerBitLength) + throws IOException, IllegalArgumentException { + try { + if (markerBitLength > 63) { + throw new IllegalArgumentException( + "skipToNextMarker can not find patterns greater than 63 bits"); + } + // pick next marketBitLength bits in the stream + long bytes = 0; + bytes = this.bsR(markerBitLength); + if (bytes == -1) { + return false; + } + while (true) { + if (bytes == marker) { + return true; + + } else { + bytes = bytes << 1; + bytes = bytes & ((1L << markerBitLength) - 1); + int oneBit = (int) this.bsR(1); + if (oneBit != -1) { + bytes = bytes | oneBit; + } else + return false; + } + } + } catch (IOException ex) { + return false; + } + } + + protected void reportCRCError() throws IOException { + throw new IOException("crc error"); + } + + private void makeMaps() { + final boolean[] inUse = this.data.inUse; + final byte[] seqToUnseq = this.data.seqToUnseq; + + int nInUseShadow = 0; + + for (int i = 0; i < 256; i++) { + if (inUse[i]) + seqToUnseq[nInUseShadow++] = (byte) i; + } + + this.nInUse = nInUseShadow; + } + /** * Constructs a new CBZip2InputStream which decompresses bytes read from the * specified stream. @@ -145,21 +279,99 @@ private void makeMaps() { * @throws NullPointerException * if in == null */ - public CBZip2InputStream(final InputStream in) throws IOException { - super(); + public CBZip2InputStream(final InputStream in, READ_MODE readMode) + throws IOException { - this.in = in; + super(); + int blockSize = 0X39;// i.e 9 + this.blockSize100k = blockSize - '0'; + this.in = new BufferedInputStream(in, 1024 * 9);// >1 MB buffer + this.readMode = readMode; + if (readMode == READ_MODE.CONTINUOUS) { + currentState = STATE.START_BLOCK_STATE; + lazyInitialization = (in.available() == 0)?true:false; + if(!lazyInitialization){ init(); } + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH); + this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream; + if(!skipDecompression){ + changeStateToProcessABlock(); + } + } + } + + /** + * Returns the number of bytes between the current stream position + * and the immediate next BZip2 block marker. + * + * @param in + * The InputStream + * + * @return long Number of bytes between current stream position and the + * next BZip2 block start marker. + * @throws IOException + * + */ + public static long numberOfBytesTillNextMarker(final InputStream in) throws IOException{ + CBZip2InputStream.skipDecompression = true; + CBZip2InputStream anObject = null; + + anObject = new CBZip2InputStream(in, READ_MODE.BYBLOCK); + + return anObject.getProcessedByteCount(); + } + + public CBZip2InputStream(final InputStream in) throws IOException { + this(in, READ_MODE.CONTINUOUS); + } + + private void changeStateToProcessABlock() throws IOException { + if (skipResult == true) { + initBlock(); + setupBlock(); + } else { + this.currentState = STATE.EOF; + } + } + public int read() throws IOException { + if (this.in != null) { - return read0(); + int result = this.read(array, 0, 1); + int value = 0XFF & array[0]; + return (result > 0 ? value : result); + } else { throw new IOException("stream closed"); } } + /** + * In CONTINOUS reading mode, this read method starts from the + * start of the compressed stream and end at the end of file by + * emitting un-compressed data. In this mode stream positioning + * is not announced and should be ignored. + * + * In BYBLOCK reading mode, this read method informs about the end + * of a BZip2 block by returning EOB. At this event, the compressed + * stream position is also announced. This announcement tells that + * how much of the compressed stream has been de-compressed and read + * out of this class. In between EOB events, the stream position is + * not updated. + * + * + * @throws IOException + * if the stream content is malformed or an I/O error occurs. + * + * @return int The return value greater than 0 are the bytes read. A value + * of -1 means end of stream while -2 represents end of block + */ + + public int read(final byte[] dest, final int offs, final int len) throws IOException { if (offs < 0) { @@ -176,13 +388,39 @@ public int read(final byte[] dest, final int offs, final int len) throw new IOException("stream closed"); } - final int hi = offs + len; - int destOffs = offs; - for (int b; (destOffs < hi) && ((b = read0()) >= 0);) { - dest[destOffs++] = (byte) b; + if(lazyInitialization){ + this.init(); + this.lazyInitialization = false; } - return (destOffs == offs) ? -1 : (destOffs - offs); + if(skipDecompression){ + changeStateToProcessABlock(); + CBZip2InputStream.skipDecompression = false; + } + + final int hi = offs + len; + int destOffs = offs; + int b = 0; + + + + for (; ((destOffs < hi) && ((b = read0())) >= 0);) { + dest[destOffs++] = (byte) b; + + } + + int result = destOffs - offs; + if (result == 0) { + //report 'end of block' or 'end of stream' + result = b; + + skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH); + //Exactly when we are about to start a new block, we advertise the stream position. + this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream; + + changeStateToProcessABlock(); + } + return result; } private int read0() throws IOException { @@ -190,7 +428,10 @@ private int read0() throws IOException { switch (this.currentState) { case EOF: - return -1; + return END_OF_STREAM;// return -1 + + case NO_PROCESS_STATE: + return END_OF_BLOCK;// return -2 case START_BLOCK_STATE: throw new IllegalStateException(); @@ -225,13 +466,13 @@ private int read0() throws IOException { } private void init() throws IOException { - int magic2 = this.in.read(); + int magic2 = this.readAByte(in); if (magic2 != 'h') { throw new IOException("Stream is not BZip2 formatted: expected 'h'" + " as first byte but got '" + (char) magic2 + "'"); } - int blockSize = this.in.read(); + int blockSize = this.readAByte(in); if ((blockSize < '1') || (blockSize > '9')) { throw new IOException("Stream is not BZip2 formatted: illegal " + "blocksize " + (char) blockSize); @@ -244,6 +485,27 @@ private void init() throws IOException { } private void initBlock() throws IOException { + if (this.readMode == READ_MODE.BYBLOCK) { + // this.checkBlockIntegrity(); + this.storedBlockCRC = bsGetInt(); + this.blockRandomised = bsR(1) == 1; + + /** + * Allocate data here instead in constructor, so we do not allocate + * it if the input file is empty. + */ + if (this.data == null) { + this.data = new Data(this.blockSize100k); + } + + // currBlockNo++; + getAndMoveToFrontDecode(); + + this.crc.initialiseCRC(); + this.currentState = STATE.START_BLOCK_STATE; + return; + } + char magic0 = bsGetUByte(); char magic1 = bsGetUByte(); char magic2 = bsGetUByte(); @@ -261,7 +523,7 @@ private void initBlock() throws IOException { magic4 != 0x53 || // 'S' magic5 != 0x59 // 'Y' ) { - this.currentState = EOF; + this.currentState = STATE.EOF; throw new IOException("bad block header"); } else { this.storedBlockCRC = bsGetInt(); @@ -279,7 +541,7 @@ private void initBlock() throws IOException { getAndMoveToFrontDecode(); this.crc.initialiseCRC(); - this.currentState = START_BLOCK_STATE; + this.currentState = STATE.START_BLOCK_STATE; } } @@ -304,7 +566,7 @@ private void endBlock() throws IOException { private void complete() throws IOException { this.storedCombinedCRC = bsGetInt(); - this.currentState = EOF; + this.currentState = STATE.EOF; this.data = null; if (this.storedCombinedCRC != this.computedCombinedCRC) { @@ -326,14 +588,14 @@ public void close() throws IOException { } } - private int bsR(final int n) throws IOException { - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + private long bsR(final long n) throws IOException { + long bsLiveShadow = this.bsLive; + long bsBuffShadow = this.bsBuff; if (bsLiveShadow < n) { final InputStream inShadow = this.in; do { - int thech = inShadow.read(); + int thech = readAByte(inShadow); if (thech < 0) { throw new IOException("unexpected end of stream"); @@ -347,15 +609,15 @@ private int bsR(final int n) throws IOException { } this.bsLive = bsLiveShadow - n; - return (bsBuffShadow >> (bsLiveShadow - n)) & ((1 << n) - 1); + return (bsBuffShadow >> (bsLiveShadow - n)) & ((1L << n) - 1); } private boolean bsGetBit() throws IOException { - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + long bsLiveShadow = this.bsLive; + long bsBuffShadow = this.bsBuff; if (bsLiveShadow < 1) { - int thech = this.in.read(); + int thech = this.readAByte(in); if (thech < 0) { throw new IOException("unexpected end of stream"); @@ -375,7 +637,7 @@ private char bsGetUByte() throws IOException { } private int bsGetInt() throws IOException { - return (((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8); + return (int) ((((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8)); } /** @@ -454,8 +716,8 @@ private void recvDecodingTables() throws IOException { final int alphaSize = this.nInUse + 2; /* Now the selectors */ - final int nGroups = bsR(3); - final int nSelectors = bsR(15); + final int nGroups = (int) bsR(3); + final int nSelectors = (int) bsR(15); for (int i = 0; i < nSelectors; i++) { int j = 0; @@ -486,7 +748,7 @@ private void recvDecodingTables() throws IOException { /* Now the coding tables */ for (int t = 0; t < nGroups; t++) { - int curr = bsR(5); + int curr = (int) bsR(5); final char[] len_t = len[t]; for (int i = 0; i < alphaSize; i++) { while (bsGetBit()) { @@ -532,7 +794,7 @@ private void createHuffmanDecodingTables(final int alphaSize, } private void getAndMoveToFrontDecode() throws IOException { - this.origPtr = bsR(24); + this.origPtr = (int) bsR(24); recvDecodingTables(); final InputStream inShadow = this.in; @@ -562,8 +824,8 @@ private void getAndMoveToFrontDecode() throws IOException { int groupPos = G_SIZE - 1; final int eob = this.nInUse + 1; int nextSym = getAndMoveToFrontDecode0(0); - int bsBuffShadow = this.bsBuff; - int bsLiveShadow = this.bsLive; + int bsBuffShadow = (int) this.bsBuff; + int bsLiveShadow = (int) this.bsLive; int lastShadow = -1; int zt = selector[groupNo] & 0xff; int[] base_zt = base[zt]; @@ -597,10 +859,8 @@ private void getAndMoveToFrontDecode() throws IOException { int zn = minLens_zt; - // Inlined: - // int zvec = bsR(zn); while (bsLiveShadow < zn) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -609,14 +869,14 @@ private void getAndMoveToFrontDecode() throws IOException { throw new IOException("unexpected end of stream"); } } - int zvec = (bsBuffShadow >> (bsLiveShadow - zn)) + long zvec = (bsBuffShadow >> (bsLiveShadow - zn)) & ((1 << zn) - 1); bsLiveShadow -= zn; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -630,7 +890,7 @@ private void getAndMoveToFrontDecode() throws IOException { zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1); } - nextSym = perm_zt[zvec - base_zt[zn]]; + nextSym = perm_zt[(int) (zvec - base_zt[zn])]; } final byte ch = seqToUnseq[yy[0]]; @@ -680,10 +940,8 @@ private void getAndMoveToFrontDecode() throws IOException { int zn = minLens_zt; - // Inlined: - // int zvec = bsR(zn); while (bsLiveShadow < zn) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -699,7 +957,7 @@ private void getAndMoveToFrontDecode() throws IOException { while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -709,7 +967,7 @@ private void getAndMoveToFrontDecode() throws IOException { } } bsLiveShadow--; - zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1); + zvec = ((zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1)); } nextSym = perm_zt[zvec - base_zt[zn]]; } @@ -726,14 +984,14 @@ private int getAndMoveToFrontDecode0(final int groupNo) throws IOException { final int zt = dataShadow.selector[groupNo] & 0xff; final int[] limit_zt = dataShadow.limit[zt]; int zn = dataShadow.minLens[zt]; - int zvec = bsR(zn); - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + int zvec = (int) bsR(zn); + int bsLiveShadow = (int) this.bsLive; + int bsBuffShadow = (int) this.bsBuff; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; @@ -807,12 +1065,16 @@ private void setupRandPartA() throws IOException { this.su_ch2 = su_ch2Shadow ^= (this.su_rNToGo == 1) ? 1 : 0; this.su_i2++; this.currentChar = su_ch2Shadow; - this.currentState = RAND_PART_B_STATE; + this.currentState = STATE.RAND_PART_B_STATE; this.crc.updateCRC(su_ch2Shadow); } else { endBlock(); + if (readMode == READ_MODE.CONTINUOUS) { initBlock(); setupBlock(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + } } } @@ -824,19 +1086,23 @@ private void setupNoRandPartA() throws IOException { this.su_tPos = this.data.tt[this.su_tPos]; this.su_i2++; this.currentChar = su_ch2Shadow; - this.currentState = NO_RAND_PART_B_STATE; + this.currentState = STATE.NO_RAND_PART_B_STATE; this.crc.updateCRC(su_ch2Shadow); } else { - this.currentState = NO_RAND_PART_A_STATE; + this.currentState = STATE.NO_RAND_PART_A_STATE; endBlock(); + if (readMode == READ_MODE.CONTINUOUS) { initBlock(); setupBlock(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + } } } private void setupRandPartB() throws IOException { if (this.su_ch2 != this.su_chPrev) { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; this.su_count = 1; setupRandPartA(); } else if (++this.su_count >= 4) { @@ -851,13 +1117,13 @@ private void setupRandPartB() throws IOException { this.su_rNToGo--; } this.su_j2 = 0; - this.currentState = RAND_PART_C_STATE; + this.currentState = STATE.RAND_PART_C_STATE; if (this.su_rNToGo == 1) { this.su_z ^= 1; } setupRandPartC(); } else { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; setupRandPartA(); } } @@ -868,7 +1134,7 @@ private void setupRandPartC() throws IOException { this.crc.updateCRC(this.su_ch2); this.su_j2++; } else { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; this.su_i2++; this.su_count = 0; setupRandPartA(); @@ -895,7 +1161,7 @@ private void setupNoRandPartC() throws IOException { this.currentChar = su_ch2Shadow; this.crc.updateCRC(su_ch2Shadow); this.su_j2++; - this.currentState = NO_RAND_PART_C_STATE; + this.currentState = STATE.NO_RAND_PART_C_STATE; } else { this.su_i2++; this.su_count = 0; diff --git a/src/test/core/org/apache/hadoop/io/compress/TestCodec.java b/src/test/core/org/apache/hadoop/io/compress/TestCodec.java index 38e4a35837..8df4cf34e1 100644 --- a/src/test/core/org/apache/hadoop/io/compress/TestCodec.java +++ b/src/test/core/org/apache/hadoop/io/compress/TestCodec.java @@ -19,31 +19,38 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.Random; -import junit.framework.TestCase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RandomDatum; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.zlib.ZlibFactory; +import org.apache.hadoop.util.LineReader; +import org.apache.hadoop.util.ReflectionUtils; -public class TestCodec extends TestCase { +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestCodec { private static final Log LOG= LogFactory.getLog(TestCodec.class); @@ -51,17 +58,20 @@ public class TestCodec extends TestCase { private Configuration conf = new Configuration(); private int count = 10000; private int seed = new Random().nextInt(); - + + @Test public void testDefaultCodec() throws IOException { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DefaultCodec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DefaultCodec"); } - + + @Test public void testGzipCodec() throws IOException { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec"); } - + + @Test public void testBZip2Codec() throws IOException { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec"); @@ -133,6 +143,109 @@ private static void codecTest(Configuration conf, int seed, int count, LOG.info("SUCCESS! Completed checking " + count + " records"); } + @Test + public void testSplitableCodecs() throws Exception { + testSplitableCodec(BZip2Codec.class); + } + + private void testSplitableCodec( + Class extends SplittableCompressionCodec> codecClass) + throws IOException { + final long DEFLBYTES = 2 * 1024 * 1024; + final Configuration conf = new Configuration(); + final Random rand = new Random(); + final long seed = rand.nextLong(); + LOG.info("seed: " + seed); + rand.setSeed(seed); + SplittableCompressionCodec codec = + ReflectionUtils.newInstance(codecClass, conf); + final FileSystem fs = FileSystem.getLocal(conf); + final FileStatus infile = + fs.getFileStatus(writeSplitTestFile(fs, rand, codec, DEFLBYTES)); + if (infile.getLen() > Integer.MAX_VALUE) { + fail("Unexpected compression: " + DEFLBYTES + " -> " + infile.getLen()); + } + final int flen = (int) infile.getLen(); + final Text line = new Text(); + final Decompressor dcmp = CodecPool.getDecompressor(codec); + try { + for (int pos = 0; pos < infile.getLen(); pos += rand.nextInt(flen / 8)) { + // read from random positions, verifying that there exist two sequential + // lines as written in writeSplitTestFile + final SplitCompressionInputStream in = + codec.createInputStream(fs.open(infile.getPath()), dcmp, + pos, flen, SplittableCompressionCodec.READ_MODE.BYBLOCK); + if (in.getAdjustedStart() >= flen) { + break; + } + LOG.info("SAMPLE " + in.getAdjustedStart() + "," + in.getAdjustedEnd()); + final LineReader lreader = new LineReader(in); + lreader.readLine(line); // ignore; likely partial + if (in.getPos() >= flen) { + break; + } + lreader.readLine(line); + final int seq1 = readLeadingInt(line); + lreader.readLine(line); + if (in.getPos() >= flen) { + break; + } + final int seq2 = readLeadingInt(line); + assertEquals("Mismatched lines", seq1 + 1, seq2); + } + } finally { + CodecPool.returnDecompressor(dcmp); + } + // remove on success + fs.delete(infile.getPath().getParent(), true); + } + + private static int readLeadingInt(Text txt) throws IOException { + DataInputStream in = + new DataInputStream(new ByteArrayInputStream(txt.getBytes())); + return in.readInt(); + } + + /** Write infLen bytes (deflated) to file in test dir using codec. + * Records are of the form + * <i><b64 rand><i+i><b64 rand> + */ + private static Path writeSplitTestFile(FileSystem fs, Random rand, + CompressionCodec codec, long infLen) throws IOException { + final int REC_SIZE = 1024; + final Path wd = new Path(new Path( + System.getProperty("test.build.data", "/tmp")).makeQualified(fs), + codec.getClass().getSimpleName()); + final Path file = new Path(wd, "test" + codec.getDefaultExtension()); + final byte[] b = new byte[REC_SIZE]; + final Base64 b64 = new Base64(); + DataOutputStream fout = null; + Compressor cmp = CodecPool.getCompressor(codec); + try { + fout = new DataOutputStream(codec.createOutputStream( + fs.create(file, true), cmp)); + final DataOutputBuffer dob = new DataOutputBuffer(REC_SIZE * 4 / 3 + 4); + int seq = 0; + while (infLen > 0) { + rand.nextBytes(b); + final byte[] b64enc = b64.encode(b); // ensures rand printable, no LF + dob.reset(); + dob.writeInt(seq); + System.arraycopy(dob.getData(), 0, b64enc, 0, dob.getLength()); + fout.write(b64enc); + fout.write('\n'); + ++seq; + infLen -= b64enc.length; + } + LOG.info("Wrote " + seq + " records to " + file); + } finally { + IOUtils.cleanup(LOG, fout); + CodecPool.returnCompressor(cmp); + } + return file; + } + + @Test public void testCodecPoolGzipReuse() throws Exception { Configuration conf = new Configuration(); conf.setBoolean("hadoop.native.lib", true); @@ -149,19 +262,21 @@ public void testCodecPoolGzipReuse() throws Exception { assertTrue("Got mismatched ZlibCompressor", c2 != CodecPool.getCompressor(gzc)); } - public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException, + @Test + public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.DefaultCodec", 100); sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000); } - - public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException, + + @Test + public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { sequenceFileCodecTest(conf, 0, "org.apache.hadoop.io.compress.BZip2Codec", 100); sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.BZip2Codec", 100); sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000); } - + private static void sequenceFileCodecTest(Configuration conf, int lines, String codecClass, int blockSize) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { @@ -242,8 +357,4 @@ public static void main(String[] args) { } - public TestCodec(String name) { - super(name); - } - }