From 3c00093cb554176ffc4bf6cebca6d2953099bfe9 Mon Sep 17 00:00:00 2001 From: skyskyhu Date: Fri, 17 May 2024 10:27:39 +0800 Subject: [PATCH] HADOOP-19167 Bug Fix: Change of Codec configuration does not work (#6807) --- .../apache/hadoop/io/compress/CodecPool.java | 4 +++ .../hadoop/io/compress/TestCodecPool.java | 35 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java index 5b1826f9e3..283dcd622f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; @@ -152,6 +153,9 @@ public static Compressor getCompressor(CompressionCodec codec, Configuration con compressor = codec.createCompressor(); LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]"); } else { + if (conf == null && codec instanceof Configurable) { + conf = ((Configurable)codec).getConf(); + } compressor.reinit(conf); if(LOG.isDebugEnabled()) { LOG.debug("Got recycled compressor"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index 4b18ee6047..ac6aff7427 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -22,6 +22,8 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -32,7 +34,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor; import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; +import org.apache.hadoop.io.compress.zlib.ZlibFactory; import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.ReflectionUtils; import org.junit.Before; import org.junit.Test; @@ -86,6 +91,36 @@ public void testCompressorNotReturnSameInstance() { } } + @Test(timeout = 10000) + public void testCompressorConf() throws Exception { + DefaultCodec codec1 = new DefaultCodec(); + Configuration conf = new Configuration(); + ZlibFactory.setCompressionLevel(conf, CompressionLevel.TWO); + codec1.setConf(conf); + Compressor comp1 = CodecPool.getCompressor(codec1); + CodecPool.returnCompressor(comp1); + + DefaultCodec codec2 = new DefaultCodec(); + Configuration conf2 = new Configuration(); + CompressionLevel newCompressionLevel = CompressionLevel.THREE; + ZlibFactory.setCompressionLevel(conf2, newCompressionLevel); + codec2.setConf(conf2); + Compressor comp2 = CodecPool.getCompressor(codec2); + List fields = ReflectionUtils.getDeclaredFieldsIncludingInherited(comp2.getClass()); + for (Field field : fields) { + if (field.getName().equals("level")) { + field.setAccessible(true); + Object levelValue = field.get(comp2); + if (levelValue instanceof CompressionLevel) { + assertEquals(newCompressionLevel, levelValue); + } else { + assertEquals(3, levelValue); + } + } + } + CodecPool.returnCompressor(comp2); + } + @Test(timeout = 10000) public void testDecompressorPoolCounts() { // Get two decompressors and return them