Revert "HDFS-14099. Unknown frame descriptor when decompressing multiple frames (#3836)"
This reverts commit 05b43f2057
.
This commit is contained in:
parent
05b43f2057
commit
cd30687a15
@ -113,12 +113,6 @@ private void setInputFromSavedData() {
|
|||||||
compressedDirectBuf.put(
|
compressedDirectBuf.put(
|
||||||
userBuf, userBufOff, bytesInCompressedBuffer);
|
userBuf, userBufOff, bytesInCompressedBuffer);
|
||||||
|
|
||||||
// Set the finished to false when compressedDirectBuf still
|
|
||||||
// contains some bytes.
|
|
||||||
if (compressedDirectBuf.position() > 0 && finished) {
|
|
||||||
finished = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
userBufOff += bytesInCompressedBuffer;
|
userBufOff += bytesInCompressedBuffer;
|
||||||
userBufferBytesToConsume -= bytesInCompressedBuffer;
|
userBufferBytesToConsume -= bytesInCompressedBuffer;
|
||||||
}
|
}
|
||||||
@ -192,13 +186,6 @@ public int decompress(byte[] b, int off, int len)
|
|||||||
0,
|
0,
|
||||||
directBufferSize
|
directBufferSize
|
||||||
);
|
);
|
||||||
|
|
||||||
// Set the finished to false when compressedDirectBuf still
|
|
||||||
// contains some bytes.
|
|
||||||
if (remaining > 0 && finished) {
|
|
||||||
finished = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
uncompressedDirectBuf.limit(n);
|
uncompressedDirectBuf.limit(n);
|
||||||
|
|
||||||
// Get at most 'len' bytes
|
// Get at most 'len' bytes
|
||||||
|
@ -234,65 +234,6 @@ public void testCompressorDecompressorLogicWithCompressionStreams()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Verify decompressor logic with some finish operation in compress.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testCompressorDecompressorWithFinish() throws Exception {
|
|
||||||
DataOutputStream deflateOut = null;
|
|
||||||
DataInputStream inflateIn = null;
|
|
||||||
int byteSize = 1024 * 100;
|
|
||||||
byte[] bytes = generate(byteSize);
|
|
||||||
int firstLength = 1024 * 30;
|
|
||||||
|
|
||||||
int bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT;
|
|
||||||
try {
|
|
||||||
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
|
|
||||||
CompressionOutputStream deflateFilter =
|
|
||||||
new CompressorStream(compressedDataBuffer, new ZStandardCompressor(),
|
|
||||||
bufferSize);
|
|
||||||
|
|
||||||
deflateOut =
|
|
||||||
new DataOutputStream(new BufferedOutputStream(deflateFilter));
|
|
||||||
|
|
||||||
// Write some data and finish.
|
|
||||||
deflateOut.write(bytes, 0, firstLength);
|
|
||||||
deflateFilter.finish();
|
|
||||||
deflateOut.flush();
|
|
||||||
|
|
||||||
// ResetState then write some data and finish.
|
|
||||||
deflateFilter.resetState();
|
|
||||||
deflateOut.write(bytes, firstLength, firstLength);
|
|
||||||
deflateFilter.finish();
|
|
||||||
deflateOut.flush();
|
|
||||||
|
|
||||||
// ResetState then write some data and finish.
|
|
||||||
deflateFilter.resetState();
|
|
||||||
deflateOut.write(bytes, firstLength * 2, byteSize - firstLength * 2);
|
|
||||||
deflateFilter.finish();
|
|
||||||
deflateOut.flush();
|
|
||||||
|
|
||||||
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
|
|
||||||
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
|
|
||||||
compressedDataBuffer.getLength());
|
|
||||||
|
|
||||||
CompressionInputStream inflateFilter =
|
|
||||||
new DecompressorStream(deCompressedDataBuffer,
|
|
||||||
new ZStandardDecompressor(bufferSize), bufferSize);
|
|
||||||
|
|
||||||
inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
|
|
||||||
|
|
||||||
byte[] result = new byte[byteSize];
|
|
||||||
inflateIn.read(result);
|
|
||||||
assertArrayEquals(
|
|
||||||
"original array not equals compress/decompressed array", bytes,
|
|
||||||
result);
|
|
||||||
} finally {
|
|
||||||
IOUtils.closeStream(deflateOut);
|
|
||||||
IOUtils.closeStream(inflateIn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testZStandardCompressDecompressInMultiThreads() throws Exception {
|
public void testZStandardCompressDecompressInMultiThreads() throws Exception {
|
||||||
MultithreadedTestUtil.TestContext ctx =
|
MultithreadedTestUtil.TestContext ctx =
|
||||||
|
Loading…
Reference in New Issue
Block a user