HADOOP-5879. Read compression level and strategy from Configuration for
gzip compression. Contributed by He Yongqiang git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@814455 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3617ed0bad
commit
254afbfe96
@ -553,6 +553,9 @@ Trunk (unreleased changes)
|
||||
HADOOP-6252. Provide a method to determine if a deprecated key is set in
|
||||
config file. (Jakob Homan via suresh)
|
||||
|
||||
HADOOP-5879. Read compression level and strategy from Configuration for
|
||||
gzip compression. (He Yongqiang via cdouglas)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-5595. NameNode does not need to run a replicator to choose a
|
||||
|
@ -24,6 +24,7 @@
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
@ -91,20 +92,26 @@ private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
|
||||
*
|
||||
* @param codec the <code>CompressionCodec</code> for which to get the
|
||||
* <code>Compressor</code>
|
||||
* @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
|
||||
* @return <code>Compressor</code> for the given
|
||||
* <code>CompressionCodec</code> from the pool or a new one
|
||||
*/
|
||||
public static Compressor getCompressor(CompressionCodec codec) {
|
||||
public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
|
||||
Compressor compressor = borrow(compressorPool, codec.getCompressorType());
|
||||
if (compressor == null) {
|
||||
compressor = codec.createCompressor();
|
||||
LOG.info("Got brand-new compressor");
|
||||
} else {
|
||||
compressor.reinit(conf);
|
||||
LOG.debug("Got recycled compressor");
|
||||
}
|
||||
return compressor;
|
||||
}
|
||||
|
||||
public static Compressor getCompressor(CompressionCodec codec) {
|
||||
return getCompressor(codec, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a {@link Decompressor} for the given {@link CompressionCodec} from the
|
||||
* pool or a new one.
|
||||
|
@ -20,6 +20,8 @@
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* Specification of a stream-based 'compressor' which can be
|
||||
* plugged into a {@link CompressionOutputStream} to compress data.
|
||||
@ -102,5 +104,13 @@ public interface Compressor {
|
||||
/**
|
||||
* Closes the compressor and discards any unprocessed input.
|
||||
*/
|
||||
public void end();
|
||||
public void end();
|
||||
|
||||
/**
|
||||
* Prepare the compressor to be used in a new stream with settings defined in
|
||||
* the given Configuration
|
||||
*
|
||||
* @param conf Configuration from which new setting are fetched
|
||||
*/
|
||||
public void reinit(Configuration conf);
|
||||
}
|
||||
|
@ -22,8 +22,11 @@
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
import org.apache.hadoop.io.compress.zlib.*;
|
||||
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
|
||||
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
|
||||
|
||||
/**
|
||||
* This class creates gzip compressors/decompressors.
|
||||
@ -155,7 +158,7 @@ public CompressionOutputStream createOutputStream(OutputStream out,
|
||||
|
||||
public Compressor createCompressor() {
|
||||
return (ZlibFactory.isNativeZlibLoaded(conf))
|
||||
? new GzipZlibCompressor()
|
||||
? new GzipZlibCompressor(conf)
|
||||
: null;
|
||||
}
|
||||
|
||||
@ -206,6 +209,13 @@ public GzipZlibCompressor() {
|
||||
ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
|
||||
ZlibCompressor.CompressionHeader.GZIP_FORMAT, 64*1024);
|
||||
}
|
||||
|
||||
public GzipZlibCompressor(Configuration conf) {
|
||||
super(ZlibFactory.getCompressionLevel(conf),
|
||||
ZlibFactory.getCompressionStrategy(conf),
|
||||
ZlibCompressor.CompressionHeader.GZIP_FORMAT,
|
||||
64 * 1024);
|
||||
}
|
||||
}
|
||||
|
||||
static final class GzipZlibDecompressor extends ZlibDecompressor {
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
|
||||
/**
|
||||
@ -77,4 +78,9 @@ public void setInput(byte[] b, int off, int len) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reinit(Configuration conf) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -21,7 +21,9 @@
|
||||
import java.io.IOException;
|
||||
import java.util.zip.Deflater;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
/**
|
||||
* A wrapper around java.util.zip.Deflater to make it conform
|
||||
@ -46,4 +48,30 @@ public synchronized int compress(byte[] b, int off, int len)
|
||||
throws IOException {
|
||||
return super.deflate(b, off, len);
|
||||
}
|
||||
|
||||
/**
|
||||
* reinit the compressor with the given configuration. It will reset the
|
||||
* compressor's compression level and compression strategy. Different from
|
||||
* <tt>ZlibCompressor</tt>, <tt>BuiltInZlibDeflater</tt> only support three
|
||||
* kind of compression strategy: FILTERED, HUFFMAN_ONLY and DEFAULT_STRATEGY.
|
||||
* It will use DEFAULT_STRATEGY as default if the configured compression
|
||||
* strategy is not supported.
|
||||
*/
|
||||
@Override
|
||||
public void reinit(Configuration conf) {
|
||||
reset();
|
||||
if (conf == null) {
|
||||
return;
|
||||
}
|
||||
setLevel(ZlibFactory.getCompressionLevel(conf).compressionLevel());
|
||||
final ZlibCompressor.CompressionStrategy strategy =
|
||||
ZlibFactory.getCompressionStrategy(conf);
|
||||
try {
|
||||
setStrategy(strategy.compressionStrategy());
|
||||
} catch (IllegalArgumentException ill) {
|
||||
Log.warn(strategy + " not supported by BuiltInZlibDeflater.");
|
||||
setStrategy(DEFAULT_STRATEGY);
|
||||
}
|
||||
Log.debug("Reinit compressor with new compression configuration");
|
||||
}
|
||||
}
|
||||
|
@ -22,8 +22,10 @@
|
||||
import java.nio.Buffer;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.hadoop.util.NativeCodeLoader;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
/**
|
||||
* A {@link Compressor} based on the popular
|
||||
@ -40,7 +42,7 @@ public class ZlibCompressor implements Compressor {
|
||||
private long stream;
|
||||
private CompressionLevel level;
|
||||
private CompressionStrategy strategy;
|
||||
private CompressionHeader windowBits;
|
||||
private final CompressionHeader windowBits;
|
||||
private int directBufferSize;
|
||||
private byte[] userBuf = null;
|
||||
private int userBufOff = 0, userBufLen = 0;
|
||||
@ -178,6 +180,31 @@ static boolean isNativeZlibLoaded() {
|
||||
return nativeZlibLoaded;
|
||||
}
|
||||
|
||||
protected final void construct(CompressionLevel level, CompressionStrategy strategy,
|
||||
CompressionHeader header, int directBufferSize) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new compressor with the default compression level.
|
||||
* Compressed data will be generated in ZLIB format.
|
||||
*/
|
||||
public ZlibCompressor() {
|
||||
this(CompressionLevel.DEFAULT_COMPRESSION,
|
||||
CompressionStrategy.DEFAULT_STRATEGY,
|
||||
CompressionHeader.DEFAULT_HEADER,
|
||||
DEFAULT_DIRECT_BUFFER_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new compressor, taking settings from the configuration.
|
||||
*/
|
||||
public ZlibCompressor(Configuration conf) {
|
||||
this(ZlibFactory.getCompressionLevel(conf),
|
||||
ZlibFactory.getCompressionStrategy(conf),
|
||||
CompressionHeader.DEFAULT_HEADER,
|
||||
DEFAULT_DIRECT_BUFFER_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new compressor using the specified compression level.
|
||||
* Compressed data will be generated in ZLIB format.
|
||||
@ -192,28 +219,38 @@ public ZlibCompressor(CompressionLevel level, CompressionStrategy strategy,
|
||||
this.level = level;
|
||||
this.strategy = strategy;
|
||||
this.windowBits = header;
|
||||
this.directBufferSize = directBufferSize;
|
||||
|
||||
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
|
||||
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
|
||||
compressedDirectBuf.position(directBufferSize);
|
||||
|
||||
stream = init(this.level.compressionLevel(),
|
||||
this.strategy.compressionStrategy(),
|
||||
this.windowBits.windowBits());
|
||||
|
||||
this.directBufferSize = directBufferSize;
|
||||
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
|
||||
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
|
||||
compressedDirectBuf.position(directBufferSize);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates a new compressor with the default compression level.
|
||||
* Compressed data will be generated in ZLIB format.
|
||||
* Prepare the compressor to be used in a new stream with settings defined in
|
||||
* the given Configuration. It will reset the compressor's compression level
|
||||
* and compression strategy.
|
||||
*
|
||||
* @param conf Configuration storing new settings
|
||||
*/
|
||||
public ZlibCompressor() {
|
||||
this(CompressionLevel.DEFAULT_COMPRESSION,
|
||||
CompressionStrategy.DEFAULT_STRATEGY,
|
||||
CompressionHeader.DEFAULT_HEADER,
|
||||
DEFAULT_DIRECT_BUFFER_SIZE);
|
||||
@Override
|
||||
public synchronized void reinit(Configuration conf) {
|
||||
reset();
|
||||
if (conf == null) {
|
||||
return;
|
||||
}
|
||||
end(stream);
|
||||
level = ZlibFactory.getCompressionLevel(conf);
|
||||
strategy = ZlibFactory.getCompressionStrategy(conf);
|
||||
stream = init(level.compressionLevel(),
|
||||
strategy.compressionStrategy(),
|
||||
windowBits.windowBits());
|
||||
Log.debug("Reinit compressor with new compression configuration");
|
||||
}
|
||||
|
||||
|
||||
public synchronized void setInput(byte[] b, int off, int len) {
|
||||
if (b== null) {
|
||||
throw new NullPointerException();
|
||||
|
@ -23,6 +23,8 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.hadoop.io.compress.Decompressor;
|
||||
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
|
||||
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
|
||||
import org.apache.hadoop.util.NativeCodeLoader;
|
||||
|
||||
/**
|
||||
@ -106,5 +108,25 @@ public static Decompressor getZlibDecompressor(Configuration conf) {
|
||||
return (isNativeZlibLoaded(conf)) ?
|
||||
new ZlibDecompressor() : new BuiltInZlibInflater();
|
||||
}
|
||||
|
||||
|
||||
public static void setCompressionStrategy(Configuration conf,
|
||||
CompressionStrategy strategy) {
|
||||
conf.setEnum("zlib.compress.strategy", strategy);
|
||||
}
|
||||
|
||||
public static CompressionStrategy getCompressionStrategy(Configuration conf) {
|
||||
return conf.getEnum("zlib.compress.strategy",
|
||||
CompressionStrategy.DEFAULT_STRATEGY);
|
||||
}
|
||||
|
||||
public static void setCompressionLevel(Configuration conf,
|
||||
CompressionLevel level) {
|
||||
conf.setEnum("zlib.compress.level", level);
|
||||
}
|
||||
|
||||
public static CompressionLevel getCompressionLevel(Configuration conf) {
|
||||
return conf.getEnum("zlib.compress.level",
|
||||
CompressionLevel.DEFAULT_COMPRESSION);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,9 +20,11 @@
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -39,6 +41,8 @@
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
|
||||
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
|
||||
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
|
||||
import org.apache.hadoop.util.LineReader;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
@ -77,6 +81,15 @@ public void testBZip2Codec() throws IOException {
|
||||
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGzipCodecWithParam() throws IOException {
|
||||
Configuration conf = new Configuration(this.conf);
|
||||
ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
|
||||
ZlibFactory.setCompressionStrategy(conf, CompressionStrategy.HUFFMAN_ONLY);
|
||||
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
|
||||
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
|
||||
}
|
||||
|
||||
private static void codecTest(Configuration conf, int seed, int count,
|
||||
String codecClass)
|
||||
throws IOException {
|
||||
@ -262,6 +275,54 @@ public void testCodecPoolGzipReuse() throws Exception {
|
||||
assertTrue("Got mismatched ZlibCompressor", c2 != CodecPool.getCompressor(gzc));
|
||||
}
|
||||
|
||||
private static void gzipReinitTest(Configuration conf, CompressionCodec codec)
|
||||
throws IOException {
|
||||
// Add codec to cache
|
||||
ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
|
||||
ZlibFactory.setCompressionStrategy(conf,
|
||||
CompressionStrategy.DEFAULT_STRATEGY);
|
||||
Compressor c1 = CodecPool.getCompressor(codec);
|
||||
CodecPool.returnCompressor(c1);
|
||||
// reset compressor's compression level to perform no compression
|
||||
ZlibFactory.setCompressionLevel(conf, CompressionLevel.NO_COMPRESSION);
|
||||
Compressor c2 = CodecPool.getCompressor(codec, conf);
|
||||
// ensure same compressor placed earlier
|
||||
assertTrue("Got mismatched ZlibCompressor", c1 == c2);
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
CompressionOutputStream cos = null;
|
||||
// write trivially compressable data
|
||||
byte[] b = new byte[1 << 15];
|
||||
Arrays.fill(b, (byte) 43);
|
||||
try {
|
||||
cos = codec.createOutputStream(bos, c2);
|
||||
cos.write(b);
|
||||
} finally {
|
||||
if (cos != null) {
|
||||
cos.close();
|
||||
}
|
||||
CodecPool.returnCompressor(c2);
|
||||
}
|
||||
byte[] outbytes = bos.toByteArray();
|
||||
// verify data were not compressed
|
||||
assertTrue("Compressed bytes contrary to configuration",
|
||||
outbytes.length >= b.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCodecPoolCompressorReinit() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean("hadoop.native.lib", true);
|
||||
if (ZlibFactory.isNativeZlibLoaded(conf)) {
|
||||
GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf);
|
||||
gzipReinitTest(conf, gzc);
|
||||
} else {
|
||||
LOG.warn("testCodecPoolCompressorReinit skipped: native libs not loaded");
|
||||
}
|
||||
conf.setBoolean("hadoop.native.lib", false);
|
||||
DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
|
||||
gzipReinitTest(conf, dfc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException,
|
||||
InstantiationException, IllegalAccessException {
|
||||
|
Loading…
Reference in New Issue
Block a user