HADOOP-17125. Use snappy-java in SnappyCodec (#2297)

This switches the SnappyCodec to use the java-snappy codec, rather than the native one.

To use the codec, snappy-java.jar (from org.xerial.snappy) needs to be on the classpath.

This comesin as an avro dependency,  so it is already on the hadoop-common classpath,
as well as in hadoop-common/lib.
The version used is now managed in the hadoop-project POM; initially 1.1.7.7

Contributed by DB Tsai and Liang-Chi Hsieh

Change-Id: Id52a404a0005480e68917cd17f0a27b7744aea4e
This commit is contained in:
Liang-Chi Hsieh 2020-10-06 09:07:54 -07:00 committed by Steve Loughran
parent 3ae78e40bf
commit 8f60a90688
22 changed files with 109 additions and 561 deletions

View File

@ -75,7 +75,7 @@ Installing required packages for clean install of Ubuntu 14.04 LTS Desktop:
Optional packages: Optional packages:
* Snappy compression * Snappy compression (only used for hadoop-mapreduce-client-nativetask)
$ sudo apt-get install snappy libsnappy-dev $ sudo apt-get install snappy libsnappy-dev
* Intel ISA-L library for erasure coding * Intel ISA-L library for erasure coding
Please refer to https://01.org/intel%C2%AE-storage-acceleration-library-open-source-version Please refer to https://01.org/intel%C2%AE-storage-acceleration-library-open-source-version
@ -161,7 +161,8 @@ Maven build goals:
Snappy is a compression library that can be utilized by the native code. Snappy is a compression library that can be utilized by the native code.
It is currently an optional component, meaning that Hadoop can be built with It is currently an optional component, meaning that Hadoop can be built with
or without this dependency. or without this dependency. Snappy library as optional dependency is only
used for hadoop-mapreduce-client-nativetask.
* Use -Drequire.snappy to fail the build if libsnappy.so is not found. * Use -Drequire.snappy to fail the build if libsnappy.so is not found.
If this option is not specified and the snappy library is missing, If this option is not specified and the snappy library is missing,

View File

@ -111,9 +111,6 @@ for i in "$@"; do
--openssllibbundle=*) --openssllibbundle=*)
OPENSSLLIBBUNDLE=${i#*=} OPENSSLLIBBUNDLE=${i#*=}
;; ;;
--snappybinbundle=*)
SNAPPYBINBUNDLE=${i#*=}
;;
--snappylib=*) --snappylib=*)
SNAPPYLIB=${i#*=} SNAPPYLIB=${i#*=}
;; ;;
@ -176,8 +173,6 @@ if [[ -d "${BIN_DIR}" ]] ; then
exit 1 exit 1
fi fi
bundle_native_bin "${SNAPPYBINBUNDLE}" "${SNAPPYLIBBUNDLE}" "snappy.lib" "snappy" "${SNAPPYLIB}"
bundle_native_bin "${ZSTDBINBUNDLE}" "${ZSTDLIBBUNDLE}" "zstd.lib" "zstd" "${ZSTDLIB}" bundle_native_bin "${ZSTDBINBUNDLE}" "${ZSTDLIBBUNDLE}" "zstd.lib" "zstd" "${ZSTDLIB}"
bundle_native_bin "${OPENSSLBINBUNDLE}" "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}" bundle_native_bin "${OPENSSLBINBUNDLE}" "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}"

View File

@ -362,6 +362,11 @@
<artifactId>wildfly-openssl-java</artifactId> <artifactId>wildfly-openssl-java</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
@ -641,10 +646,6 @@
</activation> </activation>
<properties> <properties>
<require.bzip2>false</require.bzip2> <require.bzip2>false</require.bzip2>
<snappy.prefix></snappy.prefix>
<snappy.lib></snappy.lib>
<snappy.include></snappy.include>
<require.snappy>false</require.snappy>
<zstd.prefix></zstd.prefix> <zstd.prefix></zstd.prefix>
<zstd.lib></zstd.lib> <zstd.lib></zstd.lib>
<zstd.include></zstd.include> <zstd.include></zstd.include>
@ -698,11 +699,7 @@
<GENERATED_JAVAH>${project.build.directory}/native/javah</GENERATED_JAVAH> <GENERATED_JAVAH>${project.build.directory}/native/javah</GENERATED_JAVAH>
<JVM_ARCH_DATA_MODEL>${sun.arch.data.model}</JVM_ARCH_DATA_MODEL> <JVM_ARCH_DATA_MODEL>${sun.arch.data.model}</JVM_ARCH_DATA_MODEL>
<REQUIRE_BZIP2>${require.bzip2}</REQUIRE_BZIP2> <REQUIRE_BZIP2>${require.bzip2}</REQUIRE_BZIP2>
<REQUIRE_SNAPPY>${require.snappy}</REQUIRE_SNAPPY>
<REQUIRE_ZSTD>${require.zstd}</REQUIRE_ZSTD> <REQUIRE_ZSTD>${require.zstd}</REQUIRE_ZSTD>
<CUSTOM_SNAPPY_PREFIX>${snappy.prefix}</CUSTOM_SNAPPY_PREFIX>
<CUSTOM_SNAPPY_LIB>${snappy.lib} </CUSTOM_SNAPPY_LIB>
<CUSTOM_SNAPPY_INCLUDE>${snappy.include} </CUSTOM_SNAPPY_INCLUDE>
<CUSTOM_ZSTD_PREFIX>${zstd.prefix}</CUSTOM_ZSTD_PREFIX> <CUSTOM_ZSTD_PREFIX>${zstd.prefix}</CUSTOM_ZSTD_PREFIX>
<CUSTOM_ZSTD_LIB>${zstd.lib} </CUSTOM_ZSTD_LIB> <CUSTOM_ZSTD_LIB>${zstd.lib} </CUSTOM_ZSTD_LIB>
<CUSTOM_ZSTD_INCLUDE>${zstd.include} </CUSTOM_ZSTD_INCLUDE> <CUSTOM_ZSTD_INCLUDE>${zstd.include} </CUSTOM_ZSTD_INCLUDE>
@ -757,14 +754,9 @@
</os> </os>
</activation> </activation>
<properties> <properties>
<snappy.prefix></snappy.prefix>
<snappy.lib></snappy.lib>
<snappy.include></snappy.include>
<require.isal>false</require.isal> <require.isal>false</require.isal>
<isal.prefix></isal.prefix> <isal.prefix></isal.prefix>
<isal.lib></isal.lib> <isal.lib></isal.lib>
<require.snappy>false</require.snappy>
<bundle.snappy.in.bin>true</bundle.snappy.in.bin>
<zstd.prefix></zstd.prefix> <zstd.prefix></zstd.prefix>
<zstd.lib></zstd.lib> <zstd.lib></zstd.lib>
<zstd.include></zstd.include> <zstd.include></zstd.include>
@ -864,10 +856,6 @@
<argument>/nologo</argument> <argument>/nologo</argument>
<argument>/p:Configuration=Release</argument> <argument>/p:Configuration=Release</argument>
<argument>/p:OutDir=${project.build.directory}/bin/</argument> <argument>/p:OutDir=${project.build.directory}/bin/</argument>
<argument>/p:CustomSnappyPrefix=${snappy.prefix}</argument>
<argument>/p:CustomSnappyLib=${snappy.lib}</argument>
<argument>/p:CustomSnappyInclude=${snappy.include}</argument>
<argument>/p:RequireSnappy=${require.snappy}</argument>
<argument>/p:CustomZstdPrefix=${zstd.prefix}</argument> <argument>/p:CustomZstdPrefix=${zstd.prefix}</argument>
<argument>/p:CustomZstdLib=${zstd.lib}</argument> <argument>/p:CustomZstdLib=${zstd.lib}</argument>
<argument>/p:CustomZstdInclude=${zstd.include}</argument> <argument>/p:CustomZstdInclude=${zstd.include}</argument>

View File

@ -67,33 +67,6 @@ else()
endif() endif()
set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES}) set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
# Require snappy.
set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
hadoop_set_find_shared_library_version("1")
find_library(SNAPPY_LIBRARY
NAMES snappy
PATHS ${CUSTOM_SNAPPY_PREFIX} ${CUSTOM_SNAPPY_PREFIX}/lib
${CUSTOM_SNAPPY_PREFIX}/lib64 ${CUSTOM_SNAPPY_LIB})
set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
find_path(SNAPPY_INCLUDE_DIR
NAMES snappy.h
PATHS ${CUSTOM_SNAPPY_PREFIX} ${CUSTOM_SNAPPY_PREFIX}/include
${CUSTOM_SNAPPY_INCLUDE})
if(SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
get_filename_component(HADOOP_SNAPPY_LIBRARY ${SNAPPY_LIBRARY} NAME)
set(SNAPPY_SOURCE_FILES
"${SRC}/io/compress/snappy/SnappyCompressor.c"
"${SRC}/io/compress/snappy/SnappyDecompressor.c")
set(REQUIRE_SNAPPY ${REQUIRE_SNAPPY}) # Stop warning about unused variable.
message(STATUS "Found Snappy: ${SNAPPY_LIBRARY}")
else()
set(SNAPPY_INCLUDE_DIR "")
set(SNAPPY_SOURCE_FILES "")
if(REQUIRE_SNAPPY)
message(FATAL_ERROR "Required snappy library could not be found. SNAPPY_LIBRARY=${SNAPPY_LIBRARY}, SNAPPY_INCLUDE_DIR=${SNAPPY_INCLUDE_DIR}, CUSTOM_SNAPPY_INCLUDE_DIR=${CUSTOM_SNAPPY_INCLUDE_DIR}, CUSTOM_SNAPPY_PREFIX=${CUSTOM_SNAPPY_PREFIX}, CUSTOM_SNAPPY_INCLUDE=${CUSTOM_SNAPPY_INCLUDE}")
endif()
endif()
# Require zstandard # Require zstandard
SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
hadoop_set_find_shared_library_version("1") hadoop_set_find_shared_library_version("1")
@ -253,7 +226,6 @@ include_directories(
${JNI_INCLUDE_DIRS} ${JNI_INCLUDE_DIRS}
${ZLIB_INCLUDE_DIRS} ${ZLIB_INCLUDE_DIRS}
${BZIP2_INCLUDE_DIR} ${BZIP2_INCLUDE_DIR}
${SNAPPY_INCLUDE_DIR}
${ISAL_INCLUDE_DIR} ${ISAL_INCLUDE_DIR}
${ZSTD_INCLUDE_DIR} ${ZSTD_INCLUDE_DIR}
${OPENSSL_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR}
@ -269,7 +241,6 @@ hadoop_add_dual_library(hadoop
${SRC}/io/compress/lz4/lz4.c ${SRC}/io/compress/lz4/lz4.c
${SRC}/io/compress/lz4/lz4hc.c ${SRC}/io/compress/lz4/lz4hc.c
${ISAL_SOURCE_FILES} ${ISAL_SOURCE_FILES}
${SNAPPY_SOURCE_FILES}
${ZSTD_SOURCE_FILES} ${ZSTD_SOURCE_FILES}
${OPENSSL_SOURCE_FILES} ${OPENSSL_SOURCE_FILES}
${SRC}/io/compress/zlib/ZlibCompressor.c ${SRC}/io/compress/zlib/ZlibCompressor.c

View File

@ -92,7 +92,7 @@ public interface Decompressor {
* {@link #needsInput()} should be called in order to determine if more * {@link #needsInput()} should be called in order to determine if more
* input data is required. * input data is required.
* *
* @param b Buffer for the compressed data * @param b Buffer for the uncompressed data
* @param off Start offset of the data * @param off Start offset of the data
* @param len Size of the buffer * @param len Size of the buffer
* @return The actual number of bytes of uncompressed data. * @return The actual number of bytes of uncompressed data.

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor; import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor; import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.NativeCodeLoader;
/** /**
* This class creates snappy compressors/decompressors. * This class creates snappy compressors/decompressors.
@ -56,37 +55,6 @@ public class SnappyCodec implements Configurable, CompressionCodec, DirectDecomp
return conf; return conf;
} }
/**
* Are the native snappy libraries loaded &amp; initialized?
*/
public static void checkNativeCodeLoaded() {
if (!NativeCodeLoader.buildSupportsSnappy()) {
throw new RuntimeException("native snappy library not available: " +
"this version of libhadoop was built without " +
"snappy support.");
}
if (!NativeCodeLoader.isNativeCodeLoaded()) {
throw new RuntimeException("Failed to load libhadoop.");
}
if (!SnappyCompressor.isNativeCodeLoaded()) {
throw new RuntimeException("native snappy library not available: " +
"SnappyCompressor has not been loaded.");
}
if (!SnappyDecompressor.isNativeCodeLoaded()) {
throw new RuntimeException("native snappy library not available: " +
"SnappyDecompressor has not been loaded.");
}
}
public static boolean isNativeCodeLoaded() {
return SnappyCompressor.isNativeCodeLoaded() &&
SnappyDecompressor.isNativeCodeLoaded();
}
public static String getLibraryName() {
return SnappyCompressor.getLibraryName();
}
/** /**
* Create a {@link CompressionOutputStream} that will write to the given * Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream}. * {@link OutputStream}.
@ -115,7 +83,6 @@ public class SnappyCodec implements Configurable, CompressionCodec, DirectDecomp
public CompressionOutputStream createOutputStream(OutputStream out, public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) Compressor compressor)
throws IOException { throws IOException {
checkNativeCodeLoaded();
int bufferSize = conf.getInt( int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT); CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
@ -133,7 +100,6 @@ public class SnappyCodec implements Configurable, CompressionCodec, DirectDecomp
*/ */
@Override @Override
public Class<? extends Compressor> getCompressorType() { public Class<? extends Compressor> getCompressorType() {
checkNativeCodeLoaded();
return SnappyCompressor.class; return SnappyCompressor.class;
} }
@ -144,7 +110,6 @@ public class SnappyCodec implements Configurable, CompressionCodec, DirectDecomp
*/ */
@Override @Override
public Compressor createCompressor() { public Compressor createCompressor() {
checkNativeCodeLoaded();
int bufferSize = conf.getInt( int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT); CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
@ -179,7 +144,6 @@ public class SnappyCodec implements Configurable, CompressionCodec, DirectDecomp
public CompressionInputStream createInputStream(InputStream in, public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor) Decompressor decompressor)
throws IOException { throws IOException {
checkNativeCodeLoaded();
return new BlockDecompressorStream(in, decompressor, conf.getInt( return new BlockDecompressorStream(in, decompressor, conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT)); CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
@ -192,7 +156,6 @@ public class SnappyCodec implements Configurable, CompressionCodec, DirectDecomp
*/ */
@Override @Override
public Class<? extends Decompressor> getDecompressorType() { public Class<? extends Decompressor> getDecompressorType() {
checkNativeCodeLoaded();
return SnappyDecompressor.class; return SnappyDecompressor.class;
} }
@ -203,7 +166,6 @@ public class SnappyCodec implements Configurable, CompressionCodec, DirectDecomp
*/ */
@Override @Override
public Decompressor createDecompressor() { public Decompressor createDecompressor() {
checkNativeCodeLoaded();
int bufferSize = conf.getInt( int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT); CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
@ -215,7 +177,7 @@ public class SnappyCodec implements Configurable, CompressionCodec, DirectDecomp
*/ */
@Override @Override
public DirectDecompressor createDirectDecompressor() { public DirectDecompressor createDirectDecompressor() {
return isNativeCodeLoaded() ? new SnappyDirectDecompressor() : null; return new SnappyDirectDecompressor();
} }
/** /**

View File

@ -24,9 +24,9 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
/** /**
* A {@link Compressor} based on the snappy compression algorithm. * A {@link Compressor} based on the snappy compression algorithm.
@ -48,24 +48,6 @@ public class SnappyCompressor implements Compressor {
private long bytesRead = 0L; private long bytesRead = 0L;
private long bytesWritten = 0L; private long bytesWritten = 0L;
private static boolean nativeSnappyLoaded = false;
static {
if (NativeCodeLoader.isNativeCodeLoaded() &&
NativeCodeLoader.buildSupportsSnappy()) {
try {
initIDs();
nativeSnappyLoaded = true;
} catch (Throwable t) {
LOG.error("failed to load SnappyCompressor", t);
}
}
}
public static boolean isNativeCodeLoaded() {
return nativeSnappyLoaded;
}
/** /**
* Creates a new compressor. * Creates a new compressor.
* *
@ -225,7 +207,7 @@ public class SnappyCompressor implements Compressor {
} }
// Compress data // Compress data
n = compressBytesDirect(); n = compressDirectBuf();
compressedDirectBuf.limit(n); compressedDirectBuf.limit(n);
uncompressedDirectBuf.clear(); // snappy consumes all buffer input uncompressedDirectBuf.clear(); // snappy consumes all buffer input
@ -291,9 +273,16 @@ public class SnappyCompressor implements Compressor {
public void end() { public void end() {
} }
private native static void initIDs(); private int compressDirectBuf() throws IOException {
if (uncompressedDirectBufLen == 0) {
private native int compressBytesDirect(); return 0;
} else {
public native static String getLibraryName(); // Set the position and limit of `uncompressedDirectBuf` for reading
uncompressedDirectBuf.limit(uncompressedDirectBufLen).position(0);
int size = Snappy.compress((ByteBuffer) uncompressedDirectBuf,
(ByteBuffer) compressedDirectBuf);
uncompressedDirectBufLen = 0;
return size;
}
}
} }

View File

@ -24,9 +24,9 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DirectDecompressor; import org.apache.hadoop.io.compress.DirectDecompressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
/** /**
* A {@link Decompressor} based on the snappy compression algorithm. * A {@link Decompressor} based on the snappy compression algorithm.
@ -45,24 +45,6 @@ public class SnappyDecompressor implements Decompressor {
private int userBufOff = 0, userBufLen = 0; private int userBufOff = 0, userBufLen = 0;
private boolean finished; private boolean finished;
private static boolean nativeSnappyLoaded = false;
static {
if (NativeCodeLoader.isNativeCodeLoaded() &&
NativeCodeLoader.buildSupportsSnappy()) {
try {
initIDs();
nativeSnappyLoaded = true;
} catch (Throwable t) {
LOG.error("failed to load SnappyDecompressor", t);
}
}
}
public static boolean isNativeCodeLoaded() {
return nativeSnappyLoaded;
}
/** /**
* Creates a new compressor. * Creates a new compressor.
* *
@ -201,7 +183,7 @@ public class SnappyDecompressor implements Decompressor {
* {@link #needsInput()} should be called in order to determine if more * {@link #needsInput()} should be called in order to determine if more
* input data is required. * input data is required.
* *
* @param b Buffer for the compressed data * @param b Buffer for the uncompressed data
* @param off Start offset of the data * @param off Start offset of the data
* @param len Size of the buffer * @param len Size of the buffer
* @return The actual number of bytes of compressed data. * @return The actual number of bytes of compressed data.
@ -232,7 +214,7 @@ public class SnappyDecompressor implements Decompressor {
uncompressedDirectBuf.limit(directBufferSize); uncompressedDirectBuf.limit(directBufferSize);
// Decompress data // Decompress data
n = decompressBytesDirect(); n = decompressDirectBuf();
uncompressedDirectBuf.limit(n); uncompressedDirectBuf.limit(n);
if (userBufLen <= 0) { if (userBufLen <= 0) {
@ -276,10 +258,20 @@ public class SnappyDecompressor implements Decompressor {
// do nothing // do nothing
} }
private native static void initIDs(); private int decompressDirectBuf() throws IOException {
if (compressedDirectBufLen == 0) {
return 0;
} else {
// Set the position and limit of `compressedDirectBuf` for reading
compressedDirectBuf.limit(compressedDirectBufLen).position(0);
int size = Snappy.uncompress((ByteBuffer) compressedDirectBuf,
(ByteBuffer) uncompressedDirectBuf);
compressedDirectBufLen = 0;
compressedDirectBuf.clear();
return size;
}
}
private native int decompressBytesDirect();
int decompressDirect(ByteBuffer src, ByteBuffer dst) throws IOException { int decompressDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
assert (this instanceof SnappyDirectDecompressor); assert (this instanceof SnappyDirectDecompressor);
@ -298,7 +290,7 @@ public class SnappyDecompressor implements Decompressor {
directBufferSize = dst.remaining(); directBufferSize = dst.remaining();
int n = 0; int n = 0;
try { try {
n = decompressBytesDirect(); n = decompressDirectBuf();
presliced.position(presliced.position() + n); presliced.position(presliced.position() + n);
// SNAPPY always consumes the whole buffer or throws an exception // SNAPPY always consumes the whole buffer or throws an exception
src.position(src.limit()); src.position(src.limit());

View File

@ -74,11 +74,6 @@ public final class NativeCodeLoader {
return nativeCodeLoaded; return nativeCodeLoaded;
} }
/**
* Returns true only if this build was compiled with support for snappy.
*/
public static native boolean buildSupportsSnappy();
/** /**
* Returns true only if this build was compiled with support for ISA-L. * Returns true only if this build was compiled with support for ISA-L.
*/ */

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.OpensslCipher; import org.apache.hadoop.crypto.OpensslCipher;
import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.io.compress.zlib.ZlibFactory; import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -67,7 +66,6 @@ public class NativeLibraryChecker {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
boolean nativeHadoopLoaded = NativeCodeLoader.isNativeCodeLoaded(); boolean nativeHadoopLoaded = NativeCodeLoader.isNativeCodeLoaded();
boolean zlibLoaded = false; boolean zlibLoaded = false;
boolean snappyLoaded = false;
boolean isalLoaded = false; boolean isalLoaded = false;
boolean zStdLoaded = false; boolean zStdLoaded = false;
boolean pmdkLoaded = false; boolean pmdkLoaded = false;
@ -80,7 +78,6 @@ public class NativeLibraryChecker {
String openSslDetail = ""; String openSslDetail = "";
String hadoopLibraryName = ""; String hadoopLibraryName = "";
String zlibLibraryName = ""; String zlibLibraryName = "";
String snappyLibraryName = "";
String isalDetail = ""; String isalDetail = "";
String pmdkDetail = ""; String pmdkDetail = "";
String zstdLibraryName = ""; String zstdLibraryName = "";
@ -99,11 +96,6 @@ public class NativeLibraryChecker {
if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) { if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) {
zstdLibraryName = ZStandardCodec.getLibraryName(); zstdLibraryName = ZStandardCodec.getLibraryName();
} }
snappyLoaded = NativeCodeLoader.buildSupportsSnappy() &&
SnappyCodec.isNativeCodeLoaded();
if (snappyLoaded && NativeCodeLoader.buildSupportsSnappy()) {
snappyLibraryName = SnappyCodec.getLibraryName();
}
isalDetail = ErasureCodeNative.getLoadingFailureReason(); isalDetail = ErasureCodeNative.getLoadingFailureReason();
if (isalDetail != null) { if (isalDetail != null) {
@ -152,7 +144,6 @@ public class NativeLibraryChecker {
System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName); System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName);
System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName); System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName);
System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName); System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName);
System.out.printf("snappy: %b %s%n", snappyLoaded, snappyLibraryName);
System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName); System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName);
System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName); System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName);
System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail); System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail);
@ -164,7 +155,7 @@ public class NativeLibraryChecker {
} }
if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) || if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) ||
(checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded (checkAll && !(zlibLoaded && lz4Loaded
&& bzip2Loaded && isalLoaded && zStdLoaded))) { && bzip2Loaded && isalLoaded && zStdLoaded))) {
// return 1 to indicated check failed // return 1 to indicated check failed
ExitUtil.terminate(1); ExitUtil.terminate(1);

View File

@ -17,7 +17,7 @@
limitations under the License. limitations under the License.
--> -->
<Project DefaultTargets="CheckRequireSnappy;CheckRequireIsal;Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> <Project DefaultTargets="CheckRequireIsal;Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations"> <ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Release|Win32"> <ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration> <Configuration>Release</Configuration>
@ -69,15 +69,6 @@
<TargetName>hadoop</TargetName> <TargetName>hadoop</TargetName>
</PropertyGroup> </PropertyGroup>
<PropertyGroup> <PropertyGroup>
<SnappyLib Condition="Exists('$(CustomSnappyPrefix)\snappy.dll')">$(CustomSnappyPrefix)</SnappyLib>
<SnappyLib Condition="Exists('$(CustomSnappyPrefix)\lib\snappy.dll') And '$(SnappyLib)' == ''">$(CustomSnappyPrefix)\lib</SnappyLib>
<SnappyLib Condition="Exists('$(CustomSnappyPrefix)\bin\snappy.dll') And '$(SnappyLib)' == ''">$(CustomSnappyPrefix)\bin</SnappyLib>
<SnappyLib Condition="Exists('$(CustomSnappyLib)') And '$(SnappyLib)' == ''">$(CustomSnappyLib)</SnappyLib>
<SnappyInclude Condition="Exists('$(CustomSnappyPrefix)\snappy.h')">$(CustomSnappyPrefix)</SnappyInclude>
<SnappyInclude Condition="Exists('$(CustomSnappyPrefix)\include\snappy.h') And '$(SnappyInclude)' == ''">$(CustomSnappyPrefix)\include</SnappyInclude>
<SnappyInclude Condition="Exists('$(CustomSnappyInclude)') And '$(SnappyInclude)' == ''">$(CustomSnappyInclude)</SnappyInclude>
<SnappyEnabled Condition="'$(SnappyLib)' != '' And '$(SnappyInclude)' != ''">true</SnappyEnabled>
<IncludePath Condition="'$(SnappyEnabled)' == 'true'">$(SnappyInclude);$(IncludePath)</IncludePath>
<IncludePath Condition="Exists('$(ZLIB_HOME)')">$(ZLIB_HOME);$(IncludePath)</IncludePath> <IncludePath Condition="Exists('$(ZLIB_HOME)')">$(ZLIB_HOME);$(IncludePath)</IncludePath>
</PropertyGroup> </PropertyGroup>
<PropertyGroup> <PropertyGroup>
@ -87,11 +78,6 @@
<IsalLib Condition="Exists('$(CustomIsalLib)') And '$(IsalLib)' == ''">$(CustomIsalLib)</IsalLib> <IsalLib Condition="Exists('$(CustomIsalLib)') And '$(IsalLib)' == ''">$(CustomIsalLib)</IsalLib>
<IsalEnabled Condition="'$(IsalLib)' != ''">true</IsalEnabled> <IsalEnabled Condition="'$(IsalLib)' != ''">true</IsalEnabled>
</PropertyGroup> </PropertyGroup>
<Target Name="CheckRequireSnappy">
<Error
Text="Required snappy library could not be found. SnappyLibrary=$(SnappyLibrary), SnappyInclude=$(SnappyInclude), CustomSnappyLib=$(CustomSnappyLib), CustomSnappyInclude=$(CustomSnappyInclude), CustomSnappyPrefix=$(CustomSnappyPrefix)"
Condition="'$(RequireSnappy)' == 'true' And '$(SnappyEnabled)' != 'true'" />
</Target>
<Target Name="CheckRequireIsal"> <Target Name="CheckRequireIsal">
<Error <Error
Text="Required ISA-L library could not be found. CustomIsalLib=$(CustomIsalLib), CustomIsalPrefix=$(CustomIsalPrefix)" Text="Required ISA-L library could not be found. CustomIsalLib=$(CustomIsalLib), CustomIsalPrefix=$(CustomIsalPrefix)"
@ -140,12 +126,6 @@
</Link> </Link>
</ItemDefinitionGroup> </ItemDefinitionGroup>
<ItemGroup> <ItemGroup>
<ClCompile Include="src\org\apache\hadoop\io\compress\snappy\SnappyCompressor.c" Condition="'$(SnappyEnabled)' == 'true'">
<AdditionalOptions>/D HADOOP_SNAPPY_LIBRARY=L\"snappy.dll\"</AdditionalOptions>
</ClCompile>
<ClCompile Include="src\org\apache\hadoop\io\compress\snappy\SnappyDecompressor.c" Condition="'$(SnappyEnabled)' == 'true'">
<AdditionalOptions>/D HADOOP_SNAPPY_LIBRARY=L\"snappy.dll\"</AdditionalOptions>
</ClCompile>
<ClCompile Include="src\org\apache\hadoop\io\compress\zlib\ZlibCompressor.c" Condition="Exists('$(ZLIB_HOME)')" /> <ClCompile Include="src\org\apache\hadoop\io\compress\zlib\ZlibCompressor.c" Condition="Exists('$(ZLIB_HOME)')" />
<ClCompile Include="src\org\apache\hadoop\io\compress\zlib\ZlibDecompressor.c" Condition="Exists('$(ZLIB_HOME)')" /> <ClCompile Include="src\org\apache\hadoop\io\compress\zlib\ZlibDecompressor.c" Condition="Exists('$(ZLIB_HOME)')" />
<ClCompile Include="src\org\apache\hadoop\io\compress\lz4\lz4.c" /> <ClCompile Include="src\org\apache\hadoop\io\compress\lz4\lz4.c" />
@ -157,7 +137,6 @@
<ClCompile Include="src\org\apache\hadoop\security\JniBasedUnixGroupsMappingWin.c" /> <ClCompile Include="src\org\apache\hadoop\security\JniBasedUnixGroupsMappingWin.c" />
<ClCompile Include="src\org\apache\hadoop\util\bulk_crc32.c" /> <ClCompile Include="src\org\apache\hadoop\util\bulk_crc32.c" />
<ClCompile Include="src\org\apache\hadoop\util\NativeCodeLoader.c"> <ClCompile Include="src\org\apache\hadoop\util\NativeCodeLoader.c">
<AdditionalOptions Condition="'$(SnappyEnabled)' == 'true'">/D HADOOP_SNAPPY_LIBRARY=L\"snappy.dll\"</AdditionalOptions>
<AdditionalOptions Condition="'$(IsalEnabled)' == 'true'">/D HADOOP_ISAL_LIBRARY=L\"isa-l.dll\"</AdditionalOptions> <AdditionalOptions Condition="'$(IsalEnabled)' == 'true'">/D HADOOP_ISAL_LIBRARY=L\"isa-l.dll\"</AdditionalOptions>
</ClCompile> </ClCompile>
<ClCompile Include="src\org\apache\hadoop\util\NativeCrc32.c" /> <ClCompile Include="src\org\apache\hadoop\util\NativeCrc32.c" />
@ -181,7 +160,6 @@
<ItemGroup> <ItemGroup>
<ClInclude Include="..\src\org\apache\hadoop\util\crc32c_tables.h" /> <ClInclude Include="..\src\org\apache\hadoop\util\crc32c_tables.h" />
<ClInclude Include="..\src\org\apache\hadoop\util\crc32_zlib_polynomial_tables.h" /> <ClInclude Include="..\src\org\apache\hadoop\util\crc32_zlib_polynomial_tables.h" />
<ClInclude Include="src\org\apache\hadoop\io\compress\snappy\org_apache_hadoop_io_compress_snappy.h" />
<ClInclude Include="src\org\apache\hadoop\io\compress\zlib\org_apache_hadoop_io_compress_zlib_ZlibCompressor.h" /> <ClInclude Include="src\org\apache\hadoop\io\compress\zlib\org_apache_hadoop_io_compress_zlib_ZlibCompressor.h" />
<ClInclude Include="src\org\apache\hadoop\io\compress\zlib\org_apache_hadoop_io_compress_zlib_ZlibDecompressor.h" /> <ClInclude Include="src\org\apache\hadoop\io\compress\zlib\org_apache_hadoop_io_compress_zlib_ZlibDecompressor.h" />
<ClInclude Include="src\org\apache\hadoop\io\compress\zlib\org_apache_hadoop_io_compress_zlib.h" /> <ClInclude Include="src\org\apache\hadoop\io\compress\zlib\org_apache_hadoop_io_compress_zlib.h" />

View File

@ -1,166 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "org_apache_hadoop_io_compress_snappy.h"
#if defined HADOOP_SNAPPY_LIBRARY
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef UNIX
#include <dlfcn.h>
#include "config.h"
#endif // UNIX
#ifdef WINDOWS
#include "winutils.h"
#endif
#include "org_apache_hadoop_io_compress_snappy_SnappyCompressor.h"
#define JINT_MAX 0x7fffffff
static jfieldID SnappyCompressor_uncompressedDirectBuf;
static jfieldID SnappyCompressor_uncompressedDirectBufLen;
static jfieldID SnappyCompressor_compressedDirectBuf;
static jfieldID SnappyCompressor_directBufferSize;
#ifdef UNIX
static snappy_status (*dlsym_snappy_compress)(const char*, size_t, char*, size_t*);
#endif
#ifdef WINDOWS
typedef snappy_status (__cdecl *__dlsym_snappy_compress)(const char*, size_t, char*, size_t*);
static __dlsym_snappy_compress dlsym_snappy_compress;
#endif
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompressor_initIDs
(JNIEnv *env, jclass clazz){
#ifdef UNIX
// Load libsnappy.so
void *libsnappy = dlopen(HADOOP_SNAPPY_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
if (!libsnappy) {
char msg[1000];
snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_SNAPPY_LIBRARY, dlerror());
THROW(env, "java/lang/UnsatisfiedLinkError", msg);
return;
}
#endif
#ifdef WINDOWS
HMODULE libsnappy = LoadLibrary(HADOOP_SNAPPY_LIBRARY);
if (!libsnappy) {
THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load snappy.dll");
return;
}
#endif
// Locate the requisite symbols from libsnappy.so
#ifdef UNIX
dlerror(); // Clear any existing error
LOAD_DYNAMIC_SYMBOL(dlsym_snappy_compress, env, libsnappy, "snappy_compress");
#endif
#ifdef WINDOWS
LOAD_DYNAMIC_SYMBOL(__dlsym_snappy_compress, dlsym_snappy_compress, env, libsnappy, "snappy_compress");
#endif
SnappyCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz,
"uncompressedDirectBuf",
"Ljava/nio/Buffer;");
SnappyCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz,
"uncompressedDirectBufLen", "I");
SnappyCompressor_compressedDirectBuf = (*env)->GetFieldID(env, clazz,
"compressedDirectBuf",
"Ljava/nio/Buffer;");
SnappyCompressor_directBufferSize = (*env)->GetFieldID(env, clazz,
"directBufferSize", "I");
}
JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompressor_compressBytesDirect
(JNIEnv *env, jobject thisj){
const char* uncompressed_bytes;
char* compressed_bytes;
snappy_status ret;
// Get members of SnappyCompressor
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen);
jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_directBufferSize);
size_t buf_len;
// Get the input direct buffer
uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
if (uncompressed_bytes == 0) {
return (jint)0;
}
// Get the output direct buffer
compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
if (compressed_bytes == 0) {
return (jint)0;
}
/* size_t should always be 4 bytes or larger. */
buf_len = (size_t)compressed_direct_buf_len;
ret = dlsym_snappy_compress(uncompressed_bytes, uncompressed_direct_buf_len,
compressed_bytes, &buf_len);
if (ret != SNAPPY_OK){
THROW(env, "java/lang/InternalError", "Could not compress data. Buffer length is too small.");
return 0;
}
if (buf_len > JINT_MAX) {
THROW(env, "java/lang/InternalError", "Invalid return buffer length.");
return 0;
}
(*env)->SetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen, 0);
return (jint)buf_len;
}
JNIEXPORT jstring JNICALL
Java_org_apache_hadoop_io_compress_snappy_SnappyCompressor_getLibraryName(JNIEnv *env, jclass class) {
#ifdef UNIX
if (dlsym_snappy_compress) {
Dl_info dl_info;
if(dladdr(
dlsym_snappy_compress,
&dl_info)) {
return (*env)->NewStringUTF(env, dl_info.dli_fname);
}
}
return (*env)->NewStringUTF(env, HADOOP_SNAPPY_LIBRARY);
#endif
#ifdef WINDOWS
LPWSTR filename = NULL;
GetLibraryName(dlsym_snappy_compress, &filename);
if (filename != NULL) {
return (*env)->NewString(env, filename, (jsize) wcslen(filename));
} else {
return (*env)->NewStringUTF(env, "Unavailable");
}
#endif
}
#endif //define HADOOP_SNAPPY_LIBRARY

