From b96cc8fdbf016afe7c87903fa5b96e61302588e0 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Tue, 19 Nov 2013 15:38:27 +0000 Subject: [PATCH] HADOOP-10047. Add a direct-buffer based apis for compression. Contributed by Gopal V. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543456 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../hadoop/io/compress/DirectCompressor.java | 69 ++++++++ .../io/compress/DirectDecompressor.java | 71 ++++++++ .../io/compress/zlib/ZlibCompressor.java | 108 ++++++++++++- .../io/compress/zlib/ZlibDecompressor.java | 106 +++++++++++- .../zlib/TestZlibCompressorDecompressor.java | 152 ++++++++++++++++++ 6 files changed, 507 insertions(+), 2 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectCompressor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 2b9aeb88a5..9338eb3fba 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -387,6 +387,9 @@ Release 2.3.0 - UNRELEASED HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn) + HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V via + acmurthy) + BUG FIXES HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectCompressor.java new file mode 100644 index 0000000000..e3f1add235 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectCompressor.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface DirectCompressor extends Compressor { + /** + * Example usage + *
 {@code
+   * private void compress(DirectCompressor comp, ByteBufferProducer in, ByteBufferConsumer out) throws IOException {
+   *    ByteBuffer outBB = ByteBuffer.allocateDirect(64*1024);
+   *    outBB.clear();
+   *    // returns inBB.remaining() > 0 || inBB == null 
+   *    // if you do a inBB.put(), remember to do a inBB.flip()
+   *    ByteBuffer inBB = in.get();       
+   *    while(!comp.finished()) {
+   *      comp.compress(outBB, inBB);
+   *      if(outBB.remaining() == 0) {
+   *        // flush when the buffer only when it is full
+   *        outBB.flip();          
+   *        // has to consume the buffer, because it is reused
+   *        out.put(outBB);
+   *        outBB.clear();
+   *      }
+   *      if(inBB != null && inBB.remaining() == 0) {
+   *        inBB = in.get();
+   *        if(inBB == null) {
+   *          // EOF
+   *          comp.finish();
+   *        }
+   *      }
+   *    }
+   *    
+   *    if(outBB.position() > 0) {
+   *      outBB.flip();
+   *      out.put(outBB);
+   *      outBB.clear();
+   *    }
+   *  }
+   * } 
+ * @param dst Destination {@link ByteBuffer} for storing the results into. Requires dst.remaining() to be > 0 + * @param src Source {@link ByteBuffer} for reading from. This can be null or src.remaining() > 0 + * @return bytes stored into dst + * @throws IOException if compression fails + */ + public int compress(ByteBuffer dst, ByteBuffer src) throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java new file mode 100644 index 0000000000..3707e15dfb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface DirectDecompressor extends Decompressor { + /** + * Example usage + * + *
{@code
+   * private void decompress(DirectDecompressor decomp, ByteBufferProducer in, ByteBufferConsumer out) throws IOException {
+   *    ByteBuffer outBB = ByteBuffer.allocate(64*1024);
+   *    outBB.clear();
+   *    // returns inBB.remaining() > 0 || inBB == null 
+   *    // if you do a inBB.put(), remember to do a inBB.flip()
+   *    ByteBuffer inBB = in.get();
+   *    if(inBB == null) {
+   *      // no data at all?
+   *    }
+   *    while(!decomp.finished()) {
+   *      decomp.decompress(outBB, inBB);
+   *      if(outBB.remaining() == 0) {
+   *        // flush when the buffer is full
+   *        outBB.flip();
+   *        // has to consume the buffer, because it is reused
+   *        out.put(outBB);
+   *        outBB.clear();
+   *      }
+   *      if(inBB != null && inBB.remaining() == 0) {
+   *        // inBB = null for EOF
+   *        inBB = in.get();
+   *      }
+   *    }
+   *    
+   *    if(outBB.position() > 0) {
+   *      outBB.flip();
+   *      out.put(outBB);
+   *      outBB.clear();
+   *    }
+   *  }
+   * }
+ * @param dst Destination {@link ByteBuffer} for storing the results into. Requires dst.remaining() to be > 0 + * @param src Source {@link ByteBuffer} for reading from. This can be null or src.remaining() > 0 + * @return bytes stored into dst (dst.postion += more) + * @throws IOException if compression fails + */ + public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java index ce7f68dcc7..71d826b421 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.DirectCompressor; import org.apache.hadoop.util.NativeCodeLoader; import org.apache.commons.logging.Log; @@ -35,7 +36,7 @@ * http://www.zlib.net/ * */ -public class ZlibCompressor implements Compressor { +public class ZlibCompressor implements Compressor,DirectCompressor { private static final Log LOG = LogFactory.getLog(ZlibCompressor.class); @@ -420,6 +421,7 @@ public synchronized void reset() { compressedDirectBuf.limit(directBufferSize); compressedDirectBuf.position(directBufferSize); userBufOff = userBufLen = 0; + userBuf = null; } @Override @@ -435,6 +437,110 @@ private void checkStream() { throw new NullPointerException(); } + private int put(ByteBuffer dst, ByteBuffer src) { + // this will lop off data from src[pos:limit] into dst[pos:limit] + int l1 = src.remaining(); + int l2 = dst.remaining(); + int pos1 = src.position(); + int pos2 = dst.position(); + int len = Math.min(l1, l2); + + if (len == 0) { + return 0; + } + + ByteBuffer slice = src.slice(); + slice.limit(len); + dst.put(slice); + src.position(pos1 + len); + return len; + } + + public int compress(ByteBuffer dst, ByteBuffer src) throws IOException { + assert dst.remaining() > 0 : "dst.remaining() == 0"; + int n = 0; + + /* fast path for clean state and direct buffers */ + /* TODO: reset should free userBuf? */ + if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) { + /* + * TODO: fix these assumptions in inflateDirect(), eventually by allowing + * it to read position()/limit() directly + */ + boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.capacity() >= directBufferSize); + boolean cleanState = (keepUncompressedBuf == false && uncompressedDirectBufLen == 0 && compressedDirectBuf.remaining() == 0); + /* use the buffers directly */ + if(cleanDst && cleanState) { + Buffer originalCompressed = compressedDirectBuf; + Buffer originalUncompressed = uncompressedDirectBuf; + int originalBufferSize = directBufferSize; + uncompressedDirectBuf = src; + uncompressedDirectBufOff = src.position(); + uncompressedDirectBufLen = src.remaining(); + compressedDirectBuf = dst; + directBufferSize = dst.remaining(); + // Compress data + n = deflateBytesDirect(); + // we move dst.position() forward, not limit() + // unlike the local buffer case, which moves it when we put() into the dst + dst.position(n); + if(uncompressedDirectBufLen > 0) { + src.position(uncompressedDirectBufOff); + } else { + src.position(src.limit()); + } + compressedDirectBuf = originalCompressed; + uncompressedDirectBuf = originalUncompressed; + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + directBufferSize = originalBufferSize; + return n; + } + } + + // Check if there is compressed data + if (compressedDirectBuf.remaining() > 0) { + n = put(dst, (ByteBuffer) compressedDirectBuf); + } + + if (dst.remaining() == 0) { + return n; + } else { + needsInput(); + + // if we have drained userBuf, read from src (ideally, do not mix buffer + // modes, but sometimes you can) + if (userBufLen == 0 && src != null && src.remaining() > 0) { + put((ByteBuffer) uncompressedDirectBuf, src); + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + } + + // Re-initialize the zlib's output direct buffer + compressedDirectBuf.rewind(); + compressedDirectBuf.limit(directBufferSize); + + // Compress data + int more = deflateBytesDirect(); + + compressedDirectBuf.limit(more); + + // Check if zlib consumed all input buffer + // set keepUncompressedBuf properly + if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer + keepUncompressedBuf = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + } else { // zlib did not consume all input buffer + keepUncompressedBuf = true; + } + + // fill the dst buffer from compressedDirectBuf + int fill = put(dst, ((ByteBuffer) compressedDirectBuf)); + return n + fill; + } + } + private native static void initIDs(); private native static long init(int level, int strategy, int windowBits); private native static void setDictionary(long strm, byte[] b, int off, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java index ba67571998..42e0306fea 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; import org.apache.hadoop.util.NativeCodeLoader; /** @@ -31,7 +32,7 @@ * http://www.zlib.net/ * */ -public class ZlibDecompressor implements Decompressor { +public class ZlibDecompressor implements Decompressor,DirectDecompressor { private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; // HACK - Use this as a global lock in the JNI layer @@ -280,6 +281,7 @@ public synchronized void reset() { uncompressedDirectBuf.limit(directBufferSize); uncompressedDirectBuf.position(directBufferSize); userBufOff = userBufLen = 0; + userBuf = null; } @Override @@ -299,6 +301,108 @@ private void checkStream() { if (stream == 0) throw new NullPointerException(); } + + private int put(ByteBuffer dst, ByteBuffer src) { + // this will lop off data from src[pos:limit] into dst[pos:limit], using the + // min() of both remaining() + int l1 = src.remaining(); + int l2 = dst.remaining(); + int pos1 = src.position(); + int pos2 = dst.position(); + int len = Math.min(l1, l2); + + if (len == 0) { + return 0; + } + + ByteBuffer slice = src.slice(); + slice.limit(len); + dst.put(slice); + src.position(pos1 + len); + return len; + } + + public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException { + assert dst.remaining() > 0 : "dst.remaining == 0"; + int n = 0; + + /* fast path for clean state and direct buffers */ + if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) { + /* + * TODO: fix these assumptions in inflateDirect(), eventually by allowing + * it to read position()/limit() directly + */ + boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.remaining() >= directBufferSize); + boolean cleanState = (compressedDirectBufLen == 0 && uncompressedDirectBuf.remaining() == 0); + /* use the buffers directly */ + if(cleanDst && cleanState) { + Buffer originalCompressed = compressedDirectBuf; + Buffer originalUncompressed = uncompressedDirectBuf; + int originalBufferSize = directBufferSize; + compressedDirectBuf = src; + compressedDirectBufOff = src.position(); + compressedDirectBufLen = src.remaining(); + uncompressedDirectBuf = dst; + directBufferSize = dst.remaining(); + // Compress data + n = inflateBytesDirect(); + dst.position(n); + if(compressedDirectBufLen > 0) { + src.position(compressedDirectBufOff); + } else { + src.position(src.limit()); + } + compressedDirectBuf = originalCompressed; + uncompressedDirectBuf = originalUncompressed; + compressedDirectBufOff = 0; + compressedDirectBufLen = 0; + directBufferSize = originalBufferSize; + return n; + } + } + + // Check if there is compressed data + if (uncompressedDirectBuf.remaining() > 0) { + n = put(dst, (ByteBuffer) uncompressedDirectBuf); + } + + if (dst.remaining() == 0) { + return n; + } else { + if (needsInput()) { + // this does not update buffers if we have no userBuf + if (userBufLen <= 0) { + compressedDirectBufOff = 0; + compressedDirectBufLen = 0; + compressedDirectBuf.rewind().limit(directBufferSize); + } + if (src != null) { + assert src.remaining() > 0 : "src.remaining() == 0"; + } + } + + // if we have drained userBuf, read from src (ideally, do not mix buffer + // modes, but sometimes you can) + if (userBufLen == 0 && src != null && src.remaining() > 0) { + compressedDirectBufLen += put(((ByteBuffer) compressedDirectBuf), src); + } + + // Re-initialize the zlib's output direct buffer + uncompressedDirectBuf.rewind(); + uncompressedDirectBuf.limit(directBufferSize); + + // Compress data + int more = inflateBytesDirect(); + + uncompressedDirectBuf.limit(more); + + // Get atmost 'len' bytes + int fill = put(dst, ((ByteBuffer) uncompressedDirectBuf)); + return n + fill; + } + } + + private native static void initIDs(); private native static long init(int windowBits); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java index 6e792d1e4e..8454b4038d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java @@ -19,8 +19,13 @@ import static org.junit.Assert.*; import static org.junit.Assume.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Console; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.Random; import org.apache.hadoop.conf.Configuration; @@ -33,8 +38,12 @@ import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; +import org.apache.log4j.ConsoleAppender; import org.junit.Before; import org.junit.Test; + +import sun.util.logging.resources.logging; + import com.google.common.collect.ImmutableSet; public class TestZlibCompressorDecompressor { @@ -150,6 +159,149 @@ public void testZlibCompressDecompress() { } } + private void compressDecompressLoop(int rawDataSize, int inSize, int outSize) + throws IOException { + byte[] rawData = null; + rawData = generate(rawDataSize); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteBuffer inBuf = ByteBuffer.allocateDirect(inSize); + ByteBuffer outBuf = ByteBuffer.allocateDirect(outSize); + ZlibCompressor compressor = new ZlibCompressor(); + ZlibDecompressor decompressor = new ZlibDecompressor(); + outBuf.clear(); + /* compression loop */ + int off = 0; + int len = rawDataSize; + int min = Math.min(inBuf.remaining(), len); + if (min > 0) { + inBuf.put(rawData, off, min); + } + inBuf.flip(); + len -= min; + off += min; + while (!compressor.finished()) { + compressor.compress(outBuf, inBuf); + if (outBuf.remaining() == 0) { + // flush when the buffer is full + outBuf.flip(); + while (outBuf.remaining() > 0) { + baos.write(outBuf.get()); + } + outBuf.clear(); + } + if (inBuf != null && inBuf.remaining() == 0) { + inBuf.clear(); + if (len > 0) { + min = Math.min(inBuf.remaining(), len); + inBuf.put(rawData, off, min); + inBuf.flip(); + len -= min; + off += min; + } else { + inBuf = null; + compressor.finish(); + } + } + } + + outBuf.flip(); + if (outBuf.remaining() > 0) { + while (outBuf.remaining() > 0) { + baos.write(outBuf.get()); + } + outBuf.clear(); + } + + compressor.end(); + + byte[] compressed = baos.toByteArray(); + ByteBuffer expected = ByteBuffer.wrap(rawData); + outBuf.clear(); + inBuf = ByteBuffer.allocateDirect(inSize); + inBuf.clear(); + + // zlib always has header + if (compressed.length != 0) { + off = 0; + len = compressed.length; + min = Math.min(inBuf.remaining(), len); + inBuf.put(compressed, off, min); + inBuf.flip(); + len -= min; + off += min; + while (!decompressor.finished()) { + decompressor.decompress(outBuf, inBuf); + if (outBuf.remaining() == 0) { + outBuf.flip(); + while (outBuf.remaining() > 0) { + assertEquals(expected.get(), outBuf.get()); + } + outBuf.clear(); + } + + if (inBuf != null && inBuf.remaining() == 0) { + inBuf.clear(); + if (len > 0) { + min = Math.min(inBuf.remaining(), len); + inBuf.put(compressed, off, min); + inBuf.flip(); + len -= min; + off += min; + } + } + } + } + + outBuf.flip(); + if (outBuf.remaining() > 0) { + while (outBuf.remaining() > 0) { + assertEquals(expected.get(), outBuf.get()); + } + outBuf.clear(); + } + + assertEquals(0, expected.remaining()); + } + + @Test + public void testZlibDirectCompressDecompress() { + int[] size = { 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 256 * 1024, + 1024 * 1024 }; + try { + // 0-2 bytes results in sizeof(outBuf) > sizeof(inBuf) + compressDecompressLoop(0, 4096, 4096); + compressDecompressLoop(0, 1, 1); + compressDecompressLoop(1, 1, 2); + compressDecompressLoop(1, 2, 1); + compressDecompressLoop(2, 3, 2); + + for (int i = 0; i < size.length; i++) { + compressDecompressLoop(size[i], 4096, 4096); + compressDecompressLoop(size[i], 1, 1); + compressDecompressLoop(size[i], 1, 2); + compressDecompressLoop(size[i], 2, 1); + compressDecompressLoop(size[i], 3, 2); + compressDecompressLoop(size[i], size[i], 4096); + compressDecompressLoop(size[i], size[i] - 1, 4096); + compressDecompressLoop(size[i], size[i] + 1, 4096); + compressDecompressLoop(size[i], 4096, size[i]); + compressDecompressLoop(size[i], 4096, size[i] - 1); + compressDecompressLoop(size[i], 4096, size[i] + 1); + compressDecompressLoop(size[i], size[i] - 1, size[i] - 1); + + compressDecompressLoop(size[i], size[i] / 2, 4096); + compressDecompressLoop(size[i], size[i] / 2 - 1, 4096); + compressDecompressLoop(size[i], size[i] / 2 + 1, 4096); + compressDecompressLoop(size[i], 4096, size[i] / 2); + compressDecompressLoop(size[i], 4096, size[i] / 2 - 1); + compressDecompressLoop(size[i], 4096, size[i] / 2 + 1); + compressDecompressLoop(size[i], size[i] / 2 - 1, size[i] / 2 - 1); + } + } catch (IOException ex) { + fail("testZlibDirectCompressDecompress ex !!!" + ex); + } + } + @Test public void testZlibCompressorDecompressorSetDictionary() { Configuration conf = new Configuration();