HADOOP-17825. Add BuiltInGzipCompressor (#3250)

Currently, GzipCodec only supports BuiltInGzipDecompressor, if native zlib is not loaded. So, without Hadoop native codec installed, saving SequenceFile using GzipCodec will throw exception like "SequenceFile doesn't work with GzipCodec without native-hadoop code!"

Same as other codecs which we migrated to using prepared packages (lz4, snappy), it will be better if we support GzipCodec generally without Hadoop native codec installed. Similar to BuiltInGzipDecompressor, we can use Java Deflater to support BuiltInGzipCompressor.
This commit is contained in:
Liang-Chi Hsieh 2021-08-16 10:08:03 -07:00 committed by GitHub
parent 6342d5e523
commit 6014a089fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 379 additions and 37 deletions

View File

@ -36,8 +36,6 @@
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
@ -47,7 +45,6 @@
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.MergeSort;
import org.apache.hadoop.util.PriorityQueue;
import org.apache.hadoop.util.Time;
@ -1180,14 +1177,6 @@ public static Option syncInterval(int value) {
new Metadata() : metadataOption.getValue();
this.compress = compressionTypeOption.getValue();
final CompressionCodec codec = compressionTypeOption.getCodec();
if (codec != null &&
(codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded(conf)) {
throw new IllegalArgumentException("SequenceFile doesn't work with " +
"GzipCodec without native-hadoop " +
"code!");
}
this.syncInterval = (syncIntervalOption == null) ?
SYNC_INTERVAL :
syncIntervalOption.getValue();

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor;
import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor;
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
@ -154,14 +155,14 @@ public CompressionOutputStream createOutputStream(OutputStream out,
public Compressor createCompressor() {
return (ZlibFactory.isNativeZlibLoaded(conf))
? new GzipZlibCompressor(conf)
: null;
: new BuiltInGzipCompressor(conf);
}
@Override
public Class<? extends Compressor> getCompressorType() {
return ZlibFactory.isNativeZlibLoaded(conf)
? GzipZlibCompressor.class
: null;
: BuiltInGzipCompressor.class;
}
@Override

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.zlib;
import java.io.IOException;
import java.util.zip.Checksum;
import java.util.zip.Deflater;
import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.DoNotPool;
import org.apache.hadoop.util.DataChecksum;
/**
* A {@link Compressor} based on the popular gzip compressed file format.
* http://www.gzip.org/
*/
@DoNotPool
public class BuiltInGzipCompressor implements Compressor {
/**
* Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
* details.
*/
private static final byte[] GZIP_HEADER = new byte[]{
0x1f, (byte) 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
// The trailer will be overwritten based on crc and output size.
private static final byte[] GZIP_TRAILER = new byte[]{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
private static final int GZIP_HEADER_LEN = GZIP_HEADER.length;
private static final int GZIP_TRAILER_LEN = GZIP_TRAILER.length;
private Deflater deflater;
private int headerOff = 0;
private int trailerOff = 0;
private int numExtraBytesWritten = 0;
private int currentBufLen = 0;
private int accuBufLen = 0;
private final Checksum crc = DataChecksum.newCrc32();
private BuiltInGzipDecompressor.GzipStateLabel state;
public BuiltInGzipCompressor(Configuration conf) {
init(conf);
}
@Override
public boolean finished() {
// Only if the trailer is also written, it is thought as finished.
return state == BuiltInGzipDecompressor.GzipStateLabel.FINISHED && deflater.finished();
}
@Override
public boolean needsInput() {
return deflater.needsInput() && state != BuiltInGzipDecompressor.GzipStateLabel.TRAILER_CRC;
}
@Override
public int compress(byte[] b, int off, int len) throws IOException {
if (finished()) {
throw new IOException("compress called on finished compressor");
}
int compressedBytesWritten = 0;
if (currentBufLen <= 0) {
return compressedBytesWritten;
}
// If we are not within uncompressed data yet, output the header.
if (state == BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC) {
int outputHeaderSize = writeHeader(b, off, len);
numExtraBytesWritten += outputHeaderSize;
compressedBytesWritten += outputHeaderSize;
if (outputHeaderSize == len) {
return compressedBytesWritten;
}
off += outputHeaderSize;
len -= outputHeaderSize;
}
if (state == BuiltInGzipDecompressor.GzipStateLabel.INFLATE_STREAM) {
// now compress it into b[]
int deflated = deflater.deflate(b, off, len);
compressedBytesWritten += deflated;
off += deflated;
len -= deflated;
// All current input are processed. And `finished` is called. Going to output trailer.
if (deflater.finished()) {
state = BuiltInGzipDecompressor.GzipStateLabel.TRAILER_CRC;
fillTrailer();
} else {
return compressedBytesWritten;
}
}
if (state == BuiltInGzipDecompressor.GzipStateLabel.TRAILER_CRC) {
int outputTrailerSize = writeTrailer(b, off, len);
numExtraBytesWritten += outputTrailerSize;
compressedBytesWritten += outputTrailerSize;
}
return compressedBytesWritten;
}
@Override
public long getBytesRead() {
return deflater.getTotalIn();
}
@Override
public long getBytesWritten() {
return numExtraBytesWritten + deflater.getTotalOut();
}
@Override
public void end() {
deflater.end();
}
@Override
public void finish() {
deflater.finish();
}
private void init(Configuration conf) {
ZlibCompressor.CompressionLevel level = ZlibFactory.getCompressionLevel(conf);
ZlibCompressor.CompressionStrategy strategy = ZlibFactory.getCompressionStrategy(conf);
// 'true' (nowrap) => Deflater will handle raw deflate stream only
deflater = new Deflater(level.compressionLevel(), true);
deflater.setStrategy(strategy.compressionStrategy());
state = BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC;
}
@Override
public void reinit(Configuration conf) {
init(conf);
numExtraBytesWritten = 0;
currentBufLen = 0;
headerOff = 0;
trailerOff = 0;
crc.reset();
accuBufLen = 0;
}
@Override
public void reset() {
deflater.reset();
state = BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC;
numExtraBytesWritten = 0;
currentBufLen = 0;
headerOff = 0;
trailerOff = 0;
crc.reset();
accuBufLen = 0;
}
@Override
public void setDictionary(byte[] b, int off, int len) {
deflater.setDictionary(b, off, len);
}
@Override
public void setInput(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
deflater.setInput(b, off, len);
crc.update(b, off, len); // CRC-32 is on uncompressed data
currentBufLen = len;
accuBufLen += currentBufLen;
}
private int writeHeader(byte[] b, int off, int len) {
if (len <= 0) {
return 0;
}
int n = Math.min(len, GZIP_HEADER_LEN - headerOff);
System.arraycopy(GZIP_HEADER, headerOff, b, off, n);
headerOff += n;
// Completes header output.
if (headerOff == GZIP_HEADER_LEN) {
state = BuiltInGzipDecompressor.GzipStateLabel.INFLATE_STREAM;
}
return n;
}
private void fillTrailer() {
if (state == BuiltInGzipDecompressor.GzipStateLabel.TRAILER_CRC) {
int streamCrc = (int) crc.getValue();
GZIP_TRAILER[0] = (byte) (streamCrc & 0x000000ff);
GZIP_TRAILER[1] = (byte) ((streamCrc & 0x0000ff00) >> 8);
GZIP_TRAILER[2] = (byte) ((streamCrc & 0x00ff0000) >> 16);
GZIP_TRAILER[3] = (byte) ((streamCrc & 0xff000000) >> 24);
GZIP_TRAILER[4] = (byte) (accuBufLen & 0x000000ff);
GZIP_TRAILER[5] = (byte) ((accuBufLen & 0x0000ff00) >> 8);
GZIP_TRAILER[6] = (byte) ((accuBufLen & 0x00ff0000) >> 16);
GZIP_TRAILER[7] = (byte) ((accuBufLen & 0xff000000) >> 24);
crc.reset();
accuBufLen = 0;
}
}
private int writeTrailer(byte[] b, int off, int len) {
if (len <= 0) {
return 0;
}
int n = Math.min(len, GZIP_TRAILER_LEN - trailerOff);
System.arraycopy(GZIP_TRAILER, trailerOff, b, off, n);
trailerOff += n;
if (trailerOff == GZIP_TRAILER_LEN) {
state = BuiltInGzipDecompressor.GzipStateLabel.FINISHED;
currentBufLen = 0;
headerOff = 0;
trailerOff = 0;
}
return n;
}
}

View File

@ -68,7 +68,7 @@ public class BuiltInGzipDecompressor implements Decompressor {
* (Technically, the private variables localBuf through hasHeaderCRC are
* also part of the state, so this enum is merely the label for it.)
*/
private enum GzipStateLabel {
public enum GzipStateLabel {
/**
* Immediately prior to or (strictly) within the 10-byte basic gzip header.
*/
@ -93,6 +93,10 @@ private enum GzipStateLabel {
* Immediately prior to or within the main compressed (deflate) data stream.
*/
DEFLATE_STREAM,
/**
* Immediately prior to or within the main uncompressed (inflate) data stream.
*/
INFLATE_STREAM,
/**
* Immediately prior to or (strictly) within the 4-byte uncompressed CRC.
*/

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor;
import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater;
@ -101,6 +102,14 @@ public void testDefaultCodec() throws IOException {
@Test
public void testGzipCodec() throws IOException {
Configuration conf = new Configuration();
if (ZlibFactory.isNativeZlibLoaded(conf)) {
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
}
// without hadoop-native installed.
ZlibFactory.setNativeZlibLoaded(false);
assumeTrue(ZlibFactory.isNativeZlibLoaded(conf));
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
}
@ -467,15 +476,15 @@ public void testCodecInitWithCompressionLevel() throws Exception {
"org.apache.hadoop.io.compress.GzipCodec");
codecTestWithNOCompression(conf,
"org.apache.hadoop.io.compress.DefaultCodec");
} else {
LOG.warn("testCodecInitWithCompressionLevel for native skipped"
+ ": native libs not loaded");
}
conf = new Configuration();
// don't use native libs
ZlibFactory.setNativeZlibLoaded(false);
LOG.info("testCodecInitWithCompressionLevel without native libs");
codecTestWithNOCompression( conf,
"org.apache.hadoop.io.compress.DefaultCodec");
codecTestWithNOCompression(conf,
"org.apache.hadoop.io.compress.GzipCodec");
}
@Test
@ -550,6 +559,23 @@ public void testSequenceFileDeflateCodec() throws IOException, ClassNotFoundExce
sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DeflateCodec", 1000000);
}
@Test
public void testSequenceFileGzipCodec() throws IOException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
Configuration conf = new Configuration();
if (ZlibFactory.isNativeZlibLoaded(conf)) {
sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.GzipCodec", 5);
sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.GzipCodec", 100);
sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.GzipCodec", 1000000);
}
// without hadoop-native installed.
ZlibFactory.setNativeZlibLoaded(false);
assumeTrue(ZlibFactory.isNativeZlibLoaded(conf));
sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.GzipCodec", 5);
sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.GzipCodec", 100);
sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.GzipCodec", 1000000);
}
private static void sequenceFileCodecTest(Configuration conf, int lines,
String codecClass, int blockSize)
throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
@ -853,16 +879,16 @@ private void testGzipCodecWrite(boolean useNative) throws IOException {
// and try to read it back via the regular GZIPInputStream.
// Use native libs per the parameter
Configuration conf = new Configuration();
Configuration hadoopConf = new Configuration();
if (useNative) {
assumeTrue(ZlibFactory.isNativeZlibLoaded(conf));
assumeTrue(ZlibFactory.isNativeZlibLoaded(hadoopConf));
} else {
assertFalse("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
ZlibFactory.isNativeZlibLoaded(hadoopConf));
}
// Ensure that the CodecPool has a BuiltInZlibDeflater in it.
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(hadoopConf);
assertNotNull("zlibCompressor is null!", zlibCompressor);
assertTrue("ZlibFactory returned unexpected deflator",
useNative ? zlibCompressor instanceof ZlibCompressor
@ -871,37 +897,47 @@ private void testGzipCodecWrite(boolean useNative) throws IOException {
CodecPool.returnCompressor(zlibCompressor);
// Create a GZIP text file via the Compressor interface.
CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
CompressionCodecFactory ccf = new CompressionCodecFactory(hadoopConf);
CompressionCodec codec = ccf.getCodec(new Path("foo.gz"));
assertTrue("Codec for .gz file is not GzipCodec",
codec instanceof GzipCodec);
final String msg = "This is the message we are going to compress.";
final String fileName = new Path(GenericTestUtils.getTempPath(
"testGzipCodecWrite.txt.gz")).toString();
BufferedWriter w = null;
Compressor gzipCompressor = CodecPool.getCompressor(codec);
if (null != gzipCompressor) {
// If it gives us back a Compressor, we should be able to use this
// to write files we can then read back with Java's gzip tools.
OutputStream os = new CompressorStream(new FileOutputStream(fileName),
gzipCompressor);
w = new BufferedWriter(new OutputStreamWriter(os));
w.write(msg);
w.close();
CodecPool.returnCompressor(gzipCompressor);
// When it gives us back a Compressor, we should be able to use this
// to write files we can then read back with Java's gzip tools.
OutputStream os = new CompressorStream(new FileOutputStream(fileName),
gzipCompressor);
verifyGzipFile(fileName, msg);
// Call `write` multiple times.
int bufferSize = 10000;
char[] inputBuffer = new char[bufferSize];
Random rand = new Random();
for (int i = 0; i < bufferSize; i++) {
inputBuffer[i] = (char) ('a' + rand.nextInt(26));
}
w = new BufferedWriter(new OutputStreamWriter(os), bufferSize);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10; i++) {
w.write(inputBuffer);
sb.append(inputBuffer);
}
w.close();
CodecPool.returnCompressor(gzipCompressor);
verifyGzipFile(fileName, sb.toString());
// Create a gzip text file via codec.getOutputStream().
w = new BufferedWriter(new OutputStreamWriter(
codec.createOutputStream(new FileOutputStream(fileName))));
w.write(msg);
codec.createOutputStream(new FileOutputStream(fileName))));
for (int i = 0; i < 10; i++) {
w.write(inputBuffer);
}
w.close();
verifyGzipFile(fileName, msg);
verifyGzipFile(fileName, sb.toString());
}
@Test
@ -915,6 +951,57 @@ public void testGzipCodecWriteJava() throws IOException {
public void testGzipNativeCodecWrite() throws IOException {
testGzipCodecWrite(true);
}
@Test
public void testCodecPoolAndGzipCompressor() {
// BuiltInZlibDeflater should not be used as the GzipCodec compressor.
// Assert that this is the case.
// Don't use native libs for this test.
Configuration conf = new Configuration();
ZlibFactory.setNativeZlibLoaded(false);
assertFalse("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
// This should give us a BuiltInZlibDeflater.
Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
assertNotNull("zlibCompressor is null!", zlibCompressor);
assertTrue("ZlibFactory returned unexpected deflator",
zlibCompressor instanceof BuiltInZlibDeflater);
// its createOutputStream() just wraps the existing stream in a
// java.util.zip.GZIPOutputStream.
CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
CompressionCodec codec = ccf.getCodec(new Path("foo.gz"));
assertTrue("Codec for .gz file is not GzipCodec",
codec instanceof GzipCodec);
// make sure we don't get a null compressor
Compressor codecCompressor = codec.createCompressor();
if (null == codecCompressor) {
fail("Got null codecCompressor");
}
// Asking the CodecPool for a compressor for GzipCodec
// should not return null
Compressor poolCompressor = CodecPool.getCompressor(codec);
if (null == poolCompressor) {
fail("Got null poolCompressor");
}
// return a couple compressors
CodecPool.returnCompressor(zlibCompressor);
CodecPool.returnCompressor(poolCompressor);
Compressor poolCompressor2 = CodecPool.getCompressor(codec);
if (poolCompressor.getClass() == BuiltInGzipCompressor.class) {
if (poolCompressor == poolCompressor2) {
fail("Reused java gzip compressor in pool");
}
} else {
if (poolCompressor != poolCompressor2) {
fail("Did not reuse native gzip compressor in pool");
}
}
}
@Test
public void testCodecPoolAndGzipDecompressor() {
// BuiltInZlibInflater should not be used as the GzipCodec decompressor.