View File

@ -1,133 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "org_apache_hadoop_io_compress_snappy.h"
#if defined HADOOP_SNAPPY_LIBRARY
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef UNIX
#include <dlfcn.h>
#include "config.h"
#endif
#include "org_apache_hadoop_io_compress_snappy_SnappyDecompressor.h"
static jfieldID SnappyDecompressor_compressedDirectBuf;
static jfieldID SnappyDecompressor_compressedDirectBufLen;
static jfieldID SnappyDecompressor_uncompressedDirectBuf;
static jfieldID SnappyDecompressor_directBufferSize;
#ifdef UNIX
static snappy_status (*dlsym_snappy_uncompress)(const char*, size_t, char*, size_t*);
#endif
#ifdef WINDOWS
typedef snappy_status (__cdecl *__dlsym_snappy_uncompress)(const char*, size_t, char*, size_t*);
static __dlsym_snappy_uncompress dlsym_snappy_uncompress;
#endif
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompressor_initIDs
(JNIEnv *env, jclass clazz){
// Load libsnappy.so
#ifdef UNIX
void *libsnappy = dlopen(HADOOP_SNAPPY_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
if (!libsnappy) {
char* msg = (char*)malloc(1000);
snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_SNAPPY_LIBRARY, dlerror());
THROW(env, "java/lang/UnsatisfiedLinkError", msg);
return;
}
#endif
#ifdef WINDOWS
HMODULE libsnappy = LoadLibrary(HADOOP_SNAPPY_LIBRARY);
if (!libsnappy) {
THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load snappy.dll");
return;
}
#endif
// Locate the requisite symbols from libsnappy.so
#ifdef UNIX
dlerror(); // Clear any existing error
LOAD_DYNAMIC_SYMBOL(dlsym_snappy_uncompress, env, libsnappy, "snappy_uncompress");
#endif
#ifdef WINDOWS
LOAD_DYNAMIC_SYMBOL(__dlsym_snappy_uncompress, dlsym_snappy_uncompress, env, libsnappy, "snappy_uncompress");
#endif
SnappyDecompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz,
"compressedDirectBuf",
"Ljava/nio/Buffer;");
SnappyDecompressor_compressedDirectBufLen = (*env)->GetFieldID(env,clazz,
"compressedDirectBufLen", "I");
SnappyDecompressor_uncompressedDirectBuf = (*env)->GetFieldID(env,clazz,
"uncompressedDirectBuf",
"Ljava/nio/Buffer;");
SnappyDecompressor_directBufferSize = (*env)->GetFieldID(env, clazz,
"directBufferSize", "I");
}
JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompressor_decompressBytesDirect
(JNIEnv *env, jobject thisj){
const char* compressed_bytes = NULL;
char* uncompressed_bytes = NULL;
snappy_status ret;
// Get members of SnappyDecompressor
jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, SnappyDecompressor_compressedDirectBufLen);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_uncompressedDirectBuf);
size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyDecompressor_directBufferSize);
// Get the input direct buffer
compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
if (compressed_bytes == 0) {
return (jint)0;
}
// Get the output direct buffer
uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
if (uncompressed_bytes == 0) {
return (jint)0;
}
ret = dlsym_snappy_uncompress(compressed_bytes, compressed_direct_buf_len,
uncompressed_bytes, &uncompressed_direct_buf_len);
if (ret == SNAPPY_BUFFER_TOO_SMALL){
THROW(env, "java/lang/InternalError", "Could not decompress data. Buffer length is too small.");
} else if (ret == SNAPPY_INVALID_INPUT){
THROW(env, "java/lang/InternalError", "Could not decompress data. Input is invalid.");
} else if (ret != SNAPPY_OK){
THROW(env, "java/lang/InternalError", "Could not decompress data.");
}
(*env)->SetIntField(env, thisj, SnappyDecompressor_compressedDirectBufLen, 0);
return (jint)uncompressed_direct_buf_len;
}
#endif //define HADOOP_SNAPPY_LIBRARY

