diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a2c6ef25dd..03f2267c79 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -1508,6 +1508,9 @@ Release 0.23.7 - UNRELEASED OPTIMIZATIONS + HADOOP-8462. Native-code implementation of bzip2 codec. (Govind Kamat via + jlowe) + BUG FIXES HADOOP-9302. HDFS docs not linked from top level (Andy Isaacson via diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index 9113349766..373ba7cff3 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -464,6 +464,7 @@ false + false @@ -507,6 +508,8 @@ org.apache.hadoop.io.compress.zlib.ZlibCompressor org.apache.hadoop.io.compress.zlib.ZlibDecompressor + org.apache.hadoop.io.compress.bzip2.Bzip2Compressor + org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor org.apache.hadoop.security.JniBasedUnixGroupsMapping org.apache.hadoop.io.nativeio.NativeIO org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping @@ -532,7 +535,7 @@ - + diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt index 051337e2e2..7449610b9a 100644 --- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt +++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt @@ -97,6 +97,23 @@ set(T main/native/src/test/org/apache/hadoop) GET_FILENAME_COMPONENT(HADOOP_ZLIB_LIBRARY ${ZLIB_LIBRARIES} NAME) +SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES) +set_find_shared_library_version("1") +find_package(BZip2 QUIET) +if (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES) + GET_FILENAME_COMPONENT(HADOOP_BZIP2_LIBRARY ${BZIP2_LIBRARIES} NAME) + set(BZIP2_SOURCE_FILES + "${D}/io/compress/bzip2/Bzip2Compressor.c" + "${D}/io/compress/bzip2/Bzip2Decompressor.c") +else (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES) + set(BZIP2_SOURCE_FILES "") + set(BZIP2_INCLUDE_DIR "") + IF(REQUIRE_BZIP2) + MESSAGE(FATAL_ERROR "Required bzip2 library and/or header files could not be found.") + ENDIF(REQUIRE_BZIP2) +endif (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES) +SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES) + INCLUDE(CheckFunctionExists) INCLUDE(CheckCSourceCompiles) INCLUDE(CheckLibraryExists) @@ -136,6 +153,7 @@ include_directories( ${CMAKE_BINARY_DIR} ${JNI_INCLUDE_DIRS} ${ZLIB_INCLUDE_DIRS} + ${BZIP2_INCLUDE_DIR} ${SNAPPY_INCLUDE_DIR} ${D}/util ) @@ -155,6 +173,7 @@ add_dual_library(hadoop ${SNAPPY_SOURCE_FILES} ${D}/io/compress/zlib/ZlibCompressor.c ${D}/io/compress/zlib/ZlibDecompressor.c + ${BZIP2_SOURCE_FILES} ${D}/io/nativeio/NativeIO.c ${D}/io/nativeio/errno_enum.c ${D}/io/nativeio/file_descriptor.c diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake index e720d30657..020017c02f 100644 --- a/hadoop-common-project/hadoop-common/src/config.h.cmake +++ b/hadoop-common-project/hadoop-common/src/config.h.cmake @@ -19,6 +19,7 @@ #define CONFIG_H #cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@" +#cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@" #cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@" #cmakedefine HAVE_SYNC_FILE_RANGE #cmakedefine HAVE_POSIX_FADVISE diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java index 35f7cb43ea..42e96cfdc5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -23,108 +23,156 @@ import java.io.InputStream; import java.io.OutputStream; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.compress.bzip2.BZip2Constants; -import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor; -import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor; import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; +import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; /** - * This class provides CompressionOutputStream and CompressionInputStream for - * compression and decompression. Currently we dont have an implementation of - * the Compressor and Decompressor interfaces, so those methods of - * CompressionCodec which have a Compressor or Decompressor type argument, throw - * UnsupportedOperationException. + * This class provides output and input streams for bzip2 compression + * and decompression. It uses the native bzip2 library on the system + * if possible, else it uses a pure-Java implementation of the bzip2 + * algorithm. The configuration parameter + * io.compression.codec.bzip2.library can be used to control this + * behavior. + * + * In the pure-Java mode, the Compressor and Decompressor interfaces + * are not implemented. Therefore, in that mode, those methods of + * CompressionCodec which have a Compressor or Decompressor type + * argument, throw UnsupportedOperationException. + * + * Currently, support for splittability is available only in the + * pure-Java mode; therefore, if a SplitCompressionInputStream is + * requested, the pure-Java implementation is used, regardless of the + * setting of the configuration parameter mentioned above. */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class BZip2Codec implements SplittableCompressionCodec { +public class BZip2Codec implements Configurable, SplittableCompressionCodec { private static final String HEADER = "BZ"; private static final int HEADER_LEN = HEADER.length(); private static final String SUB_HEADER = "h9"; private static final int SUB_HEADER_LEN = SUB_HEADER.length(); + private Configuration conf; + /** - * Creates a new instance of BZip2Codec + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this objec. + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Creates a new instance of BZip2Codec. */ public BZip2Codec() { } /** - * Creates CompressionOutputStream for BZip2 - * - * @param out - * The output Stream - * @return The BZip2 CompressionOutputStream - * @throws java.io.IOException - * Throws IO exception - */ + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to, to have it + * compressed + * @throws IOException + */ @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return new BZip2CompressionOutputStream(out); + return createOutputStream(out, createCompressor()); } /** - * Creates a compressor using given OutputStream. + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. * - * @return CompressionOutputStream - @throws java.io.IOException + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to, to have it + * compressed + * @throws IOException */ @Override public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException { - return createOutputStream(out); + return Bzip2Factory.isNativeBzip2Loaded(conf) ? + new CompressorStream(out, compressor, + conf.getInt("io.file.buffer.size", 4*1024)) : + new BZip2CompressionOutputStream(out); } /** - * This functionality is currently not supported. - * - * @return BZip2DummyCompressor.class - */ + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ @Override - public Class getCompressorType() { - return BZip2DummyCompressor.class; + public Class getCompressorType() { + return Bzip2Factory.getBzip2CompressorType(conf); } /** - * This functionality is currently not supported. - * - * @return Compressor - */ + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ @Override public Compressor createCompressor() { - return new BZip2DummyCompressor(); + return Bzip2Factory.getBzip2Compressor(conf); } /** - * Creates CompressionInputStream to be used to read off uncompressed data. - * - * @param in - * The InputStream - * @return Returns CompressionInputStream for BZip2 - * @throws java.io.IOException - * Throws IOException - */ + * Create a {@link CompressionInputStream} that will read from the given + * input stream and return a stream for uncompressed data. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - return new BZip2CompressionInputStream(in); + return createInputStream(in, createDecompressor()); } /** - * This functionality is currently not supported. - * - * @return CompressionInputStream - */ + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}, and return a + * stream for uncompressed data. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ @Override public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException { - return createInputStream(in); + return Bzip2Factory.isNativeBzip2Loaded(conf) ? + new DecompressorStream(in, decompressor, + conf.getInt("io.file.buffer.size", 4*1024)) : + new BZip2CompressionInputStream(in); } /** @@ -139,7 +187,6 @@ public CompressionInputStream createInputStream(InputStream in, * * @return CompressionInputStream for BZip2 aligned at block boundaries */ - @Override public SplitCompressionInputStream createInputStream(InputStream seekableIn, Decompressor decompressor, long start, long end, READ_MODE readMode) throws IOException { @@ -184,23 +231,23 @@ public SplitCompressionInputStream createInputStream(InputStream seekableIn, } /** - * This functionality is currently not supported. - * - * @return BZip2DummyDecompressor.class - */ + * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ @Override - public Class getDecompressorType() { - return BZip2DummyDecompressor.class; + public Class getDecompressorType() { + return Bzip2Factory.getBzip2DecompressorType(conf); } /** - * This functionality is currently not supported. - * - * @return Decompressor - */ + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ @Override public Decompressor createDecompressor() { - return new BZip2DummyDecompressor(); + return Bzip2Factory.getBzip2Decompressor(conf); } /** @@ -236,7 +283,6 @@ private void writeStreamHeader() throws IOException { } } - @Override public void finish() throws IOException { if (needsReset) { // In the case that nothing is written to this stream, we still need to @@ -256,14 +302,12 @@ private void internalReset() throws IOException { } } - @Override public void resetState() throws IOException { // Cannot write to out at this point because out might not be ready // yet, as in SequenceFile.Writer implementation. needsReset = true; } - @Override public void write(int b) throws IOException { if (needsReset) { internalReset(); @@ -271,7 +315,6 @@ public void write(int b) throws IOException { this.output.write(b); } - @Override public void write(byte[] b, int off, int len) throws IOException { if (needsReset) { internalReset(); @@ -279,7 +322,6 @@ public void write(byte[] b, int off, int len) throws IOException { this.output.write(b, off, len); } - @Override public void close() throws IOException { if (needsReset) { // In the case that nothing is written to this stream, we still need to @@ -397,7 +439,6 @@ private BufferedInputStream readStreamHeader() throws IOException { }// end of method - @Override public void close() throws IOException { if (!needsReset) { input.close(); @@ -433,7 +474,6 @@ public void close() throws IOException { * */ - @Override public int read(byte[] b, int off, int len) throws IOException { if (needsReset) { internalReset(); @@ -457,7 +497,6 @@ public int read(byte[] b, int off, int len) throws IOException { } - @Override public int read() throws IOException { byte b[] = new byte[1]; int result = this.read(b, 0, 1); @@ -472,7 +511,6 @@ private void internalReset() throws IOException { } } - @Override public void resetState() throws IOException { // Cannot read from bufferedIn at this point because bufferedIn // might not be ready @@ -480,7 +518,6 @@ public void resetState() throws IOException { needsReset = true; } - @Override public long getPos() { return this.compressedStreamPosition; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java new file mode 100644 index 0000000000..6f50247613 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.bzip2; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.Compressor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A {@link Compressor} based on the popular + * bzip2 compression algorithm. + * http://www.bzip2.org/ + * + */ +public class Bzip2Compressor implements Compressor { + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; + + // The default values for the block size and work factor are the same + // those in Julian Seward's original bzip2 implementation. + static final int DEFAULT_BLOCK_SIZE = 9; + static final int DEFAULT_WORK_FACTOR = 30; + + private static final Log LOG = LogFactory.getLog(Bzip2Compressor.class); + + // HACK - Use this as a global lock in the JNI layer. + private static Class clazz = Bzip2Compressor.class; + + private long stream; + private int blockSize; + private int workFactor; + private int directBufferSize; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private Buffer uncompressedDirectBuf = null; + private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0; + private boolean keepUncompressedBuf = false; + private Buffer compressedDirectBuf = null; + private boolean finish, finished; + + /** + * Creates a new compressor with a default values for the + * compression block size and work factor. Compressed data will be + * generated in bzip2 format. + */ + public Bzip2Compressor() { + this(DEFAULT_BLOCK_SIZE, DEFAULT_WORK_FACTOR, DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Creates a new compressor, taking settings from the configuration. + */ + public Bzip2Compressor(Configuration conf) { + this(Bzip2Factory.getBlockSize(conf), + Bzip2Factory.getWorkFactor(conf), + DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Creates a new compressor using the specified block size. + * Compressed data will be generated in bzip2 format. + * + * @param blockSize The block size to be used for compression. This is + * an integer from 1 through 9, which is multiplied by 100,000 to + * obtain the actual block size in bytes. + * @param workFactor This parameter is a threshold that determines when a + * fallback algorithm is used for pathological data. It ranges from + * 0 to 250. + * @param directBufferSize Size of the direct buffer to be used. + */ + public Bzip2Compressor(int blockSize, int workFactor, + int directBufferSize) { + this.blockSize = blockSize; + this.workFactor = workFactor; + this.directBufferSize = directBufferSize; + stream = init(blockSize, workFactor); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf.position(directBufferSize); + } + + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration. It will reset the compressor's block size and + * and work factor. + * + * @param conf Configuration storing new settings + */ + @Override + public synchronized void reinit(Configuration conf) { + reset(); + end(stream); + if (conf == null) { + stream = init(blockSize, workFactor); + return; + } + blockSize = Bzip2Factory.getBlockSize(conf); + workFactor = Bzip2Factory.getWorkFactor(conf); + stream = init(blockSize, workFactor); + if(LOG.isDebugEnabled()) { + LOG.debug("Reinit compressor with new compression configuration"); + } + } + + @Override + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + uncompressedDirectBufOff = 0; + setInputFromSavedData(); + + // Reinitialize bzip2's output direct buffer. + compressedDirectBuf.limit(directBufferSize); + compressedDirectBuf.position(directBufferSize); + } + + // Copy enough data from userBuf to uncompressedDirectBuf. + synchronized void setInputFromSavedData() { + int len = Math.min(userBufLen, uncompressedDirectBuf.remaining()); + ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len); + userBufLen -= len; + userBufOff += len; + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + } + + @Override + public synchronized void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized boolean needsInput() { + // Compressed data still available? + if (compressedDirectBuf.remaining() > 0) { + return false; + } + + // Uncompressed data available in either the direct buffer or user buffer? + if (keepUncompressedBuf && uncompressedDirectBufLen > 0) + return false; + + if (uncompressedDirectBuf.remaining() > 0) { + // Check if we have consumed all data in the user buffer. + if (userBufLen <= 0) { + return true; + } else { + // Copy enough data from userBuf to uncompressedDirectBuf. + setInputFromSavedData(); + return uncompressedDirectBuf.remaining() > 0; + } + } + + return false; + } + + @Override + public synchronized void finish() { + finish = true; + } + + @Override + public synchronized boolean finished() { + // Check if bzip2 says it has finished and + // all compressed data has been consumed. + return (finished && compressedDirectBuf.remaining() == 0); + } + + @Override + public synchronized int compress(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check if there is compressed data. + int n = compressedDirectBuf.remaining(); + if (n > 0) { + n = Math.min(n, len); + ((ByteBuffer)compressedDirectBuf).get(b, off, n); + return n; + } + + // Re-initialize bzip2's output direct buffer. + compressedDirectBuf.rewind(); + compressedDirectBuf.limit(directBufferSize); + + // Compress the data. + n = deflateBytesDirect(); + compressedDirectBuf.limit(n); + + // Check if bzip2 has consumed the entire input buffer. + // Set keepUncompressedBuf properly. + if (uncompressedDirectBufLen <= 0) { // bzip2 consumed all input + keepUncompressedBuf = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + } else { + keepUncompressedBuf = true; + } + + // Get at most 'len' bytes. + n = Math.min(n, len); + ((ByteBuffer)compressedDirectBuf).get(b, off, n); + + return n; + } + + /** + * Returns the total number of compressed bytes output so far. + * + * @return the total (non-negative) number of compressed bytes output so far + */ + @Override + public synchronized long getBytesWritten() { + checkStream(); + return getBytesWritten(stream); + } + + /** + * Returns the total number of uncompressed bytes input so far.

+ * + * @return the total (non-negative) number of uncompressed bytes input so far + */ + @Override + public synchronized long getBytesRead() { + checkStream(); + return getBytesRead(stream); + } + + @Override + public synchronized void reset() { + checkStream(); + end(stream); + stream = init(blockSize, workFactor); + finish = false; + finished = false; + uncompressedDirectBuf.rewind(); + uncompressedDirectBufOff = uncompressedDirectBufLen = 0; + keepUncompressedBuf = false; + compressedDirectBuf.limit(directBufferSize); + compressedDirectBuf.position(directBufferSize); + userBufOff = userBufLen = 0; + } + + @Override + public synchronized void end() { + if (stream != 0) { + end(stream); + stream = 0; + } + } + + static void initSymbols(String libname) { + initIDs(libname); + } + + private void checkStream() { + if (stream == 0) + throw new NullPointerException(); + } + + private native static void initIDs(String libname); + private native static long init(int blockSize, int workFactor); + private native int deflateBytesDirect(); + private native static long getBytesRead(long strm); + private native static long getBytesWritten(long strm); + private native static void end(long strm); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java new file mode 100644 index 0000000000..672090209d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.bzip2; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.compress.Decompressor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A {@link Decompressor} based on the popular + * bzip2 compression algorithm. + * http://www.bzip2.org/ + * + */ +public class Bzip2Decompressor implements Decompressor { + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; + + private static final Log LOG = LogFactory.getLog(Bzip2Decompressor.class); + + // HACK - Use this as a global lock in the JNI layer. + private static Class clazz = Bzip2Decompressor.class; + + private long stream; + private boolean conserveMemory; + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int compressedDirectBufOff, compressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finished; + + /** + * Creates a new decompressor. + */ + public Bzip2Decompressor(boolean conserveMemory, int directBufferSize) { + this.conserveMemory = conserveMemory; + this.directBufferSize = directBufferSize; + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + + stream = init(conserveMemory ? 1 : 0); + } + + public Bzip2Decompressor() { + this(false, DEFAULT_DIRECT_BUFFER_SIZE); + } + + @Override + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + + setInputFromSavedData(); + + // Reinitialize bzip2's output direct buffer. + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + synchronized void setInputFromSavedData() { + compressedDirectBufOff = 0; + compressedDirectBufLen = userBufLen; + if (compressedDirectBufLen > directBufferSize) { + compressedDirectBufLen = directBufferSize; + } + + // Reinitialize bzip2's input direct buffer. + compressedDirectBuf.rewind(); + ((ByteBuffer)compressedDirectBuf).put(userBuf, userBufOff, + compressedDirectBufLen); + + // Note how much data is being fed to bzip2. + userBufOff += compressedDirectBufLen; + userBufLen -= compressedDirectBufLen; + } + + @Override + public synchronized void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized boolean needsInput() { + // Consume remaining compressed data? + if (uncompressedDirectBuf.remaining() > 0) { + return false; + } + + // Check if bzip2 has consumed all input. + if (compressedDirectBufLen <= 0) { + // Check if we have consumed all user-input. + if (userBufLen <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + + return false; + } + + @Override + public synchronized boolean needsDictionary() { + return false; + } + + @Override + public synchronized boolean finished() { + // Check if bzip2 says it has finished and + // all compressed data has been consumed. + return (finished && uncompressedDirectBuf.remaining() == 0); + } + + @Override + public synchronized int decompress(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check if there is uncompressed data. + int n = uncompressedDirectBuf.remaining(); + if (n > 0) { + n = Math.min(n, len); + ((ByteBuffer)uncompressedDirectBuf).get(b, off, n); + return n; + } + + // Re-initialize bzip2's output direct buffer. + uncompressedDirectBuf.rewind(); + uncompressedDirectBuf.limit(directBufferSize); + + // Decompress the data. + n = finished ? 0 : inflateBytesDirect(); + uncompressedDirectBuf.limit(n); + + // Get at most 'len' bytes. + n = Math.min(n, len); + ((ByteBuffer)uncompressedDirectBuf).get(b, off, n); + + return n; + } + + /** + * Returns the total number of uncompressed bytes output so far. + * + * @return the total (non-negative) number of uncompressed bytes output so far + */ + public synchronized long getBytesWritten() { + checkStream(); + return getBytesWritten(stream); + } + + /** + * Returns the total number of compressed bytes input so far.

+ * + * @return the total (non-negative) number of compressed bytes input so far + */ + public synchronized long getBytesRead() { + checkStream(); + return getBytesRead(stream); + } + + /** + * Returns the number of bytes remaining in the input buffers; normally + * called when finished() is true to determine amount of post-gzip-stream + * data.

+ * + * @return the total (non-negative) number of unprocessed bytes in input + */ + @Override + public synchronized int getRemaining() { + checkStream(); + return userBufLen + getRemaining(stream); // userBuf + compressedDirectBuf + } + + /** + * Resets everything including the input buffers (user and direct).

+ */ + @Override + public synchronized void reset() { + checkStream(); + end(stream); + stream = init(conserveMemory ? 1 : 0); + finished = false; + compressedDirectBufOff = compressedDirectBufLen = 0; + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + userBufOff = userBufLen = 0; + } + + @Override + public synchronized void end() { + if (stream != 0) { + end(stream); + stream = 0; + } + } + + static void initSymbols(String libname) { + initIDs(libname); + } + + private void checkStream() { + if (stream == 0) + throw new NullPointerException(); + } + + private native static void initIDs(String libname); + private native static long init(int conserveMemory); + private native int inflateBytesDirect(); + private native static long getBytesRead(long strm); + private native static long getBytesWritten(long strm); + private native static int getRemaining(long strm); + private native static void end(long strm); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Factory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Factory.java new file mode 100644 index 0000000000..80dc4e93ba --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Factory.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.bzip2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.util.NativeCodeLoader; + +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.bzip2.Bzip2Compressor; +import org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor; +import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor; +import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor; + +/** + * A collection of factories to create the right + * bzip2 compressor/decompressor instances. + * + */ +public class Bzip2Factory { + private static final Log LOG = LogFactory.getLog(Bzip2Factory.class); + + private static String bzip2LibraryName = ""; + private static boolean nativeBzip2Loaded; + + /** + * Check if native-bzip2 code is loaded & initialized correctly and + * can be loaded for this job. + * + * @param conf configuration + * @return true if native-bzip2 is loaded & initialized + * and can be loaded for this job, else false + */ + public static boolean isNativeBzip2Loaded(Configuration conf) { + String libname = conf.get("io.compression.codec.bzip2.library", + "system-native"); + if (!bzip2LibraryName.equals(libname)) { + nativeBzip2Loaded = false; + bzip2LibraryName = libname; + if (libname.equals("java-builtin")) { + LOG.info("Using pure-Java version of bzip2 library"); + } else if (conf.getBoolean( + CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, + CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT) && + NativeCodeLoader.isNativeCodeLoaded()) { + try { + // Initialize the native library. + Bzip2Compressor.initSymbols(libname); + Bzip2Decompressor.initSymbols(libname); + nativeBzip2Loaded = true; + LOG.info("Successfully loaded & initialized native-bzip2 library " + + libname); + } catch (Throwable t) { + LOG.warn("Failed to load/initialize native-bzip2 library " + + libname + ", will use pure-Java version"); + } + } + } + return nativeBzip2Loaded; + } + + /** + * Return the appropriate type of the bzip2 compressor. + * + * @param conf configuration + * @return the appropriate type of the bzip2 compressor. + */ + public static Class + getBzip2CompressorType(Configuration conf) { + return isNativeBzip2Loaded(conf) ? + Bzip2Compressor.class : BZip2DummyCompressor.class; + } + + /** + * Return the appropriate implementation of the bzip2 compressor. + * + * @param conf configuration + * @return the appropriate implementation of the bzip2 compressor. + */ + public static Compressor getBzip2Compressor(Configuration conf) { + return isNativeBzip2Loaded(conf)? + new Bzip2Compressor(conf) : new BZip2DummyCompressor(); + } + + /** + * Return the appropriate type of the bzip2 decompressor. + * + * @param conf configuration + * @return the appropriate type of the bzip2 decompressor. + */ + public static Class + getBzip2DecompressorType(Configuration conf) { + return isNativeBzip2Loaded(conf) ? + Bzip2Decompressor.class : BZip2DummyDecompressor.class; + } + + /** + * Return the appropriate implementation of the bzip2 decompressor. + * + * @param conf configuration + * @return the appropriate implementation of the bzip2 decompressor. + */ + public static Decompressor getBzip2Decompressor(Configuration conf) { + return isNativeBzip2Loaded(conf) ? + new Bzip2Decompressor() : new BZip2DummyDecompressor(); + } + + public static void setBlockSize(Configuration conf, int blockSize) { + conf.setInt("bzip2.compress.blocksize", blockSize); + } + + public static int getBlockSize(Configuration conf) { + return conf.getInt("bzip2.compress.blocksize", + Bzip2Compressor.DEFAULT_BLOCK_SIZE); + } + + public static void setWorkFactor(Configuration conf, int workFactor) { + conf.setInt("bzip2.compress.workfactor", workFactor); + } + + public static int getWorkFactor(Configuration conf) { + return conf.getInt("bzip2.compress.workfactor", + Bzip2Compressor.DEFAULT_WORK_FACTOR); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c new file mode 100644 index 0000000000..8d0b005ab8 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "org_apache_hadoop_io_compress_bzip2.h" +#include "org_apache_hadoop_io_compress_bzip2_Bzip2Compressor.h" + +static jfieldID Bzip2Compressor_clazz; +static jfieldID Bzip2Compressor_stream; +static jfieldID Bzip2Compressor_uncompressedDirectBuf; +static jfieldID Bzip2Compressor_uncompressedDirectBufOff; +static jfieldID Bzip2Compressor_uncompressedDirectBufLen; +static jfieldID Bzip2Compressor_compressedDirectBuf; +static jfieldID Bzip2Compressor_directBufferSize; +static jfieldID Bzip2Compressor_finish; +static jfieldID Bzip2Compressor_finished; + +static int (*dlsym_BZ2_bzCompressInit)(bz_stream*, int, int, int); +static int (*dlsym_BZ2_bzCompress)(bz_stream*, int); +static int (*dlsym_BZ2_bzCompressEnd)(bz_stream*); + +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_initIDs( + JNIEnv *env, jclass class, jstring libname) +{ + const char* bzlib_name = (*env)->GetStringUTFChars(env, libname, NULL); + if (strcmp(bzlib_name, "system-native") == 0) + bzlib_name = HADOOP_BZIP2_LIBRARY; + // Load the native library. + void *libbz2 = dlopen(bzlib_name, RTLD_LAZY | RTLD_GLOBAL); + if (!libbz2) { + THROW(env, "java/lang/UnsatisfiedLinkError", + "Cannot load bzip2 native library"); + return; + } + + // Locate the requisite symbols from libbz2.so. + dlerror(); // Clear any existing error. + LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompressInit, env, libbz2, + "BZ2_bzCompressInit"); + LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompress, env, libbz2, + "BZ2_bzCompress"); + LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompressEnd, env, libbz2, + "BZ2_bzCompressEnd"); + + // Initialize the requisite fieldIds. + Bzip2Compressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", + "Ljava/lang/Class;"); + Bzip2Compressor_stream = (*env)->GetFieldID(env, class, "stream", "J"); + Bzip2Compressor_finish = (*env)->GetFieldID(env, class, "finish", "Z"); + Bzip2Compressor_finished = (*env)->GetFieldID(env, class, "finished", "Z"); + Bzip2Compressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class, + "uncompressedDirectBuf", + "Ljava/nio/Buffer;"); + Bzip2Compressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, class, + "uncompressedDirectBufOff", + "I"); + Bzip2Compressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, class, + "uncompressedDirectBufLen", + "I"); + Bzip2Compressor_compressedDirectBuf = (*env)->GetFieldID(env, class, + "compressedDirectBuf", + "Ljava/nio/Buffer;"); + Bzip2Compressor_directBufferSize = (*env)->GetFieldID(env, class, + "directBufferSize", "I"); +} + +JNIEXPORT jlong JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_init( + JNIEnv *env, jclass class, jint blockSize, jint workFactor) +{ + // Create a bz_stream. + bz_stream *stream = malloc(sizeof(bz_stream)); + if (!stream) { + THROW(env, "java/lang/OutOfMemoryError", NULL); + return (jlong)0; + } + memset((void*)stream, 0, sizeof(bz_stream)); + + // Initialize stream. + int rv = (*dlsym_BZ2_bzCompressInit)(stream, blockSize, 0, workFactor); + if (rv != BZ_OK) { + // Contingency - Report error by throwing appropriate exceptions. + free(stream); + stream = NULL; + + switch (rv) { + case BZ_MEM_ERROR: + { + THROW(env, "java/lang/OutOfMemoryError", NULL); + } + break; + case BZ_PARAM_ERROR: + { + THROW(env, + "java/lang/IllegalArgumentException", + NULL); + } + break; + default: + { + THROW(env, "java/lang/InternalError", NULL); + } + break; + } + } + + return JLONG(stream); +} + +JNIEXPORT jint JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_deflateBytesDirect( + JNIEnv *env, jobject this) +{ + // Get members of Bzip2Compressor. + bz_stream *stream = BZSTREAM((*env)->GetLongField(env, this, + Bzip2Compressor_stream)); + if (!stream) { + THROW(env, "java/lang/NullPointerException", NULL); + return (jint)0; + } + + jobject clazz = (*env)->GetStaticObjectField(env, this, + Bzip2Compressor_clazz); + jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this, + Bzip2Compressor_uncompressedDirectBuf); + jint uncompressed_direct_buf_off = (*env)->GetIntField(env, this, + Bzip2Compressor_uncompressedDirectBufOff); + jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this, + Bzip2Compressor_uncompressedDirectBufLen); + + jobject compressed_direct_buf = (*env)->GetObjectField(env, this, + Bzip2Compressor_compressedDirectBuf); + jint compressed_direct_buf_len = (*env)->GetIntField(env, this, + Bzip2Compressor_directBufferSize); + + jboolean finish = (*env)->GetBooleanField(env, this, + Bzip2Compressor_finish); + + // Get the input and output direct buffers. + LOCK_CLASS(env, clazz, "Bzip2Compressor"); + char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env, + uncompressed_direct_buf); + char* compressed_bytes = (*env)->GetDirectBufferAddress(env, + compressed_direct_buf); + UNLOCK_CLASS(env, clazz, "Bzip2Compressor"); + + if (!uncompressed_bytes || !compressed_bytes) { + return (jint)0; + } + + // Re-calibrate the bz_stream. + stream->next_in = uncompressed_bytes + uncompressed_direct_buf_off; + stream->avail_in = uncompressed_direct_buf_len; + stream->next_out = compressed_bytes; + stream->avail_out = compressed_direct_buf_len; + + // Compress. + int rv = dlsym_BZ2_bzCompress(stream, finish ? BZ_FINISH : BZ_RUN); + + jint no_compressed_bytes = 0; + switch (rv) { + // Contingency? - Report error by throwing appropriate exceptions. + case BZ_STREAM_END: + { + (*env)->SetBooleanField(env, this, + Bzip2Compressor_finished, + JNI_TRUE); + } // cascade + case BZ_RUN_OK: + case BZ_FINISH_OK: + { + uncompressed_direct_buf_off += + uncompressed_direct_buf_len - stream->avail_in; + (*env)->SetIntField(env, this, + Bzip2Compressor_uncompressedDirectBufOff, + uncompressed_direct_buf_off); + (*env)->SetIntField(env, this, + Bzip2Compressor_uncompressedDirectBufLen, + stream->avail_in); + no_compressed_bytes = + compressed_direct_buf_len - stream->avail_out; + } + break; + default: + { + THROW(env, "java/lang/InternalError", NULL); + } + break; + } + + return no_compressed_bytes; +} + +JNIEXPORT jlong JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_getBytesRead( + JNIEnv *env, jclass class, jlong stream) +{ + const bz_stream* strm = BZSTREAM(stream); + return ((jlong)strm->total_in_hi32 << 32) | strm->total_in_lo32; +} + +JNIEXPORT jlong JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_getBytesWritten( + JNIEnv *env, jclass class, jlong stream) +{ + const bz_stream* strm = BZSTREAM(stream); + return ((jlong)strm->total_out_hi32 << 32) | strm->total_out_lo32; +} + +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_end( + JNIEnv *env, jclass class, jlong stream) +{ + if (dlsym_BZ2_bzCompressEnd(BZSTREAM(stream)) != BZ_OK) { + THROW(env, "java/lang/InternalError", NULL); + } else { + free(BZSTREAM(stream)); + } +} + +/** + * vim: sw=2: ts=2: et: + */ + diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c new file mode 100644 index 0000000000..b6c5213524 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "org_apache_hadoop_io_compress_bzip2.h" +#include "org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor.h" + +static jfieldID Bzip2Decompressor_clazz; +static jfieldID Bzip2Decompressor_stream; +static jfieldID Bzip2Decompressor_compressedDirectBuf; +static jfieldID Bzip2Decompressor_compressedDirectBufOff; +static jfieldID Bzip2Decompressor_compressedDirectBufLen; +static jfieldID Bzip2Decompressor_uncompressedDirectBuf; +static jfieldID Bzip2Decompressor_directBufferSize; +static jfieldID Bzip2Decompressor_finished; + +static int (*dlsym_BZ2_bzDecompressInit)(bz_stream*, int, int); +static int (*dlsym_BZ2_bzDecompress)(bz_stream*); +static int (*dlsym_BZ2_bzDecompressEnd)(bz_stream*); + +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_initIDs( + JNIEnv *env, jclass class, jstring libname) +{ + const char* bzlib_name = (*env)->GetStringUTFChars(env, libname, NULL); + if (strcmp(bzlib_name, "system-native") == 0) + bzlib_name = HADOOP_BZIP2_LIBRARY; + // Load the native library. + void *libbz2 = dlopen(bzlib_name, RTLD_LAZY | RTLD_GLOBAL); + if (!libbz2) { + THROW(env, "java/lang/UnsatisfiedLinkError", + "Cannot load bzip2 native library"); + return; + } + + // Locate the requisite symbols from libbz2.so. + dlerror(); // Clear any existing error. + LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompressInit, env, libbz2, + "BZ2_bzDecompressInit"); + LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompress, env, libbz2, + "BZ2_bzDecompress"); + LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompressEnd, env, libbz2, + "BZ2_bzDecompressEnd"); + + // Initialize the requisite fieldIds. + Bzip2Decompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", + "Ljava/lang/Class;"); + Bzip2Decompressor_stream = (*env)->GetFieldID(env, class, "stream", "J"); + Bzip2Decompressor_finished = (*env)->GetFieldID(env, class, + "finished", "Z"); + Bzip2Decompressor_compressedDirectBuf = (*env)->GetFieldID(env, class, + "compressedDirectBuf", + "Ljava/nio/Buffer;"); + Bzip2Decompressor_compressedDirectBufOff = (*env)->GetFieldID(env, class, + "compressedDirectBufOff", "I"); + Bzip2Decompressor_compressedDirectBufLen = (*env)->GetFieldID(env, class, + "compressedDirectBufLen", "I"); + Bzip2Decompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class, + "uncompressedDirectBuf", + "Ljava/nio/Buffer;"); + Bzip2Decompressor_directBufferSize = (*env)->GetFieldID(env, class, + "directBufferSize", "I"); +} + +JNIEXPORT jlong JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_init( + JNIEnv *env, jclass cls, jint conserveMemory) +{ + bz_stream *stream = malloc(sizeof(bz_stream)); + if (stream == 0) { + THROW(env, "java/lang/OutOfMemoryError", NULL); + return (jlong)0; + } + memset((void*)stream, 0, sizeof(bz_stream)); + + int rv = dlsym_BZ2_bzDecompressInit(stream, 0, conserveMemory); + + if (rv != BZ_OK) { + // Contingency - Report error by throwing appropriate exceptions. + free(stream); + stream = NULL; + + switch (rv) { + case BZ_MEM_ERROR: + { + THROW(env, "java/lang/OutOfMemoryError", NULL); + } + break; + default: + { + THROW(env, "java/lang/InternalError", NULL); + } + break; + } + } + + return JLONG(stream); +} + +JNIEXPORT jint JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_inflateBytesDirect( + JNIEnv *env, jobject this) +{ + // Get members of Bzip2Decompressor. + bz_stream *stream = BZSTREAM((*env)->GetLongField(env, this, + Bzip2Decompressor_stream)); + if (!stream) { + THROW(env, "java/lang/NullPointerException", NULL); + return (jint)0; + } + + jobject clazz = (*env)->GetStaticObjectField(env, this, + Bzip2Decompressor_clazz); + jarray compressed_direct_buf = (jarray)(*env)->GetObjectField(env, + this, Bzip2Decompressor_compressedDirectBuf); + jint compressed_direct_buf_off = (*env)->GetIntField(env, this, + Bzip2Decompressor_compressedDirectBufOff); + jint compressed_direct_buf_len = (*env)->GetIntField(env, this, + Bzip2Decompressor_compressedDirectBufLen); + + jarray uncompressed_direct_buf = (jarray)(*env)->GetObjectField(env, + this, Bzip2Decompressor_uncompressedDirectBuf); + jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this, + Bzip2Decompressor_directBufferSize); + + // Get the input and output direct buffers. + LOCK_CLASS(env, clazz, "Bzip2Decompressor"); + char* compressed_bytes = (*env)->GetDirectBufferAddress(env, + compressed_direct_buf); + char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env, + uncompressed_direct_buf); + UNLOCK_CLASS(env, clazz, "Bzip2Decompressor"); + + if (!compressed_bytes || !uncompressed_bytes) { + return (jint)0; + } + + // Re-calibrate the bz_stream. + stream->next_in = compressed_bytes + compressed_direct_buf_off; + stream->avail_in = compressed_direct_buf_len; + stream->next_out = uncompressed_bytes; + stream->avail_out = uncompressed_direct_buf_len; + + // Decompress. + int rv = dlsym_BZ2_bzDecompress(stream); + + // Contingency? - Report error by throwing appropriate exceptions. + int no_decompressed_bytes = 0; + switch (rv) { + case BZ_STREAM_END: + { + (*env)->SetBooleanField(env, this, + Bzip2Decompressor_finished, + JNI_TRUE); + } // cascade down + case BZ_OK: + { + compressed_direct_buf_off += + compressed_direct_buf_len - stream->avail_in; + (*env)->SetIntField(env, this, + Bzip2Decompressor_compressedDirectBufOff, + compressed_direct_buf_off); + (*env)->SetIntField(env, this, + Bzip2Decompressor_compressedDirectBufLen, + stream->avail_in); + no_decompressed_bytes = + uncompressed_direct_buf_len - stream->avail_out; + } + break; + case BZ_DATA_ERROR: + case BZ_DATA_ERROR_MAGIC: + { + THROW(env, "java/io/IOException", NULL); + } + break; + case BZ_MEM_ERROR: + { + THROW(env, "java/lang/OutOfMemoryError", NULL); + } + break; + default: + { + THROW(env, "java/lang/InternalError", NULL); + } + break; + } + + return no_decompressed_bytes; +} + +JNIEXPORT jlong JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getBytesRead( + JNIEnv *env, jclass cls, jlong stream) +{ + const bz_stream* strm = BZSTREAM(stream); + return ((jlong)strm->total_in_hi32 << 32) | strm->total_in_lo32; +} + +JNIEXPORT jlong JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getBytesWritten( + JNIEnv *env, jclass cls, jlong stream) +{ + const bz_stream* strm = BZSTREAM(stream); + return ((jlong)strm->total_out_hi32 << 32) | strm->total_out_lo32; +} + +JNIEXPORT jint JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getRemaining( + JNIEnv *env, jclass cls, jlong stream) +{ + return (BZSTREAM(stream))->avail_in; +} + +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_end( + JNIEnv *env, jclass cls, jlong stream) +{ + if (dlsym_BZ2_bzDecompressEnd(BZSTREAM(stream)) != BZ_OK) { + THROW(env, "java/lang/InternalError", 0); + } else { + free(BZSTREAM(stream)); + } +} + +/** + * vim: sw=2: ts=2: et: + */ + diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/org_apache_hadoop_io_compress_bzip2.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/org_apache_hadoop_io_compress_bzip2.h new file mode 100644 index 0000000000..fa525bdedb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/org_apache_hadoop_io_compress_bzip2.h @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if !defined ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H +#define ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H + +#include +#include +#include +#include +#include + +#include "org_apache_hadoop.h" + +#define HADOOP_BZIP2_LIBRARY "libbz2.so.1" + + +/* A helper macro to convert the java 'stream-handle' to a bz_stream pointer. */ +#define BZSTREAM(stream) ((bz_stream*)((ptrdiff_t)(stream))) + +/* A helper macro to convert the bz_stream pointer to the java 'stream-handle'. */ +#define JLONG(stream) ((jlong)((ptrdiff_t)(stream))) + +#endif //ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 7a457982c7..dfbd7b98d5 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -317,6 +317,20 @@ are discovered using a Java ServiceLoader. + + io.compression.codec.bzip2.library + system-native + The native-code library to be used for compression and + decompression by the bzip2 codec. This library could be specified + either by by name or the full pathname. In the former case, the + library is located by the dynamic linker, usually searching the + directories specified in the environment variable LD_LIBRARY_PATH. + + The value of "system-native" indicates that the default system + library should be used. To indicate that the algorithm should + operate entirely in Java, specify "java-builtin". + + io.serializations org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 9a24886c15..598985be0b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -61,6 +61,7 @@ import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; import org.apache.hadoop.io.compress.zlib.ZlibFactory; +import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.ReflectionUtils; @@ -94,12 +95,33 @@ public void testGzipCodec() throws IOException { codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec"); } - @Test + @Test(timeout=20000) public void testBZip2Codec() throws IOException { + Configuration conf = new Configuration(); + conf.set("io.compression.codec.bzip2.library", "java-builtin"); codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec"); } + @Test(timeout=20000) + public void testBZip2NativeCodec() throws IOException { + Configuration conf = new Configuration(); + conf.set("io.compression.codec.bzip2.library", "system-native"); + if (NativeCodeLoader.isNativeCodeLoaded()) { + if (Bzip2Factory.isNativeBzip2Loaded(conf)) { + codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec"); + codecTest(conf, seed, count, + "org.apache.hadoop.io.compress.BZip2Codec"); + conf.set("io.compression.codec.bzip2.library", "java-builtin"); + codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec"); + codecTest(conf, seed, count, + "org.apache.hadoop.io.compress.BZip2Codec"); + } else { + LOG.warn("Native hadoop library available but native bzip2 is not"); + } + } + } + @Test public void testSnappyCodec() throws IOException { if (SnappyCodec.isNativeCodeLoaded()) { @@ -457,14 +479,37 @@ public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundExce sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000); } - @Test + @Test(timeout=20000) public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { + Configuration conf = new Configuration(); + conf.set("io.compression.codec.bzip2.library", "java-builtin"); sequenceFileCodecTest(conf, 0, "org.apache.hadoop.io.compress.BZip2Codec", 100); sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.BZip2Codec", 100); sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000); } + @Test(timeout=20000) + public void testSequenceFileBZip2NativeCodec() throws IOException, + ClassNotFoundException, InstantiationException, + IllegalAccessException { + Configuration conf = new Configuration(); + conf.set("io.compression.codec.bzip2.library", "system-native"); + if (NativeCodeLoader.isNativeCodeLoaded()) { + if (Bzip2Factory.isNativeBzip2Loaded(conf)) { + sequenceFileCodecTest(conf, 0, + "org.apache.hadoop.io.compress.BZip2Codec", 100); + sequenceFileCodecTest(conf, 100, + "org.apache.hadoop.io.compress.BZip2Codec", 100); + sequenceFileCodecTest(conf, 200000, + "org.apache.hadoop.io.compress.BZip2Codec", + 1000000); + } else { + LOG.warn("Native hadoop library available but native bzip2 is not"); + } + } + } + @Test public void testSequenceFileDeflateCodec() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {