From 254afbfe96410e40b996de0bd6542e07651269ee Mon Sep 17 00:00:00 2001 From: Christopher Douglas Date: Mon, 14 Sep 2009 00:45:01 +0000 Subject: [PATCH] HADOOP-5879. Read compression level and strategy from Configuration for gzip compression. Contributed by He Yongqiang git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@814455 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + .../apache/hadoop/io/compress/CodecPool.java | 9 ++- .../apache/hadoop/io/compress/Compressor.java | 12 +++- .../apache/hadoop/io/compress/GzipCodec.java | 12 +++- .../compress/bzip2/BZip2DummyCompressor.java | 6 ++ .../io/compress/zlib/BuiltInZlibDeflater.java | 28 ++++++++ .../io/compress/zlib/ZlibCompressor.java | 69 ++++++++++++++----- .../hadoop/io/compress/zlib/ZlibFactory.java | 24 ++++++- .../apache/hadoop/io/compress/TestCodec.java | 61 ++++++++++++++++ 9 files changed, 204 insertions(+), 20 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index afe10ab8ff..36b4df4a73 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -553,6 +553,9 @@ Trunk (unreleased changes) HADOOP-6252. Provide a method to determine if a deprecated key is set in config file. (Jakob Homan via suresh) + HADOOP-5879. Read compression level and strategy from Configuration for + gzip compression. (He Yongqiang via cdouglas) + OPTIMIZATIONS HADOOP-5595. NameNode does not need to run a replicator to choose a diff --git a/src/java/org/apache/hadoop/io/compress/CodecPool.java b/src/java/org/apache/hadoop/io/compress/CodecPool.java index 8960b41ef3..dbf1f1525f 100644 --- a/src/java/org/apache/hadoop/io/compress/CodecPool.java +++ b/src/java/org/apache/hadoop/io/compress/CodecPool.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; /** @@ -91,20 +92,26 @@ private static void payback(Map, List> pool, T codec) { * * @param codec the CompressionCodec for which to get the * Compressor + * @param conf the Configuration object which contains confs for creating or reinit the compressor * @return Compressor for the given * CompressionCodec from the pool or a new one */ - public static Compressor getCompressor(CompressionCodec codec) { + public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { Compressor compressor = borrow(compressorPool, codec.getCompressorType()); if (compressor == null) { compressor = codec.createCompressor(); LOG.info("Got brand-new compressor"); } else { + compressor.reinit(conf); LOG.debug("Got recycled compressor"); } return compressor; } + public static Compressor getCompressor(CompressionCodec codec) { + return getCompressor(codec, null); + } + /** * Get a {@link Decompressor} for the given {@link CompressionCodec} from the * pool or a new one. diff --git a/src/java/org/apache/hadoop/io/compress/Compressor.java b/src/java/org/apache/hadoop/io/compress/Compressor.java index 66bc4bfeed..0d05cdfa3e 100644 --- a/src/java/org/apache/hadoop/io/compress/Compressor.java +++ b/src/java/org/apache/hadoop/io/compress/Compressor.java @@ -20,6 +20,8 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; + /** * Specification of a stream-based 'compressor' which can be * plugged into a {@link CompressionOutputStream} to compress data. @@ -102,5 +104,13 @@ public interface Compressor { /** * Closes the compressor and discards any unprocessed input. */ - public void end(); + public void end(); + + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration + * + * @param conf Configuration from which new setting are fetched + */ + public void reinit(Configuration conf); } diff --git a/src/java/org/apache/hadoop/io/compress/GzipCodec.java b/src/java/org/apache/hadoop/io/compress/GzipCodec.java index f7bb792488..e19cd6b375 100644 --- a/src/java/org/apache/hadoop/io/compress/GzipCodec.java +++ b/src/java/org/apache/hadoop/io/compress/GzipCodec.java @@ -22,8 +22,11 @@ import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPInputStream; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.zlib.*; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; /** * This class creates gzip compressors/decompressors. @@ -155,7 +158,7 @@ public CompressionOutputStream createOutputStream(OutputStream out, public Compressor createCompressor() { return (ZlibFactory.isNativeZlibLoaded(conf)) - ? new GzipZlibCompressor() + ? new GzipZlibCompressor(conf) : null; } @@ -206,6 +209,13 @@ public GzipZlibCompressor() { ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY, ZlibCompressor.CompressionHeader.GZIP_FORMAT, 64*1024); } + + public GzipZlibCompressor(Configuration conf) { + super(ZlibFactory.getCompressionLevel(conf), + ZlibFactory.getCompressionStrategy(conf), + ZlibCompressor.CompressionHeader.GZIP_FORMAT, + 64 * 1024); + } } static final class GzipZlibDecompressor extends ZlibDecompressor { diff --git a/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java b/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java index 143a3b2c3d..28548ee529 100644 --- a/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java +++ b/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.Compressor; /** @@ -77,4 +78,9 @@ public void setInput(byte[] b, int off, int len) { throw new UnsupportedOperationException(); } + @Override + public void reinit(Configuration conf) { + // do nothing + } + } diff --git a/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java b/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java index f27e831a58..a017e1bc9a 100644 --- a/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java +++ b/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.util.zip.Deflater; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.Compressor; +import org.mortbay.log.Log; /** * A wrapper around java.util.zip.Deflater to make it conform @@ -46,4 +48,30 @@ public synchronized int compress(byte[] b, int off, int len) throws IOException { return super.deflate(b, off, len); } + + /** + * reinit the compressor with the given configuration. It will reset the + * compressor's compression level and compression strategy. Different from + * ZlibCompressor, BuiltInZlibDeflater only support three + * kind of compression strategy: FILTERED, HUFFMAN_ONLY and DEFAULT_STRATEGY. + * It will use DEFAULT_STRATEGY as default if the configured compression + * strategy is not supported. + */ + @Override + public void reinit(Configuration conf) { + reset(); + if (conf == null) { + return; + } + setLevel(ZlibFactory.getCompressionLevel(conf).compressionLevel()); + final ZlibCompressor.CompressionStrategy strategy = + ZlibFactory.getCompressionStrategy(conf); + try { + setStrategy(strategy.compressionStrategy()); + } catch (IllegalArgumentException ill) { + Log.warn(strategy + " not supported by BuiltInZlibDeflater."); + setStrategy(DEFAULT_STRATEGY); + } + Log.debug("Reinit compressor with new compression configuration"); + } } diff --git a/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java b/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java index 754af216ad..25491495e7 100644 --- a/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java +++ b/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java @@ -22,8 +22,10 @@ import java.nio.Buffer; import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.util.NativeCodeLoader; +import org.mortbay.log.Log; /** * A {@link Compressor} based on the popular @@ -40,7 +42,7 @@ public class ZlibCompressor implements Compressor { private long stream; private CompressionLevel level; private CompressionStrategy strategy; - private CompressionHeader windowBits; + private final CompressionHeader windowBits; private int directBufferSize; private byte[] userBuf = null; private int userBufOff = 0, userBufLen = 0; @@ -178,6 +180,31 @@ static boolean isNativeZlibLoaded() { return nativeZlibLoaded; } + protected final void construct(CompressionLevel level, CompressionStrategy strategy, + CompressionHeader header, int directBufferSize) { + } + + /** + * Creates a new compressor with the default compression level. + * Compressed data will be generated in ZLIB format. + */ + public ZlibCompressor() { + this(CompressionLevel.DEFAULT_COMPRESSION, + CompressionStrategy.DEFAULT_STRATEGY, + CompressionHeader.DEFAULT_HEADER, + DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Creates a new compressor, taking settings from the configuration. + */ + public ZlibCompressor(Configuration conf) { + this(ZlibFactory.getCompressionLevel(conf), + ZlibFactory.getCompressionStrategy(conf), + CompressionHeader.DEFAULT_HEADER, + DEFAULT_DIRECT_BUFFER_SIZE); + } + /** * Creates a new compressor using the specified compression level. * Compressed data will be generated in ZLIB format. @@ -192,28 +219,38 @@ public ZlibCompressor(CompressionLevel level, CompressionStrategy strategy, this.level = level; this.strategy = strategy; this.windowBits = header; - this.directBufferSize = directBufferSize; - - uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); - compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); - compressedDirectBuf.position(directBufferSize); - stream = init(this.level.compressionLevel(), this.strategy.compressionStrategy(), this.windowBits.windowBits()); + + this.directBufferSize = directBufferSize; + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf.position(directBufferSize); } - + /** - * Creates a new compressor with the default compression level. - * Compressed data will be generated in ZLIB format. + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration. It will reset the compressor's compression level + * and compression strategy. + * + * @param conf Configuration storing new settings */ - public ZlibCompressor() { - this(CompressionLevel.DEFAULT_COMPRESSION, - CompressionStrategy.DEFAULT_STRATEGY, - CompressionHeader.DEFAULT_HEADER, - DEFAULT_DIRECT_BUFFER_SIZE); + @Override + public synchronized void reinit(Configuration conf) { + reset(); + if (conf == null) { + return; + } + end(stream); + level = ZlibFactory.getCompressionLevel(conf); + strategy = ZlibFactory.getCompressionStrategy(conf); + stream = init(level.compressionLevel(), + strategy.compressionStrategy(), + windowBits.windowBits()); + Log.debug("Reinit compressor with new compression configuration"); } - + public synchronized void setInput(byte[] b, int off, int len) { if (b== null) { throw new NullPointerException(); diff --git a/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java b/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java index e3ce3ec1af..bf16262b2d 100644 --- a/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java +++ b/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java @@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; import org.apache.hadoop.util.NativeCodeLoader; /** @@ -106,5 +108,25 @@ public static Decompressor getZlibDecompressor(Configuration conf) { return (isNativeZlibLoaded(conf)) ? new ZlibDecompressor() : new BuiltInZlibInflater(); } - + + public static void setCompressionStrategy(Configuration conf, + CompressionStrategy strategy) { + conf.setEnum("zlib.compress.strategy", strategy); + } + + public static CompressionStrategy getCompressionStrategy(Configuration conf) { + return conf.getEnum("zlib.compress.strategy", + CompressionStrategy.DEFAULT_STRATEGY); + } + + public static void setCompressionLevel(Configuration conf, + CompressionLevel level) { + conf.setEnum("zlib.compress.level", level); + } + + public static CompressionLevel getCompressionLevel(Configuration conf) { + return conf.getEnum("zlib.compress.level", + CompressionLevel.DEFAULT_COMPRESSION); + } + } diff --git a/src/test/core/org/apache/hadoop/io/compress/TestCodec.java b/src/test/core/org/apache/hadoop/io/compress/TestCodec.java index 8df4cf34e1..ae79af347e 100644 --- a/src/test/core/org/apache/hadoop/io/compress/TestCodec.java +++ b/src/test/core/org/apache/hadoop/io/compress/TestCodec.java @@ -20,9 +20,11 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Arrays; import java.util.Random; import org.apache.hadoop.conf.Configuration; @@ -39,6 +41,8 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; import org.apache.hadoop.io.compress.zlib.ZlibFactory; import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.ReflectionUtils; @@ -77,6 +81,15 @@ public void testBZip2Codec() throws IOException { codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec"); } + @Test + public void testGzipCodecWithParam() throws IOException { + Configuration conf = new Configuration(this.conf); + ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION); + ZlibFactory.setCompressionStrategy(conf, CompressionStrategy.HUFFMAN_ONLY); + codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec"); + codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec"); + } + private static void codecTest(Configuration conf, int seed, int count, String codecClass) throws IOException { @@ -262,6 +275,54 @@ public void testCodecPoolGzipReuse() throws Exception { assertTrue("Got mismatched ZlibCompressor", c2 != CodecPool.getCompressor(gzc)); } + private static void gzipReinitTest(Configuration conf, CompressionCodec codec) + throws IOException { + // Add codec to cache + ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION); + ZlibFactory.setCompressionStrategy(conf, + CompressionStrategy.DEFAULT_STRATEGY); + Compressor c1 = CodecPool.getCompressor(codec); + CodecPool.returnCompressor(c1); + // reset compressor's compression level to perform no compression + ZlibFactory.setCompressionLevel(conf, CompressionLevel.NO_COMPRESSION); + Compressor c2 = CodecPool.getCompressor(codec, conf); + // ensure same compressor placed earlier + assertTrue("Got mismatched ZlibCompressor", c1 == c2); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + CompressionOutputStream cos = null; + // write trivially compressable data + byte[] b = new byte[1 << 15]; + Arrays.fill(b, (byte) 43); + try { + cos = codec.createOutputStream(bos, c2); + cos.write(b); + } finally { + if (cos != null) { + cos.close(); + } + CodecPool.returnCompressor(c2); + } + byte[] outbytes = bos.toByteArray(); + // verify data were not compressed + assertTrue("Compressed bytes contrary to configuration", + outbytes.length >= b.length); + } + + @Test + public void testCodecPoolCompressorReinit() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean("hadoop.native.lib", true); + if (ZlibFactory.isNativeZlibLoaded(conf)) { + GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf); + gzipReinitTest(conf, gzc); + } else { + LOG.warn("testCodecPoolCompressorReinit skipped: native libs not loaded"); + } + conf.setBoolean("hadoop.native.lib", false); + DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf); + gzipReinitTest(conf, dfc); + } + @Test public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {