HADOOP-6852. apparent bug in concatenated-bzip2 support (decoding). Contributed by Zsolt Venczel.

This commit is contained in:
Sean Mackrory 2018-02-21 12:53:18 -07:00
parent 92cbbfe79e
commit 2bc3351eaf
9 changed files with 42 additions and 46 deletions

View File

@ -615,6 +615,7 @@
<excludes> <excludes>
<exclude>testjar/*</exclude> <exclude>testjar/*</exclude>
<exclude>testshell/*</exclude> <exclude>testshell/*</exclude>
<exclude>testdata/*</exclude>
</excludes> </excludes>
</filter> </filter>
<!-- Mockito tries to include its own unrelocated copy of hamcrest. :( --> <!-- Mockito tries to include its own unrelocated copy of hamcrest. :( -->

View File

@ -180,7 +180,8 @@ public CompressionInputStream createInputStream(InputStream in,
new DecompressorStream(in, decompressor, new DecompressorStream(in, decompressor,
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT)) : IO_FILE_BUFFER_SIZE_DEFAULT)) :
new BZip2CompressionInputStream(in); new BZip2CompressionInputStream(
in, 0L, Long.MAX_VALUE, READ_MODE.BYBLOCK);
} }
/** /**

View File

@ -18,18 +18,6 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.Inflater;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
@ -42,16 +30,26 @@
import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.junit.After; import org.junit.After;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@Ignore import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.Inflater;
import static org.junit.Assert.*;
/**
* Test class for concatenated {@link CompressionInputStream}.
*/
public class TestConcatenatedCompressedInput { public class TestConcatenatedCompressedInput {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestConcatenatedCompressedInput.class); LoggerFactory.getLogger(TestConcatenatedCompressedInput.class);
private static int MAX_LENGTH = 10000;
private static JobConf defaultConf = new JobConf(); private static JobConf defaultConf = new JobConf();
private static FileSystem localFs = null; private static FileSystem localFs = null;
@ -85,13 +83,15 @@ public class TestConcatenatedCompressedInput {
public void after() { public void after() {
ZlibFactory.loadNativeZLib(); ZlibFactory.loadNativeZLib();
} }
private static final String DEFAULT_WORK_DIR = "target/test-classes/testdata";
private static Path workDir = localFs.makeQualified(new Path( private static Path workDir = localFs.makeQualified(new Path(
System.getProperty("test.build.data", "/tmp"), System.getProperty("test.build.data", DEFAULT_WORK_DIR),
"TestConcatenatedCompressedInput")); "TestConcatenatedCompressedInput"));
private static LineReader makeStream(String str) throws IOException { private static LineReader makeStream(String str) throws IOException {
return new LineReader(new ByteArrayInputStream(str.getBytes("UTF-8")), return new LineReader(new ByteArrayInputStream(
defaultConf); str.getBytes("UTF-8")), defaultConf);
} }
private static void writeFile(FileSystem fs, Path name, private static void writeFile(FileSystem fs, Path name,
@ -190,7 +190,8 @@ public void testGzip() throws IOException {
// copy prebuilt (correct!) version of concat.gz to HDFS // copy prebuilt (correct!) version of concat.gz to HDFS
final String fn = "concat" + gzip.getDefaultExtension(); final String fn = "concat" + gzip.getDefaultExtension();
Path fnLocal = new Path(System.getProperty("test.concat.data", "/tmp"), fn); Path fnLocal = new Path(
System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn);
Path fnHDFS = new Path(workDir, fn); Path fnHDFS = new Path(workDir, fn);
localFs.copyFromLocalFile(fnLocal, fnHDFS); localFs.copyFromLocalFile(fnLocal, fnHDFS);
@ -227,7 +228,7 @@ public void testGzip() throws IOException {
@Test @Test
public void testPrototypeInflaterGzip() throws IOException { public void testPrototypeInflaterGzip() throws IOException {
CompressionCodec gzip = new GzipCodec(); // used only for file extension CompressionCodec gzip = new GzipCodec(); // used only for file extension
localFs.delete(workDir, true); // localFs = FileSystem instance localFs.delete(workDir, true); // localFs = FileSystem instance
System.out.println(COLOR_BR_BLUE + "testPrototypeInflaterGzip() using " + System.out.println(COLOR_BR_BLUE + "testPrototypeInflaterGzip() using " +
"non-native/Java Inflater and manual gzip header/trailer parsing" + "non-native/Java Inflater and manual gzip header/trailer parsing" +
@ -235,7 +236,8 @@ public void testPrototypeInflaterGzip() throws IOException {
// copy prebuilt (correct!) version of concat.gz to HDFS // copy prebuilt (correct!) version of concat.gz to HDFS
final String fn = "concat" + gzip.getDefaultExtension(); final String fn = "concat" + gzip.getDefaultExtension();
Path fnLocal = new Path(System.getProperty("test.concat.data", "/tmp"), fn); Path fnLocal = new Path(
System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn);
Path fnHDFS = new Path(workDir, fn); Path fnHDFS = new Path(workDir, fn);
localFs.copyFromLocalFile(fnLocal, fnHDFS); localFs.copyFromLocalFile(fnLocal, fnHDFS);
@ -326,14 +328,16 @@ public void testBuiltInGzipDecompressor() throws IOException {
// copy single-member test file to HDFS // copy single-member test file to HDFS
String fn1 = "testConcatThenCompress.txt" + gzip.getDefaultExtension(); String fn1 = "testConcatThenCompress.txt" + gzip.getDefaultExtension();
Path fnLocal1 = new Path(System.getProperty("test.concat.data","/tmp"),fn1); Path fnLocal1 = new Path(
System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn1);
Path fnHDFS1 = new Path(workDir, fn1); Path fnHDFS1 = new Path(workDir, fn1);
localFs.copyFromLocalFile(fnLocal1, fnHDFS1); localFs.copyFromLocalFile(fnLocal1, fnHDFS1);
// copy multiple-member test file to HDFS // copy multiple-member test file to HDFS
// (actually in "seekable gzip" format, a la JIRA PIG-42) // (actually in "seekable gzip" format, a la JIRA PIG-42)
String fn2 = "testCompressThenConcat.txt" + gzip.getDefaultExtension(); String fn2 = "testCompressThenConcat.txt" + gzip.getDefaultExtension();
Path fnLocal2 = new Path(System.getProperty("test.concat.data","/tmp"),fn2); Path fnLocal2 = new Path(
System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn2);
Path fnHDFS2 = new Path(workDir, fn2); Path fnHDFS2 = new Path(workDir, fn2);
localFs.copyFromLocalFile(fnLocal2, fnHDFS2); localFs.copyFromLocalFile(fnLocal2, fnHDFS2);
@ -439,7 +443,8 @@ private static void doSingleGzipBufferSize(JobConf jConf) throws IOException {
InputSplit[] splits = format.getSplits(jConf, 100); InputSplit[] splits = format.getSplits(jConf, 100);
assertEquals("compressed splits == 2", 2, splits.length); assertEquals("compressed splits == 2", 2, splits.length);
FileSplit tmp = (FileSplit) splits[0]; FileSplit tmp = (FileSplit) splits[0];
if (tmp.getPath().getName().equals("testCompressThenConcat.txt.gz")) { if (tmp.getPath()
.getName().equals("testdata/testCompressThenConcat.txt.gz")) {
System.out.println(" (swapping)"); System.out.println(" (swapping)");
splits[0] = splits[1]; splits[0] = splits[1];
splits[1] = tmp; splits[1] = tmp;
@ -481,7 +486,8 @@ public void testBzip2() throws IOException {
// copy prebuilt (correct!) version of concat.bz2 to HDFS // copy prebuilt (correct!) version of concat.bz2 to HDFS
final String fn = "concat" + bzip2.getDefaultExtension(); final String fn = "concat" + bzip2.getDefaultExtension();
Path fnLocal = new Path(System.getProperty("test.concat.data", "/tmp"), fn); Path fnLocal = new Path(
System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn);
Path fnHDFS = new Path(workDir, fn); Path fnHDFS = new Path(workDir, fn);
localFs.copyFromLocalFile(fnLocal, fnHDFS); localFs.copyFromLocalFile(fnLocal, fnHDFS);
@ -531,13 +537,15 @@ public void testMoreBzip2() throws IOException {
// copy single-member test file to HDFS // copy single-member test file to HDFS
String fn1 = "testConcatThenCompress.txt" + bzip2.getDefaultExtension(); String fn1 = "testConcatThenCompress.txt" + bzip2.getDefaultExtension();
Path fnLocal1 = new Path(System.getProperty("test.concat.data","/tmp"),fn1); Path fnLocal1 = new Path(
System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn1);
Path fnHDFS1 = new Path(workDir, fn1); Path fnHDFS1 = new Path(workDir, fn1);
localFs.copyFromLocalFile(fnLocal1, fnHDFS1); localFs.copyFromLocalFile(fnLocal1, fnHDFS1);
// copy multiple-member test file to HDFS // copy multiple-member test file to HDFS
String fn2 = "testCompressThenConcat.txt" + bzip2.getDefaultExtension(); String fn2 = "testCompressThenConcat.txt" + bzip2.getDefaultExtension();
Path fnLocal2 = new Path(System.getProperty("test.concat.data","/tmp"),fn2); Path fnLocal2 = new Path(
System.getProperty("test.concat.data", DEFAULT_WORK_DIR), fn2);
Path fnHDFS2 = new Path(workDir, fn2); Path fnHDFS2 = new Path(workDir, fn2);
localFs.copyFromLocalFile(fnLocal2, fnHDFS2); localFs.copyFromLocalFile(fnLocal2, fnHDFS2);
@ -549,21 +557,6 @@ public void testMoreBzip2() throws IOException {
assertEquals("concat bytes available", 2567, in1.available()); assertEquals("concat bytes available", 2567, in1.available());
assertEquals("concat bytes available", 3056, in2.available()); assertEquals("concat bytes available", 3056, in2.available());
/*
// FIXME
// The while-loop below dies at the beginning of the 2nd concatenated
// member (after 17 lines successfully read) with:
//
// java.io.IOException: bad block header
// at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(
// CBZip2InputStream.java:527)
//
// It is not critical to concatenated-gzip support, HADOOP-6835, so it's
// simply commented out for now (and HADOOP-6852 filed). If and when the
// latter issue is resolved--perhaps by fixing an error here--this code
// should be reenabled. Note that the doMultipleBzip2BufferSizes() test
// below uses the same testCompressThenConcat.txt.bz2 file but works fine.
CompressionInputStream cin2 = bzip2.createInputStream(in2); CompressionInputStream cin2 = bzip2.createInputStream(in2);
LineReader in = new LineReader(cin2); LineReader in = new LineReader(cin2);
Text out = new Text(); Text out = new Text();
@ -578,7 +571,6 @@ public void testMoreBzip2() throws IOException {
5346, totalBytes); 5346, totalBytes);
assertEquals("total uncompressed lines in concatenated test file", assertEquals("total uncompressed lines in concatenated test file",
84, lineNum); 84, lineNum);
*/
// test CBZip2InputStream with lots of different input-buffer sizes // test CBZip2InputStream with lots of different input-buffer sizes
doMultipleBzip2BufferSizes(jobConf); doMultipleBzip2BufferSizes(jobConf);
@ -645,7 +637,8 @@ private static void doMultipleBzip2BufferSizes(JobConf jConf)
// this tests both files (testCompressThenConcat, testConcatThenCompress); all // this tests both files (testCompressThenConcat, testConcatThenCompress); all
// should work with existing Java bzip2 decoder and any future native version // should work with existing Java bzip2 decoder and any future native version
private static void doSingleBzip2BufferSize(JobConf jConf) throws IOException { private static void doSingleBzip2BufferSize(JobConf jConf)
throws IOException {
TextInputFormat format = new TextInputFormat(); TextInputFormat format = new TextInputFormat();
format.configure(jConf); format.configure(jConf);
format.setMinSplitSize(5500); // work around 256-byte/22-splits issue format.setMinSplitSize(5500); // work around 256-byte/22-splits issue
@ -654,7 +647,8 @@ private static void doSingleBzip2BufferSize(JobConf jConf) throws IOException {
InputSplit[] splits = format.getSplits(jConf, 100); InputSplit[] splits = format.getSplits(jConf, 100);
assertEquals("compressed splits == 2", 2, splits.length); assertEquals("compressed splits == 2", 2, splits.length);
FileSplit tmp = (FileSplit) splits[0]; FileSplit tmp = (FileSplit) splits[0];
if (tmp.getPath().getName().equals("testCompressThenConcat.txt.gz")) { if (tmp.getPath()
.getName().equals("testdata/testCompressThenConcat.txt.gz")) {
System.out.println(" (swapping)"); System.out.println(" (swapping)");
splits[0] = splits[1]; splits[0] = splits[1];
splits[1] = tmp; splits[1] = tmp;