From b20180ffa6c89396d9fcfec8b029b9c600503c3d Mon Sep 17 00:00:00 2001 From: Yi Liu Date: Sat, 24 May 2014 01:19:06 +0000 Subject: [PATCH] HADOOP-10603. Crypto input and output streams implementing Hadoop stream interfaces. Contributed by Yi Liu and Charles Lamb. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1597230 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES-fs-encryption.txt | 3 + .../hadoop/crypto/AESCTRCryptoCodec.java | 57 ++ .../org/apache/hadoop/crypto/CryptoCodec.java | 82 ++ .../hadoop/crypto/CryptoInputStream.java | 613 +++++++++++++++ .../hadoop/crypto/CryptoOutputStream.java | 291 +++++++ .../org/apache/hadoop/crypto/Decryptor.java | 75 ++ .../org/apache/hadoop/crypto/Encryptor.java | 75 ++ .../hadoop/crypto/JCEAESCTRCryptoCodec.java | 55 ++ .../hadoop/crypto/JCEAESCTRDecryptor.java | 84 +++ .../hadoop/crypto/JCEAESCTREncryptor.java | 84 +++ .../fs/CommonConfigurationKeysPublic.java | 11 + .../fs/crypto/CryptoFSDataInputStream.java | 37 + .../fs/crypto/CryptoFSDataOutputStream.java | 47 ++ .../src/main/resources/core-default.xml | 26 + .../hadoop/crypto/CryptoStreamsTestBase.java | 712 ++++++++++++++++++ .../hadoop/crypto/TestCryptoStreams.java | 376 +++++++++ .../crypto/TestCryptoStreamsForLocalFS.java | 114 +++ 17 files changed, 2742 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AESCTRCryptoCodec.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRCryptoCodec.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRDecryptor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTREncryptor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataInputStream.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java diff --git a/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt b/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt index fb293606ad..e7bc580ff3 100644 --- a/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt +++ b/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt @@ -8,6 +8,9 @@ fs-encryption (Unreleased) IMPROVEMENTS + HADOOP-10603. Crypto input and output streams implementing Hadoop stream + interfaces. (Yi Liu and Charles Lamb) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AESCTRCryptoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AESCTRCryptoCodec.java new file mode 100644 index 0000000000..b76f1bf2f6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AESCTRCryptoCodec.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class AESCTRCryptoCodec extends CryptoCodec { + /** + * For AES, the algorithm block is fixed size of 128 bits. + * @see http://en.wikipedia.org/wiki/Advanced_Encryption_Standard + */ + private static final int AES_BLOCK_SIZE = 16; + + @Override + public int getAlgorithmBlockSize() { + return AES_BLOCK_SIZE; + } + + /** + * IV is produced by combining initial IV and the counter using addition. + * IV length should be the same as {@link #AES_BLOCK_SIZE} + */ + @Override + public void calculateIV(byte[] initIV, long counter, byte[] IV) { + Preconditions.checkArgument(initIV.length == AES_BLOCK_SIZE); + Preconditions.checkArgument(IV.length == AES_BLOCK_SIZE); + + ByteBuffer buf = ByteBuffer.wrap(IV); + buf.put(initIV); + buf.order(ByteOrder.BIG_ENDIAN); + counter += buf.getLong(AES_BLOCK_SIZE - 8); + buf.putLong(AES_BLOCK_SIZE - 8, counter); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java new file mode 100644 index 0000000000..80d824d0e1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.security.GeneralSecurityException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASS_KEY; + +/** + * Crypto codec class, encapsulates encryptor/decryptor pair. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class CryptoCodec implements Configurable { + + public static CryptoCodec getInstance(Configuration conf) { + final Class klass = conf.getClass( + HADOOP_SECURITY_CRYPTO_CODEC_CLASS_KEY, JCEAESCTRCryptoCodec.class, + CryptoCodec.class); + return ReflectionUtils.newInstance(klass, conf); + } + + /** + * Get block size of a block cipher. + * For different algorithms, the block size may be different. + * @return int block size + */ + public abstract int getAlgorithmBlockSize(); + + /** + * Get a {@link #org.apache.hadoop.crypto.Encryptor}. + * @return Encryptor + */ + public abstract Encryptor getEncryptor() throws GeneralSecurityException; + + /** + * Get a {@link #org.apache.hadoop.crypto.Decryptor}. + * @return Decryptor + */ + public abstract Decryptor getDecryptor() throws GeneralSecurityException; + + /** + * This interface is only for Counter (CTR) mode. Typically calculating + * IV(Initialization Vector) is up to Encryptor or Decryptor, for + * example {@link #javax.crypto.Cipher} will maintain encryption context + * internally when do encryption/decryption continuously using its + * Cipher#update interface. + *

+ * In Hadoop, multiple nodes may read splits of a file, so decrypting of + * file is not continuous, even for encrypting may be not continuous. For + * each part, we need to calculate the counter through file position. + *

+ * Typically IV for a file position is produced by combining initial IV and + * the counter using any lossless operation (concatenation, addition, or XOR). + * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29 + * + * @param initIV initial IV + * @param counter counter for input stream position + * @param IV the IV for input stream position + */ + public abstract void calculateIV(byte[] initIV, long counter, byte[] IV); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java new file mode 100644 index 0000000000..ffcf1846cf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java @@ -0,0 +1,613 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.CanSetDropBehind; +import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; +import org.apache.hadoop.fs.HasFileDescriptor; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.ByteBufferPool; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT; + +import com.google.common.base.Preconditions; + +/** + * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is + * required in order to ensure that the plain text and cipher text have a 1:1 + * mapping. The decryption is buffer based. The key points of the decryption + * are (1) calculating the counter and (2) padding through stream position: + *

+ * counter = base + pos/(algorithm blocksize); + * padding = pos%(algorithm blocksize); + *

+ * The underlying stream offset is maintained as state. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CryptoInputStream extends FilterInputStream implements + Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, + CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess { + private static final int MIN_BUFFER_SIZE = 512; + private static final byte[] oneByteBuf = new byte[1]; + private final CryptoCodec codec; + private final Decryptor decryptor; + /** + * Input data buffer. The data starts at inBuffer.position() and ends at + * to inBuffer.limit(). + */ + private ByteBuffer inBuffer; + /** + * The decrypted data buffer. The data starts at outBuffer.position() and + * ends at outBuffer.limit(); + */ + private ByteBuffer outBuffer; + private long streamOffset = 0; // Underlying stream offset. + /** + * Whether underlying stream supports + * {@link #org.apache.hadoop.fs.ByteBufferReadable} + */ + private Boolean usingByteBufferRead = null; + /** + * Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer} + * before any other data goes in. The purpose of padding is to put input data + * at proper position. + */ + private byte padding; + private boolean closed; + private final byte[] key; + private final byte[] initIV; + private byte[] iv; + + public CryptoInputStream(InputStream in, CryptoCodec codec, + int bufferSize, byte[] key, byte[] iv) throws IOException { + super(in); + Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE, + "Minimum value of buffer size is 512."); + this.key = key; + this.initIV = iv; + this.iv = iv.clone(); + inBuffer = ByteBuffer.allocateDirect(bufferSize); + outBuffer = ByteBuffer.allocateDirect(bufferSize); + outBuffer.limit(0); + this.codec = codec; + try { + decryptor = codec.getDecryptor(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + if (in instanceof Seekable) { + streamOffset = ((Seekable) in).getPos(); + } + updateDecryptor(); + } + + public CryptoInputStream(InputStream in, CryptoCodec codec, + byte[] key, byte[] iv) throws IOException { + this(in, codec, getBufferSize(codec.getConf()), key, iv); + } + + public InputStream getWrappedStream() { + return in; + } + + /** + * Decryption is buffer based. + * If there is data in {@link #outBuffer}, then read it out of this buffer. + * If there is no data in {@link #outBuffer}, then read more from the + * underlying stream and do the decryption. + * @param b the buffer into which the decrypted data is read. + * @param off the buffer offset. + * @param len the maximum number of decrypted data bytes to read. + * @return int the total number of decrypted data bytes read into the buffer. + * @throws IOException + */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkStream(); + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + int remaining = outBuffer.remaining(); + if (remaining > 0) { + int n = Math.min(len, remaining); + outBuffer.get(b, off, n); + return n; + } else { + int n = 0; + /** + * Check whether the underlying stream is {@link ByteBufferReadable}, + * it can avoid bytes copy. + */ + if (usingByteBufferRead == null) { + if (in instanceof ByteBufferReadable) { + try { + n = ((ByteBufferReadable) in).read(inBuffer); + usingByteBufferRead = Boolean.TRUE; + } catch (UnsupportedOperationException e) { + usingByteBufferRead = Boolean.FALSE; + } + } + if (!usingByteBufferRead.booleanValue()) { + n = readFromUnderlyingStream(); + } + } else { + if (usingByteBufferRead.booleanValue()) { + n = ((ByteBufferReadable) in).read(inBuffer); + } else { + n = readFromUnderlyingStream(); + } + } + if (n <= 0) { + return n; + } + + streamOffset += n; // Read n bytes + decrypt(); + n = Math.min(len, outBuffer.remaining()); + outBuffer.get(b, off, n); + return n; + } + } + + // Read data from underlying stream. + private int readFromUnderlyingStream() throws IOException { + int toRead = inBuffer.remaining(); + byte[] tmp = getTmpBuf(); + int n = in.read(tmp, 0, toRead); + if (n > 0) { + inBuffer.put(tmp, 0, n); + } + return n; + } + + private byte[] tmpBuf; + private byte[] getTmpBuf() { + if (tmpBuf == null) { + tmpBuf = new byte[inBuffer.capacity()]; + } + return tmpBuf; + } + + /** + * Do the decryption using {@link #inBuffer} as input and {@link #outBuffer} + * as output. + */ + private void decrypt() throws IOException { + Preconditions.checkState(inBuffer.position() >= padding); + if(inBuffer.position() == padding) { + // There is no real data in inBuffer. + return; + } + inBuffer.flip(); + outBuffer.clear(); + decryptor.decrypt(inBuffer, outBuffer); + inBuffer.clear(); + outBuffer.flip(); + if (padding > 0) { + /** + * The plain text and cipher text have 1:1 mapping, they start at same + * position. + */ + outBuffer.position(padding); + padding = 0; + } + if (decryptor.isContextReset()) { + /** + * Typically we will not get here. To improve performance in CTR mode, + * we rely on the decryptor maintaining context, for example calculating + * the counter. Unfortunately, some bad implementations can't maintain + * context so we need to re-init after doing decryption. + */ + updateDecryptor(); + } + } + + /** + * Update the {@link #decryptor}. Calculate the counter and {@link #padding}. + */ + private void updateDecryptor() throws IOException { + long counter = streamOffset / codec.getAlgorithmBlockSize(); + padding = (byte)(streamOffset % codec.getAlgorithmBlockSize()); + inBuffer.position(padding); // Set proper position for input data. + codec.calculateIV(initIV, counter, iv); + decryptor.init(key, iv); + } + + /** + * Reset the underlying stream offset; and clear {@link #inBuffer} and + * {@link #outBuffer}. Typically this happens when doing {@link #seek(long)} + * or {@link #skip(long)}. + */ + private void resetStreamOffset(long offset) throws IOException { + streamOffset = offset; + inBuffer.clear(); + outBuffer.clear(); + outBuffer.limit(0); + updateDecryptor(); + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + super.close(); + freeBuffers(); + closed = true; + } + + /** + * Free the direct buffer manually. + */ + private void freeBuffers() { + sun.misc.Cleaner inBufferCleaner = + ((sun.nio.ch.DirectBuffer) inBuffer).cleaner(); + inBufferCleaner.clean(); + sun.misc.Cleaner outBufferCleaner = + ((sun.nio.ch.DirectBuffer) outBuffer).cleaner(); + outBufferCleaner.clean(); + } + + // Positioned read. + @Override + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + checkStream(); + try { + int n = ((PositionedReadable) in).read(position, buffer, offset, length); + if (n > 0) { + /** + * Since this operation does not change the current offset of a file, + * streamOffset should be not changed and we need to restore the + * decryptor and outBuffer after decryption. + */ + decrypt(position, buffer, offset, length); + } + + return n; + } catch (ClassCastException e) { + throw new UnsupportedOperationException("This stream does not support " + + "positioned read."); + } + } + + /** + * Decrypt given length of data in buffer: start from offset. + * Output is also buffer and start from same offset. Restore the + * {@link #decryptor} and {@link #outBuffer} after decryption. + */ + private void decrypt(long position, byte[] buffer, int offset, int length) + throws IOException { + + byte[] tmp = getTmpBuf(); + int unread = outBuffer.remaining(); + if (unread > 0) { // Cache outBuffer + outBuffer.get(tmp, 0, unread); + } + long curOffset = streamOffset; + resetStreamOffset(position); + + int n = 0; + while (n < length) { + int toDecrypt = Math.min(length - n, inBuffer.remaining()); + inBuffer.put(buffer, offset + n, toDecrypt); + // Do decryption + decrypt(); + outBuffer.get(buffer, offset + n, toDecrypt); + n += toDecrypt; + } + + // After decryption + resetStreamOffset(curOffset); + if (unread > 0) { // Restore outBuffer + outBuffer.clear(); + outBuffer.put(tmp, 0, unread); + outBuffer.flip(); + } + } + + // Positioned read fully. + @Override + public void readFully(long position, byte[] buffer, int offset, int length) + throws IOException { + checkStream(); + try { + ((PositionedReadable) in).readFully(position, buffer, offset, length); + if (length > 0) { + /** + * Since this operation does not change the current offset of a file, + * streamOffset should be not changed and we need to restore the decryptor + * and outBuffer after decryption. + */ + decrypt(position, buffer, offset, length); + } + } catch (ClassCastException e) { + throw new UnsupportedOperationException("This stream does not support " + + "positioned readFully."); + } + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + readFully(position, buffer, 0, buffer.length); + } + + // Seek to a position. + @Override + public void seek(long pos) throws IOException { + Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset."); + checkStream(); + try { + // If target pos we have already read and decrypt. + if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) { + int forward = (int) (pos - (streamOffset - outBuffer.remaining())); + if (forward > 0) { + outBuffer.position(outBuffer.position() + forward); + } + } else { + ((Seekable) in).seek(pos); + resetStreamOffset(pos); + } + } catch (ClassCastException e) { + throw new UnsupportedOperationException("This stream does not support " + + "seek."); + } + } + + // Skip n bytes + @Override + public long skip(long n) throws IOException { + Preconditions.checkArgument(n >= 0, "Negative skip length."); + checkStream(); + + if (n == 0) { + return 0; + } else if (n <= outBuffer.remaining()) { + int pos = outBuffer.position() + (int) n; + outBuffer.position(pos); + return n; + } else { + /** + * Subtract outBuffer.remaining() to see how many bytes we need to + * skip in underlying stream. We get real skipped bytes number of + * underlying stream then add outBuffer.remaining() to get skipped + * bytes number from user's view. + */ + n -= outBuffer.remaining(); + long skipped = in.skip(n); + if (skipped < 0) { + skipped = 0; + } + long pos = streamOffset + skipped; + skipped += outBuffer.remaining(); + resetStreamOffset(pos); + return skipped; + } + } + + // Get underlying stream position. + @Override + public long getPos() throws IOException { + checkStream(); + // Equals: ((Seekable) in).getPos() - outBuffer.remaining() + return streamOffset - outBuffer.remaining(); + } + + // ByteBuffer read. + @Override + public int read(ByteBuffer buf) throws IOException { + checkStream(); + if (in instanceof ByteBufferReadable) { + int unread = outBuffer.remaining(); + if (unread > 0) { // Have unread decrypted data in buffer. + int toRead = buf.remaining(); + if (toRead <= unread) { + int limit = outBuffer.limit(); + outBuffer.limit(outBuffer.position() + toRead); + buf.put(outBuffer); + outBuffer.limit(limit); + return toRead; + } else { + buf.put(outBuffer); + } + } + + int pos = buf.position(); + int n = ((ByteBufferReadable) in).read(buf); + if (n > 0) { + streamOffset += n; // Read n bytes + decrypt(buf, n, pos); + } + return n; + } + + throw new UnsupportedOperationException("ByteBuffer read unsupported " + + "by input stream."); + } + + /** + * Decrypt all data in buf: total n bytes from given start position. + * Output is also buf and same start position. + * buf.position() and buf.limit() should be unchanged after decryption. + */ + private void decrypt(ByteBuffer buf, int n, int start) + throws IOException { + int pos = buf.position(); + int limit = buf.limit(); + int len = 0; + while (len < n) { + buf.position(start + len); + buf.limit(start + len + Math.min(n - len, inBuffer.remaining())); + inBuffer.put(buf); + // Do decryption + decrypt(); + + buf.position(start + len); + buf.limit(limit); + len += outBuffer.remaining(); + buf.put(outBuffer); + } + buf.position(pos); + } + + @Override + public int available() throws IOException { + checkStream(); + + return in.available() + outBuffer.remaining(); + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readLimit) { + } + + @Override + public void reset() throws IOException { + throw new IOException("Mark/reset not supported"); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + Preconditions.checkArgument(targetPos >= 0, + "Cannot seek to negative offset."); + checkStream(); + try { + boolean result = ((Seekable) in).seekToNewSource(targetPos); + resetStreamOffset(targetPos); + return result; + } catch (ClassCastException e) { + throw new UnsupportedOperationException("This stream does not support " + + "seekToNewSource."); + } + } + + @Override + public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, + EnumSet opts) throws IOException, + UnsupportedOperationException { + checkStream(); + try { + if (outBuffer.remaining() > 0) { + // Have some decrypted data unread, need to reset. + ((Seekable) in).seek(getPos()); + resetStreamOffset(getPos()); + } + ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in). + read(bufferPool, maxLength, opts); + if (buffer != null) { + int n = buffer.remaining(); + if (n > 0) { + streamOffset += buffer.remaining(); // Read n bytes + int pos = buffer.position(); + decrypt(buffer, n, pos); + } + } + return buffer; + } catch (ClassCastException e) { + throw new UnsupportedOperationException("This stream does not support " + + "enhanced byte buffer access."); + } + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + try { + ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("This stream does not support " + + "release buffer."); + } + } + + @Override + public void setReadahead(Long readahead) throws IOException, + UnsupportedOperationException { + try { + ((CanSetReadahead) in).setReadahead(readahead); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("This stream does not support " + + "setting the readahead caching strategy."); + } + } + + @Override + public void setDropBehind(Boolean dropCache) throws IOException, + UnsupportedOperationException { + try { + ((CanSetDropBehind) in).setDropBehind(dropCache); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("This stream does not " + + "support setting the drop-behind caching setting."); + } + } + + @Override + public FileDescriptor getFileDescriptor() throws IOException { + if (in instanceof HasFileDescriptor) { + return ((HasFileDescriptor) in).getFileDescriptor(); + } else if (in instanceof FileInputStream) { + return ((FileInputStream) in).getFD(); + } else { + return null; + } + } + + @Override + public int read() throws IOException { + return (read(oneByteBuf, 0, 1) == -1) ? -1 : (oneByteBuf[0] & 0xff); + } + + private void checkStream() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + } + + private static int getBufferSize(Configuration conf) { + return conf.getInt(HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY, + HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java new file mode 100644 index 0000000000..934c447935 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CanSetDropBehind; +import org.apache.hadoop.fs.Syncable; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT; + +import com.google.common.base.Preconditions; + +/** + * CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is + * required in order to ensure that the plain text and cipher text have a 1:1 + * mapping. The encryption is buffer based. The key points of the encryption are + * (1) calculating counter and (2) padding through stream position. + *

+ * counter = base + pos/(algorithm blocksize); + * padding = pos%(algorithm blocksize); + *

+ * The underlying stream offset is maintained as state. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CryptoOutputStream extends FilterOutputStream implements + Syncable, CanSetDropBehind { + private static final int MIN_BUFFER_SIZE = 512; + private static final byte[] oneByteBuf = new byte[1]; + private final CryptoCodec codec; + private final Encryptor encryptor; + /** + * Input data buffer. The data starts at inBuffer.position() and ends at + * inBuffer.limit(). + */ + private ByteBuffer inBuffer; + /** + * Encrypted data buffer. The data starts at outBuffer.position() and ends at + * outBuffer.limit(); + */ + private ByteBuffer outBuffer; + private long streamOffset = 0; // Underlying stream offset. + /** + * Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer} + * before any other data goes in. The purpose of padding is to put input data + * at proper position. + */ + private byte padding; + private boolean closed; + private final byte[] key; + private final byte[] initIV; + private byte[] iv; + + public CryptoOutputStream(OutputStream out, CryptoCodec codec, + int bufferSize, byte[] key, byte[] iv) throws IOException { + this(out, codec, bufferSize, key, iv, 0); + } + + public CryptoOutputStream(OutputStream out, CryptoCodec codec, + int bufferSize, byte[] key, byte[] iv, long streamOffset) + throws IOException { + super(out); + Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE, + "Minimum value of buffer size is 512."); + this.key = key; + this.initIV = iv; + this.iv = iv.clone(); + inBuffer = ByteBuffer.allocateDirect(bufferSize); + outBuffer = ByteBuffer.allocateDirect(bufferSize); + this.streamOffset = streamOffset; + this.codec = codec; + try { + encryptor = codec.getEncryptor(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + updateEncryptor(); + } + + public CryptoOutputStream(OutputStream out, CryptoCodec codec, + byte[] key, byte[] iv) throws IOException { + this(out, codec, key, iv, 0); + } + + public CryptoOutputStream(OutputStream out, CryptoCodec codec, + byte[] key, byte[] iv, long streamOffset) throws IOException { + this(out, codec, getBufferSize(codec.getConf()), key, iv, streamOffset); + } + + public OutputStream getWrappedStream() { + return out; + } + + /** + * Encryption is buffer based. + * If there is enough room in {@link #inBuffer}, then write to this buffer. + * If {@link #inBuffer} is full, then do encryption and write data to the + * underlying stream. + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @throws IOException + */ + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkStream(); + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || off > b.length || + len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + while (len > 0) { + int remaining = inBuffer.remaining(); + if (len < remaining) { + inBuffer.put(b, off, len); + len = 0; + } else { + inBuffer.put(b, off, remaining); + off += remaining; + len -= remaining; + encrypt(); + } + } + } + + /** + * Do the encryption, input is {@link #inBuffer} and output is + * {@link #outBuffer}. + */ + private void encrypt() throws IOException { + Preconditions.checkState(inBuffer.position() >= padding); + if (inBuffer.position() == padding) { + // There is no real data in the inBuffer. + return; + } + inBuffer.flip(); + outBuffer.clear(); + encryptor.encrypt(inBuffer, outBuffer); + inBuffer.clear(); + outBuffer.flip(); + if (padding > 0) { + /** + * The plain text and cipher text have 1:1 mapping, they start at same + * position. + */ + outBuffer.position(padding); + padding = 0; + } + int len = outBuffer.remaining(); + /** + * If underlying stream supports {@link ByteBuffer} write in future, needs + * refine here. + */ + final byte[] tmp = getTmpBuf(); + outBuffer.get(tmp, 0, len); + out.write(tmp, 0, len); + + streamOffset += len; + if (encryptor.isContextReset()) { + /** + * We will generally not get here. For CTR mode, to improve + * performance, we rely on the encryptor maintaining context, for + * example to calculate the counter. But some bad implementations + * can't maintain context, and need us to re-init after doing + * encryption. + */ + updateEncryptor(); + } + } + + /** + * Update the {@link #encryptor}: calculate counter and {@link #padding}. + */ + private void updateEncryptor() throws IOException { + long counter = streamOffset / codec.getAlgorithmBlockSize(); + padding = (byte)(streamOffset % codec.getAlgorithmBlockSize()); + inBuffer.position(padding); // Set proper position for input data. + codec.calculateIV(initIV, counter, iv); + encryptor.init(key, iv); + } + + private byte[] tmpBuf; + private byte[] getTmpBuf() { + if (tmpBuf == null) { + tmpBuf = new byte[outBuffer.capacity()]; + } + return tmpBuf; + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + super.close(); + freeBuffers(); + closed = true; + } + + /** + * Free the direct buffer manually. + */ + private void freeBuffers() { + sun.misc.Cleaner inBufferCleaner = + ((sun.nio.ch.DirectBuffer) inBuffer).cleaner(); + inBufferCleaner.clean(); + sun.misc.Cleaner outBufferCleaner = + ((sun.nio.ch.DirectBuffer) outBuffer).cleaner(); + outBufferCleaner.clean(); + } + + /** + * To flush, we need to encrypt the data in buffer and write to underlying + * stream, then do the flush. + */ + @Override + public void flush() throws IOException { + checkStream(); + encrypt(); + super.flush(); + } + + @Override + public void write(int b) throws IOException { + oneByteBuf[0] = (byte)(b & 0xff); + write(oneByteBuf, 0, oneByteBuf.length); + } + + private void checkStream() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + } + + @Override + public void setDropBehind(Boolean dropCache) throws IOException, + UnsupportedOperationException { + try { + ((CanSetDropBehind) out).setDropBehind(dropCache); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("This stream does not " + + "support setting the drop-behind caching."); + } + } + + @Override + public void hflush() throws IOException { + flush(); + if (out instanceof Syncable) { + ((Syncable)out).hflush(); + } + } + + @Override + public void hsync() throws IOException { + flush(); + if (out instanceof Syncable) { + ((Syncable)out).hsync(); + } + } + + private static int getBufferSize(Configuration conf) { + return conf.getInt(HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY, + HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java new file mode 100644 index 0000000000..4afb221658 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface Decryptor { + + /** + * Initialize the decryptor, the internal decryption context will be + * reset. + * @param key decryption key. + * @param iv decryption initialization vector + * @throws IOException if initialization fails + */ + public void init(byte[] key, byte[] iv) throws IOException; + + /** + * Indicate whether decryption context is reset. + *

+ * It's useful for some mode like CTR which requires different IV for + * different parts of data. Usually decryptor can maintain the context + * internally such as calculating IV/counter, then continue a multiple-part + * decryption operation without reinit the decryptor using key and the new + * IV. For mode like CTR, if context is reset after each decryption, the + * decryptor should be reinit before each operation, that's not efficient. + * @return boolean whether context is reset. + */ + public boolean isContextReset(); + + /** + * This exposes a direct interface for record decryption with direct byte + * buffers. + *

+ * The decrypt() function need not always consume the buffers provided, + * it will need to be called multiple times to decrypt an entire buffer + * and the object will hold the decryption context internally. + *

+ * Some implementation may need enough space in the destination buffer to + * decrypt an entire input. + *

+ * The end result will move inBuffer.position() by the bytes-read and + * outBuffer.position() by the bytes-written. It should not modify the + * inBuffer.limit() or outBuffer.limit() to maintain consistency of operation. + *

+ * @param inBuffer in direct {@link ByteBuffer} for reading from. Requires + * inBuffer != null and inBuffer.remaining() > 0 + * @param outBuffer out direct {@link ByteBuffer} for storing the results + * into. Requires outBuffer != null and outBuffer.remaining() > 0 + * @throws IOException if decryption fails + */ + public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) + throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java new file mode 100644 index 0000000000..398cc2e9ae --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface Encryptor { + + /** + * Initialize the encryptor, the internal encryption context will be + * reset. + * @param key encryption key. + * @param iv encryption initialization vector + * @throws IOException if initialization fails + */ + public void init(byte[] key, byte[] iv) throws IOException; + + /** + * Indicate whether encryption context is reset. + *

+ * It's useful for some mode like CTR which requires different IV for + * different parts of data. Usually encryptor can maintain the context + * internally such as calculating IV/counter, then continue a multiple-part + * encryption operation without reinit the encryptor using key and the new + * IV. For mode like CTR, if context is reset after each encryption, the + * encryptor should be reinit before each operation, that's not efficient. + * @return boolean whether context is reset. + */ + public boolean isContextReset(); + + /** + * This exposes a direct interface for record encryption with direct byte + * buffers. + *

+ * The encrypt() function need not always consume the buffers provided, + * it will need to be called multiple times to encrypt an entire buffer + * and the object will hold the encryption context internally. + *

+ * Some implementation may need enough space in the destination buffer to + * encrypt an entire input. + *

+ * The end result will move inBuffer.position() by the bytes-read and + * outBuffer.position() by the bytes-written. It should not modify the + * inBuffer.limit() or outBuffer.limit() to maintain consistency of operation. + *

+ * @param inBuffer in direct {@link ByteBuffer} for reading from. Requires + * inBuffer != null and inBuffer.remaining() > 0 + * @param outBuffer out direct {@link ByteBuffer} for storing the results + * into. Requires outBuffer != null and outBuffer.remaining() > 0 + * @throws IOException if encryption fails + */ + public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) + throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRCryptoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRCryptoCodec.java new file mode 100644 index 0000000000..aea9e07ee6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRCryptoCodec.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.security.GeneralSecurityException; + +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_JCE_PROVIDER_KEY; + +/** + * Implement the AES-CTR crypto codec using JCE provider. + */ +public class JCEAESCTRCryptoCodec extends AESCTRCryptoCodec { + private Configuration conf; + private String provider; + + public JCEAESCTRCryptoCodec() { + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + provider = conf.get(HADOOP_SECURITY_CRYPTO_JCE_PROVIDER_KEY); + } + + @Override + public Encryptor getEncryptor() throws GeneralSecurityException { + return new JCEAESCTREncryptor(provider); + } + + @Override + public Decryptor getDecryptor() throws GeneralSecurityException { + return new JCEAESCTRDecryptor(provider); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRDecryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRDecryptor.java new file mode 100644 index 0000000000..a3fb13f629 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRDecryptor.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; + +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import javax.crypto.spec.SecretKeySpec; + +import com.google.common.base.Preconditions; + +public class JCEAESCTRDecryptor implements Decryptor { + private final Cipher cipher; + private boolean contextReset = false; + + public JCEAESCTRDecryptor(String provider) throws GeneralSecurityException { + if (provider == null || provider.isEmpty()) { + cipher = Cipher.getInstance("AES/CTR/NoPadding"); + } else { + cipher = Cipher.getInstance("AES/CTR/NoPadding", provider); + } + } + + @Override + public void init(byte[] key, byte[] iv) throws IOException { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(iv); + contextReset = false; + try { + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(key, "AES"), + new IvParameterSpec(iv)); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * For AES-CTR, will consume all input data and needs enough space in the + * destination buffer to decrypt entire input data. + */ + @Override + public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) + throws IOException { + try { + int inputSize = inBuffer.remaining(); + // Cipher#update will maintain decryption context. + int n = cipher.update(inBuffer, outBuffer); + if (n < inputSize) { + /** + * Typically code will not get here. Cipher#update will decrypt all + * input data and put result in outBuffer. + * Cipher#doFinal will reset the decryption context. + */ + contextReset = true; + cipher.doFinal(inBuffer, outBuffer); + } + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public boolean isContextReset() { + return contextReset; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTREncryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTREncryptor.java new file mode 100644 index 0000000000..9ee70dc472 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTREncryptor.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; + +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import javax.crypto.spec.SecretKeySpec; + +import com.google.common.base.Preconditions; + +public class JCEAESCTREncryptor implements Encryptor { + private final Cipher cipher; + private boolean contextReset = false; + + public JCEAESCTREncryptor(String provider) throws GeneralSecurityException { + if (provider == null || provider.isEmpty()) { + cipher = Cipher.getInstance("AES/CTR/NoPadding"); + } else { + cipher = Cipher.getInstance("AES/CTR/NoPadding", provider); + } + } + + @Override + public void init(byte[] key, byte[] iv) throws IOException { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(iv); + contextReset = false; + try { + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(key, "AES"), + new IvParameterSpec(iv)); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * For AES-CTR, will consume all input data and needs enough space in the + * destination buffer to encrypt entire input data. + */ + @Override + public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) + throws IOException { + try { + int inputSize = inBuffer.remaining(); + // Cipher#update will maintain encryption context. + int n = cipher.update(inBuffer, outBuffer); + if (n < inputSize) { + /** + * Typically code will not get here. Cipher#update will encrypt all + * input data and put result in outBuffer. + * Cipher#doFinal will reset the encryption context. + */ + contextReset = true; + cipher.doFinal(inBuffer, outBuffer); + } + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public boolean isContextReset() { + return contextReset; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index b6be29447a..c0853a9ded 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -282,5 +282,16 @@ public class CommonConfigurationKeysPublic { /** Class to override Sasl Properties for a connection */ public static final String HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS = "hadoop.security.saslproperties.resolver.class"; + /** See core-default.xml */ + public static final String HADOOP_SECURITY_CRYPTO_CODEC_CLASS_KEY = + "hadoop.security.crypto.codec.class"; + /** See core-default.xml */ + public static final String HADOOP_SECURITY_CRYPTO_JCE_PROVIDER_KEY = + "hadoop.security.crypto.jce.provider"; + /** See core-default.xml */ + public static final String HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY = + "hadoop.security.crypto.buffer.size"; + /** Defalt value for HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY */ + public static final int HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT = 8192; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataInputStream.java new file mode 100644 index 0000000000..8758d28f1d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataInputStream.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.crypto; + +import java.io.IOException; + +import org.apache.hadoop.crypto.CryptoCodec; +import org.apache.hadoop.crypto.CryptoInputStream; +import org.apache.hadoop.fs.FSDataInputStream; + +public class CryptoFSDataInputStream extends FSDataInputStream { + + public CryptoFSDataInputStream(FSDataInputStream in, CryptoCodec codec, + int bufferSize, byte[] key, byte[] iv) throws IOException { + super(new CryptoInputStream(in, codec, bufferSize, key, iv)); + } + + public CryptoFSDataInputStream(FSDataInputStream in, CryptoCodec codec, + byte[] key, byte[] iv) throws IOException { + super(new CryptoInputStream(in, codec, key, iv)); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java new file mode 100644 index 0000000000..040fbcb879 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.crypto; + +import java.io.IOException; + +import org.apache.hadoop.crypto.CryptoCodec; +import org.apache.hadoop.crypto.CryptoOutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class CryptoFSDataOutputStream extends FSDataOutputStream { + private final FSDataOutputStream fsOut; + + public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec, + int bufferSize, byte[] key, byte[] iv) throws IOException { + super(new CryptoOutputStream(out, codec, bufferSize, key, iv, + out.getPos()), null, out.getPos()); + this.fsOut = out; + } + + public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec, + byte[] key, byte[] iv) throws IOException { + super(new CryptoOutputStream(out, codec, key, iv, out.getPos()), + null, out.getPos()); + this.fsOut = out; + } + + @Override + public long getPos() { + return fsOut.getPos(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index ea0808eef7..6073c1a915 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1348,4 +1348,30 @@ true. + + + hadoop.security.crypto.codec.class + + + The default implementation of CryptoCodec which is used for encryption + and decryption. + + + + + hadoop.security.crypto.jce.provider + + + The JCE provider name used in CryptoCodec. + + + + + hadoop.security.crypto.buffer.size + 8192 + + The buffer size used in Crypto InputStream and OutputStream, and default + value is 8192. + + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java new file mode 100644 index 0000000000..7f36c2b249 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java @@ -0,0 +1,712 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.EnumSet; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.RandomDatum; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public abstract class CryptoStreamsTestBase { + protected static final Log LOG= LogFactory.getLog( + CryptoStreamsTestBase.class); + + protected static CryptoCodec codec; + private static final byte[] key = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, + 0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16}; + private static final byte[] iv = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, + 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}; + + protected static final int count = 10000; + protected static int defaultBufferSize = 8192; + protected static int smallBufferSize = 1024; + private byte[] data; + private int dataLen; + + @Before + public void setUp() throws IOException { + // Generate data + int seed = new Random().nextInt(); + DataOutputBuffer dataBuf = new DataOutputBuffer(); + RandomDatum.Generator generator = new RandomDatum.Generator(seed); + for(int i=0; i < count; ++i) { + generator.next(); + RandomDatum key = generator.getKey(); + RandomDatum value = generator.getValue(); + + key.write(dataBuf); + value.write(dataBuf); + } + LOG.info("Generated " + count + " records"); + data = dataBuf.getData(); + dataLen = dataBuf.getLength(); + } + + protected void writeData(OutputStream out) throws Exception { + out.write(data, 0, dataLen); + out.close(); + } + + protected int getDataLen() { + return dataLen; + } + + private int readAll(InputStream in, byte[] b, int off, int len) + throws IOException { + int n = 0; + int total = 0; + while (n != -1) { + total += n; + if (total >= len) { + break; + } + n = in.read(b, off + total, len - total); + } + + return total; + } + + protected OutputStream getOutputStream(int bufferSize) throws IOException { + return getOutputStream(bufferSize, key, iv); + } + + protected abstract OutputStream getOutputStream(int bufferSize, byte[] key, + byte[] iv) throws IOException; + + protected InputStream getInputStream(int bufferSize) throws IOException { + return getInputStream(bufferSize, key, iv); + } + + protected abstract InputStream getInputStream(int bufferSize, byte[] key, + byte[] iv) throws IOException; + + /** + * Test crypto reading with different buffer size. + */ + @Test(timeout=120000) + public void testRead() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + // Default buffer size + InputStream in = getInputStream(defaultBufferSize); + readCheck(in); + in.close(); + + // Small buffer size + in = getInputStream(smallBufferSize); + readCheck(in); + in.close(); + } + + private void readCheck(InputStream in) throws Exception { + byte[] result = new byte[dataLen]; + int n = readAll(in, result, 0, dataLen); + + Assert.assertEquals(dataLen, n); + byte[] expectedData = new byte[n]; + System.arraycopy(data, 0, expectedData, 0, n); + Assert.assertArrayEquals(result, expectedData); + + // EOF + n = in.read(result, 0, dataLen); + Assert.assertEquals(n, -1); + in.close(); + } + + /** + * Test crypto with different IV. + */ + @Test(timeout=120000) + public void testCryptoIV() throws Exception { + byte[] iv1 = iv.clone(); + + // Counter base: Long.MAX_VALUE + setCounterBaseForIV(iv1, Long.MAX_VALUE); + cryptoCheck(iv1); + + // Counter base: Long.MAX_VALUE - 1 + setCounterBaseForIV(iv1, Long.MAX_VALUE - 1); + cryptoCheck(iv1); + + // Counter base: Integer.MAX_VALUE + setCounterBaseForIV(iv1, Integer.MAX_VALUE); + cryptoCheck(iv1); + + // Counter base: 0 + setCounterBaseForIV(iv1, 0); + cryptoCheck(iv1); + + // Counter base: -1 + setCounterBaseForIV(iv1, -1); + cryptoCheck(iv1); + } + + private void cryptoCheck(byte[] iv) throws Exception { + OutputStream out = getOutputStream(defaultBufferSize, key, iv); + writeData(out); + + InputStream in = getInputStream(defaultBufferSize, key, iv); + readCheck(in); + in.close(); + } + + private void setCounterBaseForIV(byte[] iv, long counterBase) { + ByteBuffer buf = ByteBuffer.wrap(iv); + buf.order(ByteOrder.BIG_ENDIAN); + buf.putLong(iv.length - 8, counterBase); + } + + /** + * Test hflush/hsync of crypto output stream, and with different buffer size. + */ + @Test(timeout=120000) + public void testSyncable() throws IOException { + syncableCheck(); + } + + private void syncableCheck() throws IOException { + OutputStream out = getOutputStream(smallBufferSize); + try { + int bytesWritten = dataLen/3; + out.write(data, 0, bytesWritten); + ((Syncable) out).hflush(); + + InputStream in = getInputStream(defaultBufferSize); + verify(in, bytesWritten, data); + in.close(); + + out.write(data, bytesWritten, dataLen - bytesWritten); + ((Syncable) out).hsync(); + + in = getInputStream(defaultBufferSize); + verify(in, dataLen, data); + in.close(); + } finally { + out.close(); + } + } + + private void verify(InputStream in, int bytesToVerify, + byte[] expectedBytes) throws IOException { + byte[] readBuf = new byte[bytesToVerify]; + readAll(in, readBuf, 0, bytesToVerify); + for (int i=0; i= len) { + break; + } + n = ((PositionedReadable) in).read(pos + total, b, off + total, + len - total); + } + + return total; + } + + /** + * Test positioned read. + */ + @Test(timeout=120000) + public void testPositionedRead() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + InputStream in = getInputStream(defaultBufferSize); + // Pos: 1/3 dataLen + positionedReadCheck(in , dataLen/3); + + // Pos: 1/2 dataLen + positionedReadCheck(in, dataLen/2); + in.close(); + } + + private void positionedReadCheck(InputStream in, int pos) throws Exception { + byte[] result = new byte[dataLen]; + int n = readAll(in, pos, result, 0, dataLen); + + Assert.assertEquals(dataLen, n + pos); + byte[] readData = new byte[n]; + System.arraycopy(result, 0, readData, 0, n); + byte[] expectedData = new byte[n]; + System.arraycopy(data, pos, expectedData, 0, n); + Assert.assertArrayEquals(readData, expectedData); + } + + /** + * Test read fully + */ + @Test(timeout=120000) + public void testReadFully() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + InputStream in = getInputStream(defaultBufferSize); + final int len1 = dataLen/4; + // Read len1 bytes + byte [] readData = new byte[len1]; + readAll(in, readData, 0, len1); + byte[] expectedData = new byte[len1]; + System.arraycopy(data, 0, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + + // Pos: 1/3 dataLen + readFullyCheck(in, dataLen/3); + + // Read len1 bytes + readData = new byte[len1]; + readAll(in, readData, 0, len1); + expectedData = new byte[len1]; + System.arraycopy(data, len1, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + + // Pos: 1/2 dataLen + readFullyCheck(in, dataLen/2); + + // Read len1 bytes + readData = new byte[len1]; + readAll(in, readData, 0, len1); + expectedData = new byte[len1]; + System.arraycopy(data, 2 * len1, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + + in.close(); + } + + private void readFullyCheck(InputStream in, int pos) throws Exception { + byte[] result = new byte[dataLen - pos]; + ((PositionedReadable) in).readFully(pos, result); + + byte[] expectedData = new byte[dataLen - pos]; + System.arraycopy(data, pos, expectedData, 0, dataLen - pos); + Assert.assertArrayEquals(result, expectedData); + + result = new byte[dataLen]; // Exceeds maximum length + try { + ((PositionedReadable) in).readFully(pos, result); + Assert.fail("Read fully exceeds maximum length should fail."); + } catch (IOException e) { + } + } + + /** + * Test seek to different position. + */ + @Test(timeout=120000) + public void testSeek() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + InputStream in = getInputStream(defaultBufferSize); + // Pos: 1/3 dataLen + seekCheck(in, dataLen/3); + + // Pos: 0 + seekCheck(in, 0); + + // Pos: 1/2 dataLen + seekCheck(in, dataLen/2); + + // Pos: -3 + try { + seekCheck(in, -3); + Assert.fail("Seek to negative offset should fail."); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains("Cannot seek to negative " + + "offset", e); + } + + // Pos: dataLen + 3 + try { + seekCheck(in, dataLen + 3); + Assert.fail("Seek after EOF should fail."); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("Cannot seek after EOF", e); + } + + in.close(); + } + + private void seekCheck(InputStream in, int pos) throws Exception { + byte[] result = new byte[dataLen]; + ((Seekable) in).seek(pos); + int n = readAll(in, result, 0, dataLen); + + Assert.assertEquals(dataLen, n + pos); + byte[] readData = new byte[n]; + System.arraycopy(result, 0, readData, 0, n); + byte[] expectedData = new byte[n]; + System.arraycopy(data, pos, expectedData, 0, n); + Assert.assertArrayEquals(readData, expectedData); + } + + /** + * Test get position. + */ + @Test(timeout=120000) + public void testGetPos() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + // Default buffer size + InputStream in = getInputStream(defaultBufferSize); + byte[] result = new byte[dataLen]; + int n1 = readAll(in, result, 0, dataLen/3); + Assert.assertEquals(n1, ((Seekable) in).getPos()); + + int n2 = readAll(in, result, n1, dataLen - n1); + Assert.assertEquals(n1 + n2, ((Seekable) in).getPos()); + in.close(); + } + + @Test(timeout=120000) + public void testAvailable() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + // Default buffer size + InputStream in = getInputStream(defaultBufferSize); + byte[] result = new byte[dataLen]; + int n1 = readAll(in, result, 0, dataLen/3); + Assert.assertEquals(in.available(), dataLen - n1); + + int n2 = readAll(in, result, n1, dataLen - n1); + Assert.assertEquals(in.available(), dataLen - n1 - n2); + in.close(); + } + + /** + * Test skip. + */ + @Test(timeout=120000) + public void testSkip() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + // Default buffer size + InputStream in = getInputStream(defaultBufferSize); + byte[] result = new byte[dataLen]; + int n1 = readAll(in, result, 0, dataLen/3); + Assert.assertEquals(n1, ((Seekable) in).getPos()); + + long skipped = in.skip(dataLen/3); + int n2 = readAll(in, result, 0, dataLen); + + Assert.assertEquals(dataLen, n1 + skipped + n2); + byte[] readData = new byte[n2]; + System.arraycopy(result, 0, readData, 0, n2); + byte[] expectedData = new byte[n2]; + System.arraycopy(data, dataLen - n2, expectedData, 0, n2); + Assert.assertArrayEquals(readData, expectedData); + + try { + skipped = in.skip(-3); + Assert.fail("Skip Negative length should fail."); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains("Negative skip length", e); + } + + // Skip after EOF + skipped = in.skip(3); + Assert.assertEquals(skipped, 0); + + in.close(); + } + + private void byteBufferReadCheck(InputStream in, ByteBuffer buf, + int bufPos) throws Exception { + buf.position(bufPos); + int n = ((ByteBufferReadable) in).read(buf); + byte[] readData = new byte[n]; + buf.rewind(); + buf.position(bufPos); + buf.get(readData); + byte[] expectedData = new byte[n]; + System.arraycopy(data, 0, expectedData, 0, n); + Assert.assertArrayEquals(readData, expectedData); + } + + /** + * Test byte buffer read with different buffer size. + */ + @Test(timeout=120000) + public void testByteBufferRead() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + // Default buffer size, initial buffer position is 0 + InputStream in = getInputStream(defaultBufferSize); + ByteBuffer buf = ByteBuffer.allocate(dataLen + 100); + byteBufferReadCheck(in, buf, 0); + in.close(); + + // Default buffer size, initial buffer position is not 0 + in = getInputStream(defaultBufferSize); + buf.clear(); + byteBufferReadCheck(in, buf, 11); + in.close(); + + // Small buffer size, initial buffer position is 0 + in = getInputStream(smallBufferSize); + buf.clear(); + byteBufferReadCheck(in, buf, 0); + in.close(); + + // Small buffer size, initial buffer position is not 0 + in = getInputStream(smallBufferSize); + buf.clear(); + byteBufferReadCheck(in, buf, 11); + in.close(); + + // Direct buffer, default buffer size, initial buffer position is 0 + in = getInputStream(defaultBufferSize); + buf = ByteBuffer.allocateDirect(dataLen + 100); + byteBufferReadCheck(in, buf, 0); + in.close(); + + // Direct buffer, default buffer size, initial buffer position is not 0 + in = getInputStream(defaultBufferSize); + buf.clear(); + byteBufferReadCheck(in, buf, 11); + in.close(); + + // Direct buffer, small buffer size, initial buffer position is 0 + in = getInputStream(smallBufferSize); + buf.clear(); + byteBufferReadCheck(in, buf, 0); + in.close(); + + // Direct buffer, small buffer size, initial buffer position is not 0 + in = getInputStream(smallBufferSize); + buf.clear(); + byteBufferReadCheck(in, buf, 11); + in.close(); + } + + @Test(timeout=120000) + public void testCombinedOp() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + final int len1 = dataLen/8; + final int len2 = dataLen/10; + + InputStream in = getInputStream(defaultBufferSize); + // Read len1 data. + byte[] readData = new byte[len1]; + readAll(in, readData, 0, len1); + byte[] expectedData = new byte[len1]; + System.arraycopy(data, 0, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + + long pos = ((Seekable) in).getPos(); + Assert.assertEquals(len1, pos); + + // Seek forward len2 + ((Seekable) in).seek(pos + len2); + // Skip forward len2 + long n = in.skip(len2); + Assert.assertEquals(len2, n); + + // Pos: 1/4 dataLen + positionedReadCheck(in , dataLen/4); + + // Pos should be len1 + len2 + len2 + pos = ((Seekable) in).getPos(); + Assert.assertEquals(len1 + len2 + len2, pos); + + // Read forward len1 + ByteBuffer buf = ByteBuffer.allocate(len1); + int nRead = ((ByteBufferReadable) in).read(buf); + readData = new byte[nRead]; + buf.rewind(); + buf.get(readData); + expectedData = new byte[nRead]; + System.arraycopy(data, (int)pos, expectedData, 0, nRead); + Assert.assertArrayEquals(readData, expectedData); + + // Pos should be len1 + 2 * len2 + nRead + pos = ((Seekable) in).getPos(); + Assert.assertEquals(len1 + 2 * len2 + nRead, pos); + + // Pos: 1/3 dataLen + positionedReadCheck(in , dataLen/3); + + // Read forward len1 + readData = new byte[len1]; + readAll(in, readData, 0, len1); + expectedData = new byte[len1]; + System.arraycopy(data, (int)pos, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + + // Pos should be 2 * len1 + 2 * len2 + nRead + pos = ((Seekable) in).getPos(); + Assert.assertEquals(2 * len1 + 2 * len2 + nRead, pos); + + // Read forward len1 + buf = ByteBuffer.allocate(len1); + nRead = ((ByteBufferReadable) in).read(buf); + readData = new byte[nRead]; + buf.rewind(); + buf.get(readData); + expectedData = new byte[nRead]; + System.arraycopy(data, (int)pos, expectedData, 0, nRead); + Assert.assertArrayEquals(readData, expectedData); + + // ByteBuffer read after EOF + ((Seekable) in).seek(dataLen); + buf.clear(); + n = ((ByteBufferReadable) in).read(buf); + Assert.assertEquals(n, -1); + + in.close(); + } + + @Test(timeout=120000) + public void testSeekToNewSource() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + InputStream in = getInputStream(defaultBufferSize); + + final int len1 = dataLen/8; + byte[] readData = new byte[len1]; + readAll(in, readData, 0, len1); + + // Pos: 1/3 dataLen + seekToNewSourceCheck(in, dataLen/3); + + // Pos: 0 + seekToNewSourceCheck(in, 0); + + // Pos: 1/2 dataLen + seekToNewSourceCheck(in, dataLen/2); + + // Pos: -3 + try { + seekToNewSourceCheck(in, -3); + Assert.fail("Seek to negative offset should fail."); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains("Cannot seek to negative " + + "offset", e); + } + + // Pos: dataLen + 3 + try { + seekToNewSourceCheck(in, dataLen + 3); + Assert.fail("Seek after EOF should fail."); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("Attempted to read past end of file", e); + } + + in.close(); + } + + private void seekToNewSourceCheck(InputStream in, int targetPos) + throws Exception { + byte[] result = new byte[dataLen]; + ((Seekable) in).seekToNewSource(targetPos); + int n = readAll(in, result, 0, dataLen); + + Assert.assertEquals(dataLen, n + targetPos); + byte[] readData = new byte[n]; + System.arraycopy(result, 0, readData, 0, n); + byte[] expectedData = new byte[n]; + System.arraycopy(data, targetPos, expectedData, 0, n); + Assert.assertArrayEquals(readData, expectedData); + } + + private ByteBufferPool getBufferPool() { + return new ByteBufferPool() { + @Override + public ByteBuffer getBuffer(boolean direct, int length) { + return ByteBuffer.allocateDirect(length); + } + + @Override + public void putBuffer(ByteBuffer buffer) { + } + }; + } + + @Test(timeout=120000) + public void testHasEnhancedByteBufferAccess() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + InputStream in = getInputStream(defaultBufferSize); + final int len1 = dataLen/8; + // ByteBuffer size is len1 + ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).read( + getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + int n1 = buffer.remaining(); + byte[] readData = new byte[n1]; + buffer.get(readData); + byte[] expectedData = new byte[n1]; + System.arraycopy(data, 0, expectedData, 0, n1); + Assert.assertArrayEquals(readData, expectedData); + ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer); + + // Read len1 bytes + readData = new byte[len1]; + readAll(in, readData, 0, len1); + expectedData = new byte[len1]; + System.arraycopy(data, n1, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + + // ByteBuffer size is len1 + buffer = ((HasEnhancedByteBufferAccess) in).read( + getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + int n2 = buffer.remaining(); + readData = new byte[n2]; + buffer.get(readData); + expectedData = new byte[n2]; + System.arraycopy(data, n1 + len1, expectedData, 0, n2); + Assert.assertArrayEquals(readData, expectedData); + ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer); + + in.close(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java new file mode 100644 index 0000000000..ebe025b0ea --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java @@ -0,0 +1,376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.io.EOFException; +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.CanSetDropBehind; +import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; +import org.apache.hadoop.fs.HasFileDescriptor; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class TestCryptoStreams extends CryptoStreamsTestBase { + /** + * Data storage. + * {@link #getOutputStream(int)} will write to this buf. + * {@link #getInputStream(int)} will read from this buf. + */ + private byte[] buf; + private int bufLen; + + @BeforeClass + public static void init() throws Exception { + Configuration conf = new Configuration(); + codec = CryptoCodec.getInstance(conf); + } + + @AfterClass + public static void shutdown() throws Exception { + } + + @Override + protected OutputStream getOutputStream(int bufferSize, byte[] key, byte[] iv) + throws IOException { + DataOutputBuffer out = new DataOutputBuffer() { + @Override + public void flush() throws IOException { + buf = getData(); + bufLen = getLength(); + } + @Override + public void close() throws IOException { + buf = getData(); + bufLen = getLength(); + } + }; + return new CryptoOutputStream(new FakeOutputStream(out), + codec, bufferSize, key, iv); + } + + @Override + protected InputStream getInputStream(int bufferSize, byte[] key, byte[] iv) + throws IOException { + DataInputBuffer in = new DataInputBuffer(); + in.reset(buf, 0, bufLen); + return new CryptoInputStream(new FakeInputStream(in), codec, bufferSize, + key, iv); + } + + private class FakeOutputStream extends OutputStream + implements Syncable, CanSetDropBehind{ + private final byte[] oneByteBuf = new byte[1]; + private final DataOutputBuffer out; + private boolean closed; + + public FakeOutputStream(DataOutputBuffer out) { + this.out = out; + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + + checkStream(); + + out.write(b, off, len); + } + + @Override + public void flush() throws IOException { + checkStream(); + out.flush(); + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + out.close(); + closed = true; + } + + @Override + public void write(int b) throws IOException { + oneByteBuf[0] = (byte)(b & 0xff); + write(oneByteBuf, 0, oneByteBuf.length); + } + + @Override + public void setDropBehind(Boolean dropCache) throws IOException, + UnsupportedOperationException { + } + + @Override + public void hflush() throws IOException { + checkStream(); + flush(); + } + + @Override + public void hsync() throws IOException { + checkStream(); + flush(); + } + + private void checkStream() throws IOException { + if (closed) { + throw new IOException("Stream is closed!"); + } + } + } + + private class FakeInputStream extends InputStream implements + Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, + CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess { + private final byte[] oneByteBuf = new byte[1]; + private int pos = 0; + private final byte[] data; + private final int length; + private boolean closed = false; + + public FakeInputStream(DataInputBuffer in) { + data = in.getData(); + length = in.getLength(); + } + + @Override + public void seek(long pos) throws IOException { + if (pos > length) { + throw new IOException("Cannot seek after EOF."); + } + if (pos < 0) { + throw new IOException("Cannot seek to negative offset."); + } + checkStream(); + this.pos = (int)pos; + } + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public int available() throws IOException { + return length - pos; + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + checkStream(); + + if (pos < length) { + int n = (int) Math.min(len, length - pos); + System.arraycopy(data, pos, b, off, n); + pos += n; + return n; + } + + return -1; + } + + private void checkStream() throws IOException { + if (closed) { + throw new IOException("Stream is closed!"); + } + } + + @Override + public int read(ByteBuffer buf) throws IOException { + checkStream(); + if (pos < length) { + int n = (int) Math.min(buf.remaining(), length - pos); + if (n > 0) { + buf.put(data, pos, n); + } + pos += n; + return n; + } + return -1; + } + + @Override + public long skip(long n) throws IOException { + checkStream(); + if ( n > 0 ) { + if( n + pos > length ) { + n = length - pos; + } + pos += n; + return n; + } + return n < 0 ? -1 : 0; + } + + @Override + public void close() throws IOException { + closed = true; + } + + @Override + public int read(long position, byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + if (position > length) { + throw new IOException("Cannot read after EOF."); + } + if (position < 0) { + throw new IOException("Cannot read to negative offset."); + } + + checkStream(); + + if (position < length) { + int n = (int) Math.min(len, length - position); + System.arraycopy(data, (int)position, b, off, n); + return n; + } + + return -1; + } + + @Override + public void readFully(long position, byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + + if (position > length) { + throw new IOException("Cannot read after EOF."); + } + if (position < 0) { + throw new IOException("Cannot read to negative offset."); + } + + checkStream(); + + if (position + len > length) { + throw new EOFException("Reach the end of stream."); + } + + System.arraycopy(data, (int)position, b, off, len); + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + readFully(position, buffer, 0, buffer.length); + } + + @Override + public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, + EnumSet opts) throws IOException, + UnsupportedOperationException { + if (bufferPool == null) { + throw new IOException("Please specify buffer pool."); + } + ByteBuffer buffer = bufferPool.getBuffer(true, maxLength); + int pos = buffer.position(); + int n = read(buffer); + if (n >= 0) { + buffer.position(pos); + return buffer; + } + + return null; + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + + } + + @Override + public void setReadahead(Long readahead) throws IOException, + UnsupportedOperationException { + } + + @Override + public void setDropBehind(Boolean dropCache) throws IOException, + UnsupportedOperationException { + } + + @Override + public FileDescriptor getFileDescriptor() throws IOException { + return null; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + if (targetPos > length) { + throw new IOException("Attempted to read past end of file."); + } + if (targetPos < 0) { + throw new IOException("Cannot seek after EOF."); + } + checkStream(); + this.pos = (int)targetPos; + return false; + } + + @Override + public int read() throws IOException { + int ret = read( oneByteBuf, 0, 1 ); + return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java new file mode 100644 index 0000000000..286fb6a3d2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase { + private static final String TEST_ROOT_DIR + = System.getProperty("test.build.data","build/test/data") + "/work-dir/localfs"; + + private final File base = new File(TEST_ROOT_DIR); + private final Path file = new Path(TEST_ROOT_DIR, "test-file"); + private static LocalFileSystem fileSys; + + @BeforeClass + public static void init() throws Exception { + Configuration conf = new Configuration(); + conf = new Configuration(false); + conf.set("fs.file.impl", LocalFileSystem.class.getName()); + fileSys = FileSystem.getLocal(conf); + codec = CryptoCodec.getInstance(conf); + } + + @AfterClass + public static void shutdown() throws Exception { + } + + @Before + @Override + public void setUp() throws IOException { + fileSys.delete(new Path(TEST_ROOT_DIR), true); + super.setUp(); + } + + @After + public void cleanUp() throws IOException { + FileUtil.setWritable(base, true); + FileUtil.fullyDelete(base); + assertTrue(!base.exists()); + } + + @Override + protected OutputStream getOutputStream(int bufferSize, byte[] key, byte[] iv) + throws IOException { + return new CryptoOutputStream(fileSys.create(file), codec, bufferSize, + key, iv); + } + + @Override + protected InputStream getInputStream(int bufferSize, byte[] key, byte[] iv) + throws IOException { + return new CryptoInputStream(fileSys.open(file), codec, bufferSize, + key, iv); + } + + @Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read") + @Override + @Test(timeout=1000) + public void testByteBufferRead() throws Exception {} + + @Ignore("ChecksumFSOutputSummer doesn't support Syncable") + @Override + @Test(timeout=1000) + public void testSyncable() throws IOException {} + + @Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read") + @Override + @Test(timeout=1000) + public void testCombinedOp() throws Exception {} + + @Ignore("ChecksumFSInputChecker doesn't support enhanced ByteBuffer access") + @Override + @Test(timeout=1000) + public void testHasEnhancedByteBufferAccess() throws Exception { + } + + @Ignore("ChecksumFSInputChecker doesn't support seekToNewSource") + @Override + @Test(timeout=1000) + public void testSeekToNewSource() throws Exception { + } +}