diff --git a/BUILDING.txt b/BUILDING.txt index 7afc3f0bb2..a1721ba6db 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -83,6 +83,8 @@ Optional packages: $ sudo apt-get install libjansson-dev * Linux FUSE $ sudo apt-get install fuse libfuse-dev +* ZStandard compression + $ sudo apt-get install zstd ---------------------------------------------------------------------------------- Maven main modules: @@ -155,6 +157,29 @@ Maven build goals: and it ignores the -Dsnappy.prefix option. If -Dsnappy.lib isn't given, the bundling and building will fail. + + ZStandard build options: + + ZStandard is a compression library that can be utilized by the native code. + It is currently an optional component, meaning that Hadoop can be built with + or without this dependency. + + * Use -Drequire.zstd to fail the build if libzstd.so is not found. + If this option is not specified and the zstd library is missing. + + * Use -Dzstd.prefix to specify a nonstandard location for the libzstd + header files and library files. You do not need this option if you have + installed zstandard using a package manager. + + * Use -Dzstd.lib to specify a nonstandard location for the libzstd library + files. Similarly to zstd.prefix, you do not need this option if you have + installed using a package manager. + + * Use -Dbundle.zstd to copy the contents of the zstd.lib directory into + the final tar file. This option requires that -Dzstd.lib is also given, + and it ignores the -Dzstd.prefix option. If -Dzstd.lib isn't given, the + bundling and building will fail. + OpenSSL build options: OpenSSL includes a crypto library that can be utilized by the native code. diff --git a/dev-support/bin/dist-copynativelibs b/dev-support/bin/dist-copynativelibs index efe250bc45..67d2edf22d 100755 --- a/dev-support/bin/dist-copynativelibs +++ b/dev-support/bin/dist-copynativelibs @@ -114,6 +114,15 @@ for i in "$@"; do --snappylibbundle=*) SNAPPYLIBBUNDLE=${i#*=} ;; + --zstdbinbundle=*) + ZSTDBINBUNDLE=${i#*=} + ;; + --zstdlib=*) + ZSTDLIB=${i#*=} + ;; + --zstdlibbundle=*) + ZSTDLIBBUNDLE=${i#*=} + ;; esac done @@ -139,6 +148,8 @@ if [[ -d "${LIB_DIR}" ]]; then bundle_native_lib "${SNAPPYLIBBUNDLE}" "snappy.lib" "snappy" "${SNAPPYLIB}" + bundle_native_lib "${ZSTDLIBBUNDLE}" "zstd.lib" "zstd" "${ZSTDLIB}" + bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}" bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}" @@ -159,6 +170,8 @@ if [[ -d "${BIN_DIR}" ]] ; then bundle_native_bin "${SNAPPYBINBUNDLE}" "${SNAPPYLIBBUNDLE}" "snappy.lib" "snappy" "${SNAPPYLIB}" + bundle_native_bin "${ZSTDBINBUNDLE}" "${ZSTDLIBBUNDLE}" "zstd.lib" "zstd" "${ZSTDLIB}" + bundle_native_bin "${OPENSSLBINBUNDLE}" "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}" fi diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index c9b282fb24..b616074043 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -595,6 +595,10 @@ false + + + + false @@ -652,6 +656,8 @@ org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping org.apache.hadoop.io.compress.snappy.SnappyCompressor org.apache.hadoop.io.compress.snappy.SnappyDecompressor + org.apache.hadoop.io.compress.zstd.ZStandardCompressor + org.apache.hadoop.io.compress.zstd.ZStandardDecompressor org.apache.hadoop.io.compress.lz4.Lz4Compressor org.apache.hadoop.io.compress.lz4.Lz4Decompressor org.apache.hadoop.io.erasurecode.ErasureCodeNative @@ -685,9 +691,13 @@ ${sun.arch.data.model} ${require.bzip2} ${require.snappy} + ${require.zstd} ${snappy.prefix} ${snappy.lib} ${snappy.include} + ${zstd.prefix} + ${zstd.lib} + ${zstd.include} ${require.isal} ${isal.prefix} ${isal.lib} @@ -745,6 +755,11 @@ false true + + + + false + true @@ -794,6 +809,8 @@ org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping org.apache.hadoop.io.compress.snappy.SnappyCompressor org.apache.hadoop.io.compress.snappy.SnappyDecompressor + org.apache.hadoop.io.compress.zstd.ZStandardCompressor + org.apache.hadoop.io.compress.zstd.ZStandardDecompressor org.apache.hadoop.io.compress.lz4.Lz4Compressor org.apache.hadoop.io.compress.lz4.Lz4Decompressor org.apache.hadoop.io.erasurecode.ErasureCodeNative @@ -850,6 +867,10 @@ /p:CustomSnappyLib=${snappy.lib} /p:CustomSnappyInclude=${snappy.include} /p:RequireSnappy=${require.snappy} + /p:CustomZstdPrefix=${zstd.prefix} + /p:CustomZstdLib=${zstd.lib} + /p:CustomZstdInclude=${zstd.include} + /p:RequireZstd=${require.ztsd} /p:CustomOpensslPrefix=${openssl.prefix} /p:CustomOpensslLib=${openssl.lib} /p:CustomOpensslInclude=${openssl.include} diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt index 8317a46162..10b0f23ddf 100644 --- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt +++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt @@ -94,6 +94,33 @@ else() endif() endif() +# Require zstandard +SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) +hadoop_set_find_shared_library_version("1") +find_library(ZSTD_LIBRARY + NAMES zstd + PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/lib + ${CUSTOM_ZSTD_PREFIX}/lib64 ${CUSTOM_ZSTD_LIB}) +SET(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES}) +find_path(ZSTD_INCLUDE_DIR + NAMES zstd.h + PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/include + ${CUSTOM_ZSTD_INCLUDE}) +if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR) + GET_FILENAME_COMPONENT(HADOOP_ZSTD_LIBRARY ${ZSTD_LIBRARY} NAME) + set(ZSTD_SOURCE_FILES + "${SRC}/io/compress/zstd/ZStandardCompressor.c" + "${SRC}/io/compress/zstd/ZStandardDecompressor.c") + set(REQUIRE_ZSTD ${REQUIRE_ZSTD}) # Stop warning about unused variable. + message(STATUS "Found ZStandard: ${ZSTD_LIBRARY}") +else () + set(ZSTD_INCLUDE_DIR "") + set(ZSTD_SOURCE_FILES "") + IF(REQUIRE_ZSTD) + MESSAGE(FATAL_ERROR "Required zstandard library could not be found. ZSTD_LIBRARY=${ZSTD_LIBRARY}, ZSTD_INCLUDE_DIR=${ZSTD_INCLUDE_DIR}, CUSTOM_ZSTD_INCLUDE_DIR=${CUSTOM_ZSTD_INCLUDE_DIR}, CUSTOM_ZSTD_PREFIX=${CUSTOM_ZSTD_PREFIX}, CUSTOM_ZSTD_INCLUDE=${CUSTOM_ZSTD_INCLUDE}") + ENDIF(REQUIRE_ZSTD) +endif () + set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) hadoop_set_find_shared_library_version("2") find_library(ISAL_LIBRARY @@ -208,6 +235,7 @@ include_directories( ${BZIP2_INCLUDE_DIR} ${SNAPPY_INCLUDE_DIR} ${ISAL_INCLUDE_DIR} + ${ZSTD_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR} ${SRC}/util ) @@ -222,6 +250,7 @@ hadoop_add_dual_library(hadoop ${SRC}/io/compress/lz4/lz4hc.c ${ISAL_SOURCE_FILES} ${SNAPPY_SOURCE_FILES} + ${ZSTD_SOURCE_FILES} ${OPENSSL_SOURCE_FILES} ${SRC}/io/compress/zlib/ZlibCompressor.c ${SRC}/io/compress/zlib/ZlibDecompressor.c diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake index 445cc33e4d..40aa467373 100644 --- a/hadoop-common-project/hadoop-common/src/config.h.cmake +++ b/hadoop-common-project/hadoop-common/src/config.h.cmake @@ -21,6 +21,7 @@ #cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@" #cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@" #cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@" +#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@" #cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@" #cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@" #cmakedefine HAVE_SYNC_FILE_RANGE diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index fe522b3c64..b8a60d6a49 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -141,6 +141,22 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT = 256 * 1024; + /** ZStandard compression level. */ + public static final String IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY = + "io.compression.codec.zstd.level"; + + /** Default value for IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY. */ + public static final int IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT = 3; + + /** ZStandard buffer size. */ + public static final String IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY = + "io.compression.codec.zstd.buffersize"; + + /** ZStandard buffer size a value of 0 means use the recommended zstd + * buffer size that the library recommends. */ + public static final int + IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0; + /** Internal buffer size for Lz4 compressor/decompressors */ public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY = "io.compression.codec.lz4.buffersize"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java index 8cb0b2a7a1..3808003de2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java @@ -95,7 +95,7 @@ public interface Decompressor { * @param b Buffer for the compressed data * @param off Start offset of the data * @param len Size of the buffer - * @return The actual number of bytes of compressed data. + * @return The actual number of bytes of uncompressed data. * @throws IOException */ public int decompress(byte[] b, int off, int len) throws IOException; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java new file mode 100644 index 0000000000..c56bbba3b5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.compress.zstd.ZStandardCompressor; +import org.apache.hadoop.io.compress.zstd.ZStandardDecompressor; +import org.apache.hadoop.util.NativeCodeLoader; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY; + +/** + * This class creates zstd compressors/decompressors. + */ +public class ZStandardCodec implements + Configurable, CompressionCodec, DirectDecompressionCodec { + private Configuration conf; + + /** + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this object. + */ + @Override + public Configuration getConf() { + return conf; + } + + public static void checkNativeCodeLoaded() { + if (!NativeCodeLoader.isNativeCodeLoaded() || + !NativeCodeLoader.buildSupportsZstd()) { + throw new RuntimeException("native zStandard library " + + "not available: this version of libhadoop was built " + + "without zstd support."); + } + if (!ZStandardCompressor.isNativeCodeLoaded()) { + throw new RuntimeException("native zStandard library not " + + "available: ZStandardCompressor has not been loaded."); + } + if (!ZStandardDecompressor.isNativeCodeLoaded()) { + throw new RuntimeException("native zStandard library not " + + "available: ZStandardDecompressor has not been loaded."); + } + } + + public static boolean isNativeCodeLoaded() { + return ZStandardCompressor.isNativeCodeLoaded() + && ZStandardDecompressor.isNativeCodeLoaded(); + } + + public static String getLibraryName() { + return ZStandardCompressor.getLibraryName(); + } + + public static int getCompressionLevel(Configuration conf) { + return conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT); + } + + public static int getCompressionBufferSize(Configuration conf) { + int bufferSize = getBufferSize(conf); + return bufferSize == 0 ? + ZStandardCompressor.getRecommendedBufferSize() : + bufferSize; + } + + public static int getDecompressionBufferSize(Configuration conf) { + int bufferSize = getBufferSize(conf); + return bufferSize == 0 ? + ZStandardDecompressor.getRecommendedBufferSize() : + bufferSize; + } + + private static int getBufferSize(Configuration conf) { + return conf.getInt(IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, + IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to have compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return Util. + createOutputStreamWithCodecPool(this, conf, out); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) + throws IOException { + checkNativeCodeLoaded(); + return new CompressorStream(out, compressor, + getCompressionBufferSize(conf)); + } + + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class getCompressorType() { + checkNativeCodeLoaded(); + return ZStandardCompressor.class; + } + + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + checkNativeCodeLoaded(); + return new ZStandardCompressor( + getCompressionLevel(conf), getCompressionBufferSize(conf)); + } + + + /** + * Create a {@link CompressionInputStream} that will read from the given + * input stream. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return Util. + createInputStreamWithCodecPool(this, conf, in); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) + throws IOException { + checkNativeCodeLoaded(); + return new DecompressorStream(in, decompressor, + getDecompressionBufferSize(conf)); + } + + /** + * Get the type of {@link Decompressor} needed by + * this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class getDecompressorType() { + checkNativeCodeLoaded(); + return ZStandardDecompressor.class; + } + + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + checkNativeCodeLoaded(); + return new ZStandardDecompressor(getDecompressionBufferSize(conf)); + } + + /** + * Get the default filename extension for this kind of compression. + * + * @return .zst. + */ + @Override + public String getDefaultExtension() { + return ".zst"; + } + + @Override + public DirectDecompressor createDirectDecompressor() { + return new ZStandardDecompressor.ZStandardDirectDecompressor( + getDecompressionBufferSize(conf) + ); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java new file mode 100644 index 0000000000..eb2121aedf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.zstd; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.ZStandardCodec; +import org.apache.hadoop.util.NativeCodeLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A {@link Compressor} based on the zStandard compression algorithm. + * https://github.com/facebook/zstd + */ +public class ZStandardCompressor implements Compressor { + + private static final Logger LOG = + LoggerFactory.getLogger(ZStandardCompressor.class); + + private long stream; + private int level; + private int directBufferSize; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private ByteBuffer uncompressedDirectBuf = null; + private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0; + private boolean keepUncompressedBuf = false; + private ByteBuffer compressedDirectBuf = null; + private boolean finish, finished; + private long bytesRead = 0; + private long bytesWritten = 0; + + private static boolean nativeZStandardLoaded = false; + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + try { + // Initialize the native library + initIDs(); + nativeZStandardLoaded = true; + } catch (Throwable t) { + LOG.warn("Error loading zstandard native libraries: " + t); + } + } + } + + public static boolean isNativeCodeLoaded() { + return nativeZStandardLoaded; + } + + public static int getRecommendedBufferSize() { + return getStreamSize(); + } + + @VisibleForTesting + ZStandardCompressor() { + this(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + } + + /** + * Creates a new compressor with the default compression level. + * Compressed data will be generated in ZStandard format. + */ + public ZStandardCompressor(int level, int bufferSize) { + this(level, bufferSize, bufferSize); + } + + @VisibleForTesting + ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) { + this.level = level; + stream = create(); + this.directBufferSize = outputBufferSize; + uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(outputBufferSize); + compressedDirectBuf.position(outputBufferSize); + reset(); + } + + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration. It will reset the compressor's compression level + * and compression strategy. + * + * @param conf Configuration storing new settings + */ + @Override + public void reinit(Configuration conf) { + if (conf == null) { + return; + } + level = ZStandardCodec.getCompressionLevel(conf); + reset(); + LOG.debug("Reinit compressor with new compression configuration"); + } + + @Override + public void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + uncompressedDirectBufOff = 0; + setInputFromSavedData(); + + compressedDirectBuf.limit(directBufferSize); + compressedDirectBuf.position(directBufferSize); + } + + //copy enough data from userBuf to uncompressedDirectBuf + private void setInputFromSavedData() { + int len = Math.min(userBufLen, uncompressedDirectBuf.remaining()); + uncompressedDirectBuf.put(userBuf, userBufOff, len); + userBufLen -= len; + userBufOff += len; + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "Dictionary support is not enabled"); + } + + @Override + public boolean needsInput() { + // Consume remaining compressed data? + if (compressedDirectBuf.remaining() > 0) { + return false; + } + + // have we consumed all input + if (keepUncompressedBuf && uncompressedDirectBufLen > 0) { + return false; + } + + if (uncompressedDirectBuf.remaining() > 0) { + // Check if we have consumed all user-input + if (userBufLen <= 0) { + return true; + } else { + // copy enough data from userBuf to uncompressedDirectBuf + setInputFromSavedData(); + // uncompressedDirectBuf is not full + return uncompressedDirectBuf.remaining() > 0; + } + } + + return false; + } + + @Override + public void finish() { + finish = true; + } + + @Override + public boolean finished() { + // Check if 'zstd' says its 'finished' and all compressed + // data has been consumed + return (finished && compressedDirectBuf.remaining() == 0); + } + + @Override + public int compress(byte[] b, int off, int len) throws IOException { + checkStream(); + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check if there is compressed data + int n = compressedDirectBuf.remaining(); + if (n > 0) { + n = Math.min(n, len); + compressedDirectBuf.get(b, off, n); + return n; + } + + // Re-initialize the output direct buffer + compressedDirectBuf.rewind(); + compressedDirectBuf.limit(directBufferSize); + + // Compress data + n = deflateBytesDirect( + uncompressedDirectBuf, + uncompressedDirectBufOff, + uncompressedDirectBufLen, + compressedDirectBuf, + directBufferSize + ); + compressedDirectBuf.limit(n); + + // Check if we have consumed all input buffer + if (uncompressedDirectBufLen <= 0) { + // consumed all input buffer + keepUncompressedBuf = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + } else { + // did not consume all input buffer + keepUncompressedBuf = true; + } + + // Get at most 'len' bytes + n = Math.min(n, len); + compressedDirectBuf.get(b, off, n); + return n; + } + + /** + * Returns the total number of compressed bytes output so far. + * + * @return the total (non-negative) number of compressed bytes output so far + */ + @Override + public long getBytesWritten() { + checkStream(); + return bytesWritten; + } + + /** + *

Returns the total number of uncompressed bytes input so far.

+ * + * @return the total (non-negative) number of uncompressed bytes input so far + */ + @Override + public long getBytesRead() { + checkStream(); + return bytesRead; + } + + @Override + public void reset() { + checkStream(); + init(level, stream); + finish = false; + finished = false; + bytesRead = 0; + bytesWritten = 0; + uncompressedDirectBuf.rewind(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + keepUncompressedBuf = false; + compressedDirectBuf.limit(directBufferSize); + compressedDirectBuf.position(directBufferSize); + userBufOff = 0; + userBufLen = 0; + } + + @Override + public void end() { + if (stream != 0) { + end(stream); + stream = 0; + } + } + + private void checkStream() { + if (stream == 0) { + throw new NullPointerException(); + } + } + + private native static long create(); + private native static void init(int level, long stream); + private native int deflateBytesDirect(ByteBuffer src, int srcOffset, + int srcLen, ByteBuffer dst, int dstLen); + private static native int getStreamSize(); + private native static void end(long strm); + private native static void initIDs(); + public native static String getLibraryName(); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java new file mode 100644 index 0000000000..73d73e1c47 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.zstd; + +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.util.NativeCodeLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A {@link Decompressor} based on the zStandard compression algorithm. + * https://github.com/facebook/zstd + */ +public class ZStandardDecompressor implements Decompressor { + private static final Logger LOG = + LoggerFactory.getLogger(ZStandardDecompressor.class); + + private long stream; + private int directBufferSize; + private ByteBuffer compressedDirectBuf = null; + private int compressedDirectBufOff, bytesInCompressedBuffer; + private ByteBuffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufferBytesToConsume = 0; + private boolean finished; + private int remaining = 0; + + private static boolean nativeZStandardLoaded = false; + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + try { + // Initialize the native library + initIDs(); + nativeZStandardLoaded = true; + } catch (Throwable t) { + LOG.warn("Error loading zstandard native libraries: " + t); + } + } + } + + public static boolean isNativeCodeLoaded() { + return nativeZStandardLoaded; + } + + public static int getRecommendedBufferSize() { + return getStreamSize(); + } + + public ZStandardDecompressor() { + this(getStreamSize()); + } + + /** + * Creates a new decompressor. + */ + public ZStandardDecompressor(int bufferSize) { + this.directBufferSize = bufferSize; + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + stream = create(); + reset(); + } + + @Override + public void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufferBytesToConsume = len; + + setInputFromSavedData(); + + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + private void setInputFromSavedData() { + compressedDirectBufOff = 0; + bytesInCompressedBuffer = userBufferBytesToConsume; + if (bytesInCompressedBuffer > directBufferSize) { + bytesInCompressedBuffer = directBufferSize; + } + + compressedDirectBuf.rewind(); + compressedDirectBuf.put( + userBuf, userBufOff, bytesInCompressedBuffer); + + userBufOff += bytesInCompressedBuffer; + userBufferBytesToConsume -= bytesInCompressedBuffer; + } + + // dictionary is not supported + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "Dictionary support is not enabled"); + } + + @Override + public boolean needsInput() { + // Consume remaining compressed data? + if (uncompressedDirectBuf.remaining() > 0) { + return false; + } + + // Check if we have consumed all input + if (bytesInCompressedBuffer - compressedDirectBufOff <= 0) { + // Check if we have consumed all user-input + if (userBufferBytesToConsume <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + return false; + } + + // dictionary is not supported. + @Override + public boolean needsDictionary() { + return false; + } + + @Override + public boolean finished() { + // finished == true if ZSTD_decompressStream() returns 0 + // also check we have nothing left in our buffer + return (finished && uncompressedDirectBuf.remaining() == 0); + } + + @Override + public int decompress(byte[] b, int off, int len) + throws IOException { + checkStream(); + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check if there is uncompressed data + int n = uncompressedDirectBuf.remaining(); + if (n > 0) { + return populateUncompressedBuffer(b, off, len, n); + } + + // Re-initialize the output direct buffer + uncompressedDirectBuf.rewind(); + uncompressedDirectBuf.limit(directBufferSize); + + // Decompress data + n = inflateBytesDirect( + compressedDirectBuf, + compressedDirectBufOff, + bytesInCompressedBuffer, + uncompressedDirectBuf, + 0, + directBufferSize + ); + uncompressedDirectBuf.limit(n); + + // Get at most 'len' bytes + return populateUncompressedBuffer(b, off, len, n); + } + + /** + *

Returns the number of bytes remaining in the input buffers; + * normally called when finished() is true to determine amount of post-stream + * data.

+ * + * @return the total (non-negative) number of unprocessed bytes in input + */ + @Override + public int getRemaining() { + checkStream(); + // userBuf + compressedDirectBuf + return userBufferBytesToConsume + remaining; + } + + /** + * Resets everything including the input buffers (user and direct). + */ + @Override + public void reset() { + checkStream(); + init(stream); + remaining = 0; + finished = false; + compressedDirectBufOff = 0; + bytesInCompressedBuffer = 0; + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + userBufOff = 0; + userBufferBytesToConsume = 0; + } + + @Override + public void end() { + if (stream != 0) { + free(stream); + stream = 0; + } + } + + @Override + protected void finalize() { + reset(); + } + + private void checkStream() { + if (stream == 0) { + throw new NullPointerException("Stream not initialized"); + } + } + + private int populateUncompressedBuffer(byte[] b, int off, int len, int n) { + n = Math.min(n, len); + uncompressedDirectBuf.get(b, off, n); + return n; + } + + private native static void initIDs(); + private native static long create(); + private native static void init(long stream); + private native int inflateBytesDirect(ByteBuffer src, int srcOffset, + int srcLen, ByteBuffer dst, int dstOffset, int dstLen); + private native static void free(long strm); + private native static int getStreamSize(); + + int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException { + assert + (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor); + + int originalPosition = dst.position(); + int n = inflateBytesDirect( + src, src.position(), src.remaining(), dst, dst.position(), + dst.remaining() + ); + dst.position(originalPosition + n); + if (bytesInCompressedBuffer > 0) { + src.position(compressedDirectBufOff); + } else { + src.position(src.limit()); + } + return n; + } + + /** + * A {@link DirectDecompressor} for ZStandard + * https://github.com/facebook/zstd. + */ + public static class ZStandardDirectDecompressor + extends ZStandardDecompressor implements DirectDecompressor { + + public ZStandardDirectDecompressor(int directBufferSize) { + super(directBufferSize); + } + + @Override + public boolean finished() { + return (endOfInput && super.finished()); + } + + @Override + public void reset() { + super.reset(); + endOfInput = true; + } + + private boolean endOfInput; + + @Override + public void decompress(ByteBuffer src, ByteBuffer dst) + throws IOException { + assert dst.isDirect() : "dst.isDirect()"; + assert src.isDirect() : "src.isDirect()"; + assert dst.remaining() > 0 : "dst.remaining() > 0"; + this.inflateDirect(src, dst); + endOfInput = !src.hasRemaining(); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "byte[] arrays are not supported for DirectDecompressor"); + } + + @Override + public int decompress(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "byte[] arrays are not supported for DirectDecompressor"); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java new file mode 100644 index 0000000000..9069070f73 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.io.compress.zstd; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java index dd04a19f48..c38133611f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java @@ -84,6 +84,11 @@ public static boolean isNativeCodeLoaded() { */ public static native boolean buildSupportsIsal(); + /** + * Returns true only if this build was compiled with support for ZStandard. + */ + public static native boolean buildSupportsZstd(); + /** * Returns true only if this build was compiled with support for openssl. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java index e166becc4a..776839c17f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.util; +import org.apache.hadoop.io.compress.ZStandardCodec; import org.apache.hadoop.io.erasurecode.ErasureCodeNative; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.OpensslCipher; @@ -67,6 +68,7 @@ public static void main(String[] args) { boolean zlibLoaded = false; boolean snappyLoaded = false; boolean isalLoaded = false; + boolean zStdLoaded = false; // lz4 is linked within libhadoop boolean lz4Loaded = nativeHadoopLoaded; boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf); @@ -78,6 +80,7 @@ public static void main(String[] args) { String zlibLibraryName = ""; String snappyLibraryName = ""; String isalDetail = ""; + String zstdLibraryName = ""; String lz4LibraryName = ""; String bzip2LibraryName = ""; String winutilsPath = null; @@ -88,7 +91,11 @@ public static void main(String[] args) { if (zlibLoaded) { zlibLibraryName = ZlibFactory.getLibraryName(); } - + zStdLoaded = NativeCodeLoader.buildSupportsZstd() && + ZStandardCodec.isNativeCodeLoaded(); + if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) { + zstdLibraryName = ZStandardCodec.getLibraryName(); + } snappyLoaded = NativeCodeLoader.buildSupportsSnappy() && SnappyCodec.isNativeCodeLoaded(); if (snappyLoaded && NativeCodeLoader.buildSupportsSnappy()) { @@ -135,6 +142,7 @@ public static void main(String[] args) { System.out.println("Native library checking:"); System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName); System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName); + System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName); System.out.printf("snappy: %b %s%n", snappyLoaded, snappyLibraryName); System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName); System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName); @@ -146,7 +154,8 @@ public static void main(String[] args) { } if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) || - (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded && isalLoaded))) { + (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded + && bzip2Loaded && isalLoaded && zStdLoaded))) { // return 1 to indicated check failed ExitUtil.terminate(1); } diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c new file mode 100644 index 0000000000..04f2a3e661 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "org_apache_hadoop_io_compress_zstd.h" + +#if defined HADOOP_ZSTD_LIBRARY + +#include +#include +#include + +#ifdef UNIX +#include +#include "config.h" +#endif + +#include "org_apache_hadoop_io_compress_zstd_ZStandardCompressor.h" + +static jfieldID ZStandardCompressor_stream; +static jfieldID ZStandardCompressor_uncompressedDirectBufOff; +static jfieldID ZStandardCompressor_uncompressedDirectBufLen; +static jfieldID ZStandardCompressor_directBufferSize; +static jfieldID ZStandardCompressor_finish; +static jfieldID ZStandardCompressor_finished; +static jfieldID ZStandardCompressor_bytesWritten; +static jfieldID ZStandardCompressor_bytesRead; + +#ifdef UNIX +static size_t (*dlsym_ZSTD_CStreamInSize)(void); +static size_t (*dlsym_ZSTD_CStreamOutSize)(void); +static ZSTD_CStream* (*dlsym_ZSTD_createCStream)(void); +static size_t (*dlsym_ZSTD_initCStream)(ZSTD_CStream*, int); +static size_t (*dlsym_ZSTD_freeCStream)(ZSTD_CStream*); +static size_t (*dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); +static size_t (*dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*); +static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +static unsigned (*dlsym_ZSTD_isError)(size_t); +static const char * (*dlsym_ZSTD_getErrorName)(size_t); +#endif + +#ifdef WINDOWS +typedef size_t (__cdecl *__dlsym_ZSTD_CStreamInSize)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_CStreamOutSize)(void); +typedef ZSTD_CStream* (__cdecl *__dlsym_ZSTD_createCStream)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_initCStream)(ZSTD_CStream*, int); +typedef size_t (__cdecl *__dlsym_ZSTD_freeCStream)(ZSTD_CStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); +typedef size_t (__cdecl *__dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t); +typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t); + +static __dlsym_ZSTD_CStreamInSize dlsym_ZSTD_CStreamInSize; +static __dlsym_ZSTD_CStreamOutSize dlsym_ZSTD_CStreamOutSize; +static __dlsym_ZSTD_createCStream dlsym_ZSTD_createCStream; +static __dlsym_ZSTD_initCStream dlsym_ZSTD_initCStream; +static __dlsym_ZSTD_freeCStream dlsym_ZSTD_freeCStream; +static __dlsym_ZSTD_compressStream dlsym_ZSTD_compressStream; +static __dlsym_ZSTD_endStream dlsym_ZSTD_endStream; +static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream; +static __dlsym_ZSTD_isError dlsym_ZSTD_isError; +static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName; +#endif + +// Load the libztsd.so from disk +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_initIDs (JNIEnv *env, jclass clazz) { +#ifdef UNIX + // Load libzstd.so + void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); + if (!libzstd) { + char* msg = (char*)malloc(10000); + snprintf(msg, 10000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, dlerror()); + THROW(env, "java/lang/InternalError", msg); + return; + } +#endif + +#ifdef WINDOWS + HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY); + if (!libzstd) { + THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll"); + return; + } +#endif + +#ifdef UNIX + // load dynamic symbols + dlerror(); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamInSize, env, libzstd, "ZSTD_CStreamInSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createCStream, env, libzstd, "ZSTD_createCStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeCStream, env, libzstd, "ZSTD_freeCStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); +#endif + +#ifdef WINDOWS + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamInSize, dlsym_ZSTD_CStreamInSize, env, libzstd, "ZSTD_CStreamInSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamOutSize, dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createCStream, dlsym_ZSTD_createCStream, env, libzstd, "ZSTD_createCStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initCStream, dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeCStream, dlsym_ZSTD_freeCStream, env, libzstd, "ZSTD_freeCStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_compressStream, dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_endStream, dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); +#endif + + // load fields + ZStandardCompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J"); + ZStandardCompressor_finish = (*env)->GetFieldID(env, clazz, "finish", "Z"); + ZStandardCompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z"); + ZStandardCompressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, clazz, "uncompressedDirectBufOff", "I"); + ZStandardCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz, "uncompressedDirectBufLen", "I"); + ZStandardCompressor_directBufferSize = (*env)->GetFieldID(env, clazz, "directBufferSize", "I"); + ZStandardCompressor_bytesRead = (*env)->GetFieldID(env, clazz, "bytesRead", "J"); + ZStandardCompressor_bytesWritten = (*env)->GetFieldID(env, clazz, "bytesWritten", "J"); +} + +// Create the compression stream +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_create (JNIEnv *env, jobject this) { + ZSTD_CStream* const stream = dlsym_ZSTD_createCStream(); + if (stream == NULL) { + THROW(env, "java/lang/InternalError", "Error creating the stream"); + return (jlong)0; + } + return (jlong) stream; +} + +// Initialize the compression stream +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_init (JNIEnv *env, jobject this, jint level, jlong stream) { + size_t result = dlsym_ZSTD_initCStream((ZSTD_CStream *) stream, level); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } +} + +// free the compression stream +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_end (JNIEnv *env, jobject this, jlong stream) { + size_t result = dlsym_ZSTD_freeCStream((ZSTD_CStream *) stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } +} + +JNIEXPORT jint Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_deflateBytesDirect +(JNIEnv *env, jobject this, jobject uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint uncompressed_direct_buf_len, jobject compressed_direct_buf, jint compressed_direct_buf_len ) { + ZSTD_CStream* const stream = (ZSTD_CStream*) (*env)->GetLongField(env, this, ZStandardCompressor_stream); + if (!stream) { + THROW(env, "java/lang/NullPointerException", NULL); + return (jint)0; + } + + jlong bytes_read = (*env)->GetLongField(env, this, ZStandardCompressor_bytesRead); + jlong bytes_written = (*env)->GetLongField(env, this, ZStandardCompressor_bytesWritten); + jboolean finish = (*env)->GetBooleanField(env, this, ZStandardCompressor_finish); + + // Get the input direct buffer + void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); + if (!uncompressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for uncompressedDirectBuf"); + return (jint) 0; + } + + // Get the output direct buffer + void * compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf); + if (!compressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for compressedDirectBuf"); + return (jint) 0; + } + + ZSTD_inBuffer input = { uncompressed_bytes, uncompressed_direct_buf_len, uncompressed_direct_buf_off }; + ZSTD_outBuffer output = { compressed_bytes, compressed_direct_buf_len, 0 }; + + size_t size = dlsym_ZSTD_compressStream(stream, &output, &input); + if (dlsym_ZSTD_isError(size)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); + return (jint) 0; + } + if (finish && input.pos == input.size) { + // end the stream, flush and write the frame epilogue + size = dlsym_ZSTD_endStream(stream, &output); + if (!size) { + (*env)->SetBooleanField(env, this, ZStandardCompressor_finished, JNI_TRUE); + } + } else { + // need to flush the output buffer + // this also updates the output buffer position. + size = dlsym_ZSTD_flushStream(stream, &output); + } + if (dlsym_ZSTD_isError(size)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); + return (jint) 0; + } + + bytes_read += input.pos; + bytes_written += output.pos; + (*env)->SetLongField(env, this, ZStandardCompressor_bytesRead, bytes_read); + (*env)->SetLongField(env, this, ZStandardCompressor_bytesWritten, bytes_written); + + (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufOff, input.pos); + (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufLen, input.size - input.pos); + return (jint) output.pos; +} + +JNIEXPORT jstring JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getLibraryName +(JNIEnv *env, jclass class) { +#ifdef UNIX + if (dlsym_ZSTD_isError) { + Dl_info dl_info; + if (dladdr( dlsym_ZSTD_isError, &dl_info)) { + return (*env)->NewStringUTF(env, dl_info.dli_fname); + } + } + return (*env)->NewStringUTF(env, HADOOP_ZSTD_LIBRARY); +#endif +#ifdef WINDOWS + LPWSTR filename = NULL; + GetLibraryName(dlsym_ZSTD_isError, &filename); + if (filename != NULL) { + return (*env)->NewString(env, filename, (jsize) wcslen(filename)); + } else { + return (*env)->NewStringUTF(env, "Unavailable"); + } +#endif +} + +// returns the max size of the recommended input and output buffers +JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getStreamSize +(JNIEnv *env, jobject this) { + int x = (int) dlsym_ZSTD_CStreamInSize(); + int y = (int) dlsym_ZSTD_CStreamOutSize(); + return (x >= y) ? x : y; +} + +#endif //define HADOOP_ZSTD_LIBRARY \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c new file mode 100644 index 0000000000..123675688d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "org_apache_hadoop_io_compress_zstd.h" + +#if defined HADOOP_ZSTD_LIBRARY + +#include +#include +#include + +#ifdef UNIX +#include +#include "config.h" +#endif + +#include "org_apache_hadoop_io_compress_zstd_ZStandardDecompressor.h" + +static jfieldID ZStandardDecompressor_stream; +static jfieldID ZStandardDecompressor_compressedDirectBufOff; +static jfieldID ZStandardDecompressor_bytesInCompressedBuffer; +static jfieldID ZStandardDecompressor_directBufferSize; +static jfieldID ZStandardDecompressor_finished; +static jfieldID ZStandardDecompressor_remaining; + +#ifdef UNIX +static size_t (*dlsym_ZSTD_DStreamOutSize)(void); +static size_t (*dlsym_ZSTD_DStreamInSize)(void); +static ZSTD_DStream* (*dlsym_ZSTD_createDStream)(void); +static size_t (*dlsym_ZSTD_initDStream)(ZSTD_DStream*); +static size_t (*dlsym_ZSTD_freeDStream)(ZSTD_DStream*); +static size_t (*dlsym_ZSTD_resetDStream)(ZSTD_DStream*); +static size_t (*dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); +static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +static unsigned (*dlsym_ZSTD_isError)(size_t); +static const char * (*dlsym_ZSTD_getErrorName)(size_t); +#endif + +#ifdef WINDOWS +typedef size_t (__cdecl *__dlsym_ZSTD_DStreamOutSize)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_DStreamInSize)(void); +typedef ZSTD_DStream* (__cdecl *__dlsym_ZSTD_createDStream)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_initDStream)(ZSTD_DStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_freeDStream)(ZSTD_DStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_resetDStream)(ZSTD_DStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); +typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t); +typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t); + +static __dlsym_ZSTD_DStreamOutSize dlsym_ZSTD_DStreamOutSize; +static __dlsym_ZSTD_DStreamInSize dlsym_ZSTD_DStreamInSize; +static __dlsym_ZSTD_createDStream dlsym_ZSTD_createDStream; +static __dlsym_ZSTD_initDStream dlsym_ZSTD_initDStream; +static __dlsym_ZSTD_freeDStream dlsym_ZSTD_freeDStream; +static __dlsym_ZSTD_resetDStream dlsym_ZSTD_resetDStream; +static __dlsym_ZSTD_decompressStream dlsym_ZSTD_decompressStream; +static __dlsym_ZSTD_isError dlsym_ZSTD_isError; +static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName; +static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream; +#endif + +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_initIDs (JNIEnv *env, jclass clazz) { + // Load libzstd.so +#ifdef UNIX + void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); + if (!libzstd) { + char* msg = (char*)malloc(1000); + snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, dlerror()); + THROW(env, "java/lang/UnsatisfiedLinkError", msg); + return; + } +#endif + +#ifdef WINDOWS + HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY); + if (!libzstd) { + THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll"); + return; + } +#endif + +#ifdef UNIX + dlerror(); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createDStream, env, libzstd, "ZSTD_createDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeDStream, env, libzstd, "ZSTD_freeDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); +#endif + +#ifdef WINDOWS + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamOutSize, dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamInSize, dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createDStream, dlsym_ZSTD_createDStream, env, libzstd, "ZSTD_createDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initDStream, dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeDStream, dlsym_ZSTD_freeDStream, env, libzstd, "ZSTD_freeDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_resetDStream, dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_decompressStream, dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); +#endif + + ZStandardDecompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J"); + ZStandardDecompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z"); + ZStandardDecompressor_compressedDirectBufOff = (*env)->GetFieldID(env, clazz, "compressedDirectBufOff", "I"); + ZStandardDecompressor_bytesInCompressedBuffer = (*env)->GetFieldID(env, clazz, "bytesInCompressedBuffer", "I"); + ZStandardDecompressor_directBufferSize = (*env)->GetFieldID(env, clazz, "directBufferSize", "I"); + ZStandardDecompressor_remaining = (*env)->GetFieldID(env, clazz, "remaining", "I"); +} + +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_create(JNIEnv *env, jobject this) { + ZSTD_DStream * stream = dlsym_ZSTD_createDStream(); + if (stream == NULL) { + THROW(env, "java/lang/InternalError", "Error creating stream"); + return (jlong) 0; + } + return (jlong) stream; +} + +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_init(JNIEnv *env, jobject this, jlong stream) { + size_t result = dlsym_ZSTD_initDStream((ZSTD_DStream *) stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } + (*env)->SetLongField(env, this, ZStandardDecompressor_remaining, 0); +} + + +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_free(JNIEnv *env, jclass obj, jlong stream) { + size_t result = dlsym_ZSTD_freeDStream((ZSTD_DStream *) stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } +} + +JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_inflateBytesDirect +(JNIEnv *env, jobject this, jobject compressed_direct_buf, jint compressed_direct_buf_off, jint compressed_direct_buf_len, jobject uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint uncompressed_direct_buf_len) { + ZSTD_DStream *stream = (ZSTD_DStream *) (*env)->GetLongField(env, this, ZStandardDecompressor_stream); + if (!stream) { + THROW(env, "java/lang/NullPointerException", NULL); + return (jint)0; + } + + // Get the input direct buffer + void * compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf); + if (!compressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for compressedDirectBuf"); + return (jint) 0; + } + + // Get the output direct buffer + void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); + if (!uncompressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for uncompressedDirectBuf"); + return (jint) 0; + } + uncompressed_bytes = ((char*) uncompressed_bytes) + uncompressed_direct_buf_off; + + ZSTD_inBuffer input = { compressed_bytes, compressed_direct_buf_len, compressed_direct_buf_off }; + ZSTD_outBuffer output = { uncompressed_bytes, uncompressed_direct_buf_len, 0 }; + + size_t const size = dlsym_ZSTD_decompressStream(stream, &output, &input); + + // check for errors + if (dlsym_ZSTD_isError(size)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); + return (jint) 0; + } + int remaining = input.size - input.pos; + (*env)->SetIntField(env, this, ZStandardDecompressor_remaining, remaining); + + // the entire frame has been decoded + if (size == 0) { + (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, JNI_TRUE); + size_t result = dlsym_ZSTD_initDStream(stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return (jint) 0; + } + } + (*env)->SetIntField(env, this, ZStandardDecompressor_compressedDirectBufOff, input.pos); + (*env)->SetIntField(env, this, ZStandardDecompressor_bytesInCompressedBuffer, input.size); + return (jint) output.pos; +} + +// returns the max size of the recommended input and output buffers +JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_getStreamSize +(JNIEnv *env, jclass obj) { + int x = (int) dlsym_ZSTD_DStreamInSize(); + int y = (int) dlsym_ZSTD_DStreamOutSize(); + return (x >= y) ? x : y; +} + +#endif //define HADOOP_ZSTD_LIBRARY \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h new file mode 100644 index 0000000000..78fc0a4a0b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H +#define ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H + +#include "org_apache_hadoop.h" + +#ifdef UNIX +#include +#endif + +#include +#include +#include + + +#endif //ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c index ae8263aac6..1bd7fa18bf 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c @@ -39,6 +39,17 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSup #endif } +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsZstd + (JNIEnv *env, jclass clazz) +{ +#ifdef HADOOP_ZSTD_LIBRARY + return JNI_TRUE; +#else + return JNI_FALSE; +#endif +} + + JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsOpenssl (JNIEnv *env, jclass clazz) { diff --git a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec index df46e32b3b..99b6fb2a38 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec +++ b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec @@ -17,4 +17,5 @@ org.apache.hadoop.io.compress.DeflateCodec org.apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.Lz4Codec org.apache.hadoop.io.compress.SnappyCodec +org.apache.hadoop.io.compress.ZStandardCodec diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm index 04ff4265cd..e4f720cee8 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm +++ b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm @@ -118,6 +118,7 @@ NativeLibraryChecker is a tool to check whether native libraries are loaded corr hadoop: true /home/ozawa/hadoop/lib/native/libhadoop.so.1.0.0 zlib: true /lib/x86_64-linux-gnu/libz.so.1 snappy: true /usr/lib/libsnappy.so.1 + zstd: true /usr/lib/libzstd.so.1 lz4: true revision:99 bzip2: false diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 5443ca768b..3955aa20b9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -504,6 +504,18 @@ public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundExcept sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000); } + @Test(timeout=20000) + public void testSequenceFileZStandardCodec() throws Exception { + assumeTrue(ZStandardCodec.isNativeCodeLoaded()); + Configuration conf = new Configuration(); + sequenceFileCodecTest(conf, 0, + "org.apache.hadoop.io.compress.ZStandardCodec", 100); + sequenceFileCodecTest(conf, 100, + "org.apache.hadoop.io.compress.ZStandardCodec", 100); + sequenceFileCodecTest(conf, 200000, + "org.apache.hadoop.io.compress.ZStandardCodec", 1000000); + } + @Test(timeout=20000) public void testSequenceFileBZip2NativeCodec() throws IOException, ClassNotFoundException, InstantiationException, diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java index 2d75a2d2f8..dd7bdd21f5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java @@ -37,6 +37,7 @@ import org.junit.Test; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; public class TestCompressionStreamReuse { private static final Log LOG = LogFactory @@ -58,6 +59,13 @@ public void testGzipCompressStreamReuse() throws IOException { "org.apache.hadoop.io.compress.GzipCodec"); } + @Test + public void testZStandardCompressStreamReuse() throws IOException { + assumeTrue(ZStandardCodec.isNativeCodeLoaded()); + resetStateTest(conf, seed, count, + "org.apache.hadoop.io.compress.ZStandardCodec"); + } + @Test public void testGzipCompressStreamReuseWithParam() throws IOException { Configuration conf = new Configuration(this.conf); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java new file mode 100644 index 0000000000..04def24de7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java @@ -0,0 +1,485 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress.zstd; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.CompressorStream; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DecompressorStream; +import org.apache.hadoop.io.compress.ZStandardCodec; +import org.apache.hadoop.test.MultithreadedTestUtil; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +public class TestZStandardCompressorDecompressor { + private final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); + private static final Random RANDOM = new Random(12345L); + private static final Configuration CONFIGURATION = new Configuration(); + private static File compressedFile; + private static File uncompressedFile; + + @BeforeClass + public static void beforeClass() throws Exception { + CONFIGURATION.setInt(IO_FILE_BUFFER_SIZE_KEY, 1024 * 64); + uncompressedFile = new File(TestZStandardCompressorDecompressor.class + .getResource("/zstd/test_file.txt").toURI()); + compressedFile = new File(TestZStandardCompressorDecompressor.class + .getResource("/zstd/test_file.txt.zst").toURI()); + } + + @Before + public void before() throws Exception { + assumeTrue(ZStandardCodec.isNativeCodeLoaded()); + } + + @Test + public void testCompressionCompressesCorrectly() throws Exception { + int uncompressedSize = (int) FileUtils.sizeOf(uncompressedFile); + byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile); + assertEquals(uncompressedSize, bytes.length); + + Configuration conf = new Configuration(); + ZStandardCodec codec = new ZStandardCodec(); + codec.setConf(conf); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Compressor compressor = codec.createCompressor(); + CompressionOutputStream outputStream = + codec.createOutputStream(baos, compressor); + + for (byte aByte : bytes) { + outputStream.write(aByte); + } + + outputStream.finish(); + outputStream.close(); + assertEquals(uncompressedSize, compressor.getBytesRead()); + assertTrue(compressor.finished()); + + // just make sure we can decompress the file + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + Decompressor decompressor = codec.createDecompressor(); + CompressionInputStream inputStream = + codec.createInputStream(bais, decompressor); + byte[] buffer = new byte[100]; + int n = buffer.length; + while ((n = inputStream.read(buffer, 0, n)) != -1) { + byteArrayOutputStream.write(buffer, 0, n); + } + assertArrayEquals(bytes, byteArrayOutputStream.toByteArray()); + } + + @Test(expected = NullPointerException.class) + public void testCompressorSetInputNullPointerException() { + ZStandardCompressor compressor = new ZStandardCompressor(); + compressor.setInput(null, 0, 10); + } + + //test on NullPointerException in {@code decompressor.setInput()} + @Test(expected = NullPointerException.class) + public void testDecompressorSetInputNullPointerException() { + ZStandardDecompressor decompressor = + new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT); + decompressor.setInput(null, 0, 10); + } + + //test on ArrayIndexOutOfBoundsException in {@code compressor.setInput()} + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void testCompressorSetInputAIOBException() { + ZStandardCompressor compressor = new ZStandardCompressor(); + compressor.setInput(new byte[] {}, -5, 10); + } + + //test on ArrayIndexOutOfBoundsException in {@code decompressor.setInput()} + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void testDecompressorSetInputAIOUBException() { + ZStandardDecompressor decompressor = + new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT); + decompressor.setInput(new byte[] {}, -5, 10); + } + + //test on NullPointerException in {@code compressor.compress()} + @Test(expected = NullPointerException.class) + public void testCompressorCompressNullPointerException() throws Exception { + ZStandardCompressor compressor = new ZStandardCompressor(); + byte[] bytes = generate(1024 * 6); + compressor.setInput(bytes, 0, bytes.length); + compressor.compress(null, 0, 0); + } + + //test on NullPointerException in {@code decompressor.decompress()} + @Test(expected = NullPointerException.class) + public void testDecompressorCompressNullPointerException() throws Exception { + ZStandardDecompressor decompressor = + new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT); + byte[] bytes = generate(1024 * 6); + decompressor.setInput(bytes, 0, bytes.length); + decompressor.decompress(null, 0, 0); + } + + //test on ArrayIndexOutOfBoundsException in {@code compressor.compress()} + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void testCompressorCompressAIOBException() throws Exception { + ZStandardCompressor compressor = new ZStandardCompressor(); + byte[] bytes = generate(1024 * 6); + compressor.setInput(bytes, 0, bytes.length); + compressor.compress(new byte[] {}, 0, -1); + } + + //test on ArrayIndexOutOfBoundsException in decompressor.decompress() + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void testDecompressorCompressAIOBException() throws Exception { + ZStandardDecompressor decompressor = + new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT); + byte[] bytes = generate(1024 * 6); + decompressor.setInput(bytes, 0, bytes.length); + decompressor.decompress(new byte[] {}, 0, -1); + } + + // test ZStandardCompressor compressor.compress() + @Test + public void testSetInputWithBytesSizeMoreThenDefaultZStandardBufferSize() + throws Exception { + int bytesSize = 1024 * 2056 + 1; + ZStandardCompressor compressor = new ZStandardCompressor(); + byte[] bytes = generate(bytesSize); + assertTrue("needsInput error !!!", compressor.needsInput()); + compressor.setInput(bytes, 0, bytes.length); + byte[] emptyBytes = new byte[bytesSize]; + int cSize = compressor.compress(emptyBytes, 0, bytes.length); + assertTrue(cSize > 0); + } + + // test compress/decompress process through + // CompressionOutputStream/CompressionInputStream api + @Test + public void testCompressorDecompressorLogicWithCompressionStreams() + throws Exception { + DataOutputStream deflateOut = null; + DataInputStream inflateIn = null; + int byteSize = 1024 * 100; + byte[] bytes = generate(byteSize); + int bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT; + try { + DataOutputBuffer compressedDataBuffer = new DataOutputBuffer(); + CompressionOutputStream deflateFilter = + new CompressorStream(compressedDataBuffer, new ZStandardCompressor(), + bufferSize); + deflateOut = + new DataOutputStream(new BufferedOutputStream(deflateFilter)); + deflateOut.write(bytes, 0, bytes.length); + deflateOut.flush(); + deflateFilter.finish(); + + DataInputBuffer deCompressedDataBuffer = new DataInputBuffer(); + deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, + compressedDataBuffer.getLength()); + + CompressionInputStream inflateFilter = + new DecompressorStream(deCompressedDataBuffer, + new ZStandardDecompressor(bufferSize), bufferSize); + + inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter)); + + byte[] result = new byte[byteSize]; + inflateIn.read(result); + assertArrayEquals("original array not equals compress/decompressed array", + result, bytes); + } finally { + IOUtils.closeQuietly(deflateOut); + IOUtils.closeQuietly(inflateIn); + } + } + + @Test + public void testZStandardCompressDecompressInMultiThreads() throws Exception { + MultithreadedTestUtil.TestContext ctx = + new MultithreadedTestUtil.TestContext(); + for (int i = 0; i < 10; i++) { + ctx.addThread(new MultithreadedTestUtil.TestingThread(ctx) { + @Override + public void doWork() throws Exception { + testCompressDecompress(); + } + }); + } + ctx.startThreads(); + + ctx.waitFor(60000); + } + + @Test + public void testCompressDecompress() throws Exception { + byte[] rawData; + int rawDataSize; + rawDataSize = IO_FILE_BUFFER_SIZE_DEFAULT; + rawData = generate(rawDataSize); + ZStandardCompressor compressor = new ZStandardCompressor(); + ZStandardDecompressor decompressor = + new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT); + assertFalse(compressor.finished()); + compressor.setInput(rawData, 0, rawData.length); + assertEquals(0, compressor.getBytesRead()); + compressor.finish(); + + byte[] compressedResult = new byte[rawDataSize]; + int cSize = compressor.compress(compressedResult, 0, rawDataSize); + assertEquals(rawDataSize, compressor.getBytesRead()); + assertTrue(cSize < rawDataSize); + decompressor.setInput(compressedResult, 0, cSize); + byte[] decompressedBytes = new byte[rawDataSize]; + decompressor.decompress(decompressedBytes, 0, decompressedBytes.length); + assertEquals(bytesToHex(rawData), bytesToHex(decompressedBytes)); + compressor.reset(); + decompressor.reset(); + } + + @Test + public void testCompressingWithOneByteOutputBuffer() throws Exception { + int uncompressedSize = (int) FileUtils.sizeOf(uncompressedFile); + byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile); + assertEquals(uncompressedSize, bytes.length); + + Configuration conf = new Configuration(); + ZStandardCodec codec = new ZStandardCodec(); + codec.setConf(conf); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Compressor compressor = + new ZStandardCompressor(3, IO_FILE_BUFFER_SIZE_DEFAULT, 1); + CompressionOutputStream outputStream = + codec.createOutputStream(baos, compressor); + + for (byte aByte : bytes) { + outputStream.write(aByte); + } + + outputStream.finish(); + outputStream.close(); + assertEquals(uncompressedSize, compressor.getBytesRead()); + assertTrue(compressor.finished()); + + // just make sure we can decompress the file + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + Decompressor decompressor = codec.createDecompressor(); + CompressionInputStream inputStream = + codec.createInputStream(bais, decompressor); + byte[] buffer = new byte[100]; + int n = buffer.length; + while ((n = inputStream.read(buffer, 0, n)) != -1) { + byteArrayOutputStream.write(buffer, 0, n); + } + assertArrayEquals(bytes, byteArrayOutputStream.toByteArray()); + + } + + @Test + public void testZStandardCompressDecompress() throws Exception { + byte[] rawData = null; + int rawDataSize = 0; + rawDataSize = IO_FILE_BUFFER_SIZE_DEFAULT; + rawData = generate(rawDataSize); + ZStandardCompressor compressor = new ZStandardCompressor(); + ZStandardDecompressor decompressor = new ZStandardDecompressor(rawDataSize); + assertTrue(compressor.needsInput()); + assertFalse("testZStandardCompressDecompress finished error", + compressor.finished()); + compressor.setInput(rawData, 0, rawData.length); + compressor.finish(); + + byte[] compressedResult = new byte[rawDataSize]; + int cSize = compressor.compress(compressedResult, 0, rawDataSize); + assertEquals(rawDataSize, compressor.getBytesRead()); + assertTrue("compressed size no less then original size", + cSize < rawDataSize); + decompressor.setInput(compressedResult, 0, cSize); + byte[] decompressedBytes = new byte[rawDataSize]; + decompressor.decompress(decompressedBytes, 0, decompressedBytes.length); + String decompressed = bytesToHex(decompressedBytes); + String original = bytesToHex(rawData); + assertEquals(original, decompressed); + compressor.reset(); + decompressor.reset(); + } + + @Test + public void testDecompressingOutput() throws Exception { + byte[] expectedDecompressedResult = + FileUtils.readFileToByteArray(uncompressedFile); + ZStandardCodec codec = new ZStandardCodec(); + codec.setConf(CONFIGURATION); + CompressionInputStream inputStream = codec + .createInputStream(FileUtils.openInputStream(compressedFile), + codec.createDecompressor()); + + byte[] toDecompress = new byte[100]; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] decompressedResult; + int totalFileSize = 0; + int result = toDecompress.length; + try { + while ((result = inputStream.read(toDecompress, 0, result)) != -1) { + baos.write(toDecompress, 0, result); + totalFileSize += result; + } + decompressedResult = baos.toByteArray(); + } finally { + IOUtils.closeQuietly(baos); + } + + assertEquals(decompressedResult.length, totalFileSize); + assertEquals(bytesToHex(expectedDecompressedResult), + bytesToHex(decompressedResult)); + } + + @Test + public void testZStandardDirectCompressDecompress() throws Exception { + int[] size = {1, 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 }; + for (int aSize : size) { + System.out.println("aSize = " + aSize); + compressDecompressLoop(aSize); + } + } + + private void compressDecompressLoop(int rawDataSize) throws IOException { + byte[] rawData = null; + rawData = generate(rawDataSize); + ByteArrayOutputStream baos = new ByteArrayOutputStream(rawDataSize + 12); + CompressionOutputStream deflateFilter = + new CompressorStream(baos, new ZStandardCompressor(), 4096); + DataOutputStream deflateOut = + new DataOutputStream(new BufferedOutputStream(deflateFilter)); + + deflateOut.write(rawData, 0, rawData.length); + deflateOut.flush(); + deflateFilter.finish(); + byte[] compressedResult = baos.toByteArray(); + int compressedSize = compressedResult.length; + ZStandardDecompressor.ZStandardDirectDecompressor decompressor = + new ZStandardDecompressor.ZStandardDirectDecompressor(4096); + + ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize); + ByteBuffer outBuf = ByteBuffer.allocateDirect(8096); + + inBuf.put(compressedResult, 0, compressedSize); + inBuf.flip(); + + ByteBuffer expected = ByteBuffer.wrap(rawData); + + outBuf.clear(); + while (!decompressor.finished()) { + decompressor.decompress(inBuf, outBuf); + if (outBuf.remaining() == 0) { + outBuf.flip(); + while (outBuf.remaining() > 0) { + assertEquals(expected.get(), outBuf.get()); + } + outBuf.clear(); + } + } + outBuf.flip(); + while (outBuf.remaining() > 0) { + assertEquals(expected.get(), outBuf.get()); + } + outBuf.clear(); + + assertEquals(0, expected.remaining()); + } + + @Test + public void testReadingWithAStream() throws Exception { + FileInputStream inputStream = FileUtils.openInputStream(compressedFile); + ZStandardCodec codec = new ZStandardCodec(); + codec.setConf(CONFIGURATION); + Decompressor decompressor = codec.createDecompressor(); + CompressionInputStream cis = + codec.createInputStream(inputStream, decompressor); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] resultOfDecompression; + try { + byte[] buffer = new byte[100]; + int n; + while ((n = cis.read(buffer, 0, buffer.length)) != -1) { + baos.write(buffer, 0, n); + } + resultOfDecompression = baos.toByteArray(); + } finally { + IOUtils.closeQuietly(baos); + IOUtils.closeQuietly(cis); + } + + byte[] expected = FileUtils.readFileToByteArray(uncompressedFile); + assertEquals(bytesToHex(expected), bytesToHex(resultOfDecompression)); + } + + @Test + public void testDecompressReturnsWhenNothingToDecompress() throws Exception { + ZStandardDecompressor decompressor = + new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT); + int result = decompressor.decompress(new byte[10], 0, 10); + assertEquals(0, result); + } + + public static byte[] generate(int size) { + byte[] data = new byte[size]; + for (int i = 0; i < size; i++) { + data[i] = (byte) RANDOM.nextInt(16); + } + return data; + } + + private static String bytesToHex(byte[] bytes) { + char[] hexChars = new char[bytes.length * 2]; + for (int j = 0; j < bytes.length; j++) { + int v = bytes[j] & 0xFF; + hexChars[j * 2] = HEX_ARRAY[v >>> 4]; + hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; + } + return new String(hexChars); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/resources/zstd/test_file.txt b/hadoop-common-project/hadoop-common/src/test/resources/zstd/test_file.txt new file mode 100644 index 0000000000..60432f7a4e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/resources/zstd/test_file.txt @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +Apache License + +Version 2.0, January 2004 + +http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. + +"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: + +You must give any other recipients of the Work or Derivative Works a copy of this License; and +You must cause any modified files to carry prominent notices stating that You changed the files; and +You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and +If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. + +You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. +5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/resources/zstd/test_file.txt.zst b/hadoop-common-project/hadoop-common/src/test/resources/zstd/test_file.txt.zst new file mode 100644 index 0000000000..0384b4fce2 Binary files /dev/null and b/hadoop-common-project/hadoop-common/src/test/resources/zstd/test_file.txt.zst differ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/pom.xml index 9f12986cbc..2c8264df2c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/pom.xml @@ -109,6 +109,10 @@ false + + + + false @@ -197,6 +201,10 @@ ${snappy.prefix} ${snappy.lib} ${snappy.include} + ${require.zstd} + ${zstd.prefix} + ${zstd.lib} + ${zstd.include} diff --git a/hadoop-project-dist/pom.xml b/hadoop-project-dist/pom.xml index e0d2be9f0d..730ad05756 100644 --- a/hadoop-project-dist/pom.xml +++ b/hadoop-project-dist/pom.xml @@ -41,6 +41,9 @@ false false + + false + false false @@ -363,6 +366,9 @@ --snappybinbundle=${bundle.snappy.in.bin} --snappylib=${snappy.lib} --snappylibbundle=${bundle.snappy} + --zstdbinbundle=${bundle.zstd.in.bin} + --zstdlib=${zstd.lib} + --zstdlibbundle=${bundle.zstd} diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index a93529287e..4751fc3b84 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -1596,6 +1596,7 @@ file:/dev/urandom true + true true @@ -1607,6 +1608,7 @@ ${env.PATH};${hadoop.common.build.dir}/bin;${snappy.lib} + ${env.PATH};${hadoop.common.build.dir}/bin;${zstd.lib} ${env.PATH};${hadoop.common.build.dir}/bin;${openssl.lib} ${env.PATH};${hadoop.common.build.dir}/bin;${isal.lib}