diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java index bc9d29cb4f..adf2fe629f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java @@ -113,6 +113,12 @@ private void setInputFromSavedData() { compressedDirectBuf.put( userBuf, userBufOff, bytesInCompressedBuffer); + // Set the finished to false when compressedDirectBuf still + // contains some bytes. + if (compressedDirectBuf.position() > 0 && finished) { + finished = false; + } + userBufOff += bytesInCompressedBuffer; userBufferBytesToConsume -= bytesInCompressedBuffer; } @@ -186,6 +192,13 @@ public int decompress(byte[] b, int off, int len) 0, directBufferSize ); + + // Set the finished to false when compressedDirectBuf still + // contains some bytes. + if (remaining > 0 && finished) { + finished = false; + } + uncompressedDirectBuf.limit(n); // Get at most 'len' bytes diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java index dcfb7e9e32..653225b708 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java @@ -234,6 +234,65 @@ public void testCompressorDecompressorLogicWithCompressionStreams() } } + /** + * Verify decompressor logic with some finish operation in compress. + */ + @Test + public void testCompressorDecompressorWithFinish() throws Exception { + DataOutputStream deflateOut = null; + DataInputStream inflateIn = null; + int byteSize = 1024 * 100; + byte[] bytes = generate(byteSize); + int firstLength = 1024 * 30; + + int bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT; + try { + DataOutputBuffer compressedDataBuffer = new DataOutputBuffer(); + CompressionOutputStream deflateFilter = + new CompressorStream(compressedDataBuffer, new ZStandardCompressor(), + bufferSize); + + deflateOut = + new DataOutputStream(new BufferedOutputStream(deflateFilter)); + + // Write some data and finish. + deflateOut.write(bytes, 0, firstLength); + deflateFilter.finish(); + deflateOut.flush(); + + // ResetState then write some data and finish. + deflateFilter.resetState(); + deflateOut.write(bytes, firstLength, firstLength); + deflateFilter.finish(); + deflateOut.flush(); + + // ResetState then write some data and finish. + deflateFilter.resetState(); + deflateOut.write(bytes, firstLength * 2, byteSize - firstLength * 2); + deflateFilter.finish(); + deflateOut.flush(); + + DataInputBuffer deCompressedDataBuffer = new DataInputBuffer(); + deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, + compressedDataBuffer.getLength()); + + CompressionInputStream inflateFilter = + new DecompressorStream(deCompressedDataBuffer, + new ZStandardDecompressor(bufferSize), bufferSize); + + inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter)); + + byte[] result = new byte[byteSize]; + inflateIn.read(result); + assertArrayEquals( + "original array not equals compress/decompressed array", bytes, + result); + } finally { + IOUtils.closeStream(deflateOut); + IOUtils.closeStream(inflateIn); + } + } + @Test public void testZStandardCompressDecompressInMultiThreads() throws Exception { MultithreadedTestUtil.TestContext ctx =