diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java index 11d88f13cd..bb566ded68 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.io.compress; -import java.util.ArrayList; +import java.util.HashSet; import java.util.HashMap; -import java.util.List; +import java.util.Set; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -47,15 +47,15 @@ public class CodecPool { * A global compressor pool used to save the expensive * construction/destruction of (possibly native) decompression codecs. */ - private static final Map, List> compressorPool = - new HashMap, List>(); + private static final Map, Set> compressorPool = + new HashMap, Set>(); /** * A global decompressor pool used to save the expensive * construction/destruction of (possibly native) decompression codecs. */ - private static final Map, List> decompressorPool = - new HashMap, List>(); + private static final Map, Set> decompressorPool = + new HashMap, Set>(); private static LoadingCache, AtomicInteger> createCache( Class klass) { @@ -80,20 +80,21 @@ public AtomicInteger load(Class key) throws Exception { private static final LoadingCache, AtomicInteger> decompressorCounts = createCache(Decompressor.class); - private static T borrow(Map, List> pool, + private static T borrow(Map, Set> pool, Class codecClass) { T codec = null; // Check if an appropriate codec is available - List codecList; + Set codecSet; synchronized (pool) { - codecList = pool.get(codecClass); + codecSet = pool.get(codecClass); } - if (codecList != null) { - synchronized (codecList) { - if (!codecList.isEmpty()) { - codec = codecList.remove(codecList.size() - 1); + if (codecSet != null) { + synchronized (codecSet) { + if (!codecSet.isEmpty()) { + codec = codecSet.iterator().next(); + codecSet.remove(codec); } } } @@ -101,22 +102,23 @@ private static T borrow(Map, List> pool, return codec; } - private static void payback(Map, List> pool, T codec) { + private static boolean payback(Map, Set> pool, T codec) { if (codec != null) { Class codecClass = ReflectionUtils.getClass(codec); - List codecList; + Set codecSet; synchronized (pool) { - codecList = pool.get(codecClass); - if (codecList == null) { - codecList = new ArrayList(); - pool.put(codecClass, codecList); + codecSet = pool.get(codecClass); + if (codecSet == null) { + codecSet = new HashSet(); + pool.put(codecClass, codecSet); } } - synchronized (codecList) { - codecList.add(codec); + synchronized (codecSet) { + return codecSet.add(codec); } } + return false; } @SuppressWarnings("unchecked") @@ -200,8 +202,9 @@ public static void returnCompressor(Compressor compressor) { return; } compressor.reset(); - payback(compressorPool, compressor); - updateLeaseCount(compressorCounts, compressor, -1); + if (payback(compressorPool, compressor)) { + updateLeaseCount(compressorCounts, compressor, -1); + } } /** @@ -219,8 +222,9 @@ public static void returnDecompressor(Decompressor decompressor) { return; } decompressor.reset(); - payback(decompressorPool, decompressor); - updateLeaseCount(decompressorCounts, decompressor, -1); + if (payback(decompressorPool, decompressor)) { + updateLeaseCount(decompressorCounts, decompressor, -1); + } } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index 5cacebfe78..c889a59b26 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -19,15 +19,9 @@ import static org.junit.Assert.assertEquals; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -35,6 +29,9 @@ import org.junit.Before; import org.junit.Test; +import java.util.HashSet; +import java.util.Set; + public class TestCodecPool { private final String LEASE_COUNT_ERR = "Incorrect number of leased (de)compressors"; @@ -61,6 +58,25 @@ public void testCompressorPoolCounts() { CodecPool.returnCompressor(comp1); assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec)); + + CodecPool.returnCompressor(comp1); + assertEquals(LEASE_COUNT_ERR, 0, + CodecPool.getLeasedCompressorsCount(codec)); + } + + @Test(timeout = 1000) + public void testCompressorNotReturnSameInstance() { + Compressor comp = CodecPool.getCompressor(codec); + CodecPool.returnCompressor(comp); + CodecPool.returnCompressor(comp); + Set compressors = new HashSet(); + for (int i = 0; i < 10; ++i) { + compressors.add(CodecPool.getCompressor(codec)); + } + assertEquals(10, compressors.size()); + for (Compressor compressor : compressors) { + CodecPool.returnCompressor(compressor); + } } @Test(timeout = 1000) @@ -78,6 +94,10 @@ public void testDecompressorPoolCounts() { CodecPool.returnDecompressor(decomp1); assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedDecompressorsCount(codec)); + + CodecPool.returnDecompressor(decomp1); + assertEquals(LEASE_COUNT_ERR, 0, + CodecPool.getLeasedCompressorsCount(codec)); } @Test(timeout = 1000) @@ -154,4 +174,19 @@ public Boolean call() throws Exception { assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedDecompressorsCount(codec)); } + + @Test(timeout = 1000) + public void testDecompressorNotReturnSameInstance() { + Decompressor decomp = CodecPool.getDecompressor(codec); + CodecPool.returnDecompressor(decomp); + CodecPool.returnDecompressor(decomp); + Set decompressors = new HashSet(); + for (int i = 0; i < 10; ++i) { + decompressors.add(CodecPool.getDecompressor(codec)); + } + assertEquals(10, decompressors.size()); + for (Decompressor decompressor : decompressors) { + CodecPool.returnDecompressor(decompressor); + } + } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a8ca13ff5e..28595caa20 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -236,6 +236,9 @@ Release 2.7.0 - UNRELEASED BUG FIXES + MAPREDUCE-5918. LineRecordReader can return the same decompressor to CodecPool + multiple times (Sergey Murylev via raviprak) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index 6b5c26ee47..ba075e5dfb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -284,6 +284,7 @@ public synchronized void close() throws IOException { } finally { if (decompressor != null) { CodecPool.returnDecompressor(decompressor); + decompressor = null; } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 880a1a2194..42e94ad0c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -232,6 +232,7 @@ public synchronized void close() throws IOException { } finally { if (decompressor != null) { CodecPool.returnDecompressor(decompressor); + decompressor = null; } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index 7b664e93ac..a7a87c9ed0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -27,12 +27,17 @@ import java.io.IOException; import java.net.URL; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.Decompressor; import org.junit.Test; public class TestLineRecordReader { @@ -225,4 +230,36 @@ public void testStripBOM() throws IOException { assertTrue("BOM is not skipped", skipBOM); } + + @Test + public void testMultipleClose() throws IOException { + URL testFileUrl = getClass().getClassLoader(). + getResource("recordSpanningMultipleSplits.txt.bz2"); + assertNotNull("Cannot find recordSpanningMultipleSplits.txt.bz2", + testFileUrl); + File testFile = new File(testFileUrl.getFile()); + Path testFilePath = new Path(testFile.getAbsolutePath()); + long testFileSize = testFile.length(); + Configuration conf = new Configuration(); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + FileSplit split = new FileSplit(testFilePath, 0, testFileSize, + (String[])null); + + LineRecordReader reader = new LineRecordReader(conf, split); + LongWritable key = new LongWritable(); + Text value = new Text(); + //noinspection StatementWithEmptyBody + while (reader.next(key, value)) ; + reader.close(); + reader.close(); + + BZip2Codec codec = new BZip2Codec(); + codec.setConf(conf); + Set decompressors = new HashSet(); + for (int i = 0; i < 10; ++i) { + decompressors.add(CodecPool.getDecompressor(codec)); + } + assertEquals(10, decompressors.size()); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index a1b5147c0c..52fdc090ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -27,10 +27,15 @@ import java.io.IOException; import java.net.URL; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; @@ -231,4 +236,37 @@ public void testStripBOM() throws IOException { assertTrue("BOM is not skipped", skipBOM); } + + @Test + public void testMultipleClose() throws IOException { + URL testFileUrl = getClass().getClassLoader(). + getResource("recordSpanningMultipleSplits.txt.bz2"); + assertNotNull("Cannot find recordSpanningMultipleSplits.txt.bz2", + testFileUrl); + File testFile = new File(testFileUrl.getFile()); + Path testFilePath = new Path(testFile.getAbsolutePath()); + long testFileSize = testFile.length(); + Configuration conf = new Configuration(); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + + // read the data and check whether BOM is skipped + FileSplit split = new FileSplit(testFilePath, 0, testFileSize, null); + LineRecordReader reader = new LineRecordReader(); + reader.initialize(split, context); + + //noinspection StatementWithEmptyBody + while (reader.nextKeyValue()) ; + reader.close(); + reader.close(); + + BZip2Codec codec = new BZip2Codec(); + codec.setConf(conf); + Set decompressors = new HashSet(); + for (int i = 0; i < 10; ++i) { + decompressors.add(CodecPool.getDecompressor(codec)); + } + assertEquals(10, decompressors.size()); + } }