HADOOP-17868. Add more tests for BuiltInGzipCompressor (#3336)

This commit is contained in:
Liang-Chi Hsieh 2021-09-22 07:59:28 -07:00 committed by GitHub
parent c7e7b2f907
commit 138add2cb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -726,6 +726,173 @@ public void testGzipCompatibility() throws IOException {
assertArrayEquals(b, dflchk);
}
@Test
public void testGzipCompatibilityWithCompressor() throws IOException {
// don't use native libs
ZlibFactory.setNativeZlibLoaded(false);
Configuration hadoopConf = new Configuration();
CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, hadoopConf);
Random r = new Random();
for (int i = 0; i < 100; i++){
Compressor compressor = codec.createCompressor();
assertThat(compressor).withFailMessage("should be BuiltInGzipCompressor")
.isInstanceOf(BuiltInGzipCompressor.class);
long randonSeed = r.nextLong();
r.setSeed(randonSeed);
LOG.info("seed: {}", randonSeed);
int inputSize = r.nextInt(256 * 1024 + 1);
byte[] b = new byte[inputSize];
r.nextBytes(b);
compressor.setInput(b, 0, b.length);
compressor.finish();
byte[] output = new byte[inputSize + 1024];
int outputOff = 0;
while (!compressor.finished()) {
byte[] buf = new byte[r.nextInt(1024)];
int compressed = compressor.compress(buf, 0, buf.length);
System.arraycopy(buf, 0, output, outputOff, compressed);
outputOff += compressed;
}
DataInputBuffer gzbuf = new DataInputBuffer();
gzbuf.reset(output, outputOff);
Decompressor decom = codec.createDecompressor();
assertThat(decom).as("decompressor should not be null").isNotNull();
assertThat(decom).withFailMessage("should be BuiltInGzipDecompressor")
.isInstanceOf(BuiltInGzipDecompressor.class);
try (InputStream gzin = codec.createInputStream(gzbuf, decom);
DataOutputBuffer dflbuf = new DataOutputBuffer()) {
dflbuf.reset();
IOUtils.copyBytes(gzin, dflbuf, 4096);
final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
assertThat(b).as("check decompressed output").isEqualTo(dflchk);
}
}
}
@Test
public void testGzipCompatibilityWithCompressorAndGZIPOutputStream() throws IOException {
// don't use native libs
ZlibFactory.setNativeZlibLoaded(false);
Configuration hadoopConf = new Configuration();
CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, hadoopConf);
Random r = new Random();
for (int i = 0; i < 100; i++){
Compressor compressor = codec.createCompressor();
assertThat(compressor).withFailMessage("should be BuiltInGzipCompressor")
.isInstanceOf(BuiltInGzipCompressor.class);
long randonSeed = r.nextLong();
r.setSeed(randonSeed);
LOG.info("seed: {}", randonSeed);
int inputSize = r.nextInt(256 * 1024 + 1);
byte[] b = new byte[inputSize];
r.nextBytes(b);
compressor.setInput(b, 0, b.length);
compressor.finish();
byte[] output = new byte[inputSize + 1024];
int outputOff = 0;
while (!compressor.finished()) {
byte[] buf = new byte[r.nextInt(1024)];
int compressed = compressor.compress(buf, 0, buf.length);
System.arraycopy(buf, 0, output, outputOff, compressed);
outputOff += compressed;
}
try (DataOutputBuffer dflbuf = new DataOutputBuffer();
GZIPOutputStream gzout = new GZIPOutputStream(dflbuf)) {
gzout.write(b);
gzout.close();
final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
LOG.info("output: {}", outputOff);
LOG.info("dflchk: {}", dflchk.length);
assertEquals(outputOff, dflchk.length);
uncompressGzipOutput(b, output, outputOff, codec);
uncompressGzipOutput(b, dflchk, dflchk.length, codec);
}
}
}
@Test
public void testGzipCompatibilityWithCompressorStreamAndGZIPOutputStream() throws IOException {
// don't use native libs
ZlibFactory.setNativeZlibLoaded(false);
Configuration hadoopConf = new Configuration();
CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, hadoopConf);
Random r = new Random();
for (int i = 0; i < 100; i++){
Compressor compressor = codec.createCompressor();
try (DataOutputBuffer dflbuf = new DataOutputBuffer();) {
assertThat(compressor).withFailMessage("should be BuiltInGzipCompressor")
.isInstanceOf(BuiltInGzipCompressor.class);
CompressionOutputStream compressionOutputStream =
codec.createOutputStream(dflbuf, compressor);
long randonSeed = r.nextLong();
r.setSeed(randonSeed);
LOG.info("seed: {}", randonSeed);
int inputSize = r.nextInt(256 * 1024 + 1);
byte[] b = new byte[inputSize];
r.nextBytes(b);
compressionOutputStream.write(b);
compressionOutputStream.close();
final byte[] output = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
dflbuf.reset();
try (GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);) {
gzout.write(b);
gzout.close();
final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
LOG.info("output: {}", output.length);
LOG.info("dflchk: {}", dflchk.length);
assertThat(output.length).as("check compressed data length").isEqualTo(dflchk.length);
uncompressGzipOutput(b, output, output.length, codec);
uncompressGzipOutput(b, dflchk, dflchk.length, codec);
}
}
}
}
private void uncompressGzipOutput(
byte[] origin, byte[] output, int outputLen, CompressionCodec codec) throws IOException {
DataInputBuffer gzbuf = new DataInputBuffer();
gzbuf.reset(output, outputLen);
Decompressor decom = codec.createDecompressor();
assertThat(decom).as("decompressor should not be null").isNotNull();
assertThat(decom).withFailMessage("should be BuiltInGzipDecompressor")
.isInstanceOf(BuiltInGzipDecompressor.class);
InputStream gzin = codec.createInputStream(gzbuf, decom);
DataOutputBuffer dflbuf = new DataOutputBuffer();
dflbuf.reset();
IOUtils.copyBytes(gzin, dflbuf, 4096);
final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
assertThat(origin).as("check decompressed output").isEqualTo(dflchk);
}
void GzipConcatTest(Configuration conf,
Class<? extends Decompressor> decomClass) throws IOException {
Random r = new Random();