HADOOP-10591. Compression codecs must used pooled direct buffers or deallocate direct buffers when stream is closed (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611423 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-07-17 18:17:18 +00:00
parent 1e7ce76bba
commit ef9e24f826
9 changed files with 108 additions and 24 deletions

View File

@ -441,6 +441,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-9921. daemon scripts should remove pid file on stop call after stop
or process is found not running ( vinayakumarb )
HADOOP-10591. Compression codecs must used pooled direct buffers or
deallocate direct buffers when stream is closed (cmccabe)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -100,7 +100,8 @@ public BZip2Codec() { }
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return createOutputStream(out, createCompressor());
return CompressionCodec.Util.
createOutputStreamWithCodecPool(this, conf, out);
}
/**
@ -153,7 +154,8 @@ public Compressor createCompressor() {
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return createInputStream(in, createDecompressor());
return CompressionCodec.Util.
createInputStreamWithCodecPool(this, conf, in);
}
/**

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
/**
* This class encapsulates a streaming compression/decompression pair.
@ -113,4 +114,58 @@ CompressionInputStream createInputStream(InputStream in,
* @return the extension including the '.'
*/
String getDefaultExtension();
static class Util {
/**
* Create an output stream with a codec taken from the global CodecPool.
*
* @param codec The codec to use to create the output stream.
* @param conf The configuration to use if we need to create a new codec.
* @param out The output stream to wrap.
* @return The new output stream
* @throws IOException
*/
static CompressionOutputStream createOutputStreamWithCodecPool(
CompressionCodec codec, Configuration conf, OutputStream out)
throws IOException {
Compressor compressor = CodecPool.getCompressor(codec, conf);
CompressionOutputStream stream = null;
try {
stream = codec.createOutputStream(out, compressor);
} finally {
if (stream == null) {
CodecPool.returnCompressor(compressor);
} else {
stream.setTrackedCompressor(compressor);
}
}
return stream;
}
/**
* Create an input stream with a codec taken from the global CodecPool.
*
* @param codec The codec to use to create the input stream.
* @param conf The configuration to use if we need to create a new codec.
* @param in The input stream to wrap.
* @return The new input stream
* @throws IOException
*/
static CompressionInputStream createInputStreamWithCodecPool(
CompressionCodec codec, Configuration conf, InputStream in)
throws IOException {
Decompressor decompressor = CodecPool.getDecompressor(codec);
CompressionInputStream stream = null;
try {
stream = codec.createInputStream(in, decompressor);
} finally {
if (stream == null) {
CodecPool.returnDecompressor(decompressor);
} else {
stream.setTrackedDecompressor(decompressor);
}
}
return stream;
}
}
}

View File

@ -41,6 +41,8 @@ public abstract class CompressionInputStream extends InputStream implements Seek
protected final InputStream in;
protected long maxAvailableData = 0L;
private Decompressor trackedDecompressor;
/**
* Create a compression input stream that reads
* the decompressed bytes from the given stream.
@ -58,6 +60,10 @@ protected CompressionInputStream(InputStream in) throws IOException {
@Override
public void close() throws IOException {
in.close();
if (trackedDecompressor != null) {
CodecPool.returnDecompressor(trackedDecompressor);
trackedDecompressor = null;
}
}
/**
@ -112,4 +118,8 @@ public void seek(long pos) throws UnsupportedOperationException {
public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException {
throw new UnsupportedOperationException();
}
void setTrackedDecompressor(Decompressor decompressor) {
trackedDecompressor = decompressor;
}
}

View File

@ -35,6 +35,12 @@ public abstract class CompressionOutputStream extends OutputStream {
*/
protected final OutputStream out;
/**
* If non-null, this is the Compressor object that we should call
* CodecPool#returnCompressor on when this stream is closed.
*/
private Compressor trackedCompressor;
/**
* Create a compression output stream that writes
* the compressed bytes to the given stream.
@ -44,10 +50,18 @@ protected CompressionOutputStream(OutputStream out) {
this.out = out;
}
void setTrackedCompressor(Compressor compressor) {
trackedCompressor = compressor;
}
@Override
public void close() throws IOException {
finish();
out.close();
if (trackedCompressor != null) {
CodecPool.returnCompressor(trackedCompressor);
trackedCompressor = null;
}
}
@Override

View File

@ -51,14 +51,8 @@ public Configuration getConf() {
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
// This may leak memory if called in a loop. The createCompressor() call
// may cause allocation of an untracked direct-backed buffer if native
// libs are being used (even if you close the stream). A Compressor
// object should be reused between successive calls.
LOG.warn("DefaultCodec.createOutputStream() may leak memory. "
+ "Create a compressor first.");
return new CompressorStream(out, createCompressor(),
conf.getInt("io.file.buffer.size", 4*1024));
return CompressionCodec.Util.
createOutputStreamWithCodecPool(this, conf, out);
}
@Override
@ -82,8 +76,8 @@ public Compressor createCompressor() {
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return new DecompressorStream(in, createDecompressor(),
conf.getInt("io.file.buffer.size", 4*1024));
return CompressionCodec.Util.
createInputStreamWithCodecPool(this, conf, in);
}
@Override

View File

@ -159,10 +159,11 @@ public void resetState() throws IOException {
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return (ZlibFactory.isNativeZlibLoaded(conf)) ?
new CompressorStream(out, createCompressor(),
conf.getInt("io.file.buffer.size", 4*1024)) :
new GzipOutputStream(out);
if (!ZlibFactory.isNativeZlibLoaded(conf)) {
return new GzipOutputStream(out);
}
return CompressionCodec.Util.
createOutputStreamWithCodecPool(this, conf, out);
}
@Override
@ -192,8 +193,9 @@ public Class<? extends Compressor> getCompressorType() {
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return createInputStream(in, null);
throws IOException {
return CompressionCodec.Util.
createInputStreamWithCodecPool(this, conf, in);
}
@Override

View File

@ -84,7 +84,8 @@ public static String getLibraryName() {
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return createOutputStream(out, createCompressor());
return CompressionCodec.Util.
createOutputStreamWithCodecPool(this, conf, out);
}
/**
@ -157,7 +158,8 @@ public Compressor createCompressor() {
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return createInputStream(in, createDecompressor());
return CompressionCodec.Util.
createInputStreamWithCodecPool(this, conf, in);
}
/**

View File

@ -95,7 +95,8 @@ public static String getLibraryName() {
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return createOutputStream(out, createCompressor());
return CompressionCodec.Util.
createOutputStreamWithCodecPool(this, conf, out);
}
/**
@ -158,7 +159,8 @@ public Compressor createCompressor() {
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return createInputStream(in, createDecompressor());
return CompressionCodec.Util.
createInputStreamWithCodecPool(this, conf, in);
}
/**