View File

@ -1,33 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H
#define ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H
#include "org_apache_hadoop.h"
#ifdef UNIX
#include <dlfcn.h>
#endif
#include <jni.h>
#include <snappy-c.h>
#include <stddef.h>
#endif //ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H

View File

@ -47,7 +47,7 @@ Components
The native hadoop library includes various components: The native hadoop library includes various components:
* Compression Codecs (bzip2, lz4, snappy, zlib) * Compression Codecs (bzip2, lz4, zlib)
* Native IO utilities for [HDFS Short-Circuit Local Reads](../hadoop-hdfs/ShortCircuitLocalReads.html) and [Centralized Cache Management in HDFS](../hadoop-hdfs/CentralizedCacheManagement.html) * Native IO utilities for [HDFS Short-Circuit Local Reads](../hadoop-hdfs/ShortCircuitLocalReads.html) and [Centralized Cache Management in HDFS](../hadoop-hdfs/CentralizedCacheManagement.html)
* CRC32 checksum implementation * CRC32 checksum implementation
@ -117,7 +117,6 @@ NativeLibraryChecker is a tool to check whether native libraries are loaded corr
Native library checking: Native library checking:
hadoop: true /home/ozawa/hadoop/lib/native/libhadoop.so.1.0.0 hadoop: true /home/ozawa/hadoop/lib/native/libhadoop.so.1.0.0
zlib: true /lib/x86_64-linux-gnu/libz.so.1 zlib: true /lib/x86_64-linux-gnu/libz.so.1
snappy: true /usr/lib/libsnappy.so.1
zstd: true /usr/lib/libzstd.so.1 zstd: true /usr/lib/libzstd.so.1
lz4: true revision:99 lz4: true revision:99
bzip2: false bzip2: false

