HDFS-14099. Unknown frame descriptor when decompressing multiple frames (#3836)

Co-authored-by: xuzq <xuzengqiang@kuaishou.com>
Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com>
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
Ashutosh Gupta 2021-12-28 18:14:38 +05:30 committed by GitHub
parent dba139cd0f
commit caab29ec88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 0 deletions

View File

@ -113,6 +113,12 @@ private void setInputFromSavedData() {
compressedDirectBuf.put( compressedDirectBuf.put(
userBuf, userBufOff, bytesInCompressedBuffer); userBuf, userBufOff, bytesInCompressedBuffer);
// Set the finished to false when compressedDirectBuf still
// contains some bytes.
if (compressedDirectBuf.position() > 0 && finished) {
finished = false;
}
userBufOff += bytesInCompressedBuffer; userBufOff += bytesInCompressedBuffer;
userBufferBytesToConsume -= bytesInCompressedBuffer; userBufferBytesToConsume -= bytesInCompressedBuffer;
} }
@ -186,6 +192,13 @@ public int decompress(byte[] b, int off, int len)
0, 0,
directBufferSize directBufferSize
); );
// Set the finished to false when compressedDirectBuf still
// contains some bytes.
if (remaining > 0 && finished) {
finished = false;
}
uncompressedDirectBuf.limit(n); uncompressedDirectBuf.limit(n);
// Get at most 'len' bytes // Get at most 'len' bytes

View File

@ -231,6 +231,65 @@ public void testCompressorDecompressorLogicWithCompressionStreams()
} }
} }
/**
* Verify decompressor logic with some finish operation in compress.
*/
@Test
public void testCompressorDecompressorWithFinish() throws Exception {
DataOutputStream deflateOut = null;
DataInputStream inflateIn = null;
int byteSize = 1024 * 100;
byte[] bytes = generate(byteSize);
int firstLength = 1024 * 30;
int bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT;
try {
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
CompressionOutputStream deflateFilter =
new CompressorStream(compressedDataBuffer, new ZStandardCompressor(),
bufferSize);
deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter));
// Write some data and finish.
deflateOut.write(bytes, 0, firstLength);
deflateFilter.finish();
deflateOut.flush();
// ResetState then write some data and finish.
deflateFilter.resetState();
deflateOut.write(bytes, firstLength, firstLength);
deflateFilter.finish();
deflateOut.flush();
// ResetState then write some data and finish.
deflateFilter.resetState();
deflateOut.write(bytes, firstLength * 2, byteSize - firstLength * 2);
deflateFilter.finish();
deflateOut.flush();
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
CompressionInputStream inflateFilter =
new DecompressorStream(deCompressedDataBuffer,
new ZStandardDecompressor(bufferSize), bufferSize);
inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
byte[] result = new byte[byteSize];
inflateIn.read(result);
assertArrayEquals(
"original array not equals compress/decompressed array", bytes,
result);
} finally {
IOUtils.closeStream(deflateOut);
IOUtils.closeStream(inflateIn);
}
}
@Test @Test
public void testZStandardCompressDecompressInMultiThreads() throws Exception { public void testZStandardCompressDecompressInMultiThreads() throws Exception {
MultithreadedTestUtil.TestContext ctx = MultithreadedTestUtil.TestContext ctx =