diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index d070558722..449e853113 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -441,6 +441,9 @@ Release 2.6.0 - UNRELEASED HADOOP-9921. daemon scripts should remove pid file on stop call after stop or process is found not running ( vinayakumarb ) + HADOOP-10591. Compression codecs must used pooled direct buffers or + deallocate direct buffers when stream is closed (cmccabe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java index 42e96cfdc5..37b97f2a64 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -100,7 +100,8 @@ public BZip2Codec() { } @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return createOutputStream(out, createCompressor()); + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); } /** @@ -153,7 +154,8 @@ public Compressor createCompressor() { @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - return createInputStream(in, createDecompressor()); + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java index af2ff20b39..f37aadfcb5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; /** * This class encapsulates a streaming compression/decompression pair. @@ -113,4 +114,58 @@ CompressionInputStream createInputStream(InputStream in, * @return the extension including the '.' */ String getDefaultExtension(); + + static class Util { + /** + * Create an output stream with a codec taken from the global CodecPool. + * + * @param codec The codec to use to create the output stream. + * @param conf The configuration to use if we need to create a new codec. + * @param out The output stream to wrap. + * @return The new output stream + * @throws IOException + */ + static CompressionOutputStream createOutputStreamWithCodecPool( + CompressionCodec codec, Configuration conf, OutputStream out) + throws IOException { + Compressor compressor = CodecPool.getCompressor(codec, conf); + CompressionOutputStream stream = null; + try { + stream = codec.createOutputStream(out, compressor); + } finally { + if (stream == null) { + CodecPool.returnCompressor(compressor); + } else { + stream.setTrackedCompressor(compressor); + } + } + return stream; + } + + /** + * Create an input stream with a codec taken from the global CodecPool. + * + * @param codec The codec to use to create the input stream. + * @param conf The configuration to use if we need to create a new codec. + * @param in The input stream to wrap. + * @return The new input stream + * @throws IOException + */ + static CompressionInputStream createInputStreamWithCodecPool( + CompressionCodec codec, Configuration conf, InputStream in) + throws IOException { + Decompressor decompressor = CodecPool.getDecompressor(codec); + CompressionInputStream stream = null; + try { + stream = codec.createInputStream(in, decompressor); + } finally { + if (stream == null) { + CodecPool.returnDecompressor(decompressor); + } else { + stream.setTrackedDecompressor(decompressor); + } + } + return stream; + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java index 4491819d72..cf3ac401cd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java @@ -41,6 +41,8 @@ public abstract class CompressionInputStream extends InputStream implements Seek protected final InputStream in; protected long maxAvailableData = 0L; + private Decompressor trackedDecompressor; + /** * Create a compression input stream that reads * the decompressed bytes from the given stream. @@ -58,6 +60,10 @@ protected CompressionInputStream(InputStream in) throws IOException { @Override public void close() throws IOException { in.close(); + if (trackedDecompressor != null) { + CodecPool.returnDecompressor(trackedDecompressor); + trackedDecompressor = null; + } } /** @@ -112,4 +118,8 @@ public void seek(long pos) throws UnsupportedOperationException { public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException { throw new UnsupportedOperationException(); } + + void setTrackedDecompressor(Decompressor decompressor) { + trackedDecompressor = decompressor; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java index 9bd6b84f98..00e272a9cc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java @@ -34,7 +34,13 @@ public abstract class CompressionOutputStream extends OutputStream { * The output stream to be compressed. */ protected final OutputStream out; - + + /** + * If non-null, this is the Compressor object that we should call + * CodecPool#returnCompressor on when this stream is closed. + */ + private Compressor trackedCompressor; + /** * Create a compression output stream that writes * the compressed bytes to the given stream. @@ -43,11 +49,19 @@ public abstract class CompressionOutputStream extends OutputStream { protected CompressionOutputStream(OutputStream out) { this.out = out; } - + + void setTrackedCompressor(Compressor compressor) { + trackedCompressor = compressor; + } + @Override public void close() throws IOException { finish(); out.close(); + if (trackedCompressor != null) { + CodecPool.returnCompressor(trackedCompressor); + trackedCompressor = null; + } } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java index dc02dcaf42..0e6f02cc9f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java @@ -51,14 +51,8 @@ public Configuration getConf() { @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - // This may leak memory if called in a loop. The createCompressor() call - // may cause allocation of an untracked direct-backed buffer if native - // libs are being used (even if you close the stream). A Compressor - // object should be reused between successive calls. - LOG.warn("DefaultCodec.createOutputStream() may leak memory. " - + "Create a compressor first."); - return new CompressorStream(out, createCompressor(), - conf.getInt("io.file.buffer.size", 4*1024)); + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); } @Override @@ -82,8 +76,8 @@ public Compressor createCompressor() { @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - return new DecompressorStream(in, createDecompressor(), - conf.getInt("io.file.buffer.size", 4*1024)); + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java index 487f29bec6..c493f1705d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java @@ -159,10 +159,11 @@ public void resetState() throws IOException { @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return (ZlibFactory.isNativeZlibLoaded(conf)) ? - new CompressorStream(out, createCompressor(), - conf.getInt("io.file.buffer.size", 4*1024)) : - new GzipOutputStream(out); + if (!ZlibFactory.isNativeZlibLoaded(conf)) { + return new GzipOutputStream(out); + } + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); } @Override @@ -192,8 +193,9 @@ public Class getCompressorType() { @Override public CompressionInputStream createInputStream(InputStream in) - throws IOException { - return createInputStream(in, null); + throws IOException { + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java index 4b0ea796b7..61462c08dd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java @@ -84,7 +84,8 @@ public static String getLibraryName() { @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return createOutputStream(out, createCompressor()); + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); } /** @@ -157,7 +158,8 @@ public Compressor createCompressor() { @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - return createInputStream(in, createDecompressor()); + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java index 402f8c8e99..8d2fa1a6fb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java @@ -95,7 +95,8 @@ public static String getLibraryName() { @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return createOutputStream(out, createCompressor()); + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); } /** @@ -158,7 +159,8 @@ public Compressor createCompressor() { @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - return createInputStream(in, createDecompressor()); + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); } /**