HADOOP-13270. BZip2CompressionInputStream finds the same compression marker twice in corner case, causing duplicate data blocks. Contributed by Kai Sasaki.

This commit is contained in:
Akira Ajisaka 2016-06-14 10:18:17 +09:00
parent 709a814fe0
commit e3ba9ad3f1
2 changed files with 71 additions and 60 deletions

View File

@ -207,7 +207,12 @@ public SplitCompressionInputStream createInputStream(InputStream seekableIn,
// time stream might start without a leading BZ. // time stream might start without a leading BZ.
final long FIRST_BZIP2_BLOCK_MARKER_POSITION = final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION); long adjStart = 0L;
if (start != 0) {
// Other than the first of file, the marker size is 6 bytes.
adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION
- (HEADER_LEN + SUB_HEADER_LEN)));
}
((Seekable)seekableIn).seek(adjStart); ((Seekable)seekableIn).seek(adjStart);
SplitCompressionInputStream in = SplitCompressionInputStream in =

View File

@ -175,6 +175,20 @@ public void testSplitableCodecs() throws IOException {
for (int length = MAX_LENGTH / 2; length < MAX_LENGTH; for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
length += random.nextInt(MAX_LENGTH / 4)+1) { length += random.nextInt(MAX_LENGTH / 4)+1) {
for (int i = 0; i < 3; i++) {
int numSplits = random.nextInt(MAX_LENGTH / 2000) + 1;
verifyPartitions(length, numSplits, file, codec, conf);
}
}
// corner case when we have byte alignment and position of stream are same
verifyPartitions(471507, 218, file, codec, conf);
verifyPartitions(473608, 110, file, codec, conf);
}
private void verifyPartitions(int length, int numSplits, Path file,
CompressionCodec codec, JobConf conf) throws IOException {
LOG.info("creating; entries = " + length); LOG.info("creating; entries = " + length);
@ -195,26 +209,22 @@ public void testSplitableCodecs() throws IOException {
format.configure(conf); format.configure(conf);
LongWritable key = new LongWritable(); LongWritable key = new LongWritable();
Text value = new Text(); Text value = new Text();
for (int i = 0; i < 3; i++) {
int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
LOG.info("splitting: requesting = " + numSplits); LOG.info("splitting: requesting = " + numSplits);
InputSplit[] splits = format.getSplits(conf, numSplits); InputSplit[] splits = format.getSplits(conf, numSplits);
LOG.info("splitting: got = " + splits.length); LOG.info("splitting: got = " + splits.length);
// check each split // check each split
BitSet bits = new BitSet(length); BitSet bits = new BitSet(length);
for (int j = 0; j < splits.length; j++) { for (int j = 0; j < splits.length; j++) {
LOG.debug("split["+j+"]= " + splits[j]); LOG.debug("split["+j+"]= " + splits[j]);
RecordReader<LongWritable, Text> reader = RecordReader<LongWritable, Text> reader =
format.getRecordReader(splits[j], conf, reporter); format.getRecordReader(splits[j], conf, Reporter.NULL);
try { try {
int counter = 0; int counter = 0;
while (reader.next(key, value)) { while (reader.next(key, value)) {
int v = Integer.parseInt(value.toString()); int v = Integer.parseInt(value.toString());
LOG.debug("read " + v); LOG.debug("read " + v);
if (bits.get(v)) { if (bits.get(v)) {
LOG.warn("conflict with " + v + LOG.warn("conflict with " + v +
" in split " + j + " in split " + j +
@ -236,10 +246,6 @@ public void testSplitableCodecs() throws IOException {
assertEquals("Some keys in no partition.", length, bits.cardinality()); assertEquals("Some keys in no partition.", length, bits.cardinality());
} }
}
}
private static LineReader makeStream(String str) throws IOException { private static LineReader makeStream(String str) throws IOException {
return new LineReader(new ByteArrayInputStream return new LineReader(new ByteArrayInputStream
(str.getBytes("UTF-8")), (str.getBytes("UTF-8")),