diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index aba8ce050c..ce83dd415c 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -159,6 +159,8 @@ Release 0.23.1 - Unreleased HADOOP-7777 Implement a base class for DNSToSwitchMapping implementations that can offer extra topology information. (stevel) + HADOOP-7657. Add support for LZ4 compression. (Binglin Chang via todd) + IMPROVEMENTS HADOOP-7801. HADOOP_PREFIX cannot be overriden. (Bruno Mahé via tomwhite) diff --git a/hadoop-common-project/hadoop-common/LICENSE.txt b/hadoop-common-project/hadoop-common/LICENSE.txt index 111653695f..6ccfd09277 100644 --- a/hadoop-common-project/hadoop-common/LICENSE.txt +++ b/hadoop-common-project/hadoop-common/LICENSE.txt @@ -251,3 +251,34 @@ in src/main/native/src/org/apache/hadoop/util: * All rights reserved. Use of this source code is governed by a * BSD-style license that can be found in the LICENSE file. */ + + For src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c: + +/* + LZ4 - Fast LZ compression algorithm + Copyright (C) 2011, Yann Collet. + BSD License + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index aa3548e666..6a5e56fb48 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -536,6 +536,8 @@ org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping org.apache.hadoop.io.compress.snappy.SnappyCompressor org.apache.hadoop.io.compress.snappy.SnappyDecompressor + org.apache.hadoop.io.compress.lz4.Lz4Compressor + org.apache.hadoop.io.compress.lz4.Lz4Decompressor org.apache.hadoop.util.NativeCrc32 ${project.build.directory}/native/javah diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index aaac349cd2..7c9b25c957 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -93,7 +93,15 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Default value for IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY */ public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT = 256 * 1024; - + + /** Internal buffer size for Snappy compressor/decompressors */ + public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY = + "io.compression.codec.lz4.buffersize"; + + /** Default value for IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY */ + public static final int IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT = + 256 * 1024; + /** * Service Authorization */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java new file mode 100644 index 0000000000..4613ebff40 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.lz4.Lz4Compressor; +import org.apache.hadoop.io.compress.lz4.Lz4Decompressor; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.util.NativeCodeLoader; + +/** + * This class creates lz4 compressors/decompressors. + */ +public class Lz4Codec implements Configurable, CompressionCodec { + + static { + NativeCodeLoader.isNativeCodeLoaded(); + } + + Configuration conf; + + /** + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this objec. + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Are the native lz4 libraries loaded & initialized? + * + * @return true if loaded & initialized, otherwise false + */ + public static boolean isNativeCodeLoaded() { + return NativeCodeLoader.isNativeCodeLoaded(); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return createOutputStream(out, createCompressor()); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) + throws IOException { + if (!isNativeCodeLoaded()) { + throw new RuntimeException("native lz4 library not available"); + } + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT); + + int compressionOverhead = Math.max((int)(bufferSize * 0.01), 10); + + return new BlockCompressorStream(out, compressor, bufferSize, + compressionOverhead); + } + + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class getCompressorType() { + if (!isNativeCodeLoaded()) { + throw new RuntimeException("native lz4 library not available"); + } + + return Lz4Compressor.class; + } + + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + if (!isNativeCodeLoaded()) { + throw new RuntimeException("native lz4 library not available"); + } + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT); + return new Lz4Compressor(bufferSize); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * input stream. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return createInputStream(in, createDecompressor()); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) + throws IOException { + if (!isNativeCodeLoaded()) { + throw new RuntimeException("native lz4 library not available"); + } + + return new BlockDecompressorStream(in, decompressor, conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT)); + } + + /** + * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class getDecompressorType() { + if (!isNativeCodeLoaded()) { + throw new RuntimeException("native lz4 library not available"); + } + + return Lz4Decompressor.class; + } + + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + if (!isNativeCodeLoaded()) { + throw new RuntimeException("native lz4 library not available"); + } + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT); + return new Lz4Decompressor(bufferSize); + } + + /** + * Get the default filename extension for this kind of compression. + * + * @return .lz4. + */ + @Override + public String getDefaultExtension() { + return ".lz4"; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java new file mode 100644 index 0000000000..63a3afb7d4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lz4; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.util.NativeCodeLoader; + +/** + * A {@link Compressor} based on the lz4 compression algorithm. + * http://code.google.com/p/lz4/ + */ +public class Lz4Compressor implements Compressor { + private static final Log LOG = + LogFactory.getLog(Lz4Compressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + + // HACK - Use this as a global lock in the JNI layer + @SuppressWarnings({"unchecked", "unused"}) + private static Class clazz = Lz4Compressor.class; + + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int uncompressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finish, finished; + + private long bytesRead = 0L; + private long bytesWritten = 0L; + + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + // Initialize the native library + try { + initIDs(); + } catch (Throwable t) { + // Ignore failure to load/initialize lz4 + LOG.warn(t.toString()); + } + } else { + LOG.error("Cannot load " + Lz4Compressor.class.getName() + + " without native hadoop library!"); + } + } + + /** + * Creates a new compressor. + * + * @param directBufferSize size of the direct buffer to be used. + */ + public Lz4Compressor(int directBufferSize) { + this.directBufferSize = directBufferSize; + + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf.position(directBufferSize); + } + + /** + * Creates a new compressor with the default buffer size. + */ + public Lz4Compressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for compression. + * This should be called whenever #needsInput() returns + * true indicating that more input data is required. + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + @Override + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + finished = false; + + if (len > uncompressedDirectBuf.remaining()) { + // save data; now !needsInput + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + } else { + ((ByteBuffer) uncompressedDirectBuf).put(b, off, len); + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + } + + bytesRead += len; + } + + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + if (0 >= userBufLen) { + return; + } + finished = false; + + uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize); + ((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff, + uncompressedDirectBufLen); + + // Note how much data is being fed to lz4 + userBufOff += uncompressedDirectBufLen; + userBufLen -= uncompressedDirectBufLen; + } + + /** + * Does nothing. + */ + @Override + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + /** + * Returns true if the input data buffer is empty and + * #setInput() should be called to provide more input. + * + * @return true if the input data buffer is empty and + * #setInput() should be called in order to provide more input. + */ + @Override + public synchronized boolean needsInput() { + return !(compressedDirectBuf.remaining() > 0 + || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0); + } + + /** + * When called, indicates that compression should end + * with the current contents of the input buffer. + */ + @Override + public synchronized void finish() { + finish = true; + } + + /** + * Returns true if the end of the compressed + * data output stream has been reached. + * + * @return true if the end of the compressed + * data output stream has been reached. + */ + @Override + public synchronized boolean finished() { + // Check if all uncompressed data has been consumed + return (finish && finished && compressedDirectBuf.remaining() == 0); + } + + /** + * Fills specified buffer with compressed data. Returns actual number + * of bytes of compressed data. A return value of 0 indicates that + * needsInput() should be called in order to determine if more input + * data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + */ + @Override + public synchronized int compress(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check if there is compressed data + int n = compressedDirectBuf.remaining(); + if (n > 0) { + n = Math.min(n, len); + ((ByteBuffer) compressedDirectBuf).get(b, off, n); + bytesWritten += n; + return n; + } + + // Re-initialize the lz4's output direct-buffer + compressedDirectBuf.clear(); + compressedDirectBuf.limit(0); + if (0 == uncompressedDirectBuf.position()) { + // No compressed data, so we should have !needsInput or !finished + setInputFromSavedData(); + if (0 == uncompressedDirectBuf.position()) { + // Called without data; write nothing + finished = true; + return 0; + } + } + + // Compress data + n = compressBytesDirect(); + compressedDirectBuf.limit(n); + uncompressedDirectBuf.clear(); // lz4 consumes all buffer input + + // Set 'finished' if snapy has consumed all user-data + if (0 == userBufLen) { + finished = true; + } + + // Get atmost 'len' bytes + n = Math.min(n, len); + bytesWritten += n; + ((ByteBuffer) compressedDirectBuf).get(b, off, n); + + return n; + } + + /** + * Resets compressor so that a new set of input data can be processed. + */ + @Override + public synchronized void reset() { + finish = false; + finished = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufLen = 0; + compressedDirectBuf.clear(); + compressedDirectBuf.limit(0); + userBufOff = userBufLen = 0; + bytesRead = bytesWritten = 0L; + } + + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration + * + * @param conf Configuration from which new setting are fetched + */ + @Override + public synchronized void reinit(Configuration conf) { + reset(); + } + + /** + * Return number of bytes given to this compressor since last reset. + */ + @Override + public synchronized long getBytesRead() { + return bytesRead; + } + + /** + * Return number of bytes consumed by callers of compress since last reset. + */ + @Override + public synchronized long getBytesWritten() { + return bytesWritten; + } + + /** + * Closes the compressor and discards any unprocessed input. + */ + @Override + public synchronized void end() { + } + + private native static void initIDs(); + + private native int compressBytesDirect(); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java new file mode 100644 index 0000000000..0cf65e5144 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lz4; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.util.NativeCodeLoader; + +/** + * A {@link Decompressor} based on the lz4 compression algorithm. + * http://code.google.com/p/lz4/ + */ +public class Lz4Decompressor implements Decompressor { + private static final Log LOG = + LogFactory.getLog(Lz4Compressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + + // HACK - Use this as a global lock in the JNI layer + @SuppressWarnings({"unchecked", "unused"}) + private static Class clazz = Lz4Decompressor.class; + + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int compressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finished; + + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + // Initialize the native library + try { + initIDs(); + } catch (Throwable t) { + // Ignore failure to load/initialize lz4 + LOG.warn(t.toString()); + } + } else { + LOG.error("Cannot load " + Lz4Compressor.class.getName() + + " without native hadoop library!"); + } + } + + /** + * Creates a new compressor. + * + * @param directBufferSize size of the direct buffer to be used. + */ + public Lz4Decompressor(int directBufferSize) { + this.directBufferSize = directBufferSize; + + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + + } + + /** + * Creates a new decompressor with the default buffer size. + */ + public Lz4Decompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for decompression. + * This should be called if and only if {@link #needsInput()} returns + * true indicating that more input data is required. + * (Both native and non-native versions of various Decompressors require + * that the data passed in via b[] remain unmodified until + * the caller is explicitly notified--via {@link #needsInput()}--that the + * buffer may be safely modified. With this requirement, an extra + * buffer-copy can be avoided.) + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + @Override + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + + setInputFromSavedData(); + + // Reinitialize lz4's output direct-buffer + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + compressedDirectBufLen = Math.min(userBufLen, directBufferSize); + + // Reinitialize lz4's input direct buffer + compressedDirectBuf.rewind(); + ((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff, + compressedDirectBufLen); + + // Note how much data is being fed to lz4 + userBufOff += compressedDirectBufLen; + userBufLen -= compressedDirectBufLen; + } + + /** + * Does nothing. + */ + @Override + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + /** + * Returns true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called to + * provide more input. + * + * @return true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called in + * order to provide more input. + */ + @Override + public synchronized boolean needsInput() { + // Consume remaining compressed data? + if (uncompressedDirectBuf.remaining() > 0) { + return false; + } + + // Check if lz4 has consumed all input + if (compressedDirectBufLen <= 0) { + // Check if we have consumed all user-input + if (userBufLen <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + + return false; + } + + /** + * Returns false. + * + * @return false. + */ + @Override + public synchronized boolean needsDictionary() { + return false; + } + + /** + * Returns true if the end of the decompressed + * data output stream has been reached. + * + * @return true if the end of the decompressed + * data output stream has been reached. + */ + @Override + public synchronized boolean finished() { + return (finished && uncompressedDirectBuf.remaining() == 0); + } + + /** + * Fills specified buffer with uncompressed data. Returns actual number + * of bytes of uncompressed data. A return value of 0 indicates that + * {@link #needsInput()} should be called in order to determine if more + * input data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + * @throws IOException + */ + @Override + public synchronized int decompress(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + int n = 0; + + // Check if there is uncompressed data + n = uncompressedDirectBuf.remaining(); + if (n > 0) { + n = Math.min(n, len); + ((ByteBuffer) uncompressedDirectBuf).get(b, off, n); + return n; + } + if (compressedDirectBufLen > 0) { + // Re-initialize the lz4's output direct buffer + uncompressedDirectBuf.rewind(); + uncompressedDirectBuf.limit(directBufferSize); + + // Decompress data + n = decompressBytesDirect(); + uncompressedDirectBuf.limit(n); + + if (userBufLen <= 0) { + finished = true; + } + + // Get atmost 'len' bytes + n = Math.min(n, len); + ((ByteBuffer) uncompressedDirectBuf).get(b, off, n); + } + + return n; + } + + /** + * Returns 0. + * + * @return 0. + */ + @Override + public synchronized int getRemaining() { + // Never use this function in BlockDecompressorStream. + return 0; + } + + public synchronized void reset() { + finished = false; + compressedDirectBufLen = 0; + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + userBufOff = userBufLen = 0; + } + + /** + * Resets decompressor and input and output buffers so that a new set of + * input data can be processed. + */ + @Override + public synchronized void end() { + // do nothing + } + + private native static void initIDs(); + + private native int decompressBytesDirect(); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/package-info.java new file mode 100644 index 0000000000..11827f1748 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.io.compress.lz4; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + diff --git a/hadoop-common-project/hadoop-common/src/main/native/Makefile.am b/hadoop-common-project/hadoop-common/src/main/native/Makefile.am index 3afc0b88a0..c4ca564c2b 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/Makefile.am +++ b/hadoop-common-project/hadoop-common/src/main/native/Makefile.am @@ -46,6 +46,9 @@ libhadoop_la_SOURCES = src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c \ src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c \ src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c \ src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c \ + src/org/apache/hadoop/io/compress/lz4/lz4.c \ + src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c \ + src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c \ src/org/apache/hadoop/security/getGroup.c \ src/org/apache/hadoop/security/JniBasedUnixGroupsMapping.c \ src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c \ diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c new file mode 100644 index 0000000000..d52a4f6b2a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if defined HAVE_CONFIG_H + #include +#endif + +#include "org_apache_hadoop.h" +#include "org_apache_hadoop_io_compress_lz4_Lz4Compressor.h" + +//**************************** +// Simple Functions +//**************************** + +extern int LZ4_compress (char* source, char* dest, int isize); + +/* +LZ4_compress() : + return : the number of bytes in compressed buffer dest + note : destination buffer must be already allocated. + To avoid any problem, size it to handle worst cases situations (input data not compressible) + Worst case size is : "inputsize + 0.4%", with "0.4%" being at least 8 bytes. + +*/ + +static jfieldID Lz4Compressor_clazz; +static jfieldID Lz4Compressor_uncompressedDirectBuf; +static jfieldID Lz4Compressor_uncompressedDirectBufLen; +static jfieldID Lz4Compressor_compressedDirectBuf; +static jfieldID Lz4Compressor_directBufferSize; + + +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_initIDs +(JNIEnv *env, jclass clazz){ + + Lz4Compressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz", + "Ljava/lang/Class;"); + Lz4Compressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz, + "uncompressedDirectBuf", + "Ljava/nio/Buffer;"); + Lz4Compressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz, + "uncompressedDirectBufLen", "I"); + Lz4Compressor_compressedDirectBuf = (*env)->GetFieldID(env, clazz, + "compressedDirectBuf", + "Ljava/nio/Buffer;"); + Lz4Compressor_directBufferSize = (*env)->GetFieldID(env, clazz, + "directBufferSize", "I"); +} + +JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_compressBytesDirect +(JNIEnv *env, jobject thisj){ + // Get members of Lz4Compressor + jobject clazz = (*env)->GetStaticObjectField(env, thisj, Lz4Compressor_clazz); + jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf); + jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen); + jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf); + jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_directBufferSize); + + // Get the input direct buffer + LOCK_CLASS(env, clazz, "Lz4Compressor"); + const char* uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); + UNLOCK_CLASS(env, clazz, "Lz4Compressor"); + + if (uncompressed_bytes == 0) { + return (jint)0; + } + + // Get the output direct buffer + LOCK_CLASS(env, clazz, "Lz4Compressor"); + char* compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); + UNLOCK_CLASS(env, clazz, "Lz4Compressor"); + + if (compressed_bytes == 0) { + return (jint)0; + } + + compressed_direct_buf_len = LZ4_compress(uncompressed_bytes, compressed_bytes, uncompressed_direct_buf_len); + if (compressed_direct_buf_len < 0){ + THROW(env, "Ljava/lang/InternalError", "LZ4_compress failed"); + } + + (*env)->SetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen, 0); + + return (jint)compressed_direct_buf_len; +} + diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c new file mode 100644 index 0000000000..547b027cc1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if defined HAVE_CONFIG_H + #include +#endif + +#include "org_apache_hadoop.h" +#include "org_apache_hadoop_io_compress_lz4_Lz4Decompressor.h" + +int LZ4_uncompress_unknownOutputSize (char* source, char* dest, int isize, int maxOutputSize); + +/* +LZ4_uncompress_unknownOutputSize() : + isize : is the input size, therefore the compressed size + maxOutputSize : is the size of the destination buffer (which must be already allocated) + return : the number of bytes decoded in the destination buffer (necessarily <= maxOutputSize) + If the source stream is malformed, the function will stop decoding and return a negative result, indicating the byte position of the faulty instruction + This version never writes beyond dest + maxOutputSize, and is therefore protected against malicious data packets + note : This version is a bit slower than LZ4_uncompress +*/ + + +static jfieldID Lz4Decompressor_clazz; +static jfieldID Lz4Decompressor_compressedDirectBuf; +static jfieldID Lz4Decompressor_compressedDirectBufLen; +static jfieldID Lz4Decompressor_uncompressedDirectBuf; +static jfieldID Lz4Decompressor_directBufferSize; + +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_initIDs +(JNIEnv *env, jclass clazz){ + + Lz4Decompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz", + "Ljava/lang/Class;"); + Lz4Decompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz, + "compressedDirectBuf", + "Ljava/nio/Buffer;"); + Lz4Decompressor_compressedDirectBufLen = (*env)->GetFieldID(env,clazz, + "compressedDirectBufLen", "I"); + Lz4Decompressor_uncompressedDirectBuf = (*env)->GetFieldID(env,clazz, + "uncompressedDirectBuf", + "Ljava/nio/Buffer;"); + Lz4Decompressor_directBufferSize = (*env)->GetFieldID(env, clazz, + "directBufferSize", "I"); +} + +JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_decompressBytesDirect +(JNIEnv *env, jobject thisj){ + // Get members of Lz4Decompressor + jobject clazz = (*env)->GetStaticObjectField(env,thisj, Lz4Decompressor_clazz); + jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_compressedDirectBuf); + jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, Lz4Decompressor_compressedDirectBufLen); + jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_uncompressedDirectBuf); + size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Decompressor_directBufferSize); + + // Get the input direct buffer + LOCK_CLASS(env, clazz, "Lz4Decompressor"); + const char* compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); + UNLOCK_CLASS(env, clazz, "Lz4Decompressor"); + + if (compressed_bytes == 0) { + return (jint)0; + } + + // Get the output direct buffer + LOCK_CLASS(env, clazz, "Lz4Decompressor"); + char* uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); + UNLOCK_CLASS(env, clazz, "Lz4Decompressor"); + + if (uncompressed_bytes == 0) { + return (jint)0; + } + + uncompressed_direct_buf_len = LZ4_uncompress_unknownOutputSize(compressed_bytes, uncompressed_bytes, compressed_direct_buf_len, uncompressed_direct_buf_len); + if (uncompressed_direct_buf_len < 0) { + THROW(env, "Ljava/lang/InternalError", "LZ4_uncompress_unknownOutputSize failed."); + } + + (*env)->SetIntField(env, thisj, Lz4Decompressor_compressedDirectBufLen, 0); + + return (jint)uncompressed_direct_buf_len; +} diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c new file mode 100644 index 0000000000..be15615a78 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c @@ -0,0 +1,645 @@ +/* + LZ4 - Fast LZ compression algorithm + Copyright (C) 2011, Yann Collet. + BSD License + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +//************************************** +// Copy from: +// URL: http://lz4.googlecode.com/svn/trunk/lz4.c +// Repository Root: http://lz4.googlecode.com/svn +// Repository UUID: 650e7d94-2a16-8b24-b05c-7c0b3f6821cd +// Revision: 43 +// Node Kind: file +// Last Changed Author: yann.collet.73@gmail.com +// Last Changed Rev: 43 +// Last Changed Date: 2011-12-16 15:41:46 -0800 (Fri, 16 Dec 2011) +// Sha1: 9db7b2c57698c528d79572e6bce2e7dc33fa5998 +//************************************** + +//************************************** +// Compilation Directives +//************************************** +#if __STDC_VERSION__ >= 199901L + /* "restrict" is a known keyword */ +#else +#define restrict // Disable restrict +#endif + + +//************************************** +// Includes +//************************************** +#include // for malloc +#include // for memset + + +//************************************** +// Performance parameter +//************************************** +// Increasing this value improves compression ratio +// Lowering this value reduces memory usage +// Lowering may also improve speed, typically on reaching cache size limits (L1 32KB for Intel, 64KB for AMD) +// Memory usage formula for 32 bits systems : N->2^(N+2) Bytes (examples : 17 -> 512KB ; 12 -> 16KB) +#define HASH_LOG 12 + + +//************************************** +// Basic Types +//************************************** +#if defined(_MSC_VER) // Visual Studio does not support 'stdint' natively +#define BYTE unsigned __int8 +#define U16 unsigned __int16 +#define U32 unsigned __int32 +#define S32 __int32 +#else +#include +#define BYTE uint8_t +#define U16 uint16_t +#define U32 uint32_t +#define S32 int32_t +#endif + + +//************************************** +// Constants +//************************************** +#define MINMATCH 4 +#define SKIPSTRENGTH 6 +#define STACKLIMIT 13 +#define HEAPMODE (HASH_LOG>STACKLIMIT) // Defines if memory is allocated into the stack (local variable), or into the heap (malloc()). +#define COPYTOKEN 4 +#define COPYLENGTH 8 +#define LASTLITERALS 5 +#define MFLIMIT (COPYLENGTH+MINMATCH) +#define MINLENGTH (MFLIMIT+1) + +#define MAXD_LOG 16 +#define MAX_DISTANCE ((1 << MAXD_LOG) - 1) + +#define HASHTABLESIZE (1 << HASH_LOG) +#define HASH_MASK (HASHTABLESIZE - 1) + +#define ML_BITS 4 +#define ML_MASK ((1U<v) +#define A16(x) (((U16_S *)(x))->v) + + +//************************************** +// Macros +//************************************** +#define LZ4_HASH_FUNCTION(i) (((i) * 2654435761U) >> ((MINMATCH*8)-HASH_LOG)) +#define LZ4_HASH_VALUE(p) LZ4_HASH_FUNCTION(A32(p)) +#define LZ4_COPYPACKET(s,d) A32(d) = A32(s); d+=4; s+=4; A32(d) = A32(s); d+=4; s+=4; +#define LZ4_WILDCOPY(s,d,e) do { LZ4_COPYPACKET(s,d) } while (dhashTable; + memset((void*)HashTable, 0, sizeof(srt->hashTable)); +#else + (void) ctx; +#endif + + + // First Byte + HashTable[LZ4_HASH_VALUE(ip)] = ip; + ip++; forwardH = LZ4_HASH_VALUE(ip); + + // Main Loop + for ( ; ; ) + { + int findMatchAttempts = (1U << skipStrength) + 3; + const BYTE* forwardIp = ip; + const BYTE* ref; + BYTE* token; + + // Find a match + do { + U32 h = forwardH; + int step = findMatchAttempts++ >> skipStrength; + ip = forwardIp; + forwardIp = ip + step; + + if (forwardIp > mflimit) { goto _last_literals; } + + forwardH = LZ4_HASH_VALUE(forwardIp); + ref = HashTable[h]; + HashTable[h] = ip; + + } while ((ref < ip - MAX_DISTANCE) || (A32(ref) != A32(ip))); + + // Catch up + while ((ip>anchor) && (ref>(BYTE*)source) && (ip[-1]==ref[-1])) { ip--; ref--; } + + // Encode Literal length + length = ip - anchor; + token = op++; + if (length>=(int)RUN_MASK) { *token=(RUN_MASK< 254 ; len-=255) *op++ = 255; *op++ = (BYTE)len; } + else *token = (length<>8; } +#endif + + // Start Counting + ip+=MINMATCH; ref+=MINMATCH; // MinMatch verified + anchor = ip; + while (ip> 27]; +#else + if (A32(ref) == A32(ip)) { ip+=4; ref+=4; continue; } + if (A16(ref) == A16(ip)) { ip+=2; ref+=2; } + if (*ref == *ip) ip++; +#endif + goto _endCount; + } + if ((ip<(matchlimit-1)) && (A16(ref) == A16(ip))) { ip+=2; ref+=2; } + if ((ip=(int)ML_MASK) { *token+=ML_MASK; len-=ML_MASK; for(; len > 509 ; len-=510) { *op++ = 255; *op++ = 255; } if (len > 254) { len-=255; *op++ = 255; } *op++ = (BYTE)len; } + else *token += len; + + // Test end of chunk + if (ip > mflimit) { anchor = ip; break; } + + // Fill table + HashTable[LZ4_HASH_VALUE(ip-2)] = ip-2; + + // Test next position + ref = HashTable[LZ4_HASH_VALUE(ip)]; + HashTable[LZ4_HASH_VALUE(ip)] = ip; + if ((ref > ip - (MAX_DISTANCE + 1)) && (A32(ref) == A32(ip))) { token = op++; *token=0; goto _next_match; } + + // Prepare next loop + anchor = ip++; + forwardH = LZ4_HASH_VALUE(ip); + } + +_last_literals: + // Encode Last Literals + { + int lastRun = iend - anchor; + if (lastRun>=(int)RUN_MASK) { *op++=(RUN_MASK< 254 ; lastRun-=255) *op++ = 255; *op++ = (BYTE) lastRun; } + else *op++ = (lastRun<> ((MINMATCH*8)-HASHLOG64K)) +#define LZ4_HASH64K_VALUE(p) LZ4_HASH64K_FUNCTION(A32(p)) +int LZ4_compress64kCtx(void** ctx, + char* source, + char* dest, + int isize) +{ +#if HEAPMODE + struct refTables *srt = (struct refTables *) (*ctx); + U16* HashTable; +#else + U16 HashTable[HASHTABLESIZE<<1] = {0}; +#endif + + const BYTE* ip = (BYTE*) source; + const BYTE* anchor = ip; + const BYTE* const base = ip; + const BYTE* const iend = ip + isize; + const BYTE* const mflimit = iend - MFLIMIT; +#define matchlimit (iend - LASTLITERALS) + + BYTE* op = (BYTE*) dest; + +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ + const size_t DeBruijnBytePos[32] = { 0, 0, 3, 0, 3, 1, 3, 0, 3, 2, 2, 1, 3, 2, 0, 1, 3, 3, 1, 2, 2, 2, 2, 0, 3, 1, 2, 0, 1, 0, 1, 1 }; +#endif + int len, length; + const int skipStrength = SKIPSTRENGTH; + U32 forwardH; + + + // Init + if (isizehashTable); + memset((void*)HashTable, 0, sizeof(srt->hashTable)); +#else + (void) ctx; +#endif + + + // First Byte + ip++; forwardH = LZ4_HASH64K_VALUE(ip); + + // Main Loop + for ( ; ; ) + { + int findMatchAttempts = (1U << skipStrength) + 3; + const BYTE* forwardIp = ip; + const BYTE* ref; + BYTE* token; + + // Find a match + do { + U32 h = forwardH; + int step = findMatchAttempts++ >> skipStrength; + ip = forwardIp; + forwardIp = ip + step; + + if (forwardIp > mflimit) { goto _last_literals; } + + forwardH = LZ4_HASH64K_VALUE(forwardIp); + ref = base + HashTable[h]; + HashTable[h] = ip - base; + + } while (A32(ref) != A32(ip)); + + // Catch up + while ((ip>anchor) && (ref>(BYTE*)source) && (ip[-1]==ref[-1])) { ip--; ref--; } + + // Encode Literal length + length = ip - anchor; + token = op++; + if (length>=(int)RUN_MASK) { *token=(RUN_MASK< 254 ; len-=255) *op++ = 255; *op++ = (BYTE)len; } + else *token = (length<>8; } +#endif + + // Start Counting + ip+=MINMATCH; ref+=MINMATCH; // MinMatch verified + anchor = ip; + while (ip> 27]; +#else + if (A32(ref) == A32(ip)) { ip+=4; ref+=4; continue; } + if (A16(ref) == A16(ip)) { ip+=2; ref+=2; } + if (*ref == *ip) ip++; +#endif + goto _endCount; + } + if ((ip<(matchlimit-1)) && (A16(ref) == A16(ip))) { ip+=2; ref+=2; } + if ((ip=(int)ML_MASK) { *token+=ML_MASK; len-=ML_MASK; for(; len > 509 ; len-=510) { *op++ = 255; *op++ = 255; } if (len > 254) { len-=255; *op++ = 255; } *op++ = (BYTE)len; } + else *token += len; + + // Test end of chunk + if (ip > mflimit) { anchor = ip; break; } + + // Test next position + ref = base + HashTable[LZ4_HASH64K_VALUE(ip)]; + HashTable[LZ4_HASH64K_VALUE(ip)] = ip - base; + if (A32(ref) == A32(ip)) { token = op++; *token=0; goto _next_match; } + + // Prepare next loop + anchor = ip++; + forwardH = LZ4_HASH64K_VALUE(ip); + } + +_last_literals: + // Encode Last Literals + { + int lastRun = iend - anchor; + if (lastRun>=(int)RUN_MASK) { *op++=(RUN_MASK< 254 ; lastRun-=255) *op++ = 255; *op++ = (BYTE) lastRun; } + else *op++ = (lastRun<>ML_BITS)) == RUN_MASK) { for (;(len=*ip++)==255;length+=255){} length += len; } + + // copy literals + cpy = op+length; + if (cpy>oend-COPYLENGTH) + { + if (cpy > oend) goto _output_error; + memcpy(op, ip, length); + ip += length; + break; // Necessarily EOF + } + LZ4_WILDCOPY(ip, op, cpy); ip -= (op-cpy); op = cpy; + + + // get offset +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ + ref = cpy - A16(ip); ip+=2; +#else + { int delta = *ip++; delta += *ip++ << 8; ref = cpy - delta; } +#endif + + // get matchlength + if ((length=(token&ML_MASK)) == ML_MASK) { for (;*ip==255;length+=255) {ip++;} length += *ip++; } + + // copy repeated sequence + if (op-ref oend-COPYLENGTH) + { + if (cpy > oend) goto _output_error; + LZ4_WILDCOPY(ref, op, (oend-COPYLENGTH)); + while(op>ML_BITS)) == RUN_MASK) { for (;(len=*ip++)==255;length+=255){} length += len; } + + // copy literals + cpy = op+length; + if (cpy>oend-COPYLENGTH) + { + if (cpy > oend) goto _output_error; + memcpy(op, ip, length); + op += length; + break; // Necessarily EOF + } + LZ4_WILDCOPY(ip, op, cpy); ip -= (op-cpy); op = cpy; + if (ip>=iend) break; // check EOF + + // get offset +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ + ref = cpy - A16(ip); ip+=2; +#else + { int delta = *ip++; delta += *ip++ << 8; ref = cpy - delta; } +#endif + + // get matchlength + if ((length=(token&ML_MASK)) == ML_MASK) { for (;(len=*ip++)==255;length+=255){} length += len; } + + // copy repeated sequence + if (op-refoend-COPYLENGTH) + { + if (cpy > oend) goto _output_error; + LZ4_WILDCOPY(ref, op, (oend-COPYLENGTH)); + while(op SECURE_RANDOM = new ThreadLocal() { + @Override + protected SecureRandom initialValue() { + return new SecureRandom(); + } + }; - /** @return a pseudorandom number generator. */ + /** @return a pseudo random number generator. */ public static Random getRandom() { return RANDOM.get(); } + + /** @return a pseudo secure random number generator. */ + public static SecureRandom getSecureRandom() { + return SECURE_RANDOM.get(); + } /** * Compartor for sorting DataNodeInfo[] based on decommissioned states. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 9823ef72c3..af224f34cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -990,25 +990,15 @@ public static EnumSetWritable convert(int flag) { public static HdfsFileStatus convert(HdfsFileStatusProto fs) { if (fs == null) return null; - if (fs.hasLocations()) { - return new HdfsLocatedFileStatus( - fs.getLength(), fs.getFileType().equals(FileType.IS_DIR), - fs.getBlockReplication(), fs.getBlocksize(), - fs.getModificationTime(), fs.getAccessTime(), - PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(), - fs.getFileType().equals(FileType.IS_SYMLINK) ? - fs.getSymlink().toByteArray() : null, - fs.getPath().toByteArray(), - PBHelper.convert(fs.hasLocations() ? fs.getLocations() : null)); - } - return new HdfsFileStatus( - fs.getLength(), fs.getFileType().equals(FileType.IS_DIR), - fs.getBlockReplication(), fs.getBlocksize(), - fs.getModificationTime(), fs.getAccessTime(), - PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(), - fs.getFileType().equals(FileType.IS_SYMLINK) ? - fs.getSymlink().toByteArray() : null, - fs.getPath().toByteArray()); + return new HdfsLocatedFileStatus( + fs.getLength(), fs.getFileType().equals(FileType.IS_DIR), + fs.getBlockReplication(), fs.getBlocksize(), + fs.getModificationTime(), fs.getAccessTime(), + PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(), + fs.getFileType().equals(FileType.IS_SYMLINK) ? + fs.getSymlink().toByteArray() : null, + fs.getPath().toByteArray(), + fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null); } public static HdfsFileStatusProto convert(HdfsFileStatus fs) { @@ -1070,7 +1060,7 @@ public static DirectoryListing convert(DirectoryListingProto dl) { return null; List partList = dl.getPartialListingList(); return new DirectoryListing( - partList.isEmpty() ? new HdfsFileStatus[0] + partList.isEmpty() ? new HdfsLocatedFileStatus[0] : PBHelper.convert( partList.toArray(new HdfsFileStatusProto[partList.size()])), dl.getRemainingEntries()); @@ -1216,7 +1206,8 @@ public static UpgradeStatusReport convert(UpgradeStatusReportProto r) { public static CorruptFileBlocks convert(CorruptFileBlocksProto c) { if (c == null) return null; - return new CorruptFileBlocks((String[]) c.getFilesList().toArray(), + List fileList = c.getFilesList(); + return new CorruptFileBlocks(fileList.toArray(new String[fileList.size()]), c.getCookie()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 0ef132553d..5b905966e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; -import org.apache.hadoop.ipc.RPC; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 203b135290..2d1439ebde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; -import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; @@ -392,7 +392,7 @@ void join() { private synchronized void cleanUp() { shouldServiceRun = false; - RPC.stopProxy(bpNamenode); + IOUtils.cleanup(LOG, bpNamenode); bpos.shutdownActor(this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index a666149beb..1f45a7bb56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -450,14 +450,14 @@ private void verifyBlock(ExtendedBlock block) { } private synchronized long getEarliestScanTime() { - if ( blockInfoSet.size() > 0 ) { + if (!blockInfoSet.isEmpty()) { return blockInfoSet.first().lastScanTime; } return Long.MAX_VALUE; } private synchronized boolean isFirstBlockProcessed() { - if (blockInfoSet.size() > 0 ) { + if (!blockInfoSet.isEmpty()) { long blockId = blockInfoSet.first().block.getBlockId(); if ((processedBlocks.get(blockId) != null) && (processedBlocks.get(blockId) == 1)) { @@ -471,7 +471,7 @@ private synchronized boolean isFirstBlockProcessed() { private void verifyFirstBlock() { Block block = null; synchronized (this) { - if ( blockInfoSet.size() > 0 ) { + if (!blockInfoSet.isEmpty()) { block = blockInfoSet.first().block; } } @@ -560,7 +560,7 @@ private boolean assignInitialVerificationTimes() { * lastModificationTime > 0. */ synchronized (this) { - if (blockInfoSet.size() > 0 ) { + if (!blockInfoSet.isEmpty()) { BlockScanInfo info; while ((info = blockInfoSet.first()).lastScanTime < 0) { delBlockInfo(info); @@ -630,7 +630,7 @@ public void scan() { } } if (((now - getEarliestScanTime()) >= scanPeriod) - || (!(this.isFirstBlockProcessed()))) { + || ((!blockInfoSet.isEmpty()) && !(this.isFirstBlockProcessed()))) { verifyFirstBlock(); } else { if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 8878ae1879..3ac89aec6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -65,7 +65,6 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.security.PrivilegedExceptionAction; -import java.security.SecureRandom; import java.util.AbstractList; import java.util.ArrayList; import java.util.Arrays; @@ -988,7 +987,7 @@ static String createNewStorageId(int port) { LOG.warn("Could not find ip address of \"default\" inteface."); } - int rand = new SecureRandom().nextInt(Integer.MAX_VALUE); + int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE); return "DS-" + rand + "-" + ip + "-" + port + "-" + System.currentTimeMillis(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java index 118e4d26de..e763f6f682 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java @@ -26,8 +26,6 @@ import java.io.OutputStream; import java.net.URI; import java.net.UnknownHostException; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -992,13 +990,7 @@ String newBlockPoolID() throws UnknownHostException{ throw e; } - int rand = 0; - try { - rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE); - } catch (NoSuchAlgorithmException e) { - LOG.warn("Could not use SecureRandom"); - rand = DFSUtil.getRandom().nextInt(Integer.MAX_VALUE); - } + int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE); String bpid = "BP-" + rand + "-"+ ip + "-" + System.currentTimeMillis(); return bpid; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java index f82986f331..f3350b988a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java @@ -19,10 +19,7 @@ import java.io.IOException; import java.net.UnknownHostException; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; import java.util.Arrays; -import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -234,12 +231,7 @@ static private String getUniqueRackPrefix() { System.out.println("Could not find ip address of \"default\" inteface."); } - int rand = 0; - try { - rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE); - } catch (NoSuchAlgorithmException e) { - rand = (new Random()).nextInt(Integer.MAX_VALUE); - } + int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE); return "/Rack-" + rand + "-"+ ip + "-" + System.currentTimeMillis(); } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 010b34aa65..0887d51c86 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -331,6 +331,8 @@ Release 0.23.1 - Unreleased before the job started, so that it works properly with oozie throughout the job execution. (Robert Joseph Evans via vinodkv) + MAPREDUCE-3579. ConverterUtils shouldn't include a port in a path from a url without a port. (atm via harsh) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java index e7bfe72f4c..fdebd3925b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java @@ -58,8 +58,15 @@ public class ConverterUtils { */ public static Path getPathFromYarnURL(URL url) throws URISyntaxException { String scheme = url.getScheme() == null ? "" : url.getScheme(); - String authority = url.getHost() != null ? url.getHost() + ":" + url.getPort() - : ""; + + String authority = ""; + if (url.getHost() != null) { + authority = url.getHost(); + if (url.getPort() > 0) { + authority += ":" + url.getPort(); + } + } + return new Path( (new URI(scheme, authority, url.getFile(), null, null)).normalize()); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java new file mode 100644 index 0000000000..0e2eb14a40 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java @@ -0,0 +1,38 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.util; + +import static org.junit.Assert.*; + +import java.net.URISyntaxException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.URL; +import org.junit.Test; + +public class TestConverterUtils { + + @Test + public void testConvertUrlWithNoPort() throws URISyntaxException { + Path expectedPath = new Path("hdfs://foo.com"); + URL url = ConverterUtils.getYarnUrlFromPath(expectedPath); + Path actualPath = ConverterUtils.getPathFromYarnURL(url); + assertEquals(expectedPath, actualPath); + } + +}