HADOOP-4012. Provide splitting support for bzip2 compressed files. Contributed by Abdul Qadeer
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@813581 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
efb3e17bed
commit
86724941c5
@ -187,6 +187,9 @@ Trunk (unreleased changes)
|
||||
HADOOP-5073. Add annotation mechanism for interface classification.
|
||||
(Jakob Homan via suresh)
|
||||
|
||||
HADOOP-4012. Provide splitting support for bzip2 compressed files. (Abdul
|
||||
Qadeer via cdouglas)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-4565. Added CombineFileInputFormat to use data locality information
|
||||
|
@ -296,12 +296,12 @@ static public long checksum2long(byte[] checksum) {
|
||||
|
||||
@Override
|
||||
public synchronized long getPos() throws IOException {
|
||||
return chunkPos-(count-pos);
|
||||
return chunkPos-Math.max(0L, count - pos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int available() throws IOException {
|
||||
return count-pos;
|
||||
return Math.max(0, count - pos);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -23,6 +23,9 @@
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
|
||||
import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
|
||||
import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
|
||||
import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
|
||||
@ -35,17 +38,17 @@
|
||||
* CompressionCodec which have a Compressor or Decompressor type argument, throw
|
||||
* UnsupportedOperationException.
|
||||
*/
|
||||
public class BZip2Codec implements
|
||||
org.apache.hadoop.io.compress.CompressionCodec {
|
||||
public class BZip2Codec implements SplittableCompressionCodec {
|
||||
|
||||
private static final String HEADER = "BZ";
|
||||
private static final int HEADER_LEN = HEADER.length();
|
||||
private static final String SUB_HEADER = "h9";
|
||||
private static final int SUB_HEADER_LEN = SUB_HEADER.length();
|
||||
|
||||
/**
|
||||
* Creates a new instance of BZip2Codec
|
||||
*/
|
||||
public BZip2Codec() {
|
||||
}
|
||||
public BZip2Codec() { }
|
||||
|
||||
/**
|
||||
* Creates CompressionOutputStream for BZip2
|
||||
@ -62,10 +65,10 @@ public CompressionOutputStream createOutputStream(OutputStream out)
|
||||
}
|
||||
|
||||
/**
|
||||
* This functionality is currently not supported.
|
||||
* Creates a compressor using given OutputStream.
|
||||
*
|
||||
* @throws java.lang.UnsupportedOperationException
|
||||
* Throws UnsupportedOperationException
|
||||
* @return CompressionOutputStream
|
||||
@throws java.io.IOException
|
||||
*/
|
||||
public CompressionOutputStream createOutputStream(OutputStream out,
|
||||
Compressor compressor) throws IOException {
|
||||
@ -75,8 +78,7 @@ public CompressionOutputStream createOutputStream(OutputStream out,
|
||||
/**
|
||||
* This functionality is currently not supported.
|
||||
*
|
||||
* @throws java.lang.UnsupportedOperationException
|
||||
* Throws UnsupportedOperationException
|
||||
* @return BZip2DummyCompressor.class
|
||||
*/
|
||||
public Class<? extends org.apache.hadoop.io.compress.Compressor> getCompressorType() {
|
||||
return BZip2DummyCompressor.class;
|
||||
@ -85,8 +87,7 @@ public Class<? extends org.apache.hadoop.io.compress.Compressor> getCompressorTy
|
||||
/**
|
||||
* This functionality is currently not supported.
|
||||
*
|
||||
* @throws java.lang.UnsupportedOperationException
|
||||
* Throws UnsupportedOperationException
|
||||
* @return Compressor
|
||||
*/
|
||||
public Compressor createCompressor() {
|
||||
return new BZip2DummyCompressor();
|
||||
@ -109,19 +110,72 @@ public CompressionInputStream createInputStream(InputStream in)
|
||||
/**
|
||||
* This functionality is currently not supported.
|
||||
*
|
||||
* @throws java.lang.UnsupportedOperationException
|
||||
* Throws UnsupportedOperationException
|
||||
* @return CompressionInputStream
|
||||
*/
|
||||
public CompressionInputStream createInputStream(InputStream in,
|
||||
Decompressor decompressor) throws IOException {
|
||||
return createInputStream(in);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates CompressionInputStream to be used to read off uncompressed data
|
||||
* in one of the two reading modes. i.e. Continuous or Blocked reading modes
|
||||
*
|
||||
* @param seekableIn The InputStream
|
||||
* @param start The start offset into the compressed stream
|
||||
* @param end The end offset into the compressed stream
|
||||
* @param readMode Controls whether progress is reported continuously or
|
||||
* only at block boundaries.
|
||||
*
|
||||
* @return CompressionInputStream for BZip2 aligned at block boundaries
|
||||
*/
|
||||
public SplitCompressionInputStream createInputStream(InputStream seekableIn,
|
||||
Decompressor decompressor, long start, long end, READ_MODE readMode)
|
||||
throws IOException {
|
||||
|
||||
if (!(seekableIn instanceof Seekable)) {
|
||||
throw new IOException("seekableIn must be an instance of " +
|
||||
Seekable.class.getName());
|
||||
}
|
||||
|
||||
//find the position of first BZip2 start up marker
|
||||
((Seekable)seekableIn).seek(0);
|
||||
|
||||
// BZip2 start of block markers are of 6 bytes. But the very first block
|
||||
// also has "BZh9", making it 10 bytes. This is the common case. But at
|
||||
// time stream might start without a leading BZ.
|
||||
final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
|
||||
CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
|
||||
long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
|
||||
|
||||
((Seekable)seekableIn).seek(adjStart);
|
||||
SplitCompressionInputStream in =
|
||||
new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
|
||||
|
||||
|
||||
// The following if clause handles the following case:
|
||||
// Assume the following scenario in BZip2 compressed stream where
|
||||
// . represent compressed data.
|
||||
// .....[48 bit Block].....[48 bit Block].....[48 bit Block]...
|
||||
// ........................[47 bits][1 bit].....[48 bit Block]...
|
||||
// ................................^[Assume a Byte alignment here]
|
||||
// ........................................^^[current position of stream]
|
||||
// .....................^^[We go back 10 Bytes in stream and find a Block marker]
|
||||
// ........................................^^[We align at wrong position!]
|
||||
// ...........................................................^^[While this pos is correct]
|
||||
|
||||
if (in.getPos() <= start) {
|
||||
((Seekable)seekableIn).seek(start);
|
||||
in = new BZip2CompressionInputStream(seekableIn, start, end, readMode);
|
||||
}
|
||||
|
||||
return in;
|
||||
}
|
||||
|
||||
/**
|
||||
* This functionality is currently not supported.
|
||||
*
|
||||
* @throws java.lang.UnsupportedOperationException
|
||||
* Throws UnsupportedOperationException
|
||||
* @return BZip2DummyDecompressor.class
|
||||
*/
|
||||
public Class<? extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() {
|
||||
return BZip2DummyDecompressor.class;
|
||||
@ -130,8 +184,7 @@ public Class<? extends org.apache.hadoop.io.compress.Decompressor> getDecompress
|
||||
/**
|
||||
* This functionality is currently not supported.
|
||||
*
|
||||
* @throws java.lang.UnsupportedOperationException
|
||||
* Throws UnsupportedOperationException
|
||||
* @return Decompressor
|
||||
*/
|
||||
public Decompressor createDecompressor() {
|
||||
return new BZip2DummyDecompressor();
|
||||
@ -146,7 +199,8 @@ public String getDefaultExtension() {
|
||||
return ".bz2";
|
||||
}
|
||||
|
||||
private static class BZip2CompressionOutputStream extends CompressionOutputStream {
|
||||
private static class BZip2CompressionOutputStream extends
|
||||
CompressionOutputStream {
|
||||
|
||||
// class data starts here//
|
||||
private CBZip2OutputStream output;
|
||||
@ -221,26 +275,79 @@ public void close() throws IOException {
|
||||
|
||||
}// end of class BZip2CompressionOutputStream
|
||||
|
||||
private static class BZip2CompressionInputStream extends CompressionInputStream {
|
||||
/**
|
||||
* This class is capable to de-compress BZip2 data in two modes;
|
||||
* CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to
|
||||
* do decompression starting any arbitrary position in the stream.
|
||||
*
|
||||
* So this facility can easily be used to parallelize decompression
|
||||
* of a large BZip2 file for performance reasons. (It is exactly
|
||||
* done so for Hadoop framework. See LineRecordReader for an
|
||||
* example). So one can break the file (of course logically) into
|
||||
* chunks for parallel processing. These "splits" should be like
|
||||
* default Hadoop splits (e.g as in FileInputFormat getSplit metod).
|
||||
* So this code is designed and tested for FileInputFormat's way
|
||||
* of splitting only.
|
||||
*/
|
||||
|
||||
private static class BZip2CompressionInputStream extends
|
||||
SplitCompressionInputStream {
|
||||
|
||||
// class data starts here//
|
||||
private CBZip2InputStream input;
|
||||
boolean needsReset;
|
||||
private BufferedInputStream bufferedIn;
|
||||
private boolean isHeaderStripped = false;
|
||||
private boolean isSubHeaderStripped = false;
|
||||
private READ_MODE readMode = READ_MODE.CONTINUOUS;
|
||||
private long startingPos = 0L;
|
||||
|
||||
// Following state machine handles different states of compressed stream
|
||||
// position
|
||||
// HOLD : Don't advertise compressed stream position
|
||||
// ADVERTISE : Read 1 more character and advertise stream position
|
||||
// See more comments about it before updatePos method.
|
||||
private enum POS_ADVERTISEMENT_STATE_MACHINE {
|
||||
HOLD, ADVERTISE
|
||||
};
|
||||
|
||||
POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
|
||||
long compressedStreamPosition = 0;
|
||||
|
||||
// class data ends here//
|
||||
|
||||
public BZip2CompressionInputStream(InputStream in) throws IOException {
|
||||
this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS);
|
||||
}
|
||||
|
||||
super(in);
|
||||
needsReset = true;
|
||||
public BZip2CompressionInputStream(InputStream in, long start, long end,
|
||||
READ_MODE readMode) throws IOException {
|
||||
super(in, start, end);
|
||||
needsReset = false;
|
||||
bufferedIn = new BufferedInputStream(super.in);
|
||||
this.startingPos = super.getPos();
|
||||
this.readMode = readMode;
|
||||
if (this.startingPos == 0) {
|
||||
// We only strip header if it is start of file
|
||||
bufferedIn = readStreamHeader();
|
||||
}
|
||||
input = new CBZip2InputStream(bufferedIn, readMode);
|
||||
if (this.isHeaderStripped) {
|
||||
input.updateReportedByteCount(HEADER_LEN);
|
||||
}
|
||||
|
||||
if (this.isSubHeaderStripped) {
|
||||
input.updateReportedByteCount(SUB_HEADER_LEN);
|
||||
}
|
||||
|
||||
this.updatePos(false);
|
||||
}
|
||||
|
||||
private BufferedInputStream readStreamHeader() throws IOException {
|
||||
// We are flexible enough to allow the compressed stream not to
|
||||
// start with the header of BZ. So it works fine either we have
|
||||
// the header or not.
|
||||
BufferedInputStream bufferedIn = null;
|
||||
if (super.in != null) {
|
||||
bufferedIn = new BufferedInputStream(super.in);
|
||||
bufferedIn.mark(HEADER_LEN);
|
||||
byte[] headerBytes = new byte[HEADER_LEN];
|
||||
int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN);
|
||||
@ -248,6 +355,17 @@ private BufferedInputStream readStreamHeader() throws IOException {
|
||||
String header = new String(headerBytes);
|
||||
if (header.compareTo(HEADER) != 0) {
|
||||
bufferedIn.reset();
|
||||
} else {
|
||||
this.isHeaderStripped = true;
|
||||
// In case of BYBLOCK mode, we also want to strip off
|
||||
// remaining two character of the header.
|
||||
if (this.readMode == READ_MODE.BYBLOCK) {
|
||||
actualRead = bufferedIn.read(headerBytes, 0,
|
||||
SUB_HEADER_LEN);
|
||||
if (actualRead != -1) {
|
||||
this.isSubHeaderStripped = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -267,33 +385,96 @@ public void close() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method updates compressed stream position exactly when the
|
||||
* client of this code has read off at least one byte passed any BZip2
|
||||
* end of block marker.
|
||||
*
|
||||
* This mechanism is very helpful to deal with data level record
|
||||
* boundaries. Please see constructor and next methods of
|
||||
* org.apache.hadoop.mapred.LineRecordReader as an example usage of this
|
||||
* feature. We elaborate it with an example in the following:
|
||||
*
|
||||
* Assume two different scenarios of the BZip2 compressed stream, where
|
||||
* [m] represent end of block, \n is line delimiter and . represent compressed
|
||||
* data.
|
||||
*
|
||||
* ............[m]......\n.......
|
||||
*
|
||||
* ..........\n[m]......\n.......
|
||||
*
|
||||
* Assume that end is right after [m]. In the first case the reading
|
||||
* will stop at \n and there is no need to read one more line. (To see the
|
||||
* reason of reading one more line in the next() method is explained in LineRecordReader.)
|
||||
* While in the second example LineRecordReader needs to read one more line
|
||||
* (till the second \n). Now since BZip2Codecs only update position
|
||||
* at least one byte passed a maker, so it is straight forward to differentiate
|
||||
* between the two cases mentioned.
|
||||
*
|
||||
*/
|
||||
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
if (needsReset) {
|
||||
internalReset();
|
||||
}
|
||||
return this.input.read(b, off, len);
|
||||
|
||||
int result = 0;
|
||||
result = this.input.read(b, off, len);
|
||||
if (result == BZip2Constants.END_OF_BLOCK) {
|
||||
this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
|
||||
}
|
||||
|
||||
if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
|
||||
result = this.input.read(b, off, off + 1);
|
||||
// This is the precise time to update compressed stream position
|
||||
// to the client of this code.
|
||||
this.updatePos(true);
|
||||
this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
|
||||
}
|
||||
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
public int read() throws IOException {
|
||||
byte b[] = new byte[1];
|
||||
int result = this.read(b, 0, 1);
|
||||
return (result < 0) ? result : b[0];
|
||||
}
|
||||
|
||||
private void internalReset() throws IOException {
|
||||
if (needsReset) {
|
||||
needsReset = false;
|
||||
BufferedInputStream bufferedIn = readStreamHeader();
|
||||
input = new CBZip2InputStream(bufferedIn);
|
||||
input = new CBZip2InputStream(bufferedIn, this.readMode);
|
||||
}
|
||||
}
|
||||
|
||||
public void resetState() throws IOException {
|
||||
// Cannot read from bufferedIn at this point because bufferedIn might not be ready
|
||||
// Cannot read from bufferedIn at this point because bufferedIn
|
||||
// might not be ready
|
||||
// yet, as in SequenceFile.Reader implementation.
|
||||
needsReset = true;
|
||||
}
|
||||
|
||||
public int read() throws IOException {
|
||||
if (needsReset) {
|
||||
internalReset();
|
||||
public long getPos() {
|
||||
return this.compressedStreamPosition;
|
||||
}
|
||||
return this.input.read();
|
||||
|
||||
/*
|
||||
* As the comments before read method tell that
|
||||
* compressed stream is advertised when at least
|
||||
* one byte passed EOB have been read off. But
|
||||
* there is an exception to this rule. When we
|
||||
* construct the stream we advertise the position
|
||||
* exactly at EOB. In the following method
|
||||
* shouldAddOn boolean captures this exception.
|
||||
*
|
||||
*/
|
||||
private void updatePos(boolean shouldAddOn) {
|
||||
int addOn = shouldAddOn ? 1 : 0;
|
||||
this.compressedStreamPosition = this.startingPos
|
||||
+ this.input.getProcessedByteCount() + addOn;
|
||||
}
|
||||
|
||||
}// end of BZip2CompressionInputStream
|
||||
|
@ -38,9 +38,10 @@ public class BlockDecompressorStream extends DecompressorStream {
|
||||
* @param in input stream
|
||||
* @param decompressor decompressor to use
|
||||
* @param bufferSize size of buffer
|
||||
* @throws IOException
|
||||
*/
|
||||
public BlockDecompressorStream(InputStream in, Decompressor decompressor,
|
||||
int bufferSize) {
|
||||
int bufferSize) throws IOException {
|
||||
super(in, decompressor, bufferSize);
|
||||
}
|
||||
|
||||
@ -49,12 +50,13 @@ public BlockDecompressorStream(InputStream in, Decompressor decompressor,
|
||||
*
|
||||
* @param in input stream
|
||||
* @param decompressor decompressor to use
|
||||
* @throws IOException
|
||||
*/
|
||||
public BlockDecompressorStream(InputStream in, Decompressor decompressor) {
|
||||
public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
|
||||
super(in, decompressor);
|
||||
}
|
||||
|
||||
protected BlockDecompressorStream(InputStream in) {
|
||||
protected BlockDecompressorStream(InputStream in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,8 @@
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import org.apache.hadoop.fs.PositionedReadable;
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
/**
|
||||
* A compression input stream.
|
||||
*
|
||||
@ -28,19 +30,25 @@
|
||||
* reposition the underlying input stream then call {@link #resetState()},
|
||||
* without having to also synchronize client buffers.
|
||||
*/
|
||||
public abstract class CompressionInputStream extends InputStream {
|
||||
|
||||
public abstract class CompressionInputStream extends InputStream implements Seekable {
|
||||
/**
|
||||
* The input stream to be compressed.
|
||||
*/
|
||||
protected final InputStream in;
|
||||
protected long maxAvailableData = 0L;
|
||||
|
||||
/**
|
||||
* Create a compression input stream that reads
|
||||
* the decompressed bytes from the given stream.
|
||||
*
|
||||
* @param in The input stream to be compressed.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected CompressionInputStream(InputStream in) {
|
||||
protected CompressionInputStream(InputStream in) throws IOException {
|
||||
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
|
||||
this.maxAvailableData = in.available();
|
||||
}
|
||||
this.in = in;
|
||||
}
|
||||
|
||||
@ -60,4 +68,40 @@ public void close() throws IOException {
|
||||
*/
|
||||
public abstract void resetState() throws IOException;
|
||||
|
||||
/**
|
||||
* This method returns the current position in the stream.
|
||||
*
|
||||
* @return Current position in stream as a long
|
||||
*/
|
||||
public long getPos() throws IOException {
|
||||
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
|
||||
//This way of getting the current position will not work for file
|
||||
//size which can be fit in an int and hence can not be returned by
|
||||
//available method.
|
||||
return (this.maxAvailableData - this.in.available());
|
||||
}
|
||||
else{
|
||||
return ((Seekable)this.in).getPos();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is current not supported.
|
||||
*
|
||||
* @throws UnsupportedOperationException
|
||||
*/
|
||||
|
||||
public void seek(long pos) throws UnsupportedOperationException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is current not supported.
|
||||
*
|
||||
* @throws UnsupportedOperationException
|
||||
*/
|
||||
public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ public class DecompressorStream extends CompressionInputStream {
|
||||
protected boolean eof = false;
|
||||
protected boolean closed = false;
|
||||
|
||||
public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) {
|
||||
public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) throws IOException {
|
||||
super(in);
|
||||
|
||||
if (in == null || decompressor == null) {
|
||||
@ -43,7 +43,7 @@ public DecompressorStream(InputStream in, Decompressor decompressor, int bufferS
|
||||
buffer = new byte[bufferSize];
|
||||
}
|
||||
|
||||
public DecompressorStream(InputStream in, Decompressor decompressor) {
|
||||
public DecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
|
||||
this(in, decompressor, 512);
|
||||
}
|
||||
|
||||
@ -51,8 +51,9 @@ public DecompressorStream(InputStream in, Decompressor decompressor) {
|
||||
* Allow derived classes to directly set the underlying stream.
|
||||
*
|
||||
* @param in Underlying input stream.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected DecompressorStream(InputStream in) {
|
||||
protected DecompressorStream(InputStream in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
|
@ -103,8 +103,9 @@ public GzipInputStream(InputStream in) throws IOException {
|
||||
|
||||
/**
|
||||
* Allow subclasses to directly set the inflater stream.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected GzipInputStream(DecompressorStream in) {
|
||||
protected GzipInputStream(DecompressorStream in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.io.compress;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* An InputStream covering a range of compressed data. The start and end
|
||||
* offsets requested by a client may be modified by the codec to fit block
|
||||
* boundaries or other algorithm-dependent requirements.
|
||||
*/
|
||||
public abstract class SplitCompressionInputStream
|
||||
extends CompressionInputStream {
|
||||
|
||||
private long start;
|
||||
private long end;
|
||||
|
||||
public SplitCompressionInputStream(InputStream in, long start, long end)
|
||||
throws IOException {
|
||||
super(in);
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
}
|
||||
|
||||
protected void setStart(long start) {
|
||||
this.start = start;
|
||||
}
|
||||
|
||||
protected void setEnd(long end) {
|
||||
this.end = end;
|
||||
}
|
||||
|
||||
/**
|
||||
* After calling createInputStream, the values of start or end
|
||||
* might change. So this method can be used to get the new value of start.
|
||||
* @return The changed value of start
|
||||
*/
|
||||
public long getAdjustedStart() {
|
||||
return start;
|
||||
}
|
||||
|
||||
/**
|
||||
* After calling createInputStream, the values of start or end
|
||||
* might change. So this method can be used to get the new value of end.
|
||||
* @return The changed value of end
|
||||
*/
|
||||
public long getAdjustedEnd() {
|
||||
return end;
|
||||
}
|
||||
}
|
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.io.compress;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
|
||||
/**
|
||||
* This interface is meant to be implemented by those compression codecs
|
||||
* which are capable to compress / de-compress a stream starting at any
|
||||
* arbitrary position.
|
||||
*
|
||||
* Especially the process of de-compressing a stream starting at some arbitrary
|
||||
* position is challenging. Most of the codecs are only able to successfully
|
||||
* de-compress a stream, if they start from the very beginning till the end.
|
||||
* One of the reasons is the stored state at the beginning of the stream which
|
||||
* is crucial for de-compression.
|
||||
*
|
||||
* Yet there are few codecs which do not save the whole state at the beginning
|
||||
* of the stream and hence can be used to de-compress stream starting at any
|
||||
* arbitrary points. This interface is meant to be used by such codecs. Such
|
||||
* codecs are highly valuable, especially in the context of Hadoop, because
|
||||
* an input compressed file can be split and hence can be worked on by multiple
|
||||
* machines in parallel.
|
||||
*/
|
||||
public interface SplittableCompressionCodec extends CompressionCodec {
|
||||
|
||||
/**
|
||||
* During decompression, data can be read off from the decompressor in two
|
||||
* modes, namely continuous and blocked. Few codecs (e.g. BZip2) are capable
|
||||
* of compressing data in blocks and then decompressing the blocks. In
|
||||
* Blocked reading mode codecs inform 'end of block' events to its caller.
|
||||
* While in continuous mode, the caller of codecs is unaware about the blocks
|
||||
* and uncompressed data is spilled out like a continuous stream.
|
||||
*/
|
||||
public enum READ_MODE {CONTINUOUS, BYBLOCK};
|
||||
|
||||
/**
|
||||
* Create a stream as dictated by the readMode. This method is used when
|
||||
* the codecs wants the ability to work with the underlying stream positions.
|
||||
*
|
||||
* @param seekableIn The seekable input stream (seeks in compressed data)
|
||||
* @param start The start offset into the compressed stream. May be changed
|
||||
* by the underlying codec.
|
||||
* @param end The end offset into the compressed stream. May be changed by
|
||||
* the underlying codec.
|
||||
* @param readMode Controls whether stream position is reported continuously
|
||||
* from the compressed stream only only at block boundaries.
|
||||
* @return a stream to read uncompressed bytes from
|
||||
*/
|
||||
SplitCompressionInputStream createInputStream(InputStream seekableIn,
|
||||
Decompressor decompressor, long start, long end, READ_MODE readMode)
|
||||
throws IOException;
|
||||
|
||||
}
|
@ -44,6 +44,14 @@ public interface BZip2Constants {
|
||||
int N_ITERS = 4;
|
||||
int MAX_SELECTORS = (2 + (900000 / G_SIZE));
|
||||
int NUM_OVERSHOOT_BYTES = 20;
|
||||
/**
|
||||
* End of a BZip2 block
|
||||
*/
|
||||
public static final int END_OF_BLOCK = -2;
|
||||
/**
|
||||
* End of BZip2 stream.
|
||||
*/
|
||||
public static final int END_OF_STREAM = -1;
|
||||
|
||||
/**
|
||||
* This array really shouldn't be here. Again, for historical purposes it
|
||||
|
@ -23,9 +23,13 @@
|
||||
*/
|
||||
package org.apache.hadoop.io.compress.bzip2;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;
|
||||
|
||||
|
||||
/**
|
||||
* An input stream that decompresses from the BZip2 format (without the file
|
||||
* header chars) to be read as any other stream.
|
||||
@ -45,30 +49,43 @@
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* This Ant code was enhanced so that it can de-compress blocks of bzip2 data.
|
||||
* Current position in the stream is an important statistic for Hadoop. For
|
||||
* example in LineRecordReader, we solely depend on the current position in the
|
||||
* stream to know about the progess. The notion of position becomes complicated
|
||||
* for compressed files. The Hadoop splitting is done in terms of compressed
|
||||
* file. But a compressed file deflates to a large amount of data. So we have
|
||||
* handled this problem in the following way.
|
||||
*
|
||||
* On object creation time, we find the next block start delimiter. Once such a
|
||||
* marker is found, the stream stops there (we discard any read compressed data
|
||||
* in this process) and the position is updated (i.e. the caller of this class
|
||||
* will find out the stream location). At this point we are ready for actual
|
||||
* reading (i.e. decompression) of data.
|
||||
*
|
||||
* The subsequent read calls give out data. The position is updated when the
|
||||
* caller of this class has read off the current block + 1 bytes. In between the
|
||||
* block reading, position is not updated. (We can only update the postion on
|
||||
* block boundaries).
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* Instances of this class are not threadsafe.
|
||||
* </p>
|
||||
*/
|
||||
public class CBZip2InputStream extends InputStream implements BZip2Constants {
|
||||
|
||||
private static void reportCRCError() throws IOException {
|
||||
|
||||
throw new IOException("BZip2 CRC error");
|
||||
|
||||
}
|
||||
|
||||
private void makeMaps() {
|
||||
final boolean[] inUse = this.data.inUse;
|
||||
final byte[] seqToUnseq = this.data.seqToUnseq;
|
||||
|
||||
int nInUseShadow = 0;
|
||||
|
||||
for (int i = 0; i < 256; i++) {
|
||||
if (inUse[i])
|
||||
seqToUnseq[nInUseShadow++] = (byte) i;
|
||||
}
|
||||
|
||||
this.nInUse = nInUseShadow;
|
||||
}
|
||||
public static final long BLOCK_DELIMITER = 0X314159265359L;// start of block
|
||||
public static final long EOS_DELIMITER = 0X177245385090L;// end of bzip2 stream
|
||||
private static final int DELIMITER_BIT_LENGTH = 48;
|
||||
READ_MODE readMode = READ_MODE.CONTINUOUS;
|
||||
// The variable records the current advertised position of the stream.
|
||||
private long reportedBytesReadFromCompressedStream = 0L;
|
||||
// The following variable keep record of compressed bytes read.
|
||||
private long bytesReadFromCompressedStream = 0L;
|
||||
private boolean lazyInitialization = false;
|
||||
private byte array[] = new byte[1];
|
||||
|
||||
/**
|
||||
* Index of the last char in the block, so the block size == last + 1.
|
||||
@ -86,32 +103,34 @@ private void makeMaps() {
|
||||
*/
|
||||
private int blockSize100k;
|
||||
|
||||
private boolean blockRandomised;
|
||||
private boolean blockRandomised = false;
|
||||
|
||||
private int bsBuff;
|
||||
private int bsLive;
|
||||
private long bsBuff;
|
||||
private long bsLive;
|
||||
private final CRC crc = new CRC();
|
||||
|
||||
private int nInUse;
|
||||
|
||||
private InputStream in;
|
||||
private BufferedInputStream in;
|
||||
|
||||
private int currentChar = -1;
|
||||
|
||||
private static final int EOF = 0;
|
||||
private static final int START_BLOCK_STATE = 1;
|
||||
private static final int RAND_PART_A_STATE = 2;
|
||||
private static final int RAND_PART_B_STATE = 3;
|
||||
private static final int RAND_PART_C_STATE = 4;
|
||||
private static final int NO_RAND_PART_A_STATE = 5;
|
||||
private static final int NO_RAND_PART_B_STATE = 6;
|
||||
private static final int NO_RAND_PART_C_STATE = 7;
|
||||
/**
|
||||
* A state machine to keep track of current state of the de-coder
|
||||
*
|
||||
*/
|
||||
public enum STATE {
|
||||
EOF, START_BLOCK_STATE, RAND_PART_A_STATE, RAND_PART_B_STATE, RAND_PART_C_STATE, NO_RAND_PART_A_STATE, NO_RAND_PART_B_STATE, NO_RAND_PART_C_STATE, NO_PROCESS_STATE
|
||||
};
|
||||
|
||||
private int currentState = START_BLOCK_STATE;
|
||||
private STATE currentState = STATE.START_BLOCK_STATE;
|
||||
|
||||
private int storedBlockCRC, storedCombinedCRC;
|
||||
private int computedBlockCRC, computedCombinedCRC;
|
||||
|
||||
private boolean skipResult = false;// used by skipToNextMarker
|
||||
private static boolean skipDecompression = false;
|
||||
|
||||
// Variables used by setup* methods exclusively
|
||||
|
||||
private int su_count;
|
||||
@ -129,6 +148,121 @@ private void makeMaps() {
|
||||
*/
|
||||
private CBZip2InputStream.Data data;
|
||||
|
||||
/**
|
||||
* This method reports the processed bytes so far. Please note that this
|
||||
* statistic is only updated on block boundaries and only when the stream is
|
||||
* initiated in BYBLOCK mode.
|
||||
*/
|
||||
public long getProcessedByteCount() {
|
||||
return reportedBytesReadFromCompressedStream;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method keeps track of raw processed compressed
|
||||
* bytes.
|
||||
*
|
||||
* @param count count is the number of bytes to be
|
||||
* added to raw processed bytes
|
||||
*/
|
||||
|
||||
protected void updateProcessedByteCount(int count) {
|
||||
this.bytesReadFromCompressedStream += count;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is called by the client of this
|
||||
* class in case there are any corrections in
|
||||
* the stream position. One common example is
|
||||
* when client of this code removes starting BZ
|
||||
* characters from the compressed stream.
|
||||
*
|
||||
* @param count count bytes are added to the reported bytes
|
||||
*
|
||||
*/
|
||||
public void updateReportedByteCount(int count) {
|
||||
this.reportedBytesReadFromCompressedStream += count;
|
||||
this.updateProcessedByteCount(count);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method reads a Byte from the compressed stream. Whenever we need to
|
||||
* read from the underlying compressed stream, this method should be called
|
||||
* instead of directly calling the read method of the underlying compressed
|
||||
* stream. This method does important record keeping to have the statistic
|
||||
* that how many bytes have been read off the compressed stream.
|
||||
*/
|
||||
private int readAByte(InputStream inStream) throws IOException {
|
||||
int read = inStream.read();
|
||||
if (read >= 0) {
|
||||
this.updateProcessedByteCount(1);
|
||||
}
|
||||
return read;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method tries to find the marker (passed to it as the first parameter)
|
||||
* in the stream. It can find bit patterns of length <= 63 bits. Specifically
|
||||
* this method is used in CBZip2InputStream to find the end of block (EOB)
|
||||
* delimiter in the stream, starting from the current position of the stream.
|
||||
* If marker is found, the stream position will be right after marker at the
|
||||
* end of this call.
|
||||
*
|
||||
* @param marker The bit pattern to be found in the stream
|
||||
* @param markerBitLength No of bits in the marker
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws IllegalArgumentException if marketBitLength is greater than 63
|
||||
*/
|
||||
public boolean skipToNextMarker(long marker, int markerBitLength)
|
||||
throws IOException, IllegalArgumentException {
|
||||
try {
|
||||
if (markerBitLength > 63) {
|
||||
throw new IllegalArgumentException(
|
||||
"skipToNextMarker can not find patterns greater than 63 bits");
|
||||
}
|
||||
// pick next marketBitLength bits in the stream
|
||||
long bytes = 0;
|
||||
bytes = this.bsR(markerBitLength);
|
||||
if (bytes == -1) {
|
||||
return false;
|
||||
}
|
||||
while (true) {
|
||||
if (bytes == marker) {
|
||||
return true;
|
||||
|
||||
} else {
|
||||
bytes = bytes << 1;
|
||||
bytes = bytes & ((1L << markerBitLength) - 1);
|
||||
int oneBit = (int) this.bsR(1);
|
||||
if (oneBit != -1) {
|
||||
bytes = bytes | oneBit;
|
||||
} else
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
protected void reportCRCError() throws IOException {
|
||||
throw new IOException("crc error");
|
||||
}
|
||||
|
||||
private void makeMaps() {
|
||||
final boolean[] inUse = this.data.inUse;
|
||||
final byte[] seqToUnseq = this.data.seqToUnseq;
|
||||
|
||||
int nInUseShadow = 0;
|
||||
|
||||
for (int i = 0; i < 256; i++) {
|
||||
if (inUse[i])
|
||||
seqToUnseq[nInUseShadow++] = (byte) i;
|
||||
}
|
||||
|
||||
this.nInUse = nInUseShadow;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new CBZip2InputStream which decompresses bytes read from the
|
||||
* specified stream.
|
||||
@ -145,21 +279,99 @@ private void makeMaps() {
|
||||
* @throws NullPointerException
|
||||
* if <tt>in == null</tt>
|
||||
*/
|
||||
public CBZip2InputStream(final InputStream in) throws IOException {
|
||||
super();
|
||||
public CBZip2InputStream(final InputStream in, READ_MODE readMode)
|
||||
throws IOException {
|
||||
|
||||
this.in = in;
|
||||
super();
|
||||
int blockSize = 0X39;// i.e 9
|
||||
this.blockSize100k = blockSize - '0';
|
||||
this.in = new BufferedInputStream(in, 1024 * 9);// >1 MB buffer
|
||||
this.readMode = readMode;
|
||||
if (readMode == READ_MODE.CONTINUOUS) {
|
||||
currentState = STATE.START_BLOCK_STATE;
|
||||
lazyInitialization = (in.available() == 0)?true:false;
|
||||
if(!lazyInitialization){
|
||||
init();
|
||||
}
|
||||
} else if (readMode == READ_MODE.BYBLOCK) {
|
||||
this.currentState = STATE.NO_PROCESS_STATE;
|
||||
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH);
|
||||
this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream;
|
||||
if(!skipDecompression){
|
||||
changeStateToProcessABlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of bytes between the current stream position
|
||||
* and the immediate next BZip2 block marker.
|
||||
*
|
||||
* @param in
|
||||
* The InputStream
|
||||
*
|
||||
* @return long Number of bytes between current stream position and the
|
||||
* next BZip2 block start marker.
|
||||
* @throws IOException
|
||||
*
|
||||
*/
|
||||
public static long numberOfBytesTillNextMarker(final InputStream in) throws IOException{
|
||||
CBZip2InputStream.skipDecompression = true;
|
||||
CBZip2InputStream anObject = null;
|
||||
|
||||
anObject = new CBZip2InputStream(in, READ_MODE.BYBLOCK);
|
||||
|
||||
return anObject.getProcessedByteCount();
|
||||
}
|
||||
|
||||
public CBZip2InputStream(final InputStream in) throws IOException {
|
||||
this(in, READ_MODE.CONTINUOUS);
|
||||
}
|
||||
|
||||
private void changeStateToProcessABlock() throws IOException {
|
||||
if (skipResult == true) {
|
||||
initBlock();
|
||||
setupBlock();
|
||||
} else {
|
||||
this.currentState = STATE.EOF;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public int read() throws IOException {
|
||||
|
||||
if (this.in != null) {
|
||||
return read0();
|
||||
int result = this.read(array, 0, 1);
|
||||
int value = 0XFF & array[0];
|
||||
return (result > 0 ? value : result);
|
||||
|
||||
} else {
|
||||
throw new IOException("stream closed");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In CONTINOUS reading mode, this read method starts from the
|
||||
* start of the compressed stream and end at the end of file by
|
||||
* emitting un-compressed data. In this mode stream positioning
|
||||
* is not announced and should be ignored.
|
||||
*
|
||||
* In BYBLOCK reading mode, this read method informs about the end
|
||||
* of a BZip2 block by returning EOB. At this event, the compressed
|
||||
* stream position is also announced. This announcement tells that
|
||||
* how much of the compressed stream has been de-compressed and read
|
||||
* out of this class. In between EOB events, the stream position is
|
||||
* not updated.
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
* if the stream content is malformed or an I/O error occurs.
|
||||
*
|
||||
* @return int The return value greater than 0 are the bytes read. A value
|
||||
* of -1 means end of stream while -2 represents end of block
|
||||
*/
|
||||
|
||||
|
||||
public int read(final byte[] dest, final int offs, final int len)
|
||||
throws IOException {
|
||||
if (offs < 0) {
|
||||
@ -176,13 +388,39 @@ public int read(final byte[] dest, final int offs, final int len)
|
||||
throw new IOException("stream closed");
|
||||
}
|
||||
|
||||
final int hi = offs + len;
|
||||
int destOffs = offs;
|
||||
for (int b; (destOffs < hi) && ((b = read0()) >= 0);) {
|
||||
dest[destOffs++] = (byte) b;
|
||||
if(lazyInitialization){
|
||||
this.init();
|
||||
this.lazyInitialization = false;
|
||||
}
|
||||
|
||||
return (destOffs == offs) ? -1 : (destOffs - offs);
|
||||
if(skipDecompression){
|
||||
changeStateToProcessABlock();
|
||||
CBZip2InputStream.skipDecompression = false;
|
||||
}
|
||||
|
||||
final int hi = offs + len;
|
||||
int destOffs = offs;
|
||||
int b = 0;
|
||||
|
||||
|
||||
|
||||
for (; ((destOffs < hi) && ((b = read0())) >= 0);) {
|
||||
dest[destOffs++] = (byte) b;
|
||||
|
||||
}
|
||||
|
||||
int result = destOffs - offs;
|
||||
if (result == 0) {
|
||||
//report 'end of block' or 'end of stream'
|
||||
result = b;
|
||||
|
||||
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
|
||||
//Exactly when we are about to start a new block, we advertise the stream position.
|
||||
this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream;
|
||||
|
||||
changeStateToProcessABlock();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private int read0() throws IOException {
|
||||
@ -190,7 +428,10 @@ private int read0() throws IOException {
|
||||
|
||||
switch (this.currentState) {
|
||||
case EOF:
|
||||
return -1;
|
||||
return END_OF_STREAM;// return -1
|
||||
|
||||
case NO_PROCESS_STATE:
|
||||
return END_OF_BLOCK;// return -2
|
||||
|
||||
case START_BLOCK_STATE:
|
||||
throw new IllegalStateException();
|
||||
@ -225,13 +466,13 @@ private int read0() throws IOException {
|
||||
}
|
||||
|
||||
private void init() throws IOException {
|
||||
int magic2 = this.in.read();
|
||||
int magic2 = this.readAByte(in);
|
||||
if (magic2 != 'h') {
|
||||
throw new IOException("Stream is not BZip2 formatted: expected 'h'"
|
||||
+ " as first byte but got '" + (char) magic2 + "'");
|
||||
}
|
||||
|
||||
int blockSize = this.in.read();
|
||||
int blockSize = this.readAByte(in);
|
||||
if ((blockSize < '1') || (blockSize > '9')) {
|
||||
throw new IOException("Stream is not BZip2 formatted: illegal "
|
||||
+ "blocksize " + (char) blockSize);
|
||||
@ -244,6 +485,27 @@ private void init() throws IOException {
|
||||
}
|
||||
|
||||
private void initBlock() throws IOException {
|
||||
if (this.readMode == READ_MODE.BYBLOCK) {
|
||||
// this.checkBlockIntegrity();
|
||||
this.storedBlockCRC = bsGetInt();
|
||||
this.blockRandomised = bsR(1) == 1;
|
||||
|
||||
/**
|
||||
* Allocate data here instead in constructor, so we do not allocate
|
||||
* it if the input file is empty.
|
||||
*/
|
||||
if (this.data == null) {
|
||||
this.data = new Data(this.blockSize100k);
|
||||
}
|
||||
|
||||
// currBlockNo++;
|
||||
getAndMoveToFrontDecode();
|
||||
|
||||
this.crc.initialiseCRC();
|
||||
this.currentState = STATE.START_BLOCK_STATE;
|
||||
return;
|
||||
}
|
||||
|
||||
char magic0 = bsGetUByte();
|
||||
char magic1 = bsGetUByte();
|
||||
char magic2 = bsGetUByte();
|
||||
@ -261,7 +523,7 @@ private void initBlock() throws IOException {
|
||||
magic4 != 0x53 || // 'S'
|
||||
magic5 != 0x59 // 'Y'
|
||||
) {
|
||||
this.currentState = EOF;
|
||||
this.currentState = STATE.EOF;
|
||||
throw new IOException("bad block header");
|
||||
} else {
|
||||
this.storedBlockCRC = bsGetInt();
|
||||
@ -279,7 +541,7 @@ private void initBlock() throws IOException {
|
||||
getAndMoveToFrontDecode();
|
||||
|
||||
this.crc.initialiseCRC();
|
||||
this.currentState = START_BLOCK_STATE;
|
||||
this.currentState = STATE.START_BLOCK_STATE;
|
||||
}
|
||||
}
|
||||
|
||||
@ -304,7 +566,7 @@ private void endBlock() throws IOException {
|
||||
|
||||
private void complete() throws IOException {
|
||||
this.storedCombinedCRC = bsGetInt();
|
||||
this.currentState = EOF;
|
||||
this.currentState = STATE.EOF;
|
||||
this.data = null;
|
||||
|
||||
if (this.storedCombinedCRC != this.computedCombinedCRC) {
|
||||
@ -326,14 +588,14 @@ public void close() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private int bsR(final int n) throws IOException {
|
||||
int bsLiveShadow = this.bsLive;
|
||||
int bsBuffShadow = this.bsBuff;
|
||||
private long bsR(final long n) throws IOException {
|
||||
long bsLiveShadow = this.bsLive;
|
||||
long bsBuffShadow = this.bsBuff;
|
||||
|
||||
if (bsLiveShadow < n) {
|
||||
final InputStream inShadow = this.in;
|
||||
do {
|
||||
int thech = inShadow.read();
|
||||
int thech = readAByte(inShadow);
|
||||
|
||||
if (thech < 0) {
|
||||
throw new IOException("unexpected end of stream");
|
||||
@ -347,15 +609,15 @@ private int bsR(final int n) throws IOException {
|
||||
}
|
||||
|
||||
this.bsLive = bsLiveShadow - n;
|
||||
return (bsBuffShadow >> (bsLiveShadow - n)) & ((1 << n) - 1);
|
||||
return (bsBuffShadow >> (bsLiveShadow - n)) & ((1L << n) - 1);
|
||||
}
|
||||
|
||||
private boolean bsGetBit() throws IOException {
|
||||
int bsLiveShadow = this.bsLive;
|
||||
int bsBuffShadow = this.bsBuff;
|
||||
long bsLiveShadow = this.bsLive;
|
||||
long bsBuffShadow = this.bsBuff;
|
||||
|
||||
if (bsLiveShadow < 1) {
|
||||
int thech = this.in.read();
|
||||
int thech = this.readAByte(in);
|
||||
|
||||
if (thech < 0) {
|
||||
throw new IOException("unexpected end of stream");
|
||||
@ -375,7 +637,7 @@ private char bsGetUByte() throws IOException {
|
||||
}
|
||||
|
||||
private int bsGetInt() throws IOException {
|
||||
return (((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8);
|
||||
return (int) ((((((bsR(8) << 8) | bsR(8)) << 8) | bsR(8)) << 8) | bsR(8));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -454,8 +716,8 @@ private void recvDecodingTables() throws IOException {
|
||||
final int alphaSize = this.nInUse + 2;
|
||||
|
||||
/* Now the selectors */
|
||||
final int nGroups = bsR(3);
|
||||
final int nSelectors = bsR(15);
|
||||
final int nGroups = (int) bsR(3);
|
||||
final int nSelectors = (int) bsR(15);
|
||||
|
||||
for (int i = 0; i < nSelectors; i++) {
|
||||
int j = 0;
|
||||
@ -486,7 +748,7 @@ private void recvDecodingTables() throws IOException {
|
||||
|
||||
/* Now the coding tables */
|
||||
for (int t = 0; t < nGroups; t++) {
|
||||
int curr = bsR(5);
|
||||
int curr = (int) bsR(5);
|
||||
final char[] len_t = len[t];
|
||||
for (int i = 0; i < alphaSize; i++) {
|
||||
while (bsGetBit()) {
|
||||
@ -532,7 +794,7 @@ private void createHuffmanDecodingTables(final int alphaSize,
|
||||
}
|
||||
|
||||
private void getAndMoveToFrontDecode() throws IOException {
|
||||
this.origPtr = bsR(24);
|
||||
this.origPtr = (int) bsR(24);
|
||||
recvDecodingTables();
|
||||
|
||||
final InputStream inShadow = this.in;
|
||||
@ -562,8 +824,8 @@ private void getAndMoveToFrontDecode() throws IOException {
|
||||
int groupPos = G_SIZE - 1;
|
||||
final int eob = this.nInUse + 1;
|
||||
int nextSym = getAndMoveToFrontDecode0(0);
|
||||
int bsBuffShadow = this.bsBuff;
|
||||
int bsLiveShadow = this.bsLive;
|
||||
int bsBuffShadow = (int) this.bsBuff;
|
||||
int bsLiveShadow = (int) this.bsLive;
|
||||
int lastShadow = -1;
|
||||
int zt = selector[groupNo] & 0xff;
|
||||
int[] base_zt = base[zt];
|
||||
@ -597,10 +859,8 @@ private void getAndMoveToFrontDecode() throws IOException {
|
||||
|
||||
int zn = minLens_zt;
|
||||
|
||||
// Inlined:
|
||||
// int zvec = bsR(zn);
|
||||
while (bsLiveShadow < zn) {
|
||||
final int thech = inShadow.read();
|
||||
final int thech = readAByte(inShadow);
|
||||
if (thech >= 0) {
|
||||
bsBuffShadow = (bsBuffShadow << 8) | thech;
|
||||
bsLiveShadow += 8;
|
||||
@ -609,14 +869,14 @@ private void getAndMoveToFrontDecode() throws IOException {
|
||||
throw new IOException("unexpected end of stream");
|
||||
}
|
||||
}
|
||||
int zvec = (bsBuffShadow >> (bsLiveShadow - zn))
|
||||
long zvec = (bsBuffShadow >> (bsLiveShadow - zn))
|
||||
& ((1 << zn) - 1);
|
||||
bsLiveShadow -= zn;
|
||||
|
||||
while (zvec > limit_zt[zn]) {
|
||||
zn++;
|
||||
while (bsLiveShadow < 1) {
|
||||
final int thech = inShadow.read();
|
||||
final int thech = readAByte(inShadow);
|
||||
if (thech >= 0) {
|
||||
bsBuffShadow = (bsBuffShadow << 8) | thech;
|
||||
bsLiveShadow += 8;
|
||||
@ -630,7 +890,7 @@ private void getAndMoveToFrontDecode() throws IOException {
|
||||
zvec = (zvec << 1)
|
||||
| ((bsBuffShadow >> bsLiveShadow) & 1);
|
||||
}
|
||||
nextSym = perm_zt[zvec - base_zt[zn]];
|
||||
nextSym = perm_zt[(int) (zvec - base_zt[zn])];
|
||||
}
|
||||
|
||||
final byte ch = seqToUnseq[yy[0]];
|
||||
@ -680,10 +940,8 @@ private void getAndMoveToFrontDecode() throws IOException {
|
||||
|
||||
int zn = minLens_zt;
|
||||
|
||||
// Inlined:
|
||||
// int zvec = bsR(zn);
|
||||
while (bsLiveShadow < zn) {
|
||||
final int thech = inShadow.read();
|
||||
final int thech = readAByte(inShadow);
|
||||
if (thech >= 0) {
|
||||
bsBuffShadow = (bsBuffShadow << 8) | thech;
|
||||
bsLiveShadow += 8;
|
||||
@ -699,7 +957,7 @@ private void getAndMoveToFrontDecode() throws IOException {
|
||||
while (zvec > limit_zt[zn]) {
|
||||
zn++;
|
||||
while (bsLiveShadow < 1) {
|
||||
final int thech = inShadow.read();
|
||||
final int thech = readAByte(inShadow);
|
||||
if (thech >= 0) {
|
||||
bsBuffShadow = (bsBuffShadow << 8) | thech;
|
||||
bsLiveShadow += 8;
|
||||
@ -709,7 +967,7 @@ private void getAndMoveToFrontDecode() throws IOException {
|
||||
}
|
||||
}
|
||||
bsLiveShadow--;
|
||||
zvec = (zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1);
|
||||
zvec = ((zvec << 1) | ((bsBuffShadow >> bsLiveShadow) & 1));
|
||||
}
|
||||
nextSym = perm_zt[zvec - base_zt[zn]];
|
||||
}
|
||||
@ -726,14 +984,14 @@ private int getAndMoveToFrontDecode0(final int groupNo) throws IOException {
|
||||
final int zt = dataShadow.selector[groupNo] & 0xff;
|
||||
final int[] limit_zt = dataShadow.limit[zt];
|
||||
int zn = dataShadow.minLens[zt];
|
||||
int zvec = bsR(zn);
|
||||
int bsLiveShadow = this.bsLive;
|
||||
int bsBuffShadow = this.bsBuff;
|
||||
int zvec = (int) bsR(zn);
|
||||
int bsLiveShadow = (int) this.bsLive;
|
||||
int bsBuffShadow = (int) this.bsBuff;
|
||||
|
||||
while (zvec > limit_zt[zn]) {
|
||||
zn++;
|
||||
while (bsLiveShadow < 1) {
|
||||
final int thech = inShadow.read();
|
||||
final int thech = readAByte(inShadow);
|
||||
|
||||
if (thech >= 0) {
|
||||
bsBuffShadow = (bsBuffShadow << 8) | thech;
|
||||
@ -807,12 +1065,16 @@ private void setupRandPartA() throws IOException {
|
||||
this.su_ch2 = su_ch2Shadow ^= (this.su_rNToGo == 1) ? 1 : 0;
|
||||
this.su_i2++;
|
||||
this.currentChar = su_ch2Shadow;
|
||||
this.currentState = RAND_PART_B_STATE;
|
||||
this.currentState = STATE.RAND_PART_B_STATE;
|
||||
this.crc.updateCRC(su_ch2Shadow);
|
||||
} else {
|
||||
endBlock();
|
||||
if (readMode == READ_MODE.CONTINUOUS) {
|
||||
initBlock();
|
||||
setupBlock();
|
||||
} else if (readMode == READ_MODE.BYBLOCK) {
|
||||
this.currentState = STATE.NO_PROCESS_STATE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -824,19 +1086,23 @@ private void setupNoRandPartA() throws IOException {
|
||||
this.su_tPos = this.data.tt[this.su_tPos];
|
||||
this.su_i2++;
|
||||
this.currentChar = su_ch2Shadow;
|
||||
this.currentState = NO_RAND_PART_B_STATE;
|
||||
this.currentState = STATE.NO_RAND_PART_B_STATE;
|
||||
this.crc.updateCRC(su_ch2Shadow);
|
||||
} else {
|
||||
this.currentState = NO_RAND_PART_A_STATE;
|
||||
this.currentState = STATE.NO_RAND_PART_A_STATE;
|
||||
endBlock();
|
||||
if (readMode == READ_MODE.CONTINUOUS) {
|
||||
initBlock();
|
||||
setupBlock();
|
||||
} else if (readMode == READ_MODE.BYBLOCK) {
|
||||
this.currentState = STATE.NO_PROCESS_STATE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setupRandPartB() throws IOException {
|
||||
if (this.su_ch2 != this.su_chPrev) {
|
||||
this.currentState = RAND_PART_A_STATE;
|
||||
this.currentState = STATE.RAND_PART_A_STATE;
|
||||
this.su_count = 1;
|
||||
setupRandPartA();
|
||||
} else if (++this.su_count >= 4) {
|
||||
@ -851,13 +1117,13 @@ private void setupRandPartB() throws IOException {
|
||||
this.su_rNToGo--;
|
||||
}
|
||||
this.su_j2 = 0;
|
||||
this.currentState = RAND_PART_C_STATE;
|
||||
this.currentState = STATE.RAND_PART_C_STATE;
|
||||
if (this.su_rNToGo == 1) {
|
||||
this.su_z ^= 1;
|
||||
}
|
||||
setupRandPartC();
|
||||
} else {
|
||||
this.currentState = RAND_PART_A_STATE;
|
||||
this.currentState = STATE.RAND_PART_A_STATE;
|
||||
setupRandPartA();
|
||||
}
|
||||
}
|
||||
@ -868,7 +1134,7 @@ private void setupRandPartC() throws IOException {
|
||||
this.crc.updateCRC(this.su_ch2);
|
||||
this.su_j2++;
|
||||
} else {
|
||||
this.currentState = RAND_PART_A_STATE;
|
||||
this.currentState = STATE.RAND_PART_A_STATE;
|
||||
this.su_i2++;
|
||||
this.su_count = 0;
|
||||
setupRandPartA();
|
||||
@ -895,7 +1161,7 @@ private void setupNoRandPartC() throws IOException {
|
||||
this.currentChar = su_ch2Shadow;
|
||||
this.crc.updateCRC(su_ch2Shadow);
|
||||
this.su_j2++;
|
||||
this.currentState = NO_RAND_PART_C_STATE;
|
||||
this.currentState = STATE.NO_RAND_PART_C_STATE;
|
||||
} else {
|
||||
this.su_i2++;
|
||||
this.su_count = 0;
|
||||
|
@ -19,31 +19,38 @@
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.RandomDatum;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
|
||||
import org.apache.hadoop.util.LineReader;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
public class TestCodec extends TestCase {
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestCodec {
|
||||
|
||||
private static final Log LOG=
|
||||
LogFactory.getLog(TestCodec.class);
|
||||
@ -51,17 +58,20 @@ public class TestCodec extends TestCase {
|
||||
private Configuration conf = new Configuration();
|
||||
private int count = 10000;
|
||||
private int seed = new Random().nextInt();
|
||||
|
||||
|
||||
@Test
|
||||
public void testDefaultCodec() throws IOException {
|
||||
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DefaultCodec");
|
||||
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DefaultCodec");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGzipCodec() throws IOException {
|
||||
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
|
||||
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testBZip2Codec() throws IOException {
|
||||
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
|
||||
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
|
||||
@ -133,6 +143,109 @@ private static void codecTest(Configuration conf, int seed, int count,
|
||||
LOG.info("SUCCESS! Completed checking " + count + " records");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitableCodecs() throws Exception {
|
||||
testSplitableCodec(BZip2Codec.class);
|
||||
}
|
||||
|
||||
private void testSplitableCodec(
|
||||
Class<? extends SplittableCompressionCodec> codecClass)
|
||||
throws IOException {
|
||||
final long DEFLBYTES = 2 * 1024 * 1024;
|
||||
final Configuration conf = new Configuration();
|
||||
final Random rand = new Random();
|
||||
final long seed = rand.nextLong();
|
||||
LOG.info("seed: " + seed);
|
||||
rand.setSeed(seed);
|
||||
SplittableCompressionCodec codec =
|
||||
ReflectionUtils.newInstance(codecClass, conf);
|
||||
final FileSystem fs = FileSystem.getLocal(conf);
|
||||
final FileStatus infile =
|
||||
fs.getFileStatus(writeSplitTestFile(fs, rand, codec, DEFLBYTES));
|
||||
if (infile.getLen() > Integer.MAX_VALUE) {
|
||||
fail("Unexpected compression: " + DEFLBYTES + " -> " + infile.getLen());
|
||||
}
|
||||
final int flen = (int) infile.getLen();
|
||||
final Text line = new Text();
|
||||
final Decompressor dcmp = CodecPool.getDecompressor(codec);
|
||||
try {
|
||||
for (int pos = 0; pos < infile.getLen(); pos += rand.nextInt(flen / 8)) {
|
||||
// read from random positions, verifying that there exist two sequential
|
||||
// lines as written in writeSplitTestFile
|
||||
final SplitCompressionInputStream in =
|
||||
codec.createInputStream(fs.open(infile.getPath()), dcmp,
|
||||
pos, flen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
|
||||
if (in.getAdjustedStart() >= flen) {
|
||||
break;
|
||||
}
|
||||
LOG.info("SAMPLE " + in.getAdjustedStart() + "," + in.getAdjustedEnd());
|
||||
final LineReader lreader = new LineReader(in);
|
||||
lreader.readLine(line); // ignore; likely partial
|
||||
if (in.getPos() >= flen) {
|
||||
break;
|
||||
}
|
||||
lreader.readLine(line);
|
||||
final int seq1 = readLeadingInt(line);
|
||||
lreader.readLine(line);
|
||||
if (in.getPos() >= flen) {
|
||||
break;
|
||||
}
|
||||
final int seq2 = readLeadingInt(line);
|
||||
assertEquals("Mismatched lines", seq1 + 1, seq2);
|
||||
}
|
||||
} finally {
|
||||
CodecPool.returnDecompressor(dcmp);
|
||||
}
|
||||
// remove on success
|
||||
fs.delete(infile.getPath().getParent(), true);
|
||||
}
|
||||
|
||||
private static int readLeadingInt(Text txt) throws IOException {
|
||||
DataInputStream in =
|
||||
new DataInputStream(new ByteArrayInputStream(txt.getBytes()));
|
||||
return in.readInt();
|
||||
}
|
||||
|
||||
/** Write infLen bytes (deflated) to file in test dir using codec.
|
||||
* Records are of the form
|
||||
* <i><b64 rand><i+i><b64 rand>
|
||||
*/
|
||||
private static Path writeSplitTestFile(FileSystem fs, Random rand,
|
||||
CompressionCodec codec, long infLen) throws IOException {
|
||||
final int REC_SIZE = 1024;
|
||||
final Path wd = new Path(new Path(
|
||||
System.getProperty("test.build.data", "/tmp")).makeQualified(fs),
|
||||
codec.getClass().getSimpleName());
|
||||
final Path file = new Path(wd, "test" + codec.getDefaultExtension());
|
||||
final byte[] b = new byte[REC_SIZE];
|
||||
final Base64 b64 = new Base64();
|
||||
DataOutputStream fout = null;
|
||||
Compressor cmp = CodecPool.getCompressor(codec);
|
||||
try {
|
||||
fout = new DataOutputStream(codec.createOutputStream(
|
||||
fs.create(file, true), cmp));
|
||||
final DataOutputBuffer dob = new DataOutputBuffer(REC_SIZE * 4 / 3 + 4);
|
||||
int seq = 0;
|
||||
while (infLen > 0) {
|
||||
rand.nextBytes(b);
|
||||
final byte[] b64enc = b64.encode(b); // ensures rand printable, no LF
|
||||
dob.reset();
|
||||
dob.writeInt(seq);
|
||||
System.arraycopy(dob.getData(), 0, b64enc, 0, dob.getLength());
|
||||
fout.write(b64enc);
|
||||
fout.write('\n');
|
||||
++seq;
|
||||
infLen -= b64enc.length;
|
||||
}
|
||||
LOG.info("Wrote " + seq + " records to " + file);
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, fout);
|
||||
CodecPool.returnCompressor(cmp);
|
||||
}
|
||||
return file;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodecPoolGzipReuse() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean("hadoop.native.lib", true);
|
||||
@ -149,19 +262,21 @@ public void testCodecPoolGzipReuse() throws Exception {
|
||||
assertTrue("Got mismatched ZlibCompressor", c2 != CodecPool.getCompressor(gzc));
|
||||
}
|
||||
|
||||
public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException,
|
||||
@Test
|
||||
public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException,
|
||||
InstantiationException, IllegalAccessException {
|
||||
sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.DefaultCodec", 100);
|
||||
sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000);
|
||||
}
|
||||
|
||||
public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException,
|
||||
|
||||
@Test
|
||||
public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException,
|
||||
InstantiationException, IllegalAccessException {
|
||||
sequenceFileCodecTest(conf, 0, "org.apache.hadoop.io.compress.BZip2Codec", 100);
|
||||
sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.BZip2Codec", 100);
|
||||
sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000);
|
||||
}
|
||||
|
||||
|
||||
private static void sequenceFileCodecTest(Configuration conf, int lines,
|
||||
String codecClass, int blockSize)
|
||||
throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
|
||||
@ -242,8 +357,4 @@ public static void main(String[] args) {
|
||||
|
||||
}
|
||||
|
||||
public TestCodec(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user