View File

@ -79,27 +79,6 @@ public class CompressDecompressTester<T extends Compressor, E extends Decompress
}; };
} }
private static boolean isNativeSnappyLoadable() {
boolean snappyAvailable = false;
boolean loaded = false;
try {
System.loadLibrary("snappy");
logger.warn("Snappy native library is available");
snappyAvailable = true;
boolean hadoopNativeAvailable = NativeCodeLoader.isNativeCodeLoaded();
loaded = snappyAvailable && hadoopNativeAvailable;
if (loaded) {
logger.info("Snappy native library loaded");
} else {
logger.warn("Snappy native library not loaded");
}
} catch (Throwable t) {
logger.warn("Failed to load snappy: ", t);
return false;
}
return loaded;
}
public static <T extends Compressor, E extends Decompressor> CompressDecompressTester<T, E> of( public static <T extends Compressor, E extends Decompressor> CompressDecompressTester<T, E> of(
byte[] rawData) { byte[] rawData) {
return new CompressDecompressTester<T, E>(rawData); return new CompressDecompressTester<T, E>(rawData);
@ -432,7 +411,7 @@ public class CompressDecompressTester<T extends Compressor, E extends Decompress
joiner.join(name, "byte arrays not equals error !!!"), joiner.join(name, "byte arrays not equals error !!!"),
originalRawData, decompressOut.toByteArray()); originalRawData, decompressOut.toByteArray());
} catch (Exception ex) { } catch (Exception ex) {
fail(joiner.join(name, ex.getMessage())); throw new AssertionError(name + ex, ex);
} finally { } finally {
try { try {
compressedOut.close(); compressedOut.close();
@ -504,11 +483,10 @@ public class CompressDecompressTester<T extends Compressor, E extends Decompress
else if (compressor.getClass().isAssignableFrom(ZlibCompressor.class)) { else if (compressor.getClass().isAssignableFrom(ZlibCompressor.class)) {
return ZlibFactory.isNativeZlibLoaded(new Configuration()); return ZlibFactory.isNativeZlibLoaded(new Configuration());
} } else if (compressor.getClass().isAssignableFrom(SnappyCompressor.class)) {
else if (compressor.getClass().isAssignableFrom(SnappyCompressor.class)
&& isNativeSnappyLoadable())
return true; return true;
}
return false; return false;
} }

View File

@ -77,7 +77,6 @@ import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -135,10 +134,8 @@ public class TestCodec {
@Test @Test
public void testSnappyCodec() throws IOException { public void testSnappyCodec() throws IOException {
if (SnappyCodec.isNativeCodeLoaded()) { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.SnappyCodec");
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.SnappyCodec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.SnappyCodec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.SnappyCodec");
}
} }
@Test @Test
@ -614,7 +611,6 @@ public class TestCodec {
*/ */
@Test @Test
public void testSnappyMapFile() throws Exception { public void testSnappyMapFile() throws Exception {
Assume.assumeTrue(SnappyCodec.isNativeCodeLoaded());
codecTestMapFile(SnappyCodec.class, CompressionType.BLOCK, 100); codecTestMapFile(SnappyCodec.class, CompressionType.BLOCK, 100);
} }

View File

@ -33,13 +33,13 @@ import java.lang.reflect.Array;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Random; import java.util.Random;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.BlockCompressorStream; import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream; import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor; import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
import org.apache.hadoop.test.MultithreadedTestUtil; import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.Assert; import org.junit.Assert;
@ -48,8 +48,6 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.junit.Assume.*;
public class TestSnappyCompressorDecompressor { public class TestSnappyCompressorDecompressor {
public static final Logger LOG = public static final Logger LOG =
@ -57,7 +55,6 @@ public class TestSnappyCompressorDecompressor {
@Before @Before
public void before() { public void before() {
assumeTrue(SnappyCodec.isNativeCodeLoaded());
} }
@Test @Test
@ -356,8 +353,9 @@ public class TestSnappyCompressorDecompressor {
@Test @Test
public void testSnappyDirectBlockCompression() { public void testSnappyDirectBlockCompression() {
int[] size = { 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 }; int[] size = new int[] {
assumeTrue(SnappyCodec.isNativeCodeLoaded()); 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024
};
try { try {
for (int i = 0; i < size.length; i++) { for (int i = 0; i < size.length; i++) {
compressDecompressLoop(size[i]); compressDecompressLoop(size[i]);
@ -446,4 +444,52 @@ public class TestSnappyCompressorDecompressor {
ctx.waitFor(60000); ctx.waitFor(60000);
} }
@Test
public void testSnappyCompatibility() throws Exception {
// HADOOP-17125. Using snappy-java in SnappyCodec. These strings are raw
// data and compressed data using previous native Snappy codec. We use
// updated Snappy codec to decode it and check if it matches.
String rawData = "010a06030a040a0c0109020c0a010204020d02000b010701080605" +
"080b090902060a080502060a0d06070908080a0c0105030904090d050908000" +
"40c090c0d0d0804000d00040b0b0d010d060907020a030a0c09000409050801" +
"07040d0c01060a0b09070a04000b01040b09000e0e00020b06050b060e030e0" +
"a07050d06050d";
String compressed = "8001f07f010a06030a040a0c0109020c0a010204020d02000b0" +
"10701080605080b090902060a080502060a0d06070908080a0c010503090409" +
"0d05090800040c090c0d0d0804000d00040b0b0d010d060907020a030a0c090" +
"0040905080107040d0c01060a0b09070a04000b01040b09000e0e00020b0605" +
"0b060e030e0a07050d06050d";
byte[] rawDataBytes = Hex.decodeHex(rawData);
byte[] compressedBytes = Hex.decodeHex(compressed);
ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedBytes.length);
inBuf.put(compressedBytes, 0, compressedBytes.length);
inBuf.flip();
ByteBuffer outBuf = ByteBuffer.allocateDirect(rawDataBytes.length);
ByteBuffer expected = ByteBuffer.wrap(rawDataBytes);
SnappyDecompressor.SnappyDirectDecompressor decompressor =
new SnappyDecompressor.SnappyDirectDecompressor();
outBuf.clear();
while(!decompressor.finished()) {
decompressor.decompress(inBuf, outBuf);
if (outBuf.remaining() == 0) {
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
}
}
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
assertEquals(0, expected.remaining());
}
} }

View File

@ -22,7 +22,6 @@ import static org.junit.Assert.*;
import org.apache.hadoop.crypto.OpensslCipher; import org.apache.hadoop.crypto.OpensslCipher;
import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.zlib.ZlibFactory; import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -52,9 +51,6 @@ public class TestNativeCodeLoader {
// library names are depended on platform and build envs // library names are depended on platform and build envs
// so just check names are available // so just check names are available
assertFalse(ZlibFactory.getLibraryName().isEmpty()); assertFalse(ZlibFactory.getLibraryName().isEmpty());
if (NativeCodeLoader.buildSupportsSnappy()) {
assertFalse(SnappyCodec.getLibraryName().isEmpty());
}
if (NativeCodeLoader.buildSupportsOpenssl()) { if (NativeCodeLoader.buildSupportsOpenssl()) {
assertFalse(OpensslCipher.getLibraryName().isEmpty()); assertFalse(OpensslCipher.getLibraryName().isEmpty());
} }

View File

@ -183,6 +183,7 @@
<excludes> <excludes>
<exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude> <exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude>
<exclude>src/test/resources/job_1329348432655_0001-10.jhist</exclude> <exclude>src/test/resources/job_1329348432655_0001-10.jhist</exclude>
<exclude>**/jobTokenPassword</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -40,7 +40,6 @@
<hadoop.component>UNDEF</hadoop.component> <hadoop.component>UNDEF</hadoop.component>
<snappy.lib></snappy.lib> <snappy.lib></snappy.lib>
<bundle.snappy>false</bundle.snappy> <bundle.snappy>false</bundle.snappy>
<bundle.snappy.in.bin>false</bundle.snappy.in.bin>
<zstd.lib></zstd.lib> <zstd.lib></zstd.lib>
<bundle.zstd>false</bundle.zstd> <bundle.zstd>false</bundle.zstd>
<bundle.zstd.in.bin>false</bundle.zstd.in.bin> <bundle.zstd.in.bin>false</bundle.zstd.in.bin>
@ -341,7 +340,6 @@
<argument>--openssllib=${openssl.lib}</argument> <argument>--openssllib=${openssl.lib}</argument>
<argument>--opensslbinbundle=${bundle.openssl.in.bin}</argument> <argument>--opensslbinbundle=${bundle.openssl.in.bin}</argument>
<argument>--openssllibbundle=${bundle.openssl}</argument> <argument>--openssllibbundle=${bundle.openssl}</argument>
<argument>--snappybinbundle=${bundle.snappy.in.bin}</argument>
<argument>--snappylib=${snappy.lib}</argument> <argument>--snappylib=${snappy.lib}</argument>
<argument>--snappylibbundle=${bundle.snappy}</argument> <argument>--snappylibbundle=${bundle.snappy}</argument>
<argument>--zstdbinbundle=${bundle.zstd.in.bin}</argument> <argument>--zstdbinbundle=${bundle.zstd.in.bin}</argument>

View File

@ -141,6 +141,7 @@
<metrics.version>3.2.4</metrics.version> <metrics.version>3.2.4</metrics.version>
<netty3.version>3.10.6.Final</netty3.version> <netty3.version>3.10.6.Final</netty3.version>
<netty4.version>4.1.50.Final</netty4.version> <netty4.version>4.1.50.Final</netty4.version>
<snappy-java.version>1.1.7.7</snappy-java.version>
<!-- Maven protoc compiler --> <!-- Maven protoc compiler -->
<protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version> <protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version>
@ -1710,6 +1711,11 @@
<artifactId>jna</artifactId> <artifactId>jna</artifactId>
<version>${jna.version}</version> <version>${jna.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy-java.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
@ -2193,7 +2199,6 @@
<!-- will use a native entropy provider. This will not really --> <!-- will use a native entropy provider. This will not really -->
<!-- attempt to open a file at this path. --> <!-- attempt to open a file at this path. -->
<java.security.egd>file:/dev/urandom</java.security.egd> <java.security.egd>file:/dev/urandom</java.security.egd>
<bundle.snappy.in.bin>true</bundle.snappy.in.bin>
<bundle.zstd.in.bin>true</bundle.zstd.in.bin> <bundle.zstd.in.bin>true</bundle.zstd.in.bin>
<bundle.openssl.in.bin>true</bundle.openssl.in.bin> <bundle.openssl.in.bin>true</bundle.openssl.in.bin>
</properties> </properties>
@ -2205,7 +2210,6 @@
<configuration> <configuration>
<environmentVariables> <environmentVariables>
<!-- Specify where to look for the native DLL on Windows --> <!-- Specify where to look for the native DLL on Windows -->
<PATH>${env.PATH};${hadoop.common.build.dir}/bin;${snappy.lib}</PATH>
<PATH>${env.PATH};${hadoop.common.build.dir}/bin;${zstd.lib}</PATH> <PATH>${env.PATH};${hadoop.common.build.dir}/bin;${zstd.lib}</PATH>
<PATH>${env.PATH};${hadoop.common.build.dir}/bin;${openssl.lib}</PATH> <PATH>${env.PATH};${hadoop.common.build.dir}/bin;${openssl.lib}</PATH>
<PATH>${env.PATH};${hadoop.common.build.dir}/bin;${isal.lib}</PATH> <PATH>${env.PATH};${hadoop.common.build.dir}/bin;${isal.lib}</PATH>