HADOOP-14919. BZip2 drops records when reading data in splits. Contributed by Jason Lowe

This commit is contained in:
Jason Lowe 2017-10-31 09:30:13 -05:00
parent c02d2ba50d
commit 2fae63aa60
3 changed files with 98 additions and 49 deletions

View File

@ -204,43 +204,8 @@ public SplitCompressionInputStream createInputStream(InputStream seekableIn,
Seekable.class.getName()); Seekable.class.getName());
} }
//find the position of first BZip2 start up marker
((Seekable)seekableIn).seek(0);
// BZip2 start of block markers are of 6 bytes. But the very first block
// also has "BZh9", making it 10 bytes. This is the common case. But at
// time stream might start without a leading BZ.
final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
long adjStart = 0L;
if (start != 0) {
// Other than the first of file, the marker size is 6 bytes.
adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION
- (HEADER_LEN + SUB_HEADER_LEN)));
}
((Seekable)seekableIn).seek(adjStart);
SplitCompressionInputStream in =
new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
// The following if clause handles the following case:
// Assume the following scenario in BZip2 compressed stream where
// . represent compressed data.
// .....[48 bit Block].....[48 bit Block].....[48 bit Block]...
// ........................[47 bits][1 bit].....[48 bit Block]...
// ................................^[Assume a Byte alignment here]
// ........................................^^[current position of stream]
// .....................^^[We go back 10 Bytes in stream and find a Block marker]
// ........................................^^[We align at wrong position!]
// ...........................................................^^[While this pos is correct]
if (in.getPos() < start) {
((Seekable)seekableIn).seek(start); ((Seekable)seekableIn).seek(start);
in = new BZip2CompressionInputStream(seekableIn, start, end, readMode); return new BZip2CompressionInputStream(seekableIn, start, end, readMode);
}
return in;
} }
/** /**

View File

@ -52,20 +52,20 @@
* This Ant code was enhanced so that it can de-compress blocks of bzip2 data. * This Ant code was enhanced so that it can de-compress blocks of bzip2 data.
* Current position in the stream is an important statistic for Hadoop. For * Current position in the stream is an important statistic for Hadoop. For
* example in LineRecordReader, we solely depend on the current position in the * example in LineRecordReader, we solely depend on the current position in the
* stream to know about the progess. The notion of position becomes complicated * stream to know about the progress. The notion of position becomes complicated
* for compressed files. The Hadoop splitting is done in terms of compressed * for compressed files. The Hadoop splitting is done in terms of compressed
* file. But a compressed file deflates to a large amount of data. So we have * file. But a compressed file deflates to a large amount of data. So we have
* handled this problem in the following way. * handled this problem in the following way.
* *
* On object creation time, we find the next block start delimiter. Once such a * On object creation time, we find the next block start delimiter. Once such a
* marker is found, the stream stops there (we discard any read compressed data * marker is found, the stream stops there (we discard any read compressed data
* in this process) and the position is updated (i.e. the caller of this class * in this process) and the position is reported as the beginning of the block
* will find out the stream location). At this point we are ready for actual * start delimiter. At this point we are ready for actual reading
* reading (i.e. decompression) of data. * (i.e. decompression) of data.
* *
* The subsequent read calls give out data. The position is updated when the * The subsequent read calls give out data. The position is updated when the
* caller of this class has read off the current block + 1 bytes. In between the * caller of this class has read off the current block + 1 bytes. In between the
* block reading, position is not updated. (We can only update the postion on * block reading, position is not updated. (We can only update the position on
* block boundaries). * block boundaries).
* </p> * </p>
* *
@ -204,11 +204,12 @@ private int readAByte(InputStream inStream) throws IOException {
* in the stream. It can find bit patterns of length <= 63 bits. Specifically * in the stream. It can find bit patterns of length <= 63 bits. Specifically
* this method is used in CBZip2InputStream to find the end of block (EOB) * this method is used in CBZip2InputStream to find the end of block (EOB)
* delimiter in the stream, starting from the current position of the stream. * delimiter in the stream, starting from the current position of the stream.
* If marker is found, the stream position will be right after marker at the * If marker is found, the stream position will be at the byte containing
* end of this call. * the starting bit of the marker.
* *
* @param marker The bit pattern to be found in the stream * @param marker The bit pattern to be found in the stream
* @param markerBitLength No of bits in the marker * @param markerBitLength No of bits in the marker
* @return true if the marker was found otherwise false
* *
* @throws IOException * @throws IOException
* @throws IllegalArgumentException if marketBitLength is greater than 63 * @throws IllegalArgumentException if marketBitLength is greater than 63
@ -224,23 +225,33 @@ public boolean skipToNextMarker(long marker, int markerBitLength)
long bytes = 0; long bytes = 0;
bytes = this.bsR(markerBitLength); bytes = this.bsR(markerBitLength);
if (bytes == -1) { if (bytes == -1) {
this.reportedBytesReadFromCompressedStream =
this.bytesReadFromCompressedStream;
return false; return false;
} }
while (true) { while (true) {
if (bytes == marker) { if (bytes == marker) {
// Report the byte position where the marker starts
long markerBytesRead = (markerBitLength + this.bsLive + 7) / 8;
this.reportedBytesReadFromCompressedStream =
this.bytesReadFromCompressedStream - markerBytesRead;
return true; return true;
} else { } else {
bytes = bytes << 1; bytes = bytes << 1;
bytes = bytes & ((1L << markerBitLength) - 1); bytes = bytes & ((1L << markerBitLength) - 1);
int oneBit = (int) this.bsR(1); int oneBit = (int) this.bsR(1);
if (oneBit != -1) { if (oneBit != -1) {
bytes = bytes | oneBit; bytes = bytes | oneBit;
} else } else {
this.reportedBytesReadFromCompressedStream =
this.bytesReadFromCompressedStream;
return false; return false;
} }
} }
}
} catch (IOException ex) { } catch (IOException ex) {
this.reportedBytesReadFromCompressedStream =
this.bytesReadFromCompressedStream;
return false; return false;
} }
} }
@ -302,7 +313,6 @@ private CBZip2InputStream(final InputStream in, READ_MODE readMode, boolean skip
} else if (readMode == READ_MODE.BYBLOCK) { } else if (readMode == READ_MODE.BYBLOCK) {
this.currentState = STATE.NO_PROCESS_STATE; this.currentState = STATE.NO_PROCESS_STATE;
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH); skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH);
this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream;
if(!skipDecompression){ if(!skipDecompression){
changeStateToProcessABlock(); changeStateToProcessABlock();
} }
@ -419,8 +429,6 @@ public int read(final byte[] dest, final int offs, final int len)
result = b; result = b;
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH); skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
//Exactly when we are about to start a new block, we advertise the stream position.
this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream;
changeStateToProcessABlock(); changeStateToProcessABlock();
} }

View File

@ -186,6 +186,82 @@ public void testSplitableCodecs() throws IOException {
verifyPartitions(473608, 110, file, codec, conf); verifyPartitions(473608, 110, file, codec, conf);
} }
// Test a corner case when position of stream is right after BZip2 marker
@Test (timeout=900000)
public void testSplitableCodecs2() throws IOException {
JobConf conf = new JobConf(defaultConf);
// Create the codec
CompressionCodec codec = null;
try {
codec = (CompressionCodec)
ReflectionUtils.newInstance(conf.getClassByName("org.apache.hadoop.io.compress.BZip2Codec"), conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Illegal codec!");
}
Path file = new Path(workDir, "test"+codec.getDefaultExtension());
FileSystem localFs = FileSystem.getLocal(conf);
localFs.delete(workDir, true);
FileInputFormat.setInputPaths(conf, workDir);
int length = 250000;
LOG.info("creating; entries = " + length);
// create a file with length entries
Writer writer =
new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
try {
for (int i = 0; i < length; i++) {
writer.write(Integer.toString(i));
writer.write("\n");
}
} finally {
writer.close();
}
// Test split positions around a block boundary where the block does
// not start on a byte boundary.
for (long splitpos = 203418; splitpos < 203430; ++splitpos) {
TextInputFormat format = new TextInputFormat();
format.configure(conf);
LOG.info("setting block size of the input file to " + splitpos);
conf.setLong("mapreduce.input.fileinputformat.split.minsize", splitpos);
LongWritable key = new LongWritable();
Text value = new Text();
InputSplit[] splits = format.getSplits(conf, 2);
LOG.info("splitting: got = " + splits.length);
// check each split
BitSet bits = new BitSet(length);
for (int j = 0; j < splits.length; j++) {
LOG.debug("split[" + j + "]= " + splits[j]);
RecordReader<LongWritable, Text> reader =
format.getRecordReader(splits[j], conf, Reporter.NULL);
try {
int counter = 0;
while (reader.next(key, value)) {
int v = Integer.parseInt(value.toString());
LOG.debug("read " + v);
if (bits.get(v)) {
LOG.warn("conflict with " + v + " in split " + j +
" at position " + reader.getPos());
}
assertFalse("Key in multiple partitions.", bits.get(v));
bits.set(v);
counter++;
}
if (counter > 0) {
LOG.info("splits[" + j + "]=" + splits[j] + " count=" + counter);
} else {
LOG.debug("splits[" + j + "]=" + splits[j] + " count=" + counter);
}
} finally {
reader.close();
}
}
assertEquals("Some keys in no partition.", length, bits.cardinality());
}
}
private void verifyPartitions(int length, int numSplits, Path file, private void verifyPartitions(int length, int numSplits, Path file,
CompressionCodec codec, JobConf conf) throws IOException { CompressionCodec codec, JobConf conf) throws IOException {