From 432f641bc2f6c7fdb67da578233cbe07fdfbf1ea Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Wed, 6 Aug 2014 07:32:49 +0000 Subject: [PATCH] MAPREDUCE-5976. native-task: should not fail to build if snappy is missing (Manu Zhang) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-2841@1616115 13f79535-47bb-0310-9956-ffa450edef68 --- .../CHANGES.MAPREDUCE-2841.txt | 1 + .../src/CMakeLists.txt | 8 +- .../NativeMapOutputCollectorDelegator.java | 34 ++-- .../mapred/nativetask/NativeRuntime.java | 11 +- .../mapred/nativetask/util/SnappyUtil.java | 32 ---- .../src/main/native/src/codec/SnappyCodec.cc | 9 +- .../src/main/native/src/codec/snappy-c.h | 138 --------------- .../src/main/native/src/codec/snappy.h | 163 ------------------ .../src/main/native/src/lib/Compressions.cc | 9 + .../native/src/lib/NativeRuntimeJniImpl.cc | 25 +++ ...e_hadoop_mapred_nativetask_NativeRuntime.h | 8 + .../src/main/native/test/TestCompressions.cc | 69 ++++---- .../src/main/native/test/TestIFile.cc | 3 + .../main/native/test/lib/TestReadBuffer.cc | 4 +- 14 files changed, 117 insertions(+), 397 deletions(-) delete mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SnappyUtil.java delete mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy-c.h delete mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h diff --git a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt index 9dddcd5d12..4b77262caf 100644 --- a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt +++ b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt @@ -10,3 +10,4 @@ MAPREDUCE-5991. native-task should not run unit tests if native profile is not e MAPREDUCE-5995. native-task: Revert changes to Text internals (todd) MAPREDUCE-6005. native-task: Fix some valgrind errors (Binglin Chang) MAPREDUCE-5984. native-task: Reuse lz4 sources in hadoop-common (Binglin Chang) +MAPREDUCE-5976. native-task: should not fail to build if snappy is missing (Manu Zhang) \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt index 36dbd9c306..f38021dc3c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt @@ -122,7 +122,6 @@ CHECK_FUNCTION_EXISTS(memset HAVE_MEMSET) CHECK_FUNCTION_EXISTS(strchr HAVE_STRCHR) CHECK_FUNCTION_EXISTS(strtoul HAVE_STRTOUL) - SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES) set_find_shared_library_version("1") find_library(SNAPPY_LIBRARY @@ -139,6 +138,7 @@ if (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR) set(SNAPPY_SOURCE_FILES "${D}/src/codec/SnappyCodec.cc") else (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR) + set(SNAPPY_LIBRARY "") set(SNAPPY_INCLUDE_DIR "") set(SNAPPY_SOURCE_FILES "") IF(REQUIRE_SNAPPY) @@ -146,6 +146,8 @@ else (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR) ENDIF(REQUIRE_SNAPPY) endif (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR) +CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h) + include_directories( ${GENERATED_JAVAH} ${D} @@ -154,15 +156,11 @@ include_directories( ${D}/src/lib ${D}/test ${CMAKE_CURRENT_SOURCE_DIR} - #${CMAKE_CURRENT_SOURCE_DIR}/src ${CMAKE_BINARY_DIR} ${JNI_INCLUDE_DIRS} ${SNAPPY_INCLUDE_DIR} ) -#SET(CMAKE_SOURCE_DIR "/cygdrive/c/Users/tianlunz/repo/hadoop-2.2.0-src/hadoop-common-project/hadoop-common/src") -CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h) - SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java index a48b5c34fe..beb3851a12 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import com.google.common.base.Charsets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -80,29 +81,22 @@ public void init(Context context) throws IOException, ClassNotFoundException { Class comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null, RawComparator.class); if (comparatorClass != null && !Platforms.define(comparatorClass)) { - String message = "Native output collector don't support customized java comparator " + String message = "Native output collector doesn't support customized java comparator " + job.get(MRJobConfig.KEY_COMPARATOR); LOG.error(message); throw new InvalidJobConfException(message); } - if (job.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false) == true) { - if (!isCodecSupported(job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC))) { - String message = "Native output collector don't support compression codec " - + job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC) + ", We support Gzip, Lz4, snappy"; - LOG.error(message); - throw new InvalidJobConfException(message); - } - } + if (!QuickSort.class.getName().equals(job.get(Constants.MAP_SORT_CLASS))) { - String message = "Native-Task don't support sort class " + job.get(Constants.MAP_SORT_CLASS); + String message = "Native-Task doesn't support sort class " + job.get(Constants.MAP_SORT_CLASS); LOG.error(message); throw new InvalidJobConfException(message); } if (job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, false) == true) { - String message = "Native-Task don't support secure shuffle"; + String message = "Native-Task doesn't support secure shuffle"; LOG.error(message); throw new InvalidJobConfException(message); } @@ -116,7 +110,7 @@ public void init(Context context) throws IOException, ClassNotFoundException { LOG.error(message); throw new InvalidJobConfException(message); } else if (!Platforms.support(keyCls.getName(), serializer, job)) { - String message = "Native output collector don't support this key, this key is not comparable in native " + String message = "Native output collector doesn't support this key, this key is not comparable in native " + keyCls.getName(); LOG.error(message); throw new InvalidJobConfException(message); @@ -129,6 +123,14 @@ public void init(Context context) throws IOException, ClassNotFoundException { final boolean ret = NativeRuntime.isNativeLibraryLoaded(); if (ret) { + if (job.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false)) { + String codec = job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC); + if (!NativeRuntime.supportsCompressionCodec(codec.getBytes(Charsets.UTF_8))) { + String message = "Native output collector doesn't support compression codec " + codec; + LOG.error(message); + throw new InvalidJobConfException(message); + } + } NativeRuntime.configure(job); final long updateInterval = job.getLong(Constants.NATIVE_STATUS_UPDATE_INTERVAL, @@ -159,12 +161,4 @@ public void init(Context context) throws IOException, ClassNotFoundException { LOG.info("Native output collector can be successfully enabled!"); } - private boolean isCodecSupported(String string) { - if ("org.apache.hadoop.io.compress.SnappyCodec".equals(string) - || "org.apache.hadoop.io.compress.GzipCodec".equals(string) - || "org.apache.hadoop.io.compress.Lz4Codec".equals(string)) { - return true; - } - return false; - } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java index 65ce652cd7..5ce3c89646 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java @@ -31,7 +31,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Task.TaskReporter; import org.apache.hadoop.mapred.nativetask.util.ConfigUtil; -import org.apache.hadoop.mapred.nativetask.util.SnappyUtil; import org.apache.hadoop.util.VersionInfo; /** @@ -48,11 +47,6 @@ public class NativeRuntime { static { try { - if (false == SnappyUtil.isNativeSnappyLoaded(conf)) { - throw new IOException("Snappy library cannot be loaded"); - } else { - LOG.info("Snappy native library is available"); - } System.loadLibrary("nativetask"); LOG.info("Nativetask JNI library loaded."); nativeLibraryLoaded = true; @@ -161,6 +155,11 @@ public static void reportStatus(TaskReporter reporter) throws IOException { *** The following are JNI Apis ********************************************************/ + /** + * Check whether the native side has compression codec support built in + */ + public native static boolean supportsCompressionCodec(byte[] codec); + /** * Config the native runtime with mapreduce job configurations. * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SnappyUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SnappyUtil.java deleted file mode 100644 index fb15960293..0000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/util/SnappyUtil.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.mapred.nativetask.util; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.io.compress.SnappyCodec; - -public class SnappyUtil { - - public static boolean isNativeSnappyLoaded(Configuration conf) { - return SnappyCodec.isNativeCodeLoaded() && conf.getBoolean( - CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, - CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT); - } - -} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc index bf2cde2baf..050403302e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc @@ -16,11 +16,15 @@ * limitations under the License. */ -#include "snappy-c.h" +#include "config.h" + +#if defined HADOOP_SNAPPY_LIBRARY #include "commons.h" #include "NativeTask.h" #include "SnappyCodec.h" +#include + namespace NativeTask { SnappyCompressStream::SnappyCompressStream(OutputStream * stream, uint32_t bufferSizeHint) @@ -89,5 +93,6 @@ uint32_t SnappyDecompressStream::decompressOneBlock(uint32_t compressedSize, voi uint64_t SnappyDecompressStream::maxCompressedLength(uint64_t origLength) { return snappy_max_compressed_length(origLength); } - } // namespace NativeTask + +#endif // define HADOOP_SNAPPY_LIBRARY diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy-c.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy-c.h deleted file mode 100644 index 4ccc742491..0000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy-c.h +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2011 Martin Gieseking . - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * Plain C interface (a wrapper around the C++ implementation). - */ - -#ifndef UTIL_SNAPPY_OPENSOURCE_SNAPPY_C_H_ -#define UTIL_SNAPPY_OPENSOURCE_SNAPPY_C_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#include - -/* - * Return values; see the documentation for each function to know - * what each can return. - */ -typedef enum { - SNAPPY_OK = 0, - SNAPPY_INVALID_INPUT = 1, - SNAPPY_BUFFER_TOO_SMALL = 2 -} snappy_status; - -/* - * Takes the data stored in "input[0..input_length-1]" and stores - * it in the array pointed to by "compressed". - * - * signals the space available in "compressed". - * If it is not at least equal to "snappy_max_compressed_length(input_length)", - * SNAPPY_BUFFER_TOO_SMALL is returned. After successful compression, - * contains the true length of the compressed output, - * and SNAPPY_OK is returned. - * - * Example: - * size_t output_length = snappy_max_compressed_length(input_length); - * char* output = (char*)malloc(output_length); - * if (snappy_compress(input, input_length, output, &output_length) - * == SNAPPY_OK) { - * ... Process(output, output_length) ... - * } - * free(output); - */ -snappy_status snappy_compress(const char* input, - size_t input_length, - char* compressed, - size_t* compressed_length); - -/* - * Given data in "compressed[0..compressed_length-1]" generated by - * calling the snappy_compress routine, this routine stores - * the uncompressed data to - * uncompressed[0..uncompressed_length-1]. - * Returns failure (a value not equal to SNAPPY_OK) if the message - * is corrupted and could not be decrypted. - * - * signals the space available in "uncompressed". - * If it is not at least equal to the value returned by - * snappy_uncompressed_length for this stream, SNAPPY_BUFFER_TOO_SMALL - * is returned. After successful decompression, - * contains the true length of the decompressed output. - * - * Example: - * size_t output_length; - * if (snappy_uncompressed_length(input, input_length, &output_length) - * != SNAPPY_OK) { - * ... fail ... - * } - * char* output = (char*)malloc(output_length); - * if (snappy_uncompress(input, input_length, output, &output_length) - * == SNAPPY_OK) { - * ... Process(output, output_length) ... - * } - * free(output); - */ -snappy_status snappy_uncompress(const char* compressed, - size_t compressed_length, - char* uncompressed, - size_t* uncompressed_length); - -/* - * Returns the maximal size of the compressed representation of - * input data that is "source_length" bytes in length. - */ -size_t snappy_max_compressed_length(size_t source_length); - -/* - * REQUIRES: "compressed[]" was produced by snappy_compress() - * Returns SNAPPY_OK and stores the length of the uncompressed data in - * *result normally. Returns SNAPPY_INVALID_INPUT on parsing error. - * This operation takes O(1) time. - */ -snappy_status snappy_uncompressed_length(const char* compressed, - size_t compressed_length, - size_t* result); - -/* - * Check if the contents of "compressed[]" can be uncompressed successfully. - * Does not return the uncompressed data; if so, returns SNAPPY_OK, - * or if not, returns SNAPPY_INVALID_INPUT. - * Takes time proportional to compressed_length, but is usually at least a - * factor of four faster than actual decompression. - */ -snappy_status snappy_validate_compressed_buffer(const char* compressed, - size_t compressed_length); - -#ifdef __cplusplus -} // extern "C" -#endif - -#endif /* UTIL_SNAPPY_OPENSOURCE_SNAPPY_C_H_ */ \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h deleted file mode 100644 index 03ef6ce5ba..0000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright 2005 and onwards Google Inc. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// -// A light-weight compression algorithm. It is designed for speed of -// compression and decompression, rather than for the utmost in space -// savings. -// -// For getting better compression ratios when you are compressing data -// with long repeated sequences or compressing data that is similar to -// other data, while still compressing fast, you might look at first -// using BMDiff and then compressing the output of BMDiff with -// Snappy. - -#ifndef UTIL_SNAPPY_SNAPPY_H__ -#define UTIL_SNAPPY_SNAPPY_H__ - -#include -#include - -#include "snappy-stubs-public.h" - -namespace snappy { - class Source; - class Sink; - - // ------------------------------------------------------------------------ - // Generic compression/decompression routines. - // ------------------------------------------------------------------------ - - // Compress the bytes read from "*source" and append to "*sink". Return the - // number of bytes written. - size_t Compress(Source* source, Sink* sink); - - // Find the uncompressed length of the given stream, as given by the header. - // Note that the true length could deviate from this; the stream could e.g. - // be truncated. - // - // Also note that this leaves "*source" in a state that is unsuitable for - // further operations, such as RawUncompress(). You will need to rewind - // or recreate the source yourself before attempting any further calls. - bool GetUncompressedLength(Source* source, uint32* result); - - // ------------------------------------------------------------------------ - // Higher-level string based routines (should be sufficient for most users) - // ------------------------------------------------------------------------ - - // Sets "*output" to the compressed version of "input[0,input_length-1]". - // Original contents of *output are lost. - // - // REQUIRES: "input[]" is not an alias of "*output". - size_t Compress(const char* input, size_t input_length, string* output); - - // Decompresses "compressed[0,compressed_length-1]" to "*uncompressed". - // Original contents of "*uncompressed" are lost. - // - // REQUIRES: "compressed[]" is not an alias of "*uncompressed". - // - // returns false if the message is corrupted and could not be decompressed - bool Uncompress(const char* compressed, size_t compressed_length, - string* uncompressed); - - - // ------------------------------------------------------------------------ - // Lower-level character array based routines. May be useful for - // efficiency reasons in certain circumstances. - // ------------------------------------------------------------------------ - - // REQUIRES: "compressed" must point to an area of memory that is at - // least "MaxCompressedLength(input_length)" bytes in length. - // - // Takes the data stored in "input[0..input_length]" and stores - // it in the array pointed to by "compressed". - // - // "*compressed_length" is set to the length of the compressed output. - // - // Example: - // char* output = new char[snappy::MaxCompressedLength(input_length)]; - // size_t output_length; - // RawCompress(input, input_length, output, &output_length); - // ... Process(output, output_length) ... - // delete [] output; - void RawCompress(const char* input, - size_t input_length, - char* compressed, - size_t* compressed_length); - - // Given data in "compressed[0..compressed_length-1]" generated by - // calling the Snappy::Compress routine, this routine - // stores the uncompressed data to - // uncompressed[0..GetUncompressedLength(compressed)-1] - // returns false if the message is corrupted and could not be decrypted - bool RawUncompress(const char* compressed, size_t compressed_length, - char* uncompressed); - - // Given data from the byte source 'compressed' generated by calling - // the Snappy::Compress routine, this routine stores the uncompressed - // data to - // uncompressed[0..GetUncompressedLength(compressed,compressed_length)-1] - // returns false if the message is corrupted and could not be decrypted - bool RawUncompress(Source* compressed, char* uncompressed); - - // Returns the maximal size of the compressed representation of - // input data that is "source_bytes" bytes in length; - size_t MaxCompressedLength(size_t source_bytes); - - // REQUIRES: "compressed[]" was produced by RawCompress() or Compress() - // Returns true and stores the length of the uncompressed data in - // *result normally. Returns false on parsing error. - // This operation takes O(1) time. - bool GetUncompressedLength(const char* compressed, size_t compressed_length, - size_t* result); - - // Returns true iff the contents of "compressed[]" can be uncompressed - // successfully. Does not return the uncompressed data. Takes - // time proportional to compressed_length, but is usually at least - // a factor of four faster than actual decompression. - bool IsValidCompressedBuffer(const char* compressed, - size_t compressed_length); - - // The size of a compression block. Note that many parts of the compression - // code assumes that kBlockSize <= 65536; in particular, the hash table - // can only store 16-bit offsets, and EmitCopy() also assumes the offset - // is 65535 bytes or less. Note also that if you change this, it will - // affect the framing format (see framing_format.txt). - // - // Note that there might be older data around that is compressed with larger - // block sizes, so the decompression code should not rely on the - // non-existence of long backreferences. - static const int kBlockLog = 16; - static const size_t kBlockSize = 1 << kBlockLog; - - static const int kMaxHashTableBits = 14; - static const size_t kMaxHashTableSize = 1 << kMaxHashTableBits; - -} // end namespace snappy - - -#endif // UTIL_SNAPPY_SNAPPY_H__ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc index e2ce730967..0f954e00bf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Compressions.cc @@ -17,6 +17,7 @@ */ #include "commons.h" +#include "config.h" #include "SyncUtils.h" #include "Compressions.h" #include "codec/GzipCodec.h" @@ -110,7 +111,11 @@ CompressStream * Compressions::getCompressionStream(const string & codec, Output return new GzipCompressStream(stream, bufferSizeHint); } if (codec == SnappyCodec.name) { +#if defined HADOOP_SNAPPY_LIBRARY return new SnappyCompressStream(stream, bufferSizeHint); +#else + THROW_EXCEPTION(UnsupportException, "Snappy library is not loaded"); +#endif } if (codec == Lz4Codec.name) { return new Lz4CompressStream(stream, bufferSizeHint); @@ -124,7 +129,11 @@ DecompressStream * Compressions::getDecompressionStream(const string & codec, In return new GzipDecompressStream(stream, bufferSizeHint); } if (codec == SnappyCodec.name) { +#if defined HADOOP_SNAPPY_LIBRARY return new SnappyDecompressStream(stream, bufferSizeHint); +#else + THROW_EXCEPTION(UnsupportException, "Snappy library is not loaded"); +#endif } if (codec == Lz4Codec.name) { return new Lz4DecompressStream(stream, bufferSizeHint); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc index 3e815651b6..28aeaf35f9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc @@ -19,6 +19,7 @@ #ifndef QUICK_BUILD #include "org_apache_hadoop_mapred_nativetask_NativeRuntime.h" #endif +#include "config.h" #include "commons.h" #include "jniutils.h" #include "NativeObjectFactory.h" @@ -29,6 +30,30 @@ using namespace NativeTask; // NativeRuntime JNI methods /////////////////////////////////////////////////////////////// +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: supportCompressionCodec + * Signature: ([B)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_supportsCompressionCodec + (JNIEnv *jenv, jclass clazz, jbyteArray codec) +{ + const std::string codecString = JNU_ByteArrayToString(jenv, codec); + if ("org.apache.hadoop.io.compress.GzipCodec" == codecString) { + return JNI_TRUE; + } else if ("org.apache.hadoop.io.compress.Lz4Codec" == codecString) { + return JNI_TRUE; + } else if ("org.apache.hadoop.io.compress.SnappyCodec" == codecString) { +#if defined HADOOP_SNAPPY_LIBRARY + return JNI_TRUE; +#else + return JNI_FALSE; +#endif + } else { + return JNI_FALSE; + } +} + /* * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime * Method: JNIRelease diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h index 6d780cfbb6..915c5321df 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h @@ -7,6 +7,14 @@ #ifdef __cplusplus extern "C" { #endif +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: supportCompressionCodec + * Signature: ([B)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_supportsCompressionCodec + (JNIEnv *, jclass, jbyteArray); + /* * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime * Method: JNIRelease diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCompressions.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCompressions.cc index 68e8368561..de02ac76e7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCompressions.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestCompressions.cc @@ -17,7 +17,7 @@ */ #include "lz4.h" -#include "snappy.h" +#include "config.h" #include "commons.h" #include "Path.h" #include "BufferStream.h" @@ -25,9 +25,29 @@ #include "Compressions.h" #include "test_commons.h" -void TestCodec(const string & codec, const string & data, char * buff, char * buff2, size_t buffLen, - uint32_t buffhint) { +#if defined HADOOP_SNAPPY_LIBRARY +#include +#endif + +void TestCodec(const string & codec) { + string data; + size_t length = TestConfig.getInt("compression.input.length", 100 * 1024 * 1024); + uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024); + string type = TestConfig.get("compression.input.type", "bytes"); Timer timer; + GenerateKVTextLength(data, length, type); + LOG("%s", timer.getInterval("Generate data").c_str()); + + InputBuffer inputBuffer = InputBuffer(data); + size_t buffLen = data.length() / 2 * 3; + + timer.reset(); + char * buff = new char[buffLen]; + char * buff2 = new char[buffLen]; + memset(buff, 0, buffLen); + memset(buff2, 0, buffLen); + LOG("%s", timer.getInterval("memset buffer to prevent missing page").c_str()); + OutputBuffer outputBuffer = OutputBuffer(buff, buffLen); CompressStream * compressor = Compressions::getCompressionStream(codec, &outputBuffer, buffhint); @@ -57,35 +77,10 @@ void TestCodec(const string & codec, const string & data, char * buff, char * bu ASSERT_EQ(data.length(), total); ASSERT_EQ(0, memcmp(data.c_str(), buff2, total)); - delete compressor; - delete decompressor; -} - -TEST(Perf, Compressions) { - string data; - size_t length = TestConfig.getInt("compression.input.length", 100 * 1024 * 1024); - uint32_t buffhint = TestConfig.getInt("compression.buffer.hint", 128 * 1024); - string type = TestConfig.get("compression.input.type", "bytes"); - Timer timer; - GenerateKVTextLength(data, length, type); - LOG("%s", timer.getInterval("Generate data").c_str()); - - InputBuffer inputBuffer = InputBuffer(data); - size_t buffLen = data.length() / 2 * 3; - - timer.reset(); - char * buff = new char[buffLen]; - char * buff2 = new char[buffLen]; - memset(buff, 0, buffLen); - memset(buff2, 0, buffLen); - LOG("%s", timer.getInterval("memset buffer to prevent missing page").c_str()); - - TestCodec("org.apache.hadoop.io.compress.SnappyCodec", data, buff, buff2, buffLen, buffhint); - TestCodec("org.apache.hadoop.io.compress.Lz4Codec", data, buff, buff2, buffLen, buffhint); - TestCodec("org.apache.hadoop.io.compress.GzipCodec", data, buff, buff2, buffLen, buffhint); - delete[] buff; delete[] buff2; + delete compressor; + delete decompressor; } TEST(Perf, CompressionUtil) { @@ -173,6 +168,9 @@ public: } }; +TEST(Perf, GzipCodec) { + TestCodec("org.apache.hadoop.io.compress.GzipCodec"); +} void MeasureSingleFileLz4(const string & path, CompressResult & total, size_t blockSize, int times) { @@ -226,6 +224,12 @@ TEST(Perf, RawCompressionLz4) { printf("%s - Total\n", total.toString().c_str()); } +TEST(Perf, Lz4Codec) { + TestCodec("org.apache.hadoop.io.compress.Lz4Codec"); +} + +#if defined HADOOP_SNAPPY_LIBRARY + void MeasureSingleFileSnappy(const string & path, CompressResult & total, size_t blockSize, int times) { string data; @@ -278,3 +282,8 @@ TEST(Perf, RawCompressionSnappy) { printf("%s - Total\n", total.toString().c_str()); } +TEST(Perf, SnappyCodec) { + TestCodec("org.apache.hadoop.io.compress.SnappyCodec"); +} + +#endif // define HADOOP_SNAPPY_LIBRARY diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc index 22885b2e14..1d8b5200fc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc @@ -18,6 +18,7 @@ #include #include "commons.h" +#include "config.h" #include "BufferStream.h" #include "FileSystem.h" #include "IFile.h" @@ -90,7 +91,9 @@ TEST(IFile, WriteRead) { TestIFileReadWrite(TextType, partition, size, kvs); TestIFileReadWrite(BytesType, partition, size, kvs); TestIFileReadWrite(UnknownType, partition, size, kvs); +#if defined HADOOP_SNAPPY_LIBRARY TestIFileReadWrite(TextType, partition, size, kvs, "org.apache.hadoop.io.compress.SnappyCodec"); +#endif } void TestIFileWriteRead2(vector > & kvs, char * buff, size_t buffsize, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc index 648fe809c8..cbb636f532 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc @@ -17,6 +17,7 @@ */ #include "commons.h" +#include "config.h" #include "BufferStream.h" #include "Buffers.h" #include "test_commons.h" @@ -43,6 +44,7 @@ TEST(Buffers, AppendRead) { } } +#if defined HADOOP_SNAPPY_LIBRARY TEST(Buffers, AppendReadSnappy) { string codec = "org.apache.hadoop.io.compress.SnappyCodec"; vector data; @@ -64,4 +66,4 @@ TEST(Buffers, AppendReadSnappy) { ASSERT_EQ(data[i], string(rd, data[i].length())); } } - +#endif