diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java index 6bee6b84e5..dab366adbf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java @@ -22,22 +22,36 @@ import java.io.IOException; import java.io.InputStream; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.io.compress.Decompressor; @InterfaceAudience.Public @InterfaceStability.Evolving public class DecompressorStream extends CompressionInputStream { + /** + * The maximum input buffer size. + */ + private static final int MAX_INPUT_BUFFER_SIZE = 512; + /** + * MAX_SKIP_BUFFER_SIZE is used to determine the maximum buffer size to + * use when skipping. See {@link java.io.InputStream}. + */ + private static final int MAX_SKIP_BUFFER_SIZE = 2048; + + private byte[] skipBytes; + private byte[] oneByte = new byte[1]; + protected Decompressor decompressor = null; protected byte[] buffer; protected boolean eof = false; protected boolean closed = false; private int lastBytesSent = 0; - public DecompressorStream(InputStream in, Decompressor decompressor, - int bufferSize) - throws IOException { + @VisibleForTesting + DecompressorStream(InputStream in, Decompressor decompressor, + int bufferSize, int skipBufferSize) + throws IOException { super(in); if (decompressor == null) { @@ -48,11 +62,18 @@ public DecompressorStream(InputStream in, Decompressor decompressor, this.decompressor = decompressor; buffer = new byte[bufferSize]; + skipBytes = new byte[skipBufferSize]; + } + + public DecompressorStream(InputStream in, Decompressor decompressor, + int bufferSize) + throws IOException { + this(in, decompressor, bufferSize, MAX_SKIP_BUFFER_SIZE); } public DecompressorStream(InputStream in, Decompressor decompressor) - throws IOException { - this(in, decompressor, 512); + throws IOException { + this(in, decompressor, MAX_INPUT_BUFFER_SIZE); } /** @@ -64,8 +85,7 @@ public DecompressorStream(InputStream in, Decompressor decompressor) protected DecompressorStream(InputStream in) throws IOException { super(in); } - - private byte[] oneByte = new byte[1]; + @Override public int read() throws IOException { checkStream(); @@ -86,7 +106,7 @@ public int read(byte[] b, int off, int len) throws IOException { } protected int decompress(byte[] b, int off, int len) throws IOException { - int n = 0; + int n; while ((n = decompressor.decompress(b, off, len)) == 0) { if (decompressor.needsDictionary()) { @@ -170,7 +190,6 @@ public void resetState() throws IOException { decompressor.reset(); } - private byte[] skipBytes = new byte[512]; @Override public long skip(long n) throws IOException { // Sanity checks @@ -178,7 +197,7 @@ public long skip(long n) throws IOException { throw new IllegalArgumentException("negative skip length"); } checkStream(); - + // Read 'n' bytes int skipped = 0; while (skipped < n) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/FakeCompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/FakeCompressor.java new file mode 100644 index 0000000000..931297c3f9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/FakeCompressor.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/** + * A fake compressor + * Its input and output is the same. + */ +class FakeCompressor implements Compressor { + + private boolean finish; + private boolean finished; + private int nread; + private int nwrite; + + private byte[] userBuf; + private int userBufOff; + private int userBufLen; + + @Override + public int compress(byte[] b, int off, int len) throws IOException { + int n = Math.min(len, userBufLen); + if (userBuf != null && b != null) + System.arraycopy(userBuf, userBufOff, b, off, n); + userBufOff += n; + userBufLen -= n; + nwrite += n; + + if (finish && userBufLen <= 0) + finished = true; + + return n; + } + + @Override + public void end() { + // nop + } + + @Override + public void finish() { + finish = true; + } + + @Override + public boolean finished() { + return finished; + } + + @Override + public long getBytesRead() { + return nread; + } + + @Override + public long getBytesWritten() { + return nwrite; + } + + @Override + public boolean needsInput() { + return userBufLen <= 0; + } + + @Override + public void reset() { + finish = false; + finished = false; + nread = 0; + nwrite = 0; + userBuf = null; + userBufOff = 0; + userBufLen = 0; + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + // nop + } + + @Override + public void setInput(byte[] b, int off, int len) { + nread += len; + userBuf = b; + userBufOff = off; + userBufLen = len; + } + + @Override + public void reinit(Configuration conf) { + // nop + } + +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/FakeDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/FakeDecompressor.java new file mode 100644 index 0000000000..31a584f35b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/FakeDecompressor.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.io.IOException; + +/** + * A fake decompressor, just like FakeCompressor + * Its input and output is the same. + */ +class FakeDecompressor implements Decompressor { + + private boolean finish; + private boolean finished; + private int nread; + private int nwrite; + + private byte[] userBuf; + private int userBufOff; + private int userBufLen; + + @Override + public int decompress(byte[] b, int off, int len) throws IOException { + int n = Math.min(len, userBufLen); + if (userBuf != null && b != null) + System.arraycopy(userBuf, userBufOff, b, off, n); + userBufOff += n; + userBufLen -= n; + nwrite += n; + + if (finish && userBufLen <= 0) + finished = true; + + return n; + } + + @Override + public void end() { + // nop + } + + @Override + public boolean finished() { + return finished; + } + + public long getBytesRead() { + return nread; + } + + public long getBytesWritten() { + return nwrite; + } + + @Override + public boolean needsDictionary() { + return false; + } + + @Override + public boolean needsInput() { + return userBufLen <= 0; + } + + @Override + public void reset() { + finish = false; + finished = false; + nread = 0; + nwrite = 0; + userBuf = null; + userBufOff = 0; + userBufLen = 0; + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + // nop + } + + @Override + public void setInput(byte[] b, int off, int len) { + nread += len; + userBuf = b; + userBufOff = off; + userBufLen = len; + } + + @Override + public int getRemaining() { + return 0; + } + +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestBlockDecompressorStream.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestBlockDecompressorStream.java index 43bd413b81..c976572877 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestBlockDecompressorStream.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestBlockDecompressorStream.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.hadoop.conf.Configuration; import org.junit.Test; public class TestBlockDecompressorStream { @@ -67,187 +66,12 @@ private void testRead(int bufLen) throws IOException { bytesIn = new ByteArrayInputStream(buf); // get decompression stream - BlockDecompressorStream blockDecompressorStream = - new BlockDecompressorStream(bytesIn, new FakeDecompressor(), 1024); - try { + try (BlockDecompressorStream blockDecompressorStream = + new BlockDecompressorStream(bytesIn, new FakeDecompressor(), 1024)) { assertEquals("return value is not -1", -1 , blockDecompressorStream.read()); } catch (IOException e) { fail("unexpected IOException : " + e); - } finally { - blockDecompressorStream.close(); } } -} - -/** - * A fake compressor - * Its input and output is the same. - */ -class FakeCompressor implements Compressor{ - - private boolean finish; - private boolean finished; - int nread; - int nwrite; - - byte [] userBuf; - int userBufOff; - int userBufLen; - - @Override - public int compress(byte[] b, int off, int len) throws IOException { - int n = Math.min(len, userBufLen); - if (userBuf != null && b != null) - System.arraycopy(userBuf, userBufOff, b, off, n); - userBufOff += n; - userBufLen -= n; - nwrite += n; - - if (finish && userBufLen <= 0) - finished = true; - - return n; - } - - @Override - public void end() { - // nop - } - - @Override - public void finish() { - finish = true; - } - - @Override - public boolean finished() { - return finished; - } - - @Override - public long getBytesRead() { - return nread; - } - - @Override - public long getBytesWritten() { - return nwrite; - } - - @Override - public boolean needsInput() { - return userBufLen <= 0; - } - - @Override - public void reset() { - finish = false; - finished = false; - nread = 0; - nwrite = 0; - userBuf = null; - userBufOff = 0; - userBufLen = 0; - } - - @Override - public void setDictionary(byte[] b, int off, int len) { - // nop - } - - @Override - public void setInput(byte[] b, int off, int len) { - nread += len; - userBuf = b; - userBufOff = off; - userBufLen = len; - } - - @Override - public void reinit(Configuration conf) { - // nop - } - -} - -/** - * A fake decompressor, just like FakeCompressor - * Its input and output is the same. - */ -class FakeDecompressor implements Decompressor { - - private boolean finish; - private boolean finished; - int nread; - int nwrite; - - byte [] userBuf; - int userBufOff; - int userBufLen; - - @Override - public int decompress(byte[] b, int off, int len) throws IOException { - int n = Math.min(len, userBufLen); - if (userBuf != null && b != null) - System.arraycopy(userBuf, userBufOff, b, off, n); - userBufOff += n; - userBufLen -= n; - nwrite += n; - - if (finish && userBufLen <= 0) - finished = true; - - return n; - } - - @Override - public void end() { - // nop - } - - @Override - public boolean finished() { - return finished; - } - - @Override - public boolean needsDictionary() { - return false; - } - - @Override - public boolean needsInput() { - return userBufLen <= 0; - } - - @Override - public void reset() { - finish = false; - finished = false; - nread = 0; - nwrite = 0; - userBuf = null; - userBufOff = 0; - userBufLen = 0; - } - - @Override - public void setDictionary(byte[] b, int off, int len) { - // nop - } - - @Override - public void setInput(byte[] b, int off, int len) { - nread += len; - userBuf = b; - userBufOff = off; - userBufLen = len; - } - - @Override - public int getRemaining() { - return 0; - } - } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestDecompressorStream.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestDecompressorStream.java new file mode 100644 index 0000000000..5a41e7ffb2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestDecompressorStream.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; + +import org.junit.Before; +import org.junit.Test; + +public class TestDecompressorStream { + private static final String TEST_STRING = + "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + private ByteArrayInputStream bytesIn; + private Decompressor decompressor; + private DecompressorStream decompressorStream; + + @Before + public void setUp() throws IOException { + bytesIn = new ByteArrayInputStream(TEST_STRING.getBytes()); + decompressor = new FakeDecompressor(); + decompressorStream = + new DecompressorStream(bytesIn, decompressor, 20, 13); + } + + @Test + public void testReadOneByte() throws IOException { + for (int i = 0; i < TEST_STRING.length(); ++i) { + assertThat(decompressorStream.read(), is((int) TEST_STRING.charAt(i))); + } + try { + int ret = decompressorStream.read(); + fail("Not reachable but got ret " + ret); + } catch (EOFException e) { + // Expect EOF exception + } + } + + @Test + public void testReadBuffer() throws IOException { + // 32 buf.length < 52 TEST_STRING.length() + byte[] buf = new byte[32]; + int bytesToRead = TEST_STRING.length(); + int i = 0; + while (bytesToRead > 0) { + int n = Math.min(bytesToRead, buf.length); + int bytesRead = decompressorStream.read(buf, 0, n); + assertTrue(bytesRead > 0 && bytesRead <= n); + assertThat(new String(buf, 0, bytesRead), + is(TEST_STRING.substring(i, i + bytesRead))); + bytesToRead = bytesToRead - bytesRead; + i = i + bytesRead; + } + try { + int ret = decompressorStream.read(buf, 0, buf.length); + fail("Not reachable but got ret " + ret); + } catch (EOFException e) { + // Expect EOF exception + } + } + + @Test + public void testSkip() throws IOException { + assertThat(decompressorStream.skip(12), is(12L)); + assertThat(decompressorStream.read(), is((int)TEST_STRING.charAt(12))); + assertThat(decompressorStream.read(), is((int)TEST_STRING.charAt(13))); + assertThat(decompressorStream.read(), is((int)TEST_STRING.charAt(14))); + assertThat(decompressorStream.skip(10), is(10L)); + assertThat(decompressorStream.read(), is((int)TEST_STRING.charAt(25))); + try { + long ret = decompressorStream.skip(1000); + fail("Not reachable but got ret " + ret); + } catch (EOFException e) { + // Expect EOF exception + } + } +} \ No newline at end of file