From 86724941c5144a06113090d40c50c289e7ebd290 Mon Sep 17 00:00:00 2001
From: Christopher Douglas
+ * This Ant code was enhanced so that it can de-compress blocks of bzip2 data. + * Current position in the stream is an important statistic for Hadoop. For + * example in LineRecordReader, we solely depend on the current position in the + * stream to know about the progess. The notion of position becomes complicated + * for compressed files. The Hadoop splitting is done in terms of compressed + * file. But a compressed file deflates to a large amount of data. So we have + * handled this problem in the following way. + * + * On object creation time, we find the next block start delimiter. Once such a + * marker is found, the stream stops there (we discard any read compressed data + * in this process) and the position is updated (i.e. the caller of this class + * will find out the stream location). At this point we are ready for actual + * reading (i.e. decompression) of data. + * + * The subsequent read calls give out data. The position is updated when the + * caller of this class has read off the current block + 1 bytes. In between the + * block reading, position is not updated. (We can only update the postion on + * block boundaries). + *
+ * + ** Instances of this class are not threadsafe. *
*/ public class CBZip2InputStream extends InputStream implements BZip2Constants { - private static void reportCRCError() throws IOException { - throw new IOException("BZip2 CRC error"); - - } - - private void makeMaps() { - final boolean[] inUse = this.data.inUse; - final byte[] seqToUnseq = this.data.seqToUnseq; - - int nInUseShadow = 0; - - for (int i = 0; i < 256; i++) { - if (inUse[i]) - seqToUnseq[nInUseShadow++] = (byte) i; - } - - this.nInUse = nInUseShadow; - } + public static final long BLOCK_DELIMITER = 0X314159265359L;// start of block + public static final long EOS_DELIMITER = 0X177245385090L;// end of bzip2 stream + private static final int DELIMITER_BIT_LENGTH = 48; + READ_MODE readMode = READ_MODE.CONTINUOUS; + // The variable records the current advertised position of the stream. + private long reportedBytesReadFromCompressedStream = 0L; + // The following variable keep record of compressed bytes read. + private long bytesReadFromCompressedStream = 0L; + private boolean lazyInitialization = false; + private byte array[] = new byte[1]; /** * Index of the last char in the block, so the block size == last + 1. @@ -86,32 +103,34 @@ private void makeMaps() { */ private int blockSize100k; - private boolean blockRandomised; + private boolean blockRandomised = false; - private int bsBuff; - private int bsLive; + private long bsBuff; + private long bsLive; private final CRC crc = new CRC(); private int nInUse; - private InputStream in; + private BufferedInputStream in; private int currentChar = -1; - private static final int EOF = 0; - private static final int START_BLOCK_STATE = 1; - private static final int RAND_PART_A_STATE = 2; - private static final int RAND_PART_B_STATE = 3; - private static final int RAND_PART_C_STATE = 4; - private static final int NO_RAND_PART_A_STATE = 5; - private static final int NO_RAND_PART_B_STATE = 6; - private static final int NO_RAND_PART_C_STATE = 7; + /** + * A state machine to keep track of current state of the de-coder + * + */ + public enum STATE { + EOF, START_BLOCK_STATE, RAND_PART_A_STATE, RAND_PART_B_STATE, RAND_PART_C_STATE, NO_RAND_PART_A_STATE, NO_RAND_PART_B_STATE, NO_RAND_PART_C_STATE, NO_PROCESS_STATE + }; - private int currentState = START_BLOCK_STATE; + private STATE currentState = STATE.START_BLOCK_STATE; private int storedBlockCRC, storedCombinedCRC; private int computedBlockCRC, computedCombinedCRC; + private boolean skipResult = false;// used by skipToNextMarker + private static boolean skipDecompression = false; + // Variables used by setup* methods exclusively private int su_count; @@ -129,6 +148,121 @@ private void makeMaps() { */ private CBZip2InputStream.Data data; + /** + * This method reports the processed bytes so far. Please note that this + * statistic is only updated on block boundaries and only when the stream is + * initiated in BYBLOCK mode. + */ + public long getProcessedByteCount() { + return reportedBytesReadFromCompressedStream; + } + + /** + * This method keeps track of raw processed compressed + * bytes. + * + * @param count count is the number of bytes to be + * added to raw processed bytes + */ + + protected void updateProcessedByteCount(int count) { + this.bytesReadFromCompressedStream += count; + } + + /** + * This method is called by the client of this + * class in case there are any corrections in + * the stream position. One common example is + * when client of this code removes starting BZ + * characters from the compressed stream. + * + * @param count count bytes are added to the reported bytes + * + */ + public void updateReportedByteCount(int count) { + this.reportedBytesReadFromCompressedStream += count; + this.updateProcessedByteCount(count); + } + + /** + * This method reads a Byte from the compressed stream. Whenever we need to + * read from the underlying compressed stream, this method should be called + * instead of directly calling the read method of the underlying compressed + * stream. This method does important record keeping to have the statistic + * that how many bytes have been read off the compressed stream. + */ + private int readAByte(InputStream inStream) throws IOException { + int read = inStream.read(); + if (read >= 0) { + this.updateProcessedByteCount(1); + } + return read; + } + + /** + * This method tries to find the marker (passed to it as the first parameter) + * in the stream. It can find bit patterns of length <= 63 bits. Specifically + * this method is used in CBZip2InputStream to find the end of block (EOB) + * delimiter in the stream, starting from the current position of the stream. + * If marker is found, the stream position will be right after marker at the + * end of this call. + * + * @param marker The bit pattern to be found in the stream + * @param markerBitLength No of bits in the marker + * + * @throws IOException + * @throws IllegalArgumentException if marketBitLength is greater than 63 + */ + public boolean skipToNextMarker(long marker, int markerBitLength) + throws IOException, IllegalArgumentException { + try { + if (markerBitLength > 63) { + throw new IllegalArgumentException( + "skipToNextMarker can not find patterns greater than 63 bits"); + } + // pick next marketBitLength bits in the stream + long bytes = 0; + bytes = this.bsR(markerBitLength); + if (bytes == -1) { + return false; + } + while (true) { + if (bytes == marker) { + return true; + + } else { + bytes = bytes << 1; + bytes = bytes & ((1L << markerBitLength) - 1); + int oneBit = (int) this.bsR(1); + if (oneBit != -1) { + bytes = bytes | oneBit; + } else + return false; + } + } + } catch (IOException ex) { + return false; + } + } + + protected void reportCRCError() throws IOException { + throw new IOException("crc error"); + } + + private void makeMaps() { + final boolean[] inUse = this.data.inUse; + final byte[] seqToUnseq = this.data.seqToUnseq; + + int nInUseShadow = 0; + + for (int i = 0; i < 256; i++) { + if (inUse[i]) + seqToUnseq[nInUseShadow++] = (byte) i; + } + + this.nInUse = nInUseShadow; + } + /** * Constructs a new CBZip2InputStream which decompresses bytes read from the * specified stream. @@ -145,21 +279,99 @@ private void makeMaps() { * @throws NullPointerException * if in == null */ - public CBZip2InputStream(final InputStream in) throws IOException { - super(); + public CBZip2InputStream(final InputStream in, READ_MODE readMode) + throws IOException { - this.in = in; + super(); + int blockSize = 0X39;// i.e 9 + this.blockSize100k = blockSize - '0'; + this.in = new BufferedInputStream(in, 1024 * 9);// >1 MB buffer + this.readMode = readMode; + if (readMode == READ_MODE.CONTINUOUS) { + currentState = STATE.START_BLOCK_STATE; + lazyInitialization = (in.available() == 0)?true:false; + if(!lazyInitialization){ init(); } + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH); + this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream; + if(!skipDecompression){ + changeStateToProcessABlock(); + } + } + } + + /** + * Returns the number of bytes between the current stream position + * and the immediate next BZip2 block marker. + * + * @param in + * The InputStream + * + * @return long Number of bytes between current stream position and the + * next BZip2 block start marker. + * @throws IOException + * + */ + public static long numberOfBytesTillNextMarker(final InputStream in) throws IOException{ + CBZip2InputStream.skipDecompression = true; + CBZip2InputStream anObject = null; + + anObject = new CBZip2InputStream(in, READ_MODE.BYBLOCK); + + return anObject.getProcessedByteCount(); + } + + public CBZip2InputStream(final InputStream in) throws IOException { + this(in, READ_MODE.CONTINUOUS); + } + + private void changeStateToProcessABlock() throws IOException { + if (skipResult == true) { + initBlock(); + setupBlock(); + } else { + this.currentState = STATE.EOF; + } + } + public int read() throws IOException { + if (this.in != null) { - return read0(); + int result = this.read(array, 0, 1); + int value = 0XFF & array[0]; + return (result > 0 ? value : result); + } else { throw new IOException("stream closed"); } } + /** + * In CONTINOUS reading mode, this read method starts from the + * start of the compressed stream and end at the end of file by + * emitting un-compressed data. In this mode stream positioning + * is not announced and should be ignored. + * + * In BYBLOCK reading mode, this read method informs about the end + * of a BZip2 block by returning EOB. At this event, the compressed + * stream position is also announced. This announcement tells that + * how much of the compressed stream has been de-compressed and read + * out of this class. In between EOB events, the stream position is + * not updated. + * + * + * @throws IOException + * if the stream content is malformed or an I/O error occurs. + * + * @return int The return value greater than 0 are the bytes read. A value + * of -1 means end of stream while -2 represents end of block + */ + + public int read(final byte[] dest, final int offs, final int len) throws IOException { if (offs < 0) { @@ -176,13 +388,39 @@ public int read(final byte[] dest, final int offs, final int len) throw new IOException("stream closed"); } - final int hi = offs + len; - int destOffs = offs; - for (int b; (destOffs < hi) && ((b = read0()) >= 0);) { - dest[destOffs++] = (byte) b; + if(lazyInitialization){ + this.init(); + this.lazyInitialization = false; } - return (destOffs == offs) ? -1 : (destOffs - offs); + if(skipDecompression){ + changeStateToProcessABlock(); + CBZip2InputStream.skipDecompression = false; + } + + final int hi = offs + len; + int destOffs = offs; + int b = 0; + + + + for (; ((destOffs < hi) && ((b = read0())) >= 0);) { + dest[destOffs++] = (byte) b; + + } + + int result = destOffs - offs; + if (result == 0) { + //report 'end of block' or 'end of stream' + result = b; + + skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH); + //Exactly when we are about to start a new block, we advertise the stream position. + this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream; + + changeStateToProcessABlock(); + } + return result; } private int read0() throws IOException { @@ -190,7 +428,10 @@ private int read0() throws IOException { switch (this.currentState) { case EOF: - return -1; + return END_OF_STREAM;// return -1 + + case NO_PROCESS_STATE: + return END_OF_BLOCK;// return -2 case START_BLOCK_STATE: throw new IllegalStateException(); @@ -225,13 +466,13 @@ private int read0() throws IOException { } private void init() throws IOException { - int magic2 = this.in.read(); + int magic2 = this.readAByte(in); if (magic2 != 'h') { throw new IOException("Stream is not BZip2 formatted: expected 'h'" + " as first byte but got '" + (char) magic2 + "'"); } - int blockSize = this.in.read(); + int blockSize = this.readAByte(in); if ((blockSize < '1') || (blockSize > '9')) { throw new IOException("Stream is not BZip2 formatted: illegal " + "blocksize " + (char) blockSize); @@ -244,6 +485,27 @@ private void init() throws IOException { } private void initBlock() throws IOException { + if (this.readMode == READ_MODE.BYBLOCK) { + // this.checkBlockIntegrity(); + this.storedBlockCRC = bsGetInt(); + this.blockRandomised = bsR(1) == 1; + + /** + * Allocate data here instead in constructor, so we do not allocate + * it if the input file is empty. + */ + if (this.data == null) { + this.data = new Data(this.blockSize100k); + } + + // currBlockNo++; + getAndMoveToFrontDecode(); + + this.crc.initialiseCRC(); + this.currentState = STATE.START_BLOCK_STATE; + return; + } + char magic0 = bsGetUByte(); char magic1 = bsGetUByte(); char magic2 = bsGetUByte(); @@ -261,7 +523,7 @@ private void initBlock() throws IOException { magic4 != 0x53 || // 'S' magic5 != 0x59 // 'Y' ) { - this.currentState = EOF; + this.currentState = STATE.EOF; throw new IOException("bad block header"); } else { this.storedBlockCRC = bsGetInt(); @@ -279,7 +541,7 @@ private void initBlock() throws IOException { getAndMoveToFrontDecode(); this.crc.initialiseCRC(); - this.currentState = START_BLOCK_STATE; + this.currentState = STATE.START_BLOCK_STATE; } } @@ -304,7 +566,7 @@ private void endBlock() throws IOException { private void complete() throws IOException { this.storedCombinedCRC = bsGetInt(); - this.currentState = EOF; + this.currentState = STATE.EOF; this.data = null; if (this.storedCombinedCRC != this.computedCombinedCRC) { @@ -326,14 +588,14 @@ public void close() throws IOException { } } - private int bsR(final int n) throws IOException { - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + private long bsR(final long n) throws IOException { + long bsLiveShadow = this.bsLive; + long bsBuffShadow = this.bsBuff; if (bsLiveShadow < n) { final InputStream inShadow = this.in; do { - int thech = inShadow.read(); + int thech = readAByte(inShadow); if (thech < 0) { throw new IOException("unexpected end of stream"); @@ -347,15 +609,15 @@ private int bsR(final int n) throws IOException { } this.bsLive = bsLiveShadow - n; - return (bsBuffShadow >> (bsLiveShadow - n)) & ((1 << n) - 1); + return (bsBuffShadow >> (bsLiveShadow - n)) & ((1L << n) - 1); } private boolean bsGetBit() throws IOException { - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + long bsLiveShadow = this.bsLive; + long bsBuffShadow = this.bsBuff; if (bsLiveShadow < 1) { - int thech = this.in.read(); + int thech = this.readAByte(in); if (thech < 0) { throw new IOException("unexpected end of stream"); @@ -375,7 +637,7 @@ private char bsGetUByte() throws IOException { } private int bsGetInt() throws IOException { - return (((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8); + return (int) ((((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8)); } /** @@ -454,8 +716,8 @@ private void recvDecodingTables() throws IOException { final int alphaSize = this.nInUse + 2; /* Now the selectors */ - final int nGroups = bsR(3); - final int nSelectors = bsR(15); + final int nGroups = (int) bsR(3); + final int nSelectors = (int) bsR(15); for (int i = 0; i < nSelectors; i++) { int j = 0; @@ -486,7 +748,7 @@ private void recvDecodingTables() throws IOException { /* Now the coding tables */ for (int t = 0; t < nGroups; t++) { - int curr = bsR(5); + int curr = (int) bsR(5); final char[] len_t = len[t]; for (int i = 0; i < alphaSize; i++) { while (bsGetBit()) { @@ -532,7 +794,7 @@ private void createHuffmanDecodingTables(final int alphaSize, } private void getAndMoveToFrontDecode() throws IOException { - this.origPtr = bsR(24); + this.origPtr = (int) bsR(24); recvDecodingTables(); final InputStream inShadow = this.in; @@ -562,8 +824,8 @@ private void getAndMoveToFrontDecode() throws IOException { int groupPos = G_SIZE - 1; final int eob = this.nInUse + 1; int nextSym = getAndMoveToFrontDecode0(0); - int bsBuffShadow = this.bsBuff; - int bsLiveShadow = this.bsLive; + int bsBuffShadow = (int) this.bsBuff; + int bsLiveShadow = (int) this.bsLive; int lastShadow = -1; int zt = selector[groupNo] & 0xff; int[] base_zt = base[zt]; @@ -597,10 +859,8 @@ private void getAndMoveToFrontDecode() throws IOException { int zn = minLens_zt; - // Inlined: - // int zvec = bsR(zn); while (bsLiveShadow < zn) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -609,14 +869,14 @@ private void getAndMoveToFrontDecode() throws IOException { throw new IOException("unexpected end of stream"); } } - int zvec = (bsBuffShadow >> (bsLiveShadow - zn)) + long zvec = (bsBuffShadow >> (bsLiveShadow - zn)) & ((1 << zn) - 1); bsLiveShadow -= zn; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -630,7 +890,7 @@ private void getAndMoveToFrontDecode() throws IOException { zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1); } - nextSym = perm_zt[zvec - base_zt[zn]]; + nextSym = perm_zt[(int) (zvec - base_zt[zn])]; } final byte ch = seqToUnseq[yy[0]]; @@ -680,10 +940,8 @@ private void getAndMoveToFrontDecode() throws IOException { int zn = minLens_zt; - // Inlined: - // int zvec = bsR(zn); while (bsLiveShadow < zn) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -699,7 +957,7 @@ private void getAndMoveToFrontDecode() throws IOException { while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; bsLiveShadow += 8; @@ -709,7 +967,7 @@ private void getAndMoveToFrontDecode() throws IOException { } } bsLiveShadow--; - zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1); + zvec = ((zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1)); } nextSym = perm_zt[zvec - base_zt[zn]]; } @@ -726,14 +984,14 @@ private int getAndMoveToFrontDecode0(final int groupNo) throws IOException { final int zt = dataShadow.selector[groupNo] & 0xff; final int[] limit_zt = dataShadow.limit[zt]; int zn = dataShadow.minLens[zt]; - int zvec = bsR(zn); - int bsLiveShadow = this.bsLive; - int bsBuffShadow = this.bsBuff; + int zvec = (int) bsR(zn); + int bsLiveShadow = (int) this.bsLive; + int bsBuffShadow = (int) this.bsBuff; while (zvec > limit_zt[zn]) { zn++; while (bsLiveShadow < 1) { - final int thech = inShadow.read(); + final int thech = readAByte(inShadow); if (thech >= 0) { bsBuffShadow = (bsBuffShadow << 8) | thech; @@ -807,12 +1065,16 @@ private void setupRandPartA() throws IOException { this.su_ch2 = su_ch2Shadow ^= (this.su_rNToGo == 1) ? 1 : 0; this.su_i2++; this.currentChar = su_ch2Shadow; - this.currentState = RAND_PART_B_STATE; + this.currentState = STATE.RAND_PART_B_STATE; this.crc.updateCRC(su_ch2Shadow); } else { endBlock(); + if (readMode == READ_MODE.CONTINUOUS) { initBlock(); setupBlock(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + } } } @@ -824,19 +1086,23 @@ private void setupNoRandPartA() throws IOException { this.su_tPos = this.data.tt[this.su_tPos]; this.su_i2++; this.currentChar = su_ch2Shadow; - this.currentState = NO_RAND_PART_B_STATE; + this.currentState = STATE.NO_RAND_PART_B_STATE; this.crc.updateCRC(su_ch2Shadow); } else { - this.currentState = NO_RAND_PART_A_STATE; + this.currentState = STATE.NO_RAND_PART_A_STATE; endBlock(); + if (readMode == READ_MODE.CONTINUOUS) { initBlock(); setupBlock(); + } else if (readMode == READ_MODE.BYBLOCK) { + this.currentState = STATE.NO_PROCESS_STATE; + } } } private void setupRandPartB() throws IOException { if (this.su_ch2 != this.su_chPrev) { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; this.su_count = 1; setupRandPartA(); } else if (++this.su_count >= 4) { @@ -851,13 +1117,13 @@ private void setupRandPartB() throws IOException { this.su_rNToGo--; } this.su_j2 = 0; - this.currentState = RAND_PART_C_STATE; + this.currentState = STATE.RAND_PART_C_STATE; if (this.su_rNToGo == 1) { this.su_z ^= 1; } setupRandPartC(); } else { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; setupRandPartA(); } } @@ -868,7 +1134,7 @@ private void setupRandPartC() throws IOException { this.crc.updateCRC(this.su_ch2); this.su_j2++; } else { - this.currentState = RAND_PART_A_STATE; + this.currentState = STATE.RAND_PART_A_STATE; this.su_i2++; this.su_count = 0; setupRandPartA(); @@ -895,7 +1161,7 @@ private void setupNoRandPartC() throws IOException { this.currentChar = su_ch2Shadow; this.crc.updateCRC(su_ch2Shadow); this.su_j2++; - this.currentState = NO_RAND_PART_C_STATE; + this.currentState = STATE.NO_RAND_PART_C_STATE; } else { this.su_i2++; this.su_count = 0; diff --git a/src/test/core/org/apache/hadoop/io/compress/TestCodec.java b/src/test/core/org/apache/hadoop/io/compress/TestCodec.java index 38e4a35837..8df4cf34e1 100644 --- a/src/test/core/org/apache/hadoop/io/compress/TestCodec.java +++ b/src/test/core/org/apache/hadoop/io/compress/TestCodec.java @@ -19,31 +19,38 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.Random; -import junit.framework.TestCase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RandomDatum; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.zlib.ZlibFactory; +import org.apache.hadoop.util.LineReader; +import org.apache.hadoop.util.ReflectionUtils; -public class TestCodec extends TestCase { +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestCodec { private static final Log LOG= LogFactory.getLog(TestCodec.class); @@ -51,17 +58,20 @@ public class TestCodec extends TestCase { private Configuration conf = new Configuration(); private int count = 10000; private int seed = new Random().nextInt(); - + + @Test public void testDefaultCodec() throws IOException { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DefaultCodec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DefaultCodec"); } - + + @Test public void testGzipCodec() throws IOException { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec"); } - + + @Test public void testBZip2Codec() throws IOException { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec"); @@ -133,6 +143,109 @@ private static void codecTest(Configuration conf, int seed, int count, LOG.info("SUCCESS! Completed checking " + count + " records"); } + @Test + public void testSplitableCodecs() throws Exception { + testSplitableCodec(BZip2Codec.class); + } + + private void testSplitableCodec( + Class extends SplittableCompressionCodec> codecClass) + throws IOException { + final long DEFLBYTES = 2 * 1024 * 1024; + final Configuration conf = new Configuration(); + final Random rand = new Random(); + final long seed = rand.nextLong(); + LOG.info("seed: " + seed); + rand.setSeed(seed); + SplittableCompressionCodec codec = + ReflectionUtils.newInstance(codecClass, conf); + final FileSystem fs = FileSystem.getLocal(conf); + final FileStatus infile = + fs.getFileStatus(writeSplitTestFile(fs, rand, codec, DEFLBYTES)); + if (infile.getLen() > Integer.MAX_VALUE) { + fail("Unexpected compression: " + DEFLBYTES + " -> " + infile.getLen()); + } + final int flen = (int) infile.getLen(); + final Text line = new Text(); + final Decompressor dcmp = CodecPool.getDecompressor(codec); + try { + for (int pos = 0; pos < infile.getLen(); pos += rand.nextInt(flen / 8)) { + // read from random positions, verifying that there exist two sequential + // lines as written in writeSplitTestFile + final SplitCompressionInputStream in = + codec.createInputStream(fs.open(infile.getPath()), dcmp, + pos, flen, SplittableCompressionCodec.READ_MODE.BYBLOCK); + if (in.getAdjustedStart() >= flen) { + break; + } + LOG.info("SAMPLE " + in.getAdjustedStart() + "," + in.getAdjustedEnd()); + final LineReader lreader = new LineReader(in); + lreader.readLine(line); // ignore; likely partial + if (in.getPos() >= flen) { + break; + } + lreader.readLine(line); + final int seq1 = readLeadingInt(line); + lreader.readLine(line); + if (in.getPos() >= flen) { + break; + } + final int seq2 = readLeadingInt(line); + assertEquals("Mismatched lines", seq1 + 1, seq2); + } + } finally { + CodecPool.returnDecompressor(dcmp); + } + // remove on success + fs.delete(infile.getPath().getParent(), true); + } + + private static int readLeadingInt(Text txt) throws IOException { + DataInputStream in = + new DataInputStream(new ByteArrayInputStream(txt.getBytes())); + return in.readInt(); + } + + /** Write infLen bytes (deflated) to file in test dir using codec. + * Records are of the form + * <i><b64 rand><i+i><b64 rand> + */ + private static Path writeSplitTestFile(FileSystem fs, Random rand, + CompressionCodec codec, long infLen) throws IOException { + final int REC_SIZE = 1024; + final Path wd = new Path(new Path( + System.getProperty("test.build.data", "/tmp")).makeQualified(fs), + codec.getClass().getSimpleName()); + final Path file = new Path(wd, "test" + codec.getDefaultExtension()); + final byte[] b = new byte[REC_SIZE]; + final Base64 b64 = new Base64(); + DataOutputStream fout = null; + Compressor cmp = CodecPool.getCompressor(codec); + try { + fout = new DataOutputStream(codec.createOutputStream( + fs.create(file, true), cmp)); + final DataOutputBuffer dob = new DataOutputBuffer(REC_SIZE * 4 / 3 + 4); + int seq = 0; + while (infLen > 0) { + rand.nextBytes(b); + final byte[] b64enc = b64.encode(b); // ensures rand printable, no LF + dob.reset(); + dob.writeInt(seq); + System.arraycopy(dob.getData(), 0, b64enc, 0, dob.getLength()); + fout.write(b64enc); + fout.write('\n'); + ++seq; + infLen -= b64enc.length; + } + LOG.info("Wrote " + seq + " records to " + file); + } finally { + IOUtils.cleanup(LOG, fout); + CodecPool.returnCompressor(cmp); + } + return file; + } + + @Test public void testCodecPoolGzipReuse() throws Exception { Configuration conf = new Configuration(); conf.setBoolean("hadoop.native.lib", true); @@ -149,19 +262,21 @@ public void testCodecPoolGzipReuse() throws Exception { assertTrue("Got mismatched ZlibCompressor", c2 != CodecPool.getCompressor(gzc)); } - public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException, + @Test + public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.DefaultCodec", 100); sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000); } - - public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException, + + @Test + public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { sequenceFileCodecTest(conf, 0, "org.apache.hadoop.io.compress.BZip2Codec", 100); sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.BZip2Codec", 100); sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000); } - + private static void sequenceFileCodecTest(Configuration conf, int lines, String codecClass, int blockSize) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { @@ -242,8 +357,4 @@ public static void main(String[] args) { } - public TestCodec(String name) { - super(name); - } - }