From 8f9ab998e273259c1e7a3ed53ba37d767e02b6bb Mon Sep 17 00:00:00 2001 From: "Arun C. Murthy" Date: Sun, 5 Oct 2014 07:38:21 -0700 Subject: [PATCH] HADOOP-10681. Remove unnecessary synchronization from Snappy & Zlib codecs. Contributed by Gopal Vijayaraghavan. --- .../hadoop-common/CHANGES.txt | 3 + .../io/compress/snappy/SnappyCompressor.java | 24 ++--- .../compress/snappy/SnappyDecompressor.java | 26 +++--- .../io/compress/zlib/ZlibCompressor.java | 24 ++--- .../io/compress/zlib/ZlibDecompressor.java | 30 +++---- .../hadoop/io/compress/TestCodecPool.java | 87 +++++++++++++++++++ 6 files changed, 142 insertions(+), 52 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 6a45540e5d..f025615ade 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -632,6 +632,9 @@ Release 2.6.0 - UNRELEASED HADOOP-10731. Remove @date JavaDoc comment in ProgramDriver class (Henry Saputra via aw) + HADOOP-10681. Remove unnecessary synchronization from Snappy & Zlib + codecs. (Gopal Vijayaraghavan via acmurthy) + BUG FIXES HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java index 376ea0685b..ab45f25058 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java @@ -100,7 +100,7 @@ public SnappyCompressor() { * @param len Length */ @Override - public synchronized void setInput(byte[] b, int off, int len) { + public void setInput(byte[] b, int off, int len) { if (b == null) { throw new NullPointerException(); } @@ -127,7 +127,7 @@ public synchronized void setInput(byte[] b, int off, int len) { * aside to be loaded by this function while the compressed data are * consumed. */ - synchronized void setInputFromSavedData() { + void setInputFromSavedData() { if (0 >= userBufLen) { return; } @@ -146,7 +146,7 @@ synchronized void setInputFromSavedData() { * Does nothing. */ @Override - public synchronized void setDictionary(byte[] b, int off, int len) { + public void setDictionary(byte[] b, int off, int len) { // do nothing } @@ -158,7 +158,7 @@ public synchronized void setDictionary(byte[] b, int off, int len) { * #setInput() should be called in order to provide more input. */ @Override - public synchronized boolean needsInput() { + public boolean needsInput() { return !(compressedDirectBuf.remaining() > 0 || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0); } @@ -168,7 +168,7 @@ public synchronized boolean needsInput() { * with the current contents of the input buffer. */ @Override - public synchronized void finish() { + public void finish() { finish = true; } @@ -180,7 +180,7 @@ public synchronized void finish() { * data output stream has been reached. */ @Override - public synchronized boolean finished() { + public boolean finished() { // Check if all uncompressed data has been consumed return (finish && finished && compressedDirectBuf.remaining() == 0); } @@ -197,7 +197,7 @@ public synchronized boolean finished() { * @return The actual number of bytes of compressed data. */ @Override - public synchronized int compress(byte[] b, int off, int len) + public int compress(byte[] b, int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); @@ -250,7 +250,7 @@ public synchronized int compress(byte[] b, int off, int len) * Resets compressor so that a new set of input data can be processed. */ @Override - public synchronized void reset() { + public void reset() { finish = false; finished = false; uncompressedDirectBuf.clear(); @@ -268,7 +268,7 @@ public synchronized void reset() { * @param conf Configuration from which new setting are fetched */ @Override - public synchronized void reinit(Configuration conf) { + public void reinit(Configuration conf) { reset(); } @@ -276,7 +276,7 @@ public synchronized void reinit(Configuration conf) { * Return number of bytes given to this compressor since last reset. */ @Override - public synchronized long getBytesRead() { + public long getBytesRead() { return bytesRead; } @@ -284,7 +284,7 @@ public synchronized long getBytesRead() { * Return number of bytes consumed by callers of compress since last reset. */ @Override - public synchronized long getBytesWritten() { + public long getBytesWritten() { return bytesWritten; } @@ -292,7 +292,7 @@ public synchronized long getBytesWritten() { * Closes the compressor and discards any unprocessed input. */ @Override - public synchronized void end() { + public void end() { } private native static void initIDs(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java index a86771746c..b5f5acf866 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java @@ -103,7 +103,7 @@ public SnappyDecompressor() { * @param len Length */ @Override - public synchronized void setInput(byte[] b, int off, int len) { + public void setInput(byte[] b, int off, int len) { if (b == null) { throw new NullPointerException(); } @@ -127,7 +127,7 @@ public synchronized void setInput(byte[] b, int off, int len) { * aside to be loaded by this function while the compressed data are * consumed. */ - synchronized void setInputFromSavedData() { + void setInputFromSavedData() { compressedDirectBufLen = Math.min(userBufLen, directBufferSize); // Reinitialize snappy's input direct buffer @@ -144,7 +144,7 @@ synchronized void setInputFromSavedData() { * Does nothing. */ @Override - public synchronized void setDictionary(byte[] b, int off, int len) { + public void setDictionary(byte[] b, int off, int len) { // do nothing } @@ -158,7 +158,7 @@ public synchronized void setDictionary(byte[] b, int off, int len) { * order to provide more input. */ @Override - public synchronized boolean needsInput() { + public boolean needsInput() { // Consume remaining compressed data? if (uncompressedDirectBuf.remaining() > 0) { return false; @@ -183,7 +183,7 @@ public synchronized boolean needsInput() { * @return false. */ @Override - public synchronized boolean needsDictionary() { + public boolean needsDictionary() { return false; } @@ -195,7 +195,7 @@ public synchronized boolean needsDictionary() { * data output stream has been reached. */ @Override - public synchronized boolean finished() { + public boolean finished() { return (finished && uncompressedDirectBuf.remaining() == 0); } @@ -212,7 +212,7 @@ public synchronized boolean finished() { * @throws IOException */ @Override - public synchronized int decompress(byte[] b, int off, int len) + public int decompress(byte[] b, int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); @@ -257,13 +257,13 @@ public synchronized int decompress(byte[] b, int off, int len) * @return 0. */ @Override - public synchronized int getRemaining() { + public int getRemaining() { // Never use this function in BlockDecompressorStream. return 0; } @Override - public synchronized void reset() { + public void reset() { finished = false; compressedDirectBufLen = 0; uncompressedDirectBuf.limit(directBufferSize); @@ -276,7 +276,7 @@ public synchronized void reset() { * input data can be processed. */ @Override - public synchronized void end() { + public void end() { // do nothing } @@ -333,7 +333,7 @@ public void reset() { private boolean endOfInput; @Override - public synchronized void decompress(ByteBuffer src, ByteBuffer dst) + public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException { assert dst.isDirect() : "dst.isDirect()"; assert src.isDirect() : "src.isDirect()"; @@ -343,13 +343,13 @@ public synchronized void decompress(ByteBuffer src, ByteBuffer dst) } @Override - public synchronized void setDictionary(byte[] b, int off, int len) { + public void setDictionary(byte[] b, int off, int len) { throw new UnsupportedOperationException( "byte[] arrays are not supported for DirectDecompressor"); } @Override - public synchronized int decompress(byte[] b, int off, int len) { + public int decompress(byte[] b, int off, int len) { throw new UnsupportedOperationException( "byte[] arrays are not supported for DirectDecompressor"); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java index ce7f68dcc7..6799403b16 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java @@ -243,7 +243,7 @@ public ZlibCompressor(CompressionLevel level, CompressionStrategy strategy, * @param conf Configuration storing new settings */ @Override - public synchronized void reinit(Configuration conf) { + public void reinit(Configuration conf) { reset(); if (conf == null) { return; @@ -260,7 +260,7 @@ public synchronized void reinit(Configuration conf) { } @Override - public synchronized void setInput(byte[] b, int off, int len) { + public void setInput(byte[] b, int off, int len) { if (b== null) { throw new NullPointerException(); } @@ -280,7 +280,7 @@ public synchronized void setInput(byte[] b, int off, int len) { } //copy enough data from userBuf to uncompressedDirectBuf - synchronized void setInputFromSavedData() { + void setInputFromSavedData() { int len = Math.min(userBufLen, uncompressedDirectBuf.remaining()); ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len); userBufLen -= len; @@ -289,7 +289,7 @@ synchronized void setInputFromSavedData() { } @Override - public synchronized void setDictionary(byte[] b, int off, int len) { + public void setDictionary(byte[] b, int off, int len) { if (stream == 0 || b == null) { throw new NullPointerException(); } @@ -300,7 +300,7 @@ public synchronized void setDictionary(byte[] b, int off, int len) { } @Override - public synchronized boolean needsInput() { + public boolean needsInput() { // Consume remaining compressed data? if (compressedDirectBuf.remaining() > 0) { return false; @@ -329,19 +329,19 @@ public synchronized boolean needsInput() { } @Override - public synchronized void finish() { + public void finish() { finish = true; } @Override - public synchronized boolean finished() { + public boolean finished() { // Check if 'zlib' says its 'finished' and // all compressed data has been consumed return (finished && compressedDirectBuf.remaining() == 0); } @Override - public synchronized int compress(byte[] b, int off, int len) + public int compress(byte[] b, int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); @@ -392,7 +392,7 @@ public synchronized int compress(byte[] b, int off, int len) * @return the total (non-negative) number of compressed bytes output so far */ @Override - public synchronized long getBytesWritten() { + public long getBytesWritten() { checkStream(); return getBytesWritten(stream); } @@ -403,13 +403,13 @@ public synchronized long getBytesWritten() { * @return the total (non-negative) number of uncompressed bytes input so far */ @Override - public synchronized long getBytesRead() { + public long getBytesRead() { checkStream(); return getBytesRead(stream); } @Override - public synchronized void reset() { + public void reset() { checkStream(); reset(stream); finish = false; @@ -423,7 +423,7 @@ public synchronized void reset() { } @Override - public synchronized void end() { + public void end() { if (stream != 0) { end(stream); stream = 0; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java index 575ce3c327..89c879a03c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java @@ -120,7 +120,7 @@ public ZlibDecompressor() { } @Override - public synchronized void setInput(byte[] b, int off, int len) { + public void setInput(byte[] b, int off, int len) { if (b == null) { throw new NullPointerException(); } @@ -139,7 +139,7 @@ public synchronized void setInput(byte[] b, int off, int len) { uncompressedDirectBuf.position(directBufferSize); } - synchronized void setInputFromSavedData() { + void setInputFromSavedData() { compressedDirectBufOff = 0; compressedDirectBufLen = userBufLen; if (compressedDirectBufLen > directBufferSize) { @@ -157,7 +157,7 @@ synchronized void setInputFromSavedData() { } @Override - public synchronized void setDictionary(byte[] b, int off, int len) { + public void setDictionary(byte[] b, int off, int len) { if (stream == 0 || b == null) { throw new NullPointerException(); } @@ -169,7 +169,7 @@ public synchronized void setDictionary(byte[] b, int off, int len) { } @Override - public synchronized boolean needsInput() { + public boolean needsInput() { // Consume remaining compressed data? if (uncompressedDirectBuf.remaining() > 0) { return false; @@ -189,19 +189,19 @@ public synchronized boolean needsInput() { } @Override - public synchronized boolean needsDictionary() { + public boolean needsDictionary() { return needDict; } @Override - public synchronized boolean finished() { + public boolean finished() { // Check if 'zlib' says it's 'finished' and // all compressed data has been consumed return (finished && uncompressedDirectBuf.remaining() == 0); } @Override - public synchronized int decompress(byte[] b, int off, int len) + public int decompress(byte[] b, int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); @@ -240,7 +240,7 @@ public synchronized int decompress(byte[] b, int off, int len) * * @return the total (non-negative) number of uncompressed bytes output so far */ - public synchronized long getBytesWritten() { + public long getBytesWritten() { checkStream(); return getBytesWritten(stream); } @@ -250,7 +250,7 @@ public synchronized long getBytesWritten() { * * @return the total (non-negative) number of compressed bytes input so far */ - public synchronized long getBytesRead() { + public long getBytesRead() { checkStream(); return getBytesRead(stream); } @@ -263,7 +263,7 @@ public synchronized long getBytesRead() { * @return the total (non-negative) number of unprocessed bytes in input */ @Override - public synchronized int getRemaining() { + public int getRemaining() { checkStream(); return userBufLen + getRemaining(stream); // userBuf + compressedDirectBuf } @@ -272,7 +272,7 @@ public synchronized int getRemaining() { * Resets everything including the input buffers (user and direct).

*/ @Override - public synchronized void reset() { + public void reset() { checkStream(); reset(stream); finished = false; @@ -284,7 +284,7 @@ public synchronized void reset() { } @Override - public synchronized void end() { + public void end() { if (stream != 0) { end(stream); stream = 0; @@ -372,7 +372,7 @@ public void reset() { private boolean endOfInput; @Override - public synchronized void decompress(ByteBuffer src, ByteBuffer dst) + public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException { assert dst.isDirect() : "dst.isDirect()"; assert src.isDirect() : "src.isDirect()"; @@ -382,13 +382,13 @@ public synchronized void decompress(ByteBuffer src, ByteBuffer dst) } @Override - public synchronized void setDictionary(byte[] b, int off, int len) { + public void setDictionary(byte[] b, int off, int len) { throw new UnsupportedOperationException( "byte[] arrays are not supported for DirectDecompressor"); } @Override - public synchronized int decompress(byte[] b, int off, int len) { + public int decompress(byte[] b, int off, int len) { throw new UnsupportedOperationException( "byte[] arrays are not supported for DirectDecompressor"); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index 551f282889..5cacebfe78 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -19,6 +19,18 @@ import static org.junit.Assert.assertEquals; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.junit.Before; import org.junit.Test; @@ -67,4 +79,79 @@ public void testDecompressorPoolCounts() { assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedDecompressorsCount(codec)); } + + @Test(timeout = 1000) + public void testMultiThreadedCompressorPool() throws InterruptedException { + final int iterations = 4; + ExecutorService threadpool = Executors.newFixedThreadPool(3); + final LinkedBlockingDeque queue = new LinkedBlockingDeque( + 2 * iterations); + + Callable consumer = new Callable() { + @Override + public Boolean call() throws Exception { + Compressor c = queue.take(); + CodecPool.returnCompressor(c); + return c != null; + } + }; + + Callable producer = new Callable() { + @Override + public Boolean call() throws Exception { + Compressor c = CodecPool.getCompressor(codec); + queue.put(c); + return c != null; + } + }; + + for (int i = 0; i < iterations; i++) { + threadpool.submit(consumer); + threadpool.submit(producer); + } + + // wait for completion + threadpool.shutdown(); + threadpool.awaitTermination(1000, TimeUnit.SECONDS); + + assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec)); + } + + @Test(timeout = 1000) + public void testMultiThreadedDecompressorPool() throws InterruptedException { + final int iterations = 4; + ExecutorService threadpool = Executors.newFixedThreadPool(3); + final LinkedBlockingDeque queue = new LinkedBlockingDeque( + 2 * iterations); + + Callable consumer = new Callable() { + @Override + public Boolean call() throws Exception { + Decompressor dc = queue.take(); + CodecPool.returnDecompressor(dc); + return dc != null; + } + }; + + Callable producer = new Callable() { + @Override + public Boolean call() throws Exception { + Decompressor c = CodecPool.getDecompressor(codec); + queue.put(c); + return c != null; + } + }; + + for (int i = 0; i < iterations; i++) { + threadpool.submit(consumer); + threadpool.submit(producer); + } + + // wait for completion + threadpool.shutdown(); + threadpool.awaitTermination(1000, TimeUnit.SECONDS); + + assertEquals(LEASE_COUNT_ERR, 0, + CodecPool.getLeasedDecompressorsCount(codec)); + } }