MAPREDUCE-5976. native-task: should not fail to build if snappy is missing (Manu Zhang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-2841@1616115 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sean Zhong 2014-08-06 07:32:49 +00:00
parent 7ecaa81d27
commit 432f641bc2
14 changed files with 117 additions and 397 deletions

View File

@ -10,3 +10,4 @@ MAPREDUCE-5991. native-task should not run unit tests if native profile is not e
MAPREDUCE-5995. native-task: Revert changes to Text internals (todd)
MAPREDUCE-6005. native-task: Fix some valgrind errors (Binglin Chang)
MAPREDUCE-5984. native-task: Reuse lz4 sources in hadoop-common (Binglin Chang)
MAPREDUCE-5976. native-task: should not fail to build if snappy is missing (Manu Zhang)

View File

@ -122,7 +122,6 @@ CHECK_FUNCTION_EXISTS(memset HAVE_MEMSET)
CHECK_FUNCTION_EXISTS(strchr HAVE_STRCHR)
CHECK_FUNCTION_EXISTS(strtoul HAVE_STRTOUL)
SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
set_find_shared_library_version("1")
find_library(SNAPPY_LIBRARY
@ -139,6 +138,7 @@ if (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
set(SNAPPY_SOURCE_FILES
"${D}/src/codec/SnappyCodec.cc")
else (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
set(SNAPPY_LIBRARY "")
set(SNAPPY_INCLUDE_DIR "")
set(SNAPPY_SOURCE_FILES "")
IF(REQUIRE_SNAPPY)
@ -146,6 +146,8 @@ else (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
ENDIF(REQUIRE_SNAPPY)
endif (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
include_directories(
${GENERATED_JAVAH}
${D}
@ -154,15 +156,11 @@ include_directories(
${D}/src/lib
${D}/test
${CMAKE_CURRENT_SOURCE_DIR}
#${CMAKE_CURRENT_SOURCE_DIR}/src
${CMAKE_BINARY_DIR}
${JNI_INCLUDE_DIRS}
${SNAPPY_INCLUDE_DIR}
)
#SET(CMAKE_SOURCE_DIR "/cygdrive/c/Users/tianlunz/repo/hadoop-2.2.0-src/hadoop-common-project/hadoop-common/src")
CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)

View File

@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
import com.google.common.base.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@ -80,29 +81,22 @@ public void init(Context context) throws IOException, ClassNotFoundException {
Class comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null, RawComparator.class);
if (comparatorClass != null && !Platforms.define(comparatorClass)) {
String message = "Native output collector don't support customized java comparator "
String message = "Native output collector doesn't support customized java comparator "
+ job.get(MRJobConfig.KEY_COMPARATOR);
LOG.error(message);
throw new InvalidJobConfException(message);
}
if (job.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false) == true) {
if (!isCodecSupported(job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC))) {
String message = "Native output collector don't support compression codec "
+ job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC) + ", We support Gzip, Lz4, snappy";
LOG.error(message);
throw new InvalidJobConfException(message);
}
}
if (!QuickSort.class.getName().equals(job.get(Constants.MAP_SORT_CLASS))) {
String message = "Native-Task don't support sort class " + job.get(Constants.MAP_SORT_CLASS);
String message = "Native-Task doesn't support sort class " + job.get(Constants.MAP_SORT_CLASS);
LOG.error(message);
throw new InvalidJobConfException(message);
}
if (job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, false) == true) {
String message = "Native-Task don't support secure shuffle";
String message = "Native-Task doesn't support secure shuffle";
LOG.error(message);
throw new InvalidJobConfException(message);
}
@ -116,7 +110,7 @@ public void init(Context context) throws IOException, ClassNotFoundException {
LOG.error(message);
throw new InvalidJobConfException(message);
} else if (!Platforms.support(keyCls.getName(), serializer, job)) {
String message = "Native output collector don't support this key, this key is not comparable in native "
String message = "Native output collector doesn't support this key, this key is not comparable in native "
+ keyCls.getName();
LOG.error(message);
throw new InvalidJobConfException(message);
@ -129,6 +123,14 @@ public void init(Context context) throws IOException, ClassNotFoundException {
final boolean ret = NativeRuntime.isNativeLibraryLoaded();
if (ret) {
if (job.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false)) {
String codec = job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC);
if (!NativeRuntime.supportsCompressionCodec(codec.getBytes(Charsets.UTF_8))) {
String message = "Native output collector doesn't support compression codec " + codec;
LOG.error(message);
throw new InvalidJobConfException(message);
}
}
NativeRuntime.configure(job);
final long updateInterval = job.getLong(Constants.NATIVE_STATUS_UPDATE_INTERVAL,
@ -159,12 +161,4 @@ public void init(Context context) throws IOException, ClassNotFoundException {
LOG.info("Native output collector can be successfully enabled!");
}
private boolean isCodecSupported(String string) {
if ("org.apache.hadoop.io.compress.SnappyCodec".equals(string)
|| "org.apache.hadoop.io.compress.GzipCodec".equals(string)
|| "org.apache.hadoop.io.compress.Lz4Codec".equals(string)) {
return true;
}
return false;
}
}

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Task.TaskReporter;
import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
import org.apache.hadoop.mapred.nativetask.util.SnappyUtil;
import org.apache.hadoop.util.VersionInfo;
/**
@ -48,11 +47,6 @@ public class NativeRuntime {
static {
try {
if (false == SnappyUtil.isNativeSnappyLoaded(conf)) {
throw new IOException("Snappy library cannot be loaded");
} else {
LOG.info("Snappy native library is available");
}
System.loadLibrary("nativetask");
LOG.info("Nativetask JNI library loaded.");
nativeLibraryLoaded = true;
@ -161,6 +155,11 @@ public static void reportStatus(TaskReporter reporter) throws IOException {
*** The following are JNI Apis
********************************************************/
/**
* Check whether the native side has compression codec support built in
*/
public native static boolean supportsCompressionCodec(byte[] codec);
/**
* Config the native runtime with mapreduce job configurations.
*

View File

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.nativetask.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.compress.SnappyCodec;
public class SnappyUtil {
public static boolean isNativeSnappyLoaded(Configuration conf) {
return SnappyCodec.isNativeCodeLoaded() && conf.getBoolean(
CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY,
CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT);
}
}

View File

@ -16,11 +16,15 @@
* limitations under the License.
*/
#include "snappy-c.h"
#include "config.h"
#if defined HADOOP_SNAPPY_LIBRARY
#include "commons.h"
#include "NativeTask.h"
#include "SnappyCodec.h"
#include <snappy-c.h>
namespace NativeTask {
SnappyCompressStream::SnappyCompressStream(OutputStream * stream, uint32_t bufferSizeHint)
@ -89,5 +93,6 @@ uint32_t SnappyDecompressStream::decompressOneBlock(uint32_t compressedSize, voi
uint64_t SnappyDecompressStream::maxCompressedLength(uint64_t origLength) {
return snappy_max_compressed_length(origLength);
}
} // namespace NativeTask
#endif // define HADOOP_SNAPPY_LIBRARY

View File

@ -1,138 +0,0 @@
/*
* Copyright 2011 Martin Gieseking <martin.gieseking@uos.de>.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Plain C interface (a wrapper around the C++ implementation).
*/
#ifndef UTIL_SNAPPY_OPENSOURCE_SNAPPY_C_H_
#define UTIL_SNAPPY_OPENSOURCE_SNAPPY_C_H_
#ifdef __cplusplus
extern "C" {
#endif
#include <stddef.h>
/*
* Return values; see the documentation for each function to know
* what each can return.
*/
typedef enum {
SNAPPY_OK = 0,
SNAPPY_INVALID_INPUT = 1,
SNAPPY_BUFFER_TOO_SMALL = 2
} snappy_status;
/*
* Takes the data stored in "input[0..input_length-1]" and stores
* it in the array pointed to by "compressed".
*
* <compressed_length> signals the space available in "compressed".
* If it is not at least equal to "snappy_max_compressed_length(input_length)",
* SNAPPY_BUFFER_TOO_SMALL is returned. After successful compression,
* <compressed_length> contains the true length of the compressed output,
* and SNAPPY_OK is returned.
*
* Example:
* size_t output_length = snappy_max_compressed_length(input_length);
* char* output = (char*)malloc(output_length);
* if (snappy_compress(input, input_length, output, &output_length)
* == SNAPPY_OK) {
* ... Process(output, output_length) ...
* }
* free(output);
*/
snappy_status snappy_compress(const char* input,
size_t input_length,
char* compressed,
size_t* compressed_length);
/*
* Given data in "compressed[0..compressed_length-1]" generated by
* calling the snappy_compress routine, this routine stores
* the uncompressed data to
* uncompressed[0..uncompressed_length-1].
* Returns failure (a value not equal to SNAPPY_OK) if the message
* is corrupted and could not be decrypted.
*
* <uncompressed_length> signals the space available in "uncompressed".
* If it is not at least equal to the value returned by
* snappy_uncompressed_length for this stream, SNAPPY_BUFFER_TOO_SMALL
* is returned. After successful decompression, <uncompressed_length>
* contains the true length of the decompressed output.
*
* Example:
* size_t output_length;
* if (snappy_uncompressed_length(input, input_length, &output_length)
* != SNAPPY_OK) {
* ... fail ...
* }
* char* output = (char*)malloc(output_length);
* if (snappy_uncompress(input, input_length, output, &output_length)
* == SNAPPY_OK) {
* ... Process(output, output_length) ...
* }
* free(output);
*/
snappy_status snappy_uncompress(const char* compressed,
size_t compressed_length,
char* uncompressed,
size_t* uncompressed_length);
/*
* Returns the maximal size of the compressed representation of
* input data that is "source_length" bytes in length.
*/
size_t snappy_max_compressed_length(size_t source_length);
/*
* REQUIRES: "compressed[]" was produced by snappy_compress()
* Returns SNAPPY_OK and stores the length of the uncompressed data in
* *result normally. Returns SNAPPY_INVALID_INPUT on parsing error.
* This operation takes O(1) time.
*/
snappy_status snappy_uncompressed_length(const char* compressed,
size_t compressed_length,
size_t* result);
/*
* Check if the contents of "compressed[]" can be uncompressed successfully.
* Does not return the uncompressed data; if so, returns SNAPPY_OK,
* or if not, returns SNAPPY_INVALID_INPUT.
* Takes time proportional to compressed_length, but is usually at least a
* factor of four faster than actual decompression.
*/
snappy_status snappy_validate_compressed_buffer(const char* compressed,
size_t compressed_length);
#ifdef __cplusplus
} // extern "C"
#endif
#endif /* UTIL_SNAPPY_OPENSOURCE_SNAPPY_C_H_ */

View File

@ -1,163 +0,0 @@
// Copyright 2005 and onwards Google Inc.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// A light-weight compression algorithm. It is designed for speed of
// compression and decompression, rather than for the utmost in space
// savings.
//
// For getting better compression ratios when you are compressing data
// with long repeated sequences or compressing data that is similar to
// other data, while still compressing fast, you might look at first
// using BMDiff and then compressing the output of BMDiff with
// Snappy.
#ifndef UTIL_SNAPPY_SNAPPY_H__
#define UTIL_SNAPPY_SNAPPY_H__
#include <stddef.h>
#include <string>
#include "snappy-stubs-public.h"
namespace snappy {
class Source;
class Sink;
// ------------------------------------------------------------------------
// Generic compression/decompression routines.
// ------------------------------------------------------------------------
// Compress the bytes read from "*source" and append to "*sink". Return the
// number of bytes written.
size_t Compress(Source* source, Sink* sink);
// Find the uncompressed length of the given stream, as given by the header.
// Note that the true length could deviate from this; the stream could e.g.
// be truncated.
//
// Also note that this leaves "*source" in a state that is unsuitable for
// further operations, such as RawUncompress(). You will need to rewind
// or recreate the source yourself before attempting any further calls.
bool GetUncompressedLength(Source* source, uint32* result);
// ------------------------------------------------------------------------
// Higher-level string based routines (should be sufficient for most users)
// ------------------------------------------------------------------------
// Sets "*output" to the compressed version of "input[0,input_length-1]".
// Original contents of *output are lost.
//
// REQUIRES: "input[]" is not an alias of "*output".
size_t Compress(const char* input, size_t input_length, string* output);
// Decompresses "compressed[0,compressed_length-1]" to "*uncompressed".
// Original contents of "*uncompressed" are lost.
//
// REQUIRES: "compressed[]" is not an alias of "*uncompressed".
//
// returns false if the message is corrupted and could not be decompressed
bool Uncompress(const char* compressed, size_t compressed_length,
string* uncompressed);
// ------------------------------------------------------------------------
// Lower-level character array based routines. May be useful for
// efficiency reasons in certain circumstances.
// ------------------------------------------------------------------------
// REQUIRES: "compressed" must point to an area of memory that is at
// least "MaxCompressedLength(input_length)" bytes in length.
//
// Takes the data stored in "input[0..input_length]" and stores
// it in the array pointed to by "compressed".
//
// "*compressed_length" is set to the length of the compressed output.
//
// Example:
// char* output = new char[snappy::MaxCompressedLength(input_length)];
// size_t output_length;
// RawCompress(input, input_length, output, &output_length);
// ... Process(output, output_length) ...
// delete [] output;
void RawCompress(const char* input,
size_t input_length,
char* compressed,
size_t* compressed_length);
// Given data in "compressed[0..compressed_length-1]" generated by
// calling the Snappy::Compress routine, this routine
// stores the uncompressed data to
// uncompressed[0..GetUncompressedLength(compressed)-1]
// returns false if the message is corrupted and could not be decrypted
bool RawUncompress(const char* compressed, size_t compressed_length,
char* uncompressed);
// Given data from the byte source 'compressed' generated by calling
// the Snappy::Compress routine, this routine stores the uncompressed
// data to
// uncompressed[0..GetUncompressedLength(compressed,compressed_length)-1]
// returns false if the message is corrupted and could not be decrypted
bool RawUncompress(Source* compressed, char* uncompressed);
// Returns the maximal size of the compressed representation of
// input data that is "source_bytes" bytes in length;
size_t MaxCompressedLength(size_t source_bytes);
// REQUIRES: "compressed[]" was produced by RawCompress() or Compress()
// Returns true and stores the length of the uncompressed data in
// *result normally. Returns false on parsing error.
// This operation takes O(1) time.
bool GetUncompressedLength(const char* compressed, size_t compressed_length,
size_t* result);
// Returns true iff the contents of "compressed[]" can be uncompressed
// successfully. Does not return the uncompressed data. Takes
// time proportional to compressed_length, but is usually at least
// a factor of four faster than actual decompression.
bool IsValidCompressedBuffer(const char* compressed,
size_t compressed_length);
// The size of a compression block. Note that many parts of the compression
// code assumes that kBlockSize <= 65536; in particular, the hash table
// can only store 16-bit offsets, and EmitCopy() also assumes the offset
// is 65535 bytes or less. Note also that if you change this, it will
// affect the framing format (see framing_format.txt).
//
// Note that there might be older data around that is compressed with larger
// block sizes, so the decompression code should not rely on the
// non-existence of long backreferences.
static const int kBlockLog = 16;
static const size_t kBlockSize = 1 << kBlockLog;
static const int kMaxHashTableBits = 14;
static const size_t kMaxHashTableSize = 1 << kMaxHashTableBits;
} // end namespace snappy
#endif // UTIL_SNAPPY_SNAPPY_H__

View File

@ -17,6 +17,7 @@
*/
#include "commons.h"
#include "config.h"
#include "SyncUtils.h"
#include "Compressions.h"
#include "codec/GzipCodec.h"
@ -110,7 +111,11 @@ CompressStream * Compressions::getCompressionStream(const string & codec, Output
return new GzipCompressStream(stream, bufferSizeHint);
}
if (codec == SnappyCodec.name) {
#if defined HADOOP_SNAPPY_LIBRARY
return new SnappyCompressStream(stream, bufferSizeHint);
#else
THROW_EXCEPTION(UnsupportException, "Snappy library is not loaded");
#endif
}
if (codec == Lz4Codec.name) {
return new Lz4CompressStream(stream, bufferSizeHint);
@ -124,7 +129,11 @@ DecompressStream * Compressions::getDecompressionStream(const string & codec, In
return new GzipDecompressStream(stream, bufferSizeHint);
}
if (codec == SnappyCodec.name) {
#if defined HADOOP_SNAPPY_LIBRARY
return new SnappyDecompressStream(stream, bufferSizeHint);
#else
THROW_EXCEPTION(UnsupportException, "Snappy library is not loaded");
#endif
}
if (codec == Lz4Codec.name) {
return new Lz4DecompressStream(stream, bufferSizeHint);

View File

@ -19,6 +19,7 @@
#ifndef QUICK_BUILD
#include "org_apache_hadoop_mapred_nativetask_NativeRuntime.h"
#endif
#include "config.h"
#include "commons.h"
#include "jniutils.h"
#include "NativeObjectFactory.h"
@ -29,6 +30,30 @@ using namespace NativeTask;
// NativeRuntime JNI methods
///////////////////////////////////////////////////////////////
/*
* Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
* Method: supportCompressionCodec
* Signature: ([B)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_supportsCompressionCodec
(JNIEnv *jenv, jclass clazz, jbyteArray codec)
{
const std::string codecString = JNU_ByteArrayToString(jenv, codec);
if ("org.apache.hadoop.io.compress.GzipCodec" == codecString) {
return JNI_TRUE;
} else if ("org.apache.hadoop.io.compress.Lz4Codec" == codecString) {
return JNI_TRUE;
} else if ("org.apache.hadoop.io.compress.SnappyCodec" == codecString) {
#if defined HADOOP_SNAPPY_LIBRARY
return JNI_TRUE;
#else
return JNI_FALSE;
#endif
} else {
return JNI_FALSE;
}
}
/*
* Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
* Method: JNIRelease

View File

@ -7,6 +7,14 @@
#ifdef __cplusplus
extern "C" {
#endif
/*
* Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
* Method: supportCompressionCodec
* Signature: ([B)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_supportsCompressionCodec
(JNIEnv *, jclass, jbyteArray);
/*
* Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
* Method: JNIRelease

View File

@ -17,7 +17,7 @@
*/
#include "lz4.h"
#include "snappy.h"
#include "config.h"
#include "commons.h"
#include "Path.h"
#include "BufferStream.h"
@ -25,9 +25,29 @@
#include "Compressions.h"
#include "test_commons.h"
void TestCodec(const string & codec, const string & data, char * buff, char * buff2, size_t buffLen,
uint32_t buffhint) {
#if defined HADOOP_SNAPPY_LIBRARY
#include <snappy.h>
#endif
void TestCodec(const string & codec) {
string data;
size_t length = TestConfig.getInt("compression.input.length", 100 * 1024 * 1024);
uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024);
string type = TestConfig.get("compression.input.type", "bytes");
Timer timer;
GenerateKVTextLength(data, length, type);
LOG("%s", timer.getInterval("Generate data").c_str());
InputBuffer inputBuffer = InputBuffer(data);
size_t buffLen = data.length() / 2 * 3;
timer.reset();
char * buff = new char[buffLen];
char * buff2 = new char[buffLen];
memset(buff, 0, buffLen);
memset(buff2, 0, buffLen);
LOG("%s", timer.getInterval("memset buffer to prevent missing page").c_str());
OutputBuffer outputBuffer = OutputBuffer(buff, buffLen);
CompressStream * compressor = Compressions::getCompressionStream(codec, &outputBuffer, buffhint);
@ -57,35 +77,10 @@ void TestCodec(const string & codec, const string & data, char * buff, char * bu
ASSERT_EQ(data.length(), total);
ASSERT_EQ(0, memcmp(data.c_str(), buff2, total));
delete compressor;
delete decompressor;
}
TEST(Perf, Compressions) {
string data;
size_t length = TestConfig.getInt("compression.input.length", 100 * 1024 * 1024);
uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024);
string type = TestConfig.get("compression.input.type", "bytes");
Timer timer;
GenerateKVTextLength(data, length, type);
LOG("%s", timer.getInterval("Generate data").c_str());
InputBuffer inputBuffer = InputBuffer(data);
size_t buffLen = data.length() / 2 * 3;
timer.reset();
char * buff = new char[buffLen];
char * buff2 = new char[buffLen];
memset(buff, 0, buffLen);
memset(buff2, 0, buffLen);
LOG("%s", timer.getInterval("memset buffer to prevent missing page").c_str());
TestCodec("org.apache.hadoop.io.compress.SnappyCodec", data, buff, buff2, buffLen, buffhint);
TestCodec("org.apache.hadoop.io.compress.Lz4Codec", data, buff, buff2, buffLen, buffhint);
TestCodec("org.apache.hadoop.io.compress.GzipCodec", data, buff, buff2, buffLen, buffhint);
delete[] buff;
delete[] buff2;
delete compressor;
delete decompressor;
}
TEST(Perf, CompressionUtil) {
@ -173,6 +168,9 @@ public:
}
};
TEST(Perf, GzipCodec) {
TestCodec("org.apache.hadoop.io.compress.GzipCodec");
}
void MeasureSingleFileLz4(const string & path, CompressResult & total, size_t blockSize,
int times) {
@ -226,6 +224,12 @@ TEST(Perf, RawCompressionLz4) {
printf("%s - Total\n", total.toString().c_str());
}
TEST(Perf, Lz4Codec) {
TestCodec("org.apache.hadoop.io.compress.Lz4Codec");
}
#if defined HADOOP_SNAPPY_LIBRARY
void MeasureSingleFileSnappy(const string & path, CompressResult & total, size_t blockSize,
int times) {
string data;
@ -278,3 +282,8 @@ TEST(Perf, RawCompressionSnappy) {
printf("%s - Total\n", total.toString().c_str());
}
TEST(Perf, SnappyCodec) {
TestCodec("org.apache.hadoop.io.compress.SnappyCodec");
}
#endif // define HADOOP_SNAPPY_LIBRARY

View File

@ -18,6 +18,7 @@
#include <algorithm>
#include "commons.h"
#include "config.h"
#include "BufferStream.h"
#include "FileSystem.h"
#include "IFile.h"
@ -90,7 +91,9 @@ TEST(IFile, WriteRead) {
TestIFileReadWrite(TextType, partition, size, kvs);
TestIFileReadWrite(BytesType, partition, size, kvs);
TestIFileReadWrite(UnknownType, partition, size, kvs);
#if defined HADOOP_SNAPPY_LIBRARY
TestIFileReadWrite(TextType, partition, size, kvs, "org.apache.hadoop.io.compress.SnappyCodec");
#endif
}
void TestIFileWriteRead2(vector<pair<string, string> > & kvs, char * buff, size_t buffsize,

View File

@ -17,6 +17,7 @@
*/
#include "commons.h"
#include "config.h"
#include "BufferStream.h"
#include "Buffers.h"
#include "test_commons.h"
@ -43,6 +44,7 @@ TEST(Buffers, AppendRead) {
}
}
#if defined HADOOP_SNAPPY_LIBRARY
TEST(Buffers, AppendReadSnappy) {
string codec = "org.apache.hadoop.io.compress.SnappyCodec";
vector<string> data;
@ -64,4 +66,4 @@ TEST(Buffers, AppendReadSnappy) {
ASSERT_EQ(data[i], string(rd, data[i].length()));
}
}
#endif