MAPREDUCE-5918. LineRecordReader can return the same decompressor to CodecPool multiple times (Sergey Murylev via raviprak)

This commit is contained in:
Ravi Prakash 2014-11-14 03:45:53 -08:00
parent d005404ef7
commit 1a1dcce827
7 changed files with 150 additions and 31 deletions

View File

@ -17,9 +17,9 @@
*/ */
package org.apache.hadoop.io.compress; package org.apache.hadoop.io.compress;
import java.util.ArrayList; import java.util.HashSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.Set;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -47,15 +47,15 @@ public class CodecPool {
* A global compressor pool used to save the expensive * A global compressor pool used to save the expensive
* construction/destruction of (possibly native) decompression codecs. * construction/destruction of (possibly native) decompression codecs.
*/ */
private static final Map<Class<Compressor>, List<Compressor>> compressorPool = private static final Map<Class<Compressor>, Set<Compressor>> compressorPool =
new HashMap<Class<Compressor>, List<Compressor>>(); new HashMap<Class<Compressor>, Set<Compressor>>();
/** /**
* A global decompressor pool used to save the expensive * A global decompressor pool used to save the expensive
* construction/destruction of (possibly native) decompression codecs. * construction/destruction of (possibly native) decompression codecs.
*/ */
private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = private static final Map<Class<Decompressor>, Set<Decompressor>> decompressorPool =
new HashMap<Class<Decompressor>, List<Decompressor>>(); new HashMap<Class<Decompressor>, Set<Decompressor>>();
private static <T> LoadingCache<Class<T>, AtomicInteger> createCache( private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
Class<T> klass) { Class<T> klass) {
@ -80,20 +80,21 @@ public class CodecPool {
private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts =
createCache(Decompressor.class); createCache(Decompressor.class);
private static <T> T borrow(Map<Class<T>, List<T>> pool, private static <T> T borrow(Map<Class<T>, Set<T>> pool,
Class<? extends T> codecClass) { Class<? extends T> codecClass) {
T codec = null; T codec = null;
// Check if an appropriate codec is available // Check if an appropriate codec is available
List<T> codecList; Set<T> codecSet;
synchronized (pool) { synchronized (pool) {
codecList = pool.get(codecClass); codecSet = pool.get(codecClass);
} }
if (codecList != null) { if (codecSet != null) {
synchronized (codecList) { synchronized (codecSet) {
if (!codecList.isEmpty()) { if (!codecSet.isEmpty()) {
codec = codecList.remove(codecList.size() - 1); codec = codecSet.iterator().next();
codecSet.remove(codec);
} }
} }
} }
@ -101,22 +102,23 @@ public class CodecPool {
return codec; return codec;
} }
private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) { private static <T> boolean payback(Map<Class<T>, Set<T>> pool, T codec) {
if (codec != null) { if (codec != null) {
Class<T> codecClass = ReflectionUtils.getClass(codec); Class<T> codecClass = ReflectionUtils.getClass(codec);
List<T> codecList; Set<T> codecSet;
synchronized (pool) { synchronized (pool) {
codecList = pool.get(codecClass); codecSet = pool.get(codecClass);
if (codecList == null) { if (codecSet == null) {
codecList = new ArrayList<T>(); codecSet = new HashSet<T>();
pool.put(codecClass, codecList); pool.put(codecClass, codecSet);
} }
} }
synchronized (codecList) { synchronized (codecSet) {
codecList.add(codec); return codecSet.add(codec);
} }
} }
return false;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -200,8 +202,9 @@ public class CodecPool {
return; return;
} }
compressor.reset(); compressor.reset();
payback(compressorPool, compressor); if (payback(compressorPool, compressor)) {
updateLeaseCount(compressorCounts, compressor, -1); updateLeaseCount(compressorCounts, compressor, -1);
}
} }
/** /**
@ -219,8 +222,9 @@ public class CodecPool {
return; return;
} }
decompressor.reset(); decompressor.reset();
payback(decompressorPool, decompressor); if (payback(decompressorPool, decompressor)) {
updateLeaseCount(decompressorCounts, decompressor, -1); updateLeaseCount(decompressorCounts, decompressor, -1);
}
} }
/** /**

View File

@ -19,15 +19,9 @@ package org.apache.hadoop.io.compress;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -35,6 +29,9 @@ import org.apache.hadoop.conf.Configuration;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
public class TestCodecPool { public class TestCodecPool {
private final String LEASE_COUNT_ERR = private final String LEASE_COUNT_ERR =
"Incorrect number of leased (de)compressors"; "Incorrect number of leased (de)compressors";
@ -61,6 +58,25 @@ public class TestCodecPool {
CodecPool.returnCompressor(comp1); CodecPool.returnCompressor(comp1);
assertEquals(LEASE_COUNT_ERR, 0, assertEquals(LEASE_COUNT_ERR, 0,
CodecPool.getLeasedCompressorsCount(codec)); CodecPool.getLeasedCompressorsCount(codec));
CodecPool.returnCompressor(comp1);
assertEquals(LEASE_COUNT_ERR, 0,
CodecPool.getLeasedCompressorsCount(codec));
}
@Test(timeout = 1000)
public void testCompressorNotReturnSameInstance() {
Compressor comp = CodecPool.getCompressor(codec);
CodecPool.returnCompressor(comp);
CodecPool.returnCompressor(comp);
Set<Compressor> compressors = new HashSet<Compressor>();
for (int i = 0; i < 10; ++i) {
compressors.add(CodecPool.getCompressor(codec));
}
assertEquals(10, compressors.size());
for (Compressor compressor : compressors) {
CodecPool.returnCompressor(compressor);
}
} }
@Test(timeout = 1000) @Test(timeout = 1000)
@ -78,6 +94,10 @@ public class TestCodecPool {
CodecPool.returnDecompressor(decomp1); CodecPool.returnDecompressor(decomp1);
assertEquals(LEASE_COUNT_ERR, 0, assertEquals(LEASE_COUNT_ERR, 0,
CodecPool.getLeasedDecompressorsCount(codec)); CodecPool.getLeasedDecompressorsCount(codec));
CodecPool.returnDecompressor(decomp1);
assertEquals(LEASE_COUNT_ERR, 0,
CodecPool.getLeasedCompressorsCount(codec));
} }
@Test(timeout = 1000) @Test(timeout = 1000)
@ -154,4 +174,19 @@ public class TestCodecPool {
assertEquals(LEASE_COUNT_ERR, 0, assertEquals(LEASE_COUNT_ERR, 0,
CodecPool.getLeasedDecompressorsCount(codec)); CodecPool.getLeasedDecompressorsCount(codec));
} }
@Test(timeout = 1000)
public void testDecompressorNotReturnSameInstance() {
Decompressor decomp = CodecPool.getDecompressor(codec);
CodecPool.returnDecompressor(decomp);
CodecPool.returnDecompressor(decomp);
Set<Decompressor> decompressors = new HashSet<Decompressor>();
for (int i = 0; i < 10; ++i) {
decompressors.add(CodecPool.getDecompressor(codec));
}
assertEquals(10, decompressors.size());
for (Decompressor decompressor : decompressors) {
CodecPool.returnDecompressor(decompressor);
}
}
} }

View File

@ -236,6 +236,9 @@ Release 2.7.0 - UNRELEASED
BUG FIXES BUG FIXES
MAPREDUCE-5918. LineRecordReader can return the same decompressor to CodecPool
multiple times (Sergey Murylev via raviprak)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -284,6 +284,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
} finally { } finally {
if (decompressor != null) { if (decompressor != null) {
CodecPool.returnDecompressor(decompressor); CodecPool.returnDecompressor(decompressor);
decompressor = null;
} }
} }
} }

View File

@ -232,6 +232,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
} finally { } finally {
if (decompressor != null) { if (decompressor != null) {
CodecPool.returnDecompressor(decompressor); CodecPool.returnDecompressor(decompressor);
decompressor = null;
} }
} }
} }

View File

@ -27,12 +27,17 @@ import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;
import org.junit.Test; import org.junit.Test;
public class TestLineRecordReader { public class TestLineRecordReader {
@ -225,4 +230,36 @@ public class TestLineRecordReader {
assertTrue("BOM is not skipped", skipBOM); assertTrue("BOM is not skipped", skipBOM);
} }
@Test
public void testMultipleClose() throws IOException {
URL testFileUrl = getClass().getClassLoader().
getResource("recordSpanningMultipleSplits.txt.bz2");
assertNotNull("Cannot find recordSpanningMultipleSplits.txt.bz2",
testFileUrl);
File testFile = new File(testFileUrl.getFile());
Path testFilePath = new Path(testFile.getAbsolutePath());
long testFileSize = testFile.length();
Configuration conf = new Configuration();
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
(String[])null);
LineRecordReader reader = new LineRecordReader(conf, split);
LongWritable key = new LongWritable();
Text value = new Text();
//noinspection StatementWithEmptyBody
while (reader.next(key, value)) ;
reader.close();
reader.close();
BZip2Codec codec = new BZip2Codec();
codec.setConf(conf);
Set<Decompressor> decompressors = new HashSet<Decompressor>();
for (int i = 0; i < 10; ++i) {
decompressors.add(CodecPool.getDecompressor(codec));
}
assertEquals(10, decompressors.size());
}
} }

View File

@ -27,10 +27,15 @@ import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@ -231,4 +236,37 @@ public class TestLineRecordReader {
assertTrue("BOM is not skipped", skipBOM); assertTrue("BOM is not skipped", skipBOM);
} }
@Test
public void testMultipleClose() throws IOException {
URL testFileUrl = getClass().getClassLoader().
getResource("recordSpanningMultipleSplits.txt.bz2");
assertNotNull("Cannot find recordSpanningMultipleSplits.txt.bz2",
testFileUrl);
File testFile = new File(testFileUrl.getFile());
Path testFilePath = new Path(testFile.getAbsolutePath());
long testFileSize = testFile.length();
Configuration conf = new Configuration();
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
// read the data and check whether BOM is skipped
FileSplit split = new FileSplit(testFilePath, 0, testFileSize, null);
LineRecordReader reader = new LineRecordReader();
reader.initialize(split, context);
//noinspection StatementWithEmptyBody
while (reader.nextKeyValue()) ;
reader.close();
reader.close();
BZip2Codec codec = new BZip2Codec();
codec.setConf(conf);
Set<Decompressor> decompressors = new HashSet<Decompressor>();
for (int i = 0; i < 10; ++i) {
decompressors.add(CodecPool.getDecompressor(codec));
}
assertEquals(10, decompressors.size());
}
} }