diff --git a/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt b/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt
index fb293606ad..e7bc580ff3 100644
--- a/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt
@@ -8,6 +8,9 @@ fs-encryption (Unreleased)
IMPROVEMENTS
+ HADOOP-10603. Crypto input and output streams implementing Hadoop stream
+ interfaces. (Yi Liu and Charles Lamb)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AESCTRCryptoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AESCTRCryptoCodec.java
new file mode 100644
index 0000000000..b76f1bf2f6
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AESCTRCryptoCodec.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class AESCTRCryptoCodec extends CryptoCodec {
+ /**
+ * For AES, the algorithm block is fixed size of 128 bits.
+ * @see http://en.wikipedia.org/wiki/Advanced_Encryption_Standard
+ */
+ private static final int AES_BLOCK_SIZE = 16;
+
+ @Override
+ public int getAlgorithmBlockSize() {
+ return AES_BLOCK_SIZE;
+ }
+
+ /**
+ * IV is produced by combining initial IV and the counter using addition.
+ * IV length should be the same as {@link #AES_BLOCK_SIZE}
+ */
+ @Override
+ public void calculateIV(byte[] initIV, long counter, byte[] IV) {
+ Preconditions.checkArgument(initIV.length == AES_BLOCK_SIZE);
+ Preconditions.checkArgument(IV.length == AES_BLOCK_SIZE);
+
+ ByteBuffer buf = ByteBuffer.wrap(IV);
+ buf.put(initIV);
+ buf.order(ByteOrder.BIG_ENDIAN);
+ counter += buf.getLong(AES_BLOCK_SIZE - 8);
+ buf.putLong(AES_BLOCK_SIZE - 8, counter);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java
new file mode 100644
index 0000000000..80d824d0e1
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.security.GeneralSecurityException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASS_KEY;
+
+/**
+ * Crypto codec class, encapsulates encryptor/decryptor pair.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class CryptoCodec implements Configurable {
+
+ public static CryptoCodec getInstance(Configuration conf) {
+ final Class extends CryptoCodec> klass = conf.getClass(
+ HADOOP_SECURITY_CRYPTO_CODEC_CLASS_KEY, JCEAESCTRCryptoCodec.class,
+ CryptoCodec.class);
+ return ReflectionUtils.newInstance(klass, conf);
+ }
+
+ /**
+ * Get block size of a block cipher.
+ * For different algorithms, the block size may be different.
+ * @return int block size
+ */
+ public abstract int getAlgorithmBlockSize();
+
+ /**
+ * Get a {@link #org.apache.hadoop.crypto.Encryptor}.
+ * @return Encryptor
+ */
+ public abstract Encryptor getEncryptor() throws GeneralSecurityException;
+
+ /**
+ * Get a {@link #org.apache.hadoop.crypto.Decryptor}.
+ * @return Decryptor
+ */
+ public abstract Decryptor getDecryptor() throws GeneralSecurityException;
+
+ /**
+ * This interface is only for Counter (CTR) mode. Typically calculating
+ * IV(Initialization Vector) is up to Encryptor or Decryptor, for
+ * example {@link #javax.crypto.Cipher} will maintain encryption context
+ * internally when do encryption/decryption continuously using its
+ * Cipher#update interface.
+ *
+ * In Hadoop, multiple nodes may read splits of a file, so decrypting of
+ * file is not continuous, even for encrypting may be not continuous. For
+ * each part, we need to calculate the counter through file position.
+ *
+ * Typically IV for a file position is produced by combining initial IV and
+ * the counter using any lossless operation (concatenation, addition, or XOR).
+ * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29
+ *
+ * @param initIV initial IV
+ * @param counter counter for input stream position
+ * @param IV the IV for input stream position
+ */
+ public abstract void calculateIV(byte[] initIV, long counter, byte[] IV);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
new file mode 100644
index 0000000000..ffcf1846cf
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.HasFileDescriptor;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.ByteBufferPool;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
+ * required in order to ensure that the plain text and cipher text have a 1:1
+ * mapping. The decryption is buffer based. The key points of the decryption
+ * are (1) calculating the counter and (2) padding through stream position:
+ *
+ * counter = base + pos/(algorithm blocksize);
+ * padding = pos%(algorithm blocksize);
+ *
+ * The underlying stream offset is maintained as state.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CryptoInputStream extends FilterInputStream implements
+ Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
+ CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
+ private static final int MIN_BUFFER_SIZE = 512;
+ private static final byte[] oneByteBuf = new byte[1];
+ private final CryptoCodec codec;
+ private final Decryptor decryptor;
+ /**
+ * Input data buffer. The data starts at inBuffer.position() and ends at
+ * to inBuffer.limit().
+ */
+ private ByteBuffer inBuffer;
+ /**
+ * The decrypted data buffer. The data starts at outBuffer.position() and
+ * ends at outBuffer.limit();
+ */
+ private ByteBuffer outBuffer;
+ private long streamOffset = 0; // Underlying stream offset.
+ /**
+ * Whether underlying stream supports
+ * {@link #org.apache.hadoop.fs.ByteBufferReadable}
+ */
+ private Boolean usingByteBufferRead = null;
+ /**
+ * Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer}
+ * before any other data goes in. The purpose of padding is to put input data
+ * at proper position.
+ */
+ private byte padding;
+ private boolean closed;
+ private final byte[] key;
+ private final byte[] initIV;
+ private byte[] iv;
+
+ public CryptoInputStream(InputStream in, CryptoCodec codec,
+ int bufferSize, byte[] key, byte[] iv) throws IOException {
+ super(in);
+ Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE,
+ "Minimum value of buffer size is 512.");
+ this.key = key;
+ this.initIV = iv;
+ this.iv = iv.clone();
+ inBuffer = ByteBuffer.allocateDirect(bufferSize);
+ outBuffer = ByteBuffer.allocateDirect(bufferSize);
+ outBuffer.limit(0);
+ this.codec = codec;
+ try {
+ decryptor = codec.getDecryptor();
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ if (in instanceof Seekable) {
+ streamOffset = ((Seekable) in).getPos();
+ }
+ updateDecryptor();
+ }
+
+ public CryptoInputStream(InputStream in, CryptoCodec codec,
+ byte[] key, byte[] iv) throws IOException {
+ this(in, codec, getBufferSize(codec.getConf()), key, iv);
+ }
+
+ public InputStream getWrappedStream() {
+ return in;
+ }
+
+ /**
+ * Decryption is buffer based.
+ * If there is data in {@link #outBuffer}, then read it out of this buffer.
+ * If there is no data in {@link #outBuffer}, then read more from the
+ * underlying stream and do the decryption.
+ * @param b the buffer into which the decrypted data is read.
+ * @param off the buffer offset.
+ * @param len the maximum number of decrypted data bytes to read.
+ * @return int the total number of decrypted data bytes read into the buffer.
+ * @throws IOException
+ */
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ checkStream();
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ int remaining = outBuffer.remaining();
+ if (remaining > 0) {
+ int n = Math.min(len, remaining);
+ outBuffer.get(b, off, n);
+ return n;
+ } else {
+ int n = 0;
+ /**
+ * Check whether the underlying stream is {@link ByteBufferReadable},
+ * it can avoid bytes copy.
+ */
+ if (usingByteBufferRead == null) {
+ if (in instanceof ByteBufferReadable) {
+ try {
+ n = ((ByteBufferReadable) in).read(inBuffer);
+ usingByteBufferRead = Boolean.TRUE;
+ } catch (UnsupportedOperationException e) {
+ usingByteBufferRead = Boolean.FALSE;
+ }
+ }
+ if (!usingByteBufferRead.booleanValue()) {
+ n = readFromUnderlyingStream();
+ }
+ } else {
+ if (usingByteBufferRead.booleanValue()) {
+ n = ((ByteBufferReadable) in).read(inBuffer);
+ } else {
+ n = readFromUnderlyingStream();
+ }
+ }
+ if (n <= 0) {
+ return n;
+ }
+
+ streamOffset += n; // Read n bytes
+ decrypt();
+ n = Math.min(len, outBuffer.remaining());
+ outBuffer.get(b, off, n);
+ return n;
+ }
+ }
+
+ // Read data from underlying stream.
+ private int readFromUnderlyingStream() throws IOException {
+ int toRead = inBuffer.remaining();
+ byte[] tmp = getTmpBuf();
+ int n = in.read(tmp, 0, toRead);
+ if (n > 0) {
+ inBuffer.put(tmp, 0, n);
+ }
+ return n;
+ }
+
+ private byte[] tmpBuf;
+ private byte[] getTmpBuf() {
+ if (tmpBuf == null) {
+ tmpBuf = new byte[inBuffer.capacity()];
+ }
+ return tmpBuf;
+ }
+
+ /**
+ * Do the decryption using {@link #inBuffer} as input and {@link #outBuffer}
+ * as output.
+ */
+ private void decrypt() throws IOException {
+ Preconditions.checkState(inBuffer.position() >= padding);
+ if(inBuffer.position() == padding) {
+ // There is no real data in inBuffer.
+ return;
+ }
+ inBuffer.flip();
+ outBuffer.clear();
+ decryptor.decrypt(inBuffer, outBuffer);
+ inBuffer.clear();
+ outBuffer.flip();
+ if (padding > 0) {
+ /**
+ * The plain text and cipher text have 1:1 mapping, they start at same
+ * position.
+ */
+ outBuffer.position(padding);
+ padding = 0;
+ }
+ if (decryptor.isContextReset()) {
+ /**
+ * Typically we will not get here. To improve performance in CTR mode,
+ * we rely on the decryptor maintaining context, for example calculating
+ * the counter. Unfortunately, some bad implementations can't maintain
+ * context so we need to re-init after doing decryption.
+ */
+ updateDecryptor();
+ }
+ }
+
+ /**
+ * Update the {@link #decryptor}. Calculate the counter and {@link #padding}.
+ */
+ private void updateDecryptor() throws IOException {
+ long counter = streamOffset / codec.getAlgorithmBlockSize();
+ padding = (byte)(streamOffset % codec.getAlgorithmBlockSize());
+ inBuffer.position(padding); // Set proper position for input data.
+ codec.calculateIV(initIV, counter, iv);
+ decryptor.init(key, iv);
+ }
+
+ /**
+ * Reset the underlying stream offset; and clear {@link #inBuffer} and
+ * {@link #outBuffer}. Typically this happens when doing {@link #seek(long)}
+ * or {@link #skip(long)}.
+ */
+ private void resetStreamOffset(long offset) throws IOException {
+ streamOffset = offset;
+ inBuffer.clear();
+ outBuffer.clear();
+ outBuffer.limit(0);
+ updateDecryptor();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ super.close();
+ freeBuffers();
+ closed = true;
+ }
+
+ /**
+ * Free the direct buffer manually.
+ */
+ private void freeBuffers() {
+ sun.misc.Cleaner inBufferCleaner =
+ ((sun.nio.ch.DirectBuffer) inBuffer).cleaner();
+ inBufferCleaner.clean();
+ sun.misc.Cleaner outBufferCleaner =
+ ((sun.nio.ch.DirectBuffer) outBuffer).cleaner();
+ outBufferCleaner.clean();
+ }
+
+ // Positioned read.
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ checkStream();
+ try {
+ int n = ((PositionedReadable) in).read(position, buffer, offset, length);
+ if (n > 0) {
+ /**
+ * Since this operation does not change the current offset of a file,
+ * streamOffset should be not changed and we need to restore the
+ * decryptor and outBuffer after decryption.
+ */
+ decrypt(position, buffer, offset, length);
+ }
+
+ return n;
+ } catch (ClassCastException e) {
+ throw new UnsupportedOperationException("This stream does not support " +
+ "positioned read.");
+ }
+ }
+
+ /**
+ * Decrypt given length of data in buffer: start from offset.
+ * Output is also buffer and start from same offset. Restore the
+ * {@link #decryptor} and {@link #outBuffer} after decryption.
+ */
+ private void decrypt(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+
+ byte[] tmp = getTmpBuf();
+ int unread = outBuffer.remaining();
+ if (unread > 0) { // Cache outBuffer
+ outBuffer.get(tmp, 0, unread);
+ }
+ long curOffset = streamOffset;
+ resetStreamOffset(position);
+
+ int n = 0;
+ while (n < length) {
+ int toDecrypt = Math.min(length - n, inBuffer.remaining());
+ inBuffer.put(buffer, offset + n, toDecrypt);
+ // Do decryption
+ decrypt();
+ outBuffer.get(buffer, offset + n, toDecrypt);
+ n += toDecrypt;
+ }
+
+ // After decryption
+ resetStreamOffset(curOffset);
+ if (unread > 0) { // Restore outBuffer
+ outBuffer.clear();
+ outBuffer.put(tmp, 0, unread);
+ outBuffer.flip();
+ }
+ }
+
+ // Positioned read fully.
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ checkStream();
+ try {
+ ((PositionedReadable) in).readFully(position, buffer, offset, length);
+ if (length > 0) {
+ /**
+ * Since this operation does not change the current offset of a file,
+ * streamOffset should be not changed and we need to restore the decryptor
+ * and outBuffer after decryption.
+ */
+ decrypt(position, buffer, offset, length);
+ }
+ } catch (ClassCastException e) {
+ throw new UnsupportedOperationException("This stream does not support " +
+ "positioned readFully.");
+ }
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ readFully(position, buffer, 0, buffer.length);
+ }
+
+ // Seek to a position.
+ @Override
+ public void seek(long pos) throws IOException {
+ Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
+ checkStream();
+ try {
+ // If target pos we have already read and decrypt.
+ if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
+ int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
+ if (forward > 0) {
+ outBuffer.position(outBuffer.position() + forward);
+ }
+ } else {
+ ((Seekable) in).seek(pos);
+ resetStreamOffset(pos);
+ }
+ } catch (ClassCastException e) {
+ throw new UnsupportedOperationException("This stream does not support " +
+ "seek.");
+ }
+ }
+
+ // Skip n bytes
+ @Override
+ public long skip(long n) throws IOException {
+ Preconditions.checkArgument(n >= 0, "Negative skip length.");
+ checkStream();
+
+ if (n == 0) {
+ return 0;
+ } else if (n <= outBuffer.remaining()) {
+ int pos = outBuffer.position() + (int) n;
+ outBuffer.position(pos);
+ return n;
+ } else {
+ /**
+ * Subtract outBuffer.remaining() to see how many bytes we need to
+ * skip in underlying stream. We get real skipped bytes number of
+ * underlying stream then add outBuffer.remaining() to get skipped
+ * bytes number from user's view.
+ */
+ n -= outBuffer.remaining();
+ long skipped = in.skip(n);
+ if (skipped < 0) {
+ skipped = 0;
+ }
+ long pos = streamOffset + skipped;
+ skipped += outBuffer.remaining();
+ resetStreamOffset(pos);
+ return skipped;
+ }
+ }
+
+ // Get underlying stream position.
+ @Override
+ public long getPos() throws IOException {
+ checkStream();
+ // Equals: ((Seekable) in).getPos() - outBuffer.remaining()
+ return streamOffset - outBuffer.remaining();
+ }
+
+ // ByteBuffer read.
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ checkStream();
+ if (in instanceof ByteBufferReadable) {
+ int unread = outBuffer.remaining();
+ if (unread > 0) { // Have unread decrypted data in buffer.
+ int toRead = buf.remaining();
+ if (toRead <= unread) {
+ int limit = outBuffer.limit();
+ outBuffer.limit(outBuffer.position() + toRead);
+ buf.put(outBuffer);
+ outBuffer.limit(limit);
+ return toRead;
+ } else {
+ buf.put(outBuffer);
+ }
+ }
+
+ int pos = buf.position();
+ int n = ((ByteBufferReadable) in).read(buf);
+ if (n > 0) {
+ streamOffset += n; // Read n bytes
+ decrypt(buf, n, pos);
+ }
+ return n;
+ }
+
+ throw new UnsupportedOperationException("ByteBuffer read unsupported " +
+ "by input stream.");
+ }
+
+ /**
+ * Decrypt all data in buf: total n bytes from given start position.
+ * Output is also buf and same start position.
+ * buf.position() and buf.limit() should be unchanged after decryption.
+ */
+ private void decrypt(ByteBuffer buf, int n, int start)
+ throws IOException {
+ int pos = buf.position();
+ int limit = buf.limit();
+ int len = 0;
+ while (len < n) {
+ buf.position(start + len);
+ buf.limit(start + len + Math.min(n - len, inBuffer.remaining()));
+ inBuffer.put(buf);
+ // Do decryption
+ decrypt();
+
+ buf.position(start + len);
+ buf.limit(limit);
+ len += outBuffer.remaining();
+ buf.put(outBuffer);
+ }
+ buf.position(pos);
+ }
+
+ @Override
+ public int available() throws IOException {
+ checkStream();
+
+ return in.available() + outBuffer.remaining();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public void mark(int readLimit) {
+ }
+
+ @Override
+ public void reset() throws IOException {
+ throw new IOException("Mark/reset not supported");
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ Preconditions.checkArgument(targetPos >= 0,
+ "Cannot seek to negative offset.");
+ checkStream();
+ try {
+ boolean result = ((Seekable) in).seekToNewSource(targetPos);
+ resetStreamOffset(targetPos);
+ return result;
+ } catch (ClassCastException e) {
+ throw new UnsupportedOperationException("This stream does not support " +
+ "seekToNewSource.");
+ }
+ }
+
+ @Override
+ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
+ EnumSet opts) throws IOException,
+ UnsupportedOperationException {
+ checkStream();
+ try {
+ if (outBuffer.remaining() > 0) {
+ // Have some decrypted data unread, need to reset.
+ ((Seekable) in).seek(getPos());
+ resetStreamOffset(getPos());
+ }
+ ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
+ read(bufferPool, maxLength, opts);
+ if (buffer != null) {
+ int n = buffer.remaining();
+ if (n > 0) {
+ streamOffset += buffer.remaining(); // Read n bytes
+ int pos = buffer.position();
+ decrypt(buffer, n, pos);
+ }
+ }
+ return buffer;
+ } catch (ClassCastException e) {
+ throw new UnsupportedOperationException("This stream does not support " +
+ "enhanced byte buffer access.");
+ }
+ }
+
+ @Override
+ public void releaseBuffer(ByteBuffer buffer) {
+ try {
+ ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
+ } catch (ClassCastException e) {
+ throw new UnsupportedOperationException("This stream does not support " +
+ "release buffer.");
+ }
+ }
+
+ @Override
+ public void setReadahead(Long readahead) throws IOException,
+ UnsupportedOperationException {
+ try {
+ ((CanSetReadahead) in).setReadahead(readahead);
+ } catch (ClassCastException e) {
+ throw new UnsupportedOperationException("This stream does not support " +
+ "setting the readahead caching strategy.");
+ }
+ }
+
+ @Override
+ public void setDropBehind(Boolean dropCache) throws IOException,
+ UnsupportedOperationException {
+ try {
+ ((CanSetDropBehind) in).setDropBehind(dropCache);
+ } catch (ClassCastException e) {
+ throw new UnsupportedOperationException("This stream does not " +
+ "support setting the drop-behind caching setting.");
+ }
+ }
+
+ @Override
+ public FileDescriptor getFileDescriptor() throws IOException {
+ if (in instanceof HasFileDescriptor) {
+ return ((HasFileDescriptor) in).getFileDescriptor();
+ } else if (in instanceof FileInputStream) {
+ return ((FileInputStream) in).getFD();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ return (read(oneByteBuf, 0, 1) == -1) ? -1 : (oneByteBuf[0] & 0xff);
+ }
+
+ private void checkStream() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ }
+
+ private static int getBufferSize(Configuration conf) {
+ return conf.getInt(HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY,
+ HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
new file mode 100644
index 0000000000..934c447935
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.Syncable;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
+ * required in order to ensure that the plain text and cipher text have a 1:1
+ * mapping. The encryption is buffer based. The key points of the encryption are
+ * (1) calculating counter and (2) padding through stream position.
+ *
+ * counter = base + pos/(algorithm blocksize);
+ * padding = pos%(algorithm blocksize);
+ *
+ * The underlying stream offset is maintained as state.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CryptoOutputStream extends FilterOutputStream implements
+ Syncable, CanSetDropBehind {
+ private static final int MIN_BUFFER_SIZE = 512;
+ private static final byte[] oneByteBuf = new byte[1];
+ private final CryptoCodec codec;
+ private final Encryptor encryptor;
+ /**
+ * Input data buffer. The data starts at inBuffer.position() and ends at
+ * inBuffer.limit().
+ */
+ private ByteBuffer inBuffer;
+ /**
+ * Encrypted data buffer. The data starts at outBuffer.position() and ends at
+ * outBuffer.limit();
+ */
+ private ByteBuffer outBuffer;
+ private long streamOffset = 0; // Underlying stream offset.
+ /**
+ * Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer}
+ * before any other data goes in. The purpose of padding is to put input data
+ * at proper position.
+ */
+ private byte padding;
+ private boolean closed;
+ private final byte[] key;
+ private final byte[] initIV;
+ private byte[] iv;
+
+ public CryptoOutputStream(OutputStream out, CryptoCodec codec,
+ int bufferSize, byte[] key, byte[] iv) throws IOException {
+ this(out, codec, bufferSize, key, iv, 0);
+ }
+
+ public CryptoOutputStream(OutputStream out, CryptoCodec codec,
+ int bufferSize, byte[] key, byte[] iv, long streamOffset)
+ throws IOException {
+ super(out);
+ Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE,
+ "Minimum value of buffer size is 512.");
+ this.key = key;
+ this.initIV = iv;
+ this.iv = iv.clone();
+ inBuffer = ByteBuffer.allocateDirect(bufferSize);
+ outBuffer = ByteBuffer.allocateDirect(bufferSize);
+ this.streamOffset = streamOffset;
+ this.codec = codec;
+ try {
+ encryptor = codec.getEncryptor();
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ updateEncryptor();
+ }
+
+ public CryptoOutputStream(OutputStream out, CryptoCodec codec,
+ byte[] key, byte[] iv) throws IOException {
+ this(out, codec, key, iv, 0);
+ }
+
+ public CryptoOutputStream(OutputStream out, CryptoCodec codec,
+ byte[] key, byte[] iv, long streamOffset) throws IOException {
+ this(out, codec, getBufferSize(codec.getConf()), key, iv, streamOffset);
+ }
+
+ public OutputStream getWrappedStream() {
+ return out;
+ }
+
+ /**
+ * Encryption is buffer based.
+ * If there is enough room in {@link #inBuffer}, then write to this buffer.
+ * If {@link #inBuffer} is full, then do encryption and write data to the
+ * underlying stream.
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ * @throws IOException
+ */
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checkStream();
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || off > b.length ||
+ len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+ while (len > 0) {
+ int remaining = inBuffer.remaining();
+ if (len < remaining) {
+ inBuffer.put(b, off, len);
+ len = 0;
+ } else {
+ inBuffer.put(b, off, remaining);
+ off += remaining;
+ len -= remaining;
+ encrypt();
+ }
+ }
+ }
+
+ /**
+ * Do the encryption, input is {@link #inBuffer} and output is
+ * {@link #outBuffer}.
+ */
+ private void encrypt() throws IOException {
+ Preconditions.checkState(inBuffer.position() >= padding);
+ if (inBuffer.position() == padding) {
+ // There is no real data in the inBuffer.
+ return;
+ }
+ inBuffer.flip();
+ outBuffer.clear();
+ encryptor.encrypt(inBuffer, outBuffer);
+ inBuffer.clear();
+ outBuffer.flip();
+ if (padding > 0) {
+ /**
+ * The plain text and cipher text have 1:1 mapping, they start at same
+ * position.
+ */
+ outBuffer.position(padding);
+ padding = 0;
+ }
+ int len = outBuffer.remaining();
+ /**
+ * If underlying stream supports {@link ByteBuffer} write in future, needs
+ * refine here.
+ */
+ final byte[] tmp = getTmpBuf();
+ outBuffer.get(tmp, 0, len);
+ out.write(tmp, 0, len);
+
+ streamOffset += len;
+ if (encryptor.isContextReset()) {
+ /**
+ * We will generally not get here. For CTR mode, to improve
+ * performance, we rely on the encryptor maintaining context, for
+ * example to calculate the counter. But some bad implementations
+ * can't maintain context, and need us to re-init after doing
+ * encryption.
+ */
+ updateEncryptor();
+ }
+ }
+
+ /**
+ * Update the {@link #encryptor}: calculate counter and {@link #padding}.
+ */
+ private void updateEncryptor() throws IOException {
+ long counter = streamOffset / codec.getAlgorithmBlockSize();
+ padding = (byte)(streamOffset % codec.getAlgorithmBlockSize());
+ inBuffer.position(padding); // Set proper position for input data.
+ codec.calculateIV(initIV, counter, iv);
+ encryptor.init(key, iv);
+ }
+
+ private byte[] tmpBuf;
+ private byte[] getTmpBuf() {
+ if (tmpBuf == null) {
+ tmpBuf = new byte[outBuffer.capacity()];
+ }
+ return tmpBuf;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ super.close();
+ freeBuffers();
+ closed = true;
+ }
+
+ /**
+ * Free the direct buffer manually.
+ */
+ private void freeBuffers() {
+ sun.misc.Cleaner inBufferCleaner =
+ ((sun.nio.ch.DirectBuffer) inBuffer).cleaner();
+ inBufferCleaner.clean();
+ sun.misc.Cleaner outBufferCleaner =
+ ((sun.nio.ch.DirectBuffer) outBuffer).cleaner();
+ outBufferCleaner.clean();
+ }
+
+ /**
+ * To flush, we need to encrypt the data in buffer and write to underlying
+ * stream, then do the flush.
+ */
+ @Override
+ public void flush() throws IOException {
+ checkStream();
+ encrypt();
+ super.flush();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ oneByteBuf[0] = (byte)(b & 0xff);
+ write(oneByteBuf, 0, oneByteBuf.length);
+ }
+
+ private void checkStream() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ }
+
+ @Override
+ public void setDropBehind(Boolean dropCache) throws IOException,
+ UnsupportedOperationException {
+ try {
+ ((CanSetDropBehind) out).setDropBehind(dropCache);
+ } catch (ClassCastException e) {
+ throw new UnsupportedOperationException("This stream does not " +
+ "support setting the drop-behind caching.");
+ }
+ }
+
+ @Override
+ public void hflush() throws IOException {
+ flush();
+ if (out instanceof Syncable) {
+ ((Syncable)out).hflush();
+ }
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ flush();
+ if (out instanceof Syncable) {
+ ((Syncable)out).hsync();
+ }
+ }
+
+ private static int getBufferSize(Configuration conf) {
+ return conf.getInt(HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY,
+ HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java
new file mode 100644
index 0000000000..4afb221658
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface Decryptor {
+
+ /**
+ * Initialize the decryptor, the internal decryption context will be
+ * reset.
+ * @param key decryption key.
+ * @param iv decryption initialization vector
+ * @throws IOException if initialization fails
+ */
+ public void init(byte[] key, byte[] iv) throws IOException;
+
+ /**
+ * Indicate whether decryption context is reset.
+ *
+ * It's useful for some mode like CTR which requires different IV for
+ * different parts of data. Usually decryptor can maintain the context
+ * internally such as calculating IV/counter, then continue a multiple-part
+ * decryption operation without reinit the decryptor using key and the new
+ * IV. For mode like CTR, if context is reset after each decryption, the
+ * decryptor should be reinit before each operation, that's not efficient.
+ * @return boolean whether context is reset.
+ */
+ public boolean isContextReset();
+
+ /**
+ * This exposes a direct interface for record decryption with direct byte
+ * buffers.
+ *
+ * The decrypt() function need not always consume the buffers provided,
+ * it will need to be called multiple times to decrypt an entire buffer
+ * and the object will hold the decryption context internally.
+ *
+ * Some implementation may need enough space in the destination buffer to
+ * decrypt an entire input.
+ *
+ * The end result will move inBuffer.position() by the bytes-read and
+ * outBuffer.position() by the bytes-written. It should not modify the
+ * inBuffer.limit() or outBuffer.limit() to maintain consistency of operation.
+ *
+ * @param inBuffer in direct {@link ByteBuffer} for reading from. Requires
+ * inBuffer != null and inBuffer.remaining() > 0
+ * @param outBuffer out direct {@link ByteBuffer} for storing the results
+ * into. Requires outBuffer != null and outBuffer.remaining() > 0
+ * @throws IOException if decryption fails
+ */
+ public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
+ throws IOException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java
new file mode 100644
index 0000000000..398cc2e9ae
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface Encryptor {
+
+ /**
+ * Initialize the encryptor, the internal encryption context will be
+ * reset.
+ * @param key encryption key.
+ * @param iv encryption initialization vector
+ * @throws IOException if initialization fails
+ */
+ public void init(byte[] key, byte[] iv) throws IOException;
+
+ /**
+ * Indicate whether encryption context is reset.
+ *
+ * It's useful for some mode like CTR which requires different IV for
+ * different parts of data. Usually encryptor can maintain the context
+ * internally such as calculating IV/counter, then continue a multiple-part
+ * encryption operation without reinit the encryptor using key and the new
+ * IV. For mode like CTR, if context is reset after each encryption, the
+ * encryptor should be reinit before each operation, that's not efficient.
+ * @return boolean whether context is reset.
+ */
+ public boolean isContextReset();
+
+ /**
+ * This exposes a direct interface for record encryption with direct byte
+ * buffers.
+ *
+ * The encrypt() function need not always consume the buffers provided,
+ * it will need to be called multiple times to encrypt an entire buffer
+ * and the object will hold the encryption context internally.
+ *
+ * Some implementation may need enough space in the destination buffer to
+ * encrypt an entire input.
+ *
+ * The end result will move inBuffer.position() by the bytes-read and
+ * outBuffer.position() by the bytes-written. It should not modify the
+ * inBuffer.limit() or outBuffer.limit() to maintain consistency of operation.
+ *
+ * @param inBuffer in direct {@link ByteBuffer} for reading from. Requires
+ * inBuffer != null and inBuffer.remaining() > 0
+ * @param outBuffer out direct {@link ByteBuffer} for storing the results
+ * into. Requires outBuffer != null and outBuffer.remaining() > 0
+ * @throws IOException if encryption fails
+ */
+ public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
+ throws IOException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRCryptoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRCryptoCodec.java
new file mode 100644
index 0000000000..aea9e07ee6
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRCryptoCodec.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.security.GeneralSecurityException;
+
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_JCE_PROVIDER_KEY;
+
+/**
+ * Implement the AES-CTR crypto codec using JCE provider.
+ */
+public class JCEAESCTRCryptoCodec extends AESCTRCryptoCodec {
+ private Configuration conf;
+ private String provider;
+
+ public JCEAESCTRCryptoCodec() {
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ provider = conf.get(HADOOP_SECURITY_CRYPTO_JCE_PROVIDER_KEY);
+ }
+
+ @Override
+ public Encryptor getEncryptor() throws GeneralSecurityException {
+ return new JCEAESCTREncryptor(provider);
+ }
+
+ @Override
+ public Decryptor getDecryptor() throws GeneralSecurityException {
+ return new JCEAESCTRDecryptor(provider);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRDecryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRDecryptor.java
new file mode 100644
index 0000000000..a3fb13f629
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTRDecryptor.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import com.google.common.base.Preconditions;
+
+public class JCEAESCTRDecryptor implements Decryptor {
+ private final Cipher cipher;
+ private boolean contextReset = false;
+
+ public JCEAESCTRDecryptor(String provider) throws GeneralSecurityException {
+ if (provider == null || provider.isEmpty()) {
+ cipher = Cipher.getInstance("AES/CTR/NoPadding");
+ } else {
+ cipher = Cipher.getInstance("AES/CTR/NoPadding", provider);
+ }
+ }
+
+ @Override
+ public void init(byte[] key, byte[] iv) throws IOException {
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(iv);
+ contextReset = false;
+ try {
+ cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(key, "AES"),
+ new IvParameterSpec(iv));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * For AES-CTR, will consume all input data and needs enough space in the
+ * destination buffer to decrypt entire input data.
+ */
+ @Override
+ public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
+ throws IOException {
+ try {
+ int inputSize = inBuffer.remaining();
+ // Cipher#update will maintain decryption context.
+ int n = cipher.update(inBuffer, outBuffer);
+ if (n < inputSize) {
+ /**
+ * Typically code will not get here. Cipher#update will decrypt all
+ * input data and put result in outBuffer.
+ * Cipher#doFinal will reset the decryption context.
+ */
+ contextReset = true;
+ cipher.doFinal(inBuffer, outBuffer);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean isContextReset() {
+ return contextReset;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTREncryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTREncryptor.java
new file mode 100644
index 0000000000..9ee70dc472
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JCEAESCTREncryptor.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import com.google.common.base.Preconditions;
+
+public class JCEAESCTREncryptor implements Encryptor {
+ private final Cipher cipher;
+ private boolean contextReset = false;
+
+ public JCEAESCTREncryptor(String provider) throws GeneralSecurityException {
+ if (provider == null || provider.isEmpty()) {
+ cipher = Cipher.getInstance("AES/CTR/NoPadding");
+ } else {
+ cipher = Cipher.getInstance("AES/CTR/NoPadding", provider);
+ }
+ }
+
+ @Override
+ public void init(byte[] key, byte[] iv) throws IOException {
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(iv);
+ contextReset = false;
+ try {
+ cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(key, "AES"),
+ new IvParameterSpec(iv));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * For AES-CTR, will consume all input data and needs enough space in the
+ * destination buffer to encrypt entire input data.
+ */
+ @Override
+ public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
+ throws IOException {
+ try {
+ int inputSize = inBuffer.remaining();
+ // Cipher#update will maintain encryption context.
+ int n = cipher.update(inBuffer, outBuffer);
+ if (n < inputSize) {
+ /**
+ * Typically code will not get here. Cipher#update will encrypt all
+ * input data and put result in outBuffer.
+ * Cipher#doFinal will reset the encryption context.
+ */
+ contextReset = true;
+ cipher.doFinal(inBuffer, outBuffer);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean isContextReset() {
+ return contextReset;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index b6be29447a..c0853a9ded 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -282,5 +282,16 @@ public class CommonConfigurationKeysPublic {
/** Class to override Sasl Properties for a connection */
public static final String HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS =
"hadoop.security.saslproperties.resolver.class";
+ /** See core-default.xml */
+ public static final String HADOOP_SECURITY_CRYPTO_CODEC_CLASS_KEY =
+ "hadoop.security.crypto.codec.class";
+ /** See core-default.xml */
+ public static final String HADOOP_SECURITY_CRYPTO_JCE_PROVIDER_KEY =
+ "hadoop.security.crypto.jce.provider";
+ /** See core-default.xml */
+ public static final String HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY =
+ "hadoop.security.crypto.buffer.size";
+ /** Defalt value for HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY */
+ public static final int HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT = 8192;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataInputStream.java
new file mode 100644
index 0000000000..8758d28f1d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataInputStream.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.crypto;
+
+import java.io.IOException;
+
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+public class CryptoFSDataInputStream extends FSDataInputStream {
+
+ public CryptoFSDataInputStream(FSDataInputStream in, CryptoCodec codec,
+ int bufferSize, byte[] key, byte[] iv) throws IOException {
+ super(new CryptoInputStream(in, codec, bufferSize, key, iv));
+ }
+
+ public CryptoFSDataInputStream(FSDataInputStream in, CryptoCodec codec,
+ byte[] key, byte[] iv) throws IOException {
+ super(new CryptoInputStream(in, codec, key, iv));
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java
new file mode 100644
index 0000000000..040fbcb879
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.crypto;
+
+import java.io.IOException;
+
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.CryptoOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+public class CryptoFSDataOutputStream extends FSDataOutputStream {
+ private final FSDataOutputStream fsOut;
+
+ public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec,
+ int bufferSize, byte[] key, byte[] iv) throws IOException {
+ super(new CryptoOutputStream(out, codec, bufferSize, key, iv,
+ out.getPos()), null, out.getPos());
+ this.fsOut = out;
+ }
+
+ public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec,
+ byte[] key, byte[] iv) throws IOException {
+ super(new CryptoOutputStream(out, codec, key, iv, out.getPos()),
+ null, out.getPos());
+ this.fsOut = out;
+ }
+
+ @Override
+ public long getPos() {
+ return fsOut.getPos();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index ea0808eef7..6073c1a915 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1348,4 +1348,30 @@
true.
+
+
+ hadoop.security.crypto.codec.class
+
+
+ The default implementation of CryptoCodec which is used for encryption
+ and decryption.
+
+
+
+
+ hadoop.security.crypto.jce.provider
+
+
+ The JCE provider name used in CryptoCodec.
+
+
+
+
+ hadoop.security.crypto.buffer.size
+ 8192
+
+ The buffer size used in Crypto InputStream and OutputStream, and default
+ value is 8192.
+
+
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
new file mode 100644
index 0000000000..7f36c2b249
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
@@ -0,0 +1,712 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.EnumSet;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public abstract class CryptoStreamsTestBase {
+ protected static final Log LOG= LogFactory.getLog(
+ CryptoStreamsTestBase.class);
+
+ protected static CryptoCodec codec;
+ private static final byte[] key = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
+ 0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16};
+ private static final byte[] iv = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
+ 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
+
+ protected static final int count = 10000;
+ protected static int defaultBufferSize = 8192;
+ protected static int smallBufferSize = 1024;
+ private byte[] data;
+ private int dataLen;
+
+ @Before
+ public void setUp() throws IOException {
+ // Generate data
+ int seed = new Random().nextInt();
+ DataOutputBuffer dataBuf = new DataOutputBuffer();
+ RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+ for(int i=0; i < count; ++i) {
+ generator.next();
+ RandomDatum key = generator.getKey();
+ RandomDatum value = generator.getValue();
+
+ key.write(dataBuf);
+ value.write(dataBuf);
+ }
+ LOG.info("Generated " + count + " records");
+ data = dataBuf.getData();
+ dataLen = dataBuf.getLength();
+ }
+
+ protected void writeData(OutputStream out) throws Exception {
+ out.write(data, 0, dataLen);
+ out.close();
+ }
+
+ protected int getDataLen() {
+ return dataLen;
+ }
+
+ private int readAll(InputStream in, byte[] b, int off, int len)
+ throws IOException {
+ int n = 0;
+ int total = 0;
+ while (n != -1) {
+ total += n;
+ if (total >= len) {
+ break;
+ }
+ n = in.read(b, off + total, len - total);
+ }
+
+ return total;
+ }
+
+ protected OutputStream getOutputStream(int bufferSize) throws IOException {
+ return getOutputStream(bufferSize, key, iv);
+ }
+
+ protected abstract OutputStream getOutputStream(int bufferSize, byte[] key,
+ byte[] iv) throws IOException;
+
+ protected InputStream getInputStream(int bufferSize) throws IOException {
+ return getInputStream(bufferSize, key, iv);
+ }
+
+ protected abstract InputStream getInputStream(int bufferSize, byte[] key,
+ byte[] iv) throws IOException;
+
+ /**
+ * Test crypto reading with different buffer size.
+ */
+ @Test(timeout=120000)
+ public void testRead() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ // Default buffer size
+ InputStream in = getInputStream(defaultBufferSize);
+ readCheck(in);
+ in.close();
+
+ // Small buffer size
+ in = getInputStream(smallBufferSize);
+ readCheck(in);
+ in.close();
+ }
+
+ private void readCheck(InputStream in) throws Exception {
+ byte[] result = new byte[dataLen];
+ int n = readAll(in, result, 0, dataLen);
+
+ Assert.assertEquals(dataLen, n);
+ byte[] expectedData = new byte[n];
+ System.arraycopy(data, 0, expectedData, 0, n);
+ Assert.assertArrayEquals(result, expectedData);
+
+ // EOF
+ n = in.read(result, 0, dataLen);
+ Assert.assertEquals(n, -1);
+ in.close();
+ }
+
+ /**
+ * Test crypto with different IV.
+ */
+ @Test(timeout=120000)
+ public void testCryptoIV() throws Exception {
+ byte[] iv1 = iv.clone();
+
+ // Counter base: Long.MAX_VALUE
+ setCounterBaseForIV(iv1, Long.MAX_VALUE);
+ cryptoCheck(iv1);
+
+ // Counter base: Long.MAX_VALUE - 1
+ setCounterBaseForIV(iv1, Long.MAX_VALUE - 1);
+ cryptoCheck(iv1);
+
+ // Counter base: Integer.MAX_VALUE
+ setCounterBaseForIV(iv1, Integer.MAX_VALUE);
+ cryptoCheck(iv1);
+
+ // Counter base: 0
+ setCounterBaseForIV(iv1, 0);
+ cryptoCheck(iv1);
+
+ // Counter base: -1
+ setCounterBaseForIV(iv1, -1);
+ cryptoCheck(iv1);
+ }
+
+ private void cryptoCheck(byte[] iv) throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize, key, iv);
+ writeData(out);
+
+ InputStream in = getInputStream(defaultBufferSize, key, iv);
+ readCheck(in);
+ in.close();
+ }
+
+ private void setCounterBaseForIV(byte[] iv, long counterBase) {
+ ByteBuffer buf = ByteBuffer.wrap(iv);
+ buf.order(ByteOrder.BIG_ENDIAN);
+ buf.putLong(iv.length - 8, counterBase);
+ }
+
+ /**
+ * Test hflush/hsync of crypto output stream, and with different buffer size.
+ */
+ @Test(timeout=120000)
+ public void testSyncable() throws IOException {
+ syncableCheck();
+ }
+
+ private void syncableCheck() throws IOException {
+ OutputStream out = getOutputStream(smallBufferSize);
+ try {
+ int bytesWritten = dataLen/3;
+ out.write(data, 0, bytesWritten);
+ ((Syncable) out).hflush();
+
+ InputStream in = getInputStream(defaultBufferSize);
+ verify(in, bytesWritten, data);
+ in.close();
+
+ out.write(data, bytesWritten, dataLen - bytesWritten);
+ ((Syncable) out).hsync();
+
+ in = getInputStream(defaultBufferSize);
+ verify(in, dataLen, data);
+ in.close();
+ } finally {
+ out.close();
+ }
+ }
+
+ private void verify(InputStream in, int bytesToVerify,
+ byte[] expectedBytes) throws IOException {
+ byte[] readBuf = new byte[bytesToVerify];
+ readAll(in, readBuf, 0, bytesToVerify);
+ for (int i=0; i= len) {
+ break;
+ }
+ n = ((PositionedReadable) in).read(pos + total, b, off + total,
+ len - total);
+ }
+
+ return total;
+ }
+
+ /**
+ * Test positioned read.
+ */
+ @Test(timeout=120000)
+ public void testPositionedRead() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ InputStream in = getInputStream(defaultBufferSize);
+ // Pos: 1/3 dataLen
+ positionedReadCheck(in , dataLen/3);
+
+ // Pos: 1/2 dataLen
+ positionedReadCheck(in, dataLen/2);
+ in.close();
+ }
+
+ private void positionedReadCheck(InputStream in, int pos) throws Exception {
+ byte[] result = new byte[dataLen];
+ int n = readAll(in, pos, result, 0, dataLen);
+
+ Assert.assertEquals(dataLen, n + pos);
+ byte[] readData = new byte[n];
+ System.arraycopy(result, 0, readData, 0, n);
+ byte[] expectedData = new byte[n];
+ System.arraycopy(data, pos, expectedData, 0, n);
+ Assert.assertArrayEquals(readData, expectedData);
+ }
+
+ /**
+ * Test read fully
+ */
+ @Test(timeout=120000)
+ public void testReadFully() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ InputStream in = getInputStream(defaultBufferSize);
+ final int len1 = dataLen/4;
+ // Read len1 bytes
+ byte [] readData = new byte[len1];
+ readAll(in, readData, 0, len1);
+ byte[] expectedData = new byte[len1];
+ System.arraycopy(data, 0, expectedData, 0, len1);
+ Assert.assertArrayEquals(readData, expectedData);
+
+ // Pos: 1/3 dataLen
+ readFullyCheck(in, dataLen/3);
+
+ // Read len1 bytes
+ readData = new byte[len1];
+ readAll(in, readData, 0, len1);
+ expectedData = new byte[len1];
+ System.arraycopy(data, len1, expectedData, 0, len1);
+ Assert.assertArrayEquals(readData, expectedData);
+
+ // Pos: 1/2 dataLen
+ readFullyCheck(in, dataLen/2);
+
+ // Read len1 bytes
+ readData = new byte[len1];
+ readAll(in, readData, 0, len1);
+ expectedData = new byte[len1];
+ System.arraycopy(data, 2 * len1, expectedData, 0, len1);
+ Assert.assertArrayEquals(readData, expectedData);
+
+ in.close();
+ }
+
+ private void readFullyCheck(InputStream in, int pos) throws Exception {
+ byte[] result = new byte[dataLen - pos];
+ ((PositionedReadable) in).readFully(pos, result);
+
+ byte[] expectedData = new byte[dataLen - pos];
+ System.arraycopy(data, pos, expectedData, 0, dataLen - pos);
+ Assert.assertArrayEquals(result, expectedData);
+
+ result = new byte[dataLen]; // Exceeds maximum length
+ try {
+ ((PositionedReadable) in).readFully(pos, result);
+ Assert.fail("Read fully exceeds maximum length should fail.");
+ } catch (IOException e) {
+ }
+ }
+
+ /**
+ * Test seek to different position.
+ */
+ @Test(timeout=120000)
+ public void testSeek() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ InputStream in = getInputStream(defaultBufferSize);
+ // Pos: 1/3 dataLen
+ seekCheck(in, dataLen/3);
+
+ // Pos: 0
+ seekCheck(in, 0);
+
+ // Pos: 1/2 dataLen
+ seekCheck(in, dataLen/2);
+
+ // Pos: -3
+ try {
+ seekCheck(in, -3);
+ Assert.fail("Seek to negative offset should fail.");
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains("Cannot seek to negative " +
+ "offset", e);
+ }
+
+ // Pos: dataLen + 3
+ try {
+ seekCheck(in, dataLen + 3);
+ Assert.fail("Seek after EOF should fail.");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains("Cannot seek after EOF", e);
+ }
+
+ in.close();
+ }
+
+ private void seekCheck(InputStream in, int pos) throws Exception {
+ byte[] result = new byte[dataLen];
+ ((Seekable) in).seek(pos);
+ int n = readAll(in, result, 0, dataLen);
+
+ Assert.assertEquals(dataLen, n + pos);
+ byte[] readData = new byte[n];
+ System.arraycopy(result, 0, readData, 0, n);
+ byte[] expectedData = new byte[n];
+ System.arraycopy(data, pos, expectedData, 0, n);
+ Assert.assertArrayEquals(readData, expectedData);
+ }
+
+ /**
+ * Test get position.
+ */
+ @Test(timeout=120000)
+ public void testGetPos() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ // Default buffer size
+ InputStream in = getInputStream(defaultBufferSize);
+ byte[] result = new byte[dataLen];
+ int n1 = readAll(in, result, 0, dataLen/3);
+ Assert.assertEquals(n1, ((Seekable) in).getPos());
+
+ int n2 = readAll(in, result, n1, dataLen - n1);
+ Assert.assertEquals(n1 + n2, ((Seekable) in).getPos());
+ in.close();
+ }
+
+ @Test(timeout=120000)
+ public void testAvailable() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ // Default buffer size
+ InputStream in = getInputStream(defaultBufferSize);
+ byte[] result = new byte[dataLen];
+ int n1 = readAll(in, result, 0, dataLen/3);
+ Assert.assertEquals(in.available(), dataLen - n1);
+
+ int n2 = readAll(in, result, n1, dataLen - n1);
+ Assert.assertEquals(in.available(), dataLen - n1 - n2);
+ in.close();
+ }
+
+ /**
+ * Test skip.
+ */
+ @Test(timeout=120000)
+ public void testSkip() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ // Default buffer size
+ InputStream in = getInputStream(defaultBufferSize);
+ byte[] result = new byte[dataLen];
+ int n1 = readAll(in, result, 0, dataLen/3);
+ Assert.assertEquals(n1, ((Seekable) in).getPos());
+
+ long skipped = in.skip(dataLen/3);
+ int n2 = readAll(in, result, 0, dataLen);
+
+ Assert.assertEquals(dataLen, n1 + skipped + n2);
+ byte[] readData = new byte[n2];
+ System.arraycopy(result, 0, readData, 0, n2);
+ byte[] expectedData = new byte[n2];
+ System.arraycopy(data, dataLen - n2, expectedData, 0, n2);
+ Assert.assertArrayEquals(readData, expectedData);
+
+ try {
+ skipped = in.skip(-3);
+ Assert.fail("Skip Negative length should fail.");
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains("Negative skip length", e);
+ }
+
+ // Skip after EOF
+ skipped = in.skip(3);
+ Assert.assertEquals(skipped, 0);
+
+ in.close();
+ }
+
+ private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
+ int bufPos) throws Exception {
+ buf.position(bufPos);
+ int n = ((ByteBufferReadable) in).read(buf);
+ byte[] readData = new byte[n];
+ buf.rewind();
+ buf.position(bufPos);
+ buf.get(readData);
+ byte[] expectedData = new byte[n];
+ System.arraycopy(data, 0, expectedData, 0, n);
+ Assert.assertArrayEquals(readData, expectedData);
+ }
+
+ /**
+ * Test byte buffer read with different buffer size.
+ */
+ @Test(timeout=120000)
+ public void testByteBufferRead() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ // Default buffer size, initial buffer position is 0
+ InputStream in = getInputStream(defaultBufferSize);
+ ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);
+ byteBufferReadCheck(in, buf, 0);
+ in.close();
+
+ // Default buffer size, initial buffer position is not 0
+ in = getInputStream(defaultBufferSize);
+ buf.clear();
+ byteBufferReadCheck(in, buf, 11);
+ in.close();
+
+ // Small buffer size, initial buffer position is 0
+ in = getInputStream(smallBufferSize);
+ buf.clear();
+ byteBufferReadCheck(in, buf, 0);
+ in.close();
+
+ // Small buffer size, initial buffer position is not 0
+ in = getInputStream(smallBufferSize);
+ buf.clear();
+ byteBufferReadCheck(in, buf, 11);
+ in.close();
+
+ // Direct buffer, default buffer size, initial buffer position is 0
+ in = getInputStream(defaultBufferSize);
+ buf = ByteBuffer.allocateDirect(dataLen + 100);
+ byteBufferReadCheck(in, buf, 0);
+ in.close();
+
+ // Direct buffer, default buffer size, initial buffer position is not 0
+ in = getInputStream(defaultBufferSize);
+ buf.clear();
+ byteBufferReadCheck(in, buf, 11);
+ in.close();
+
+ // Direct buffer, small buffer size, initial buffer position is 0
+ in = getInputStream(smallBufferSize);
+ buf.clear();
+ byteBufferReadCheck(in, buf, 0);
+ in.close();
+
+ // Direct buffer, small buffer size, initial buffer position is not 0
+ in = getInputStream(smallBufferSize);
+ buf.clear();
+ byteBufferReadCheck(in, buf, 11);
+ in.close();
+ }
+
+ @Test(timeout=120000)
+ public void testCombinedOp() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ final int len1 = dataLen/8;
+ final int len2 = dataLen/10;
+
+ InputStream in = getInputStream(defaultBufferSize);
+ // Read len1 data.
+ byte[] readData = new byte[len1];
+ readAll(in, readData, 0, len1);
+ byte[] expectedData = new byte[len1];
+ System.arraycopy(data, 0, expectedData, 0, len1);
+ Assert.assertArrayEquals(readData, expectedData);
+
+ long pos = ((Seekable) in).getPos();
+ Assert.assertEquals(len1, pos);
+
+ // Seek forward len2
+ ((Seekable) in).seek(pos + len2);
+ // Skip forward len2
+ long n = in.skip(len2);
+ Assert.assertEquals(len2, n);
+
+ // Pos: 1/4 dataLen
+ positionedReadCheck(in , dataLen/4);
+
+ // Pos should be len1 + len2 + len2
+ pos = ((Seekable) in).getPos();
+ Assert.assertEquals(len1 + len2 + len2, pos);
+
+ // Read forward len1
+ ByteBuffer buf = ByteBuffer.allocate(len1);
+ int nRead = ((ByteBufferReadable) in).read(buf);
+ readData = new byte[nRead];
+ buf.rewind();
+ buf.get(readData);
+ expectedData = new byte[nRead];
+ System.arraycopy(data, (int)pos, expectedData, 0, nRead);
+ Assert.assertArrayEquals(readData, expectedData);
+
+ // Pos should be len1 + 2 * len2 + nRead
+ pos = ((Seekable) in).getPos();
+ Assert.assertEquals(len1 + 2 * len2 + nRead, pos);
+
+ // Pos: 1/3 dataLen
+ positionedReadCheck(in , dataLen/3);
+
+ // Read forward len1
+ readData = new byte[len1];
+ readAll(in, readData, 0, len1);
+ expectedData = new byte[len1];
+ System.arraycopy(data, (int)pos, expectedData, 0, len1);
+ Assert.assertArrayEquals(readData, expectedData);
+
+ // Pos should be 2 * len1 + 2 * len2 + nRead
+ pos = ((Seekable) in).getPos();
+ Assert.assertEquals(2 * len1 + 2 * len2 + nRead, pos);
+
+ // Read forward len1
+ buf = ByteBuffer.allocate(len1);
+ nRead = ((ByteBufferReadable) in).read(buf);
+ readData = new byte[nRead];
+ buf.rewind();
+ buf.get(readData);
+ expectedData = new byte[nRead];
+ System.arraycopy(data, (int)pos, expectedData, 0, nRead);
+ Assert.assertArrayEquals(readData, expectedData);
+
+ // ByteBuffer read after EOF
+ ((Seekable) in).seek(dataLen);
+ buf.clear();
+ n = ((ByteBufferReadable) in).read(buf);
+ Assert.assertEquals(n, -1);
+
+ in.close();
+ }
+
+ @Test(timeout=120000)
+ public void testSeekToNewSource() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ InputStream in = getInputStream(defaultBufferSize);
+
+ final int len1 = dataLen/8;
+ byte[] readData = new byte[len1];
+ readAll(in, readData, 0, len1);
+
+ // Pos: 1/3 dataLen
+ seekToNewSourceCheck(in, dataLen/3);
+
+ // Pos: 0
+ seekToNewSourceCheck(in, 0);
+
+ // Pos: 1/2 dataLen
+ seekToNewSourceCheck(in, dataLen/2);
+
+ // Pos: -3
+ try {
+ seekToNewSourceCheck(in, -3);
+ Assert.fail("Seek to negative offset should fail.");
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains("Cannot seek to negative " +
+ "offset", e);
+ }
+
+ // Pos: dataLen + 3
+ try {
+ seekToNewSourceCheck(in, dataLen + 3);
+ Assert.fail("Seek after EOF should fail.");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains("Attempted to read past end of file", e);
+ }
+
+ in.close();
+ }
+
+ private void seekToNewSourceCheck(InputStream in, int targetPos)
+ throws Exception {
+ byte[] result = new byte[dataLen];
+ ((Seekable) in).seekToNewSource(targetPos);
+ int n = readAll(in, result, 0, dataLen);
+
+ Assert.assertEquals(dataLen, n + targetPos);
+ byte[] readData = new byte[n];
+ System.arraycopy(result, 0, readData, 0, n);
+ byte[] expectedData = new byte[n];
+ System.arraycopy(data, targetPos, expectedData, 0, n);
+ Assert.assertArrayEquals(readData, expectedData);
+ }
+
+ private ByteBufferPool getBufferPool() {
+ return new ByteBufferPool() {
+ @Override
+ public ByteBuffer getBuffer(boolean direct, int length) {
+ return ByteBuffer.allocateDirect(length);
+ }
+
+ @Override
+ public void putBuffer(ByteBuffer buffer) {
+ }
+ };
+ }
+
+ @Test(timeout=120000)
+ public void testHasEnhancedByteBufferAccess() throws Exception {
+ OutputStream out = getOutputStream(defaultBufferSize);
+ writeData(out);
+
+ InputStream in = getInputStream(defaultBufferSize);
+ final int len1 = dataLen/8;
+ // ByteBuffer size is len1
+ ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).read(
+ getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ int n1 = buffer.remaining();
+ byte[] readData = new byte[n1];
+ buffer.get(readData);
+ byte[] expectedData = new byte[n1];
+ System.arraycopy(data, 0, expectedData, 0, n1);
+ Assert.assertArrayEquals(readData, expectedData);
+ ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
+
+ // Read len1 bytes
+ readData = new byte[len1];
+ readAll(in, readData, 0, len1);
+ expectedData = new byte[len1];
+ System.arraycopy(data, n1, expectedData, 0, len1);
+ Assert.assertArrayEquals(readData, expectedData);
+
+ // ByteBuffer size is len1
+ buffer = ((HasEnhancedByteBufferAccess) in).read(
+ getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+ int n2 = buffer.remaining();
+ readData = new byte[n2];
+ buffer.get(readData);
+ expectedData = new byte[n2];
+ System.arraycopy(data, n1 + len1, expectedData, 0, n2);
+ Assert.assertArrayEquals(readData, expectedData);
+ ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
+
+ in.close();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
new file mode 100644
index 0000000000..ebe025b0ea
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.HasFileDescriptor;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestCryptoStreams extends CryptoStreamsTestBase {
+ /**
+ * Data storage.
+ * {@link #getOutputStream(int)} will write to this buf.
+ * {@link #getInputStream(int)} will read from this buf.
+ */
+ private byte[] buf;
+ private int bufLen;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ Configuration conf = new Configuration();
+ codec = CryptoCodec.getInstance(conf);
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ }
+
+ @Override
+ protected OutputStream getOutputStream(int bufferSize, byte[] key, byte[] iv)
+ throws IOException {
+ DataOutputBuffer out = new DataOutputBuffer() {
+ @Override
+ public void flush() throws IOException {
+ buf = getData();
+ bufLen = getLength();
+ }
+ @Override
+ public void close() throws IOException {
+ buf = getData();
+ bufLen = getLength();
+ }
+ };
+ return new CryptoOutputStream(new FakeOutputStream(out),
+ codec, bufferSize, key, iv);
+ }
+
+ @Override
+ protected InputStream getInputStream(int bufferSize, byte[] key, byte[] iv)
+ throws IOException {
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(buf, 0, bufLen);
+ return new CryptoInputStream(new FakeInputStream(in), codec, bufferSize,
+ key, iv);
+ }
+
+ private class FakeOutputStream extends OutputStream
+ implements Syncable, CanSetDropBehind{
+ private final byte[] oneByteBuf = new byte[1];
+ private final DataOutputBuffer out;
+ private boolean closed;
+
+ public FakeOutputStream(DataOutputBuffer out) {
+ this.out = out;
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return;
+ }
+
+ checkStream();
+
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ checkStream();
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ out.close();
+ closed = true;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ oneByteBuf[0] = (byte)(b & 0xff);
+ write(oneByteBuf, 0, oneByteBuf.length);
+ }
+
+ @Override
+ public void setDropBehind(Boolean dropCache) throws IOException,
+ UnsupportedOperationException {
+ }
+
+ @Override
+ public void hflush() throws IOException {
+ checkStream();
+ flush();
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ checkStream();
+ flush();
+ }
+
+ private void checkStream() throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed!");
+ }
+ }
+ }
+
+ private class FakeInputStream extends InputStream implements
+ Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
+ CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
+ private final byte[] oneByteBuf = new byte[1];
+ private int pos = 0;
+ private final byte[] data;
+ private final int length;
+ private boolean closed = false;
+
+ public FakeInputStream(DataInputBuffer in) {
+ data = in.getData();
+ length = in.getLength();
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ if (pos > length) {
+ throw new IOException("Cannot seek after EOF.");
+ }
+ if (pos < 0) {
+ throw new IOException("Cannot seek to negative offset.");
+ }
+ checkStream();
+ this.pos = (int)pos;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return length - pos;
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ checkStream();
+
+ if (pos < length) {
+ int n = (int) Math.min(len, length - pos);
+ System.arraycopy(data, pos, b, off, n);
+ pos += n;
+ return n;
+ }
+
+ return -1;
+ }
+
+ private void checkStream() throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed!");
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ checkStream();
+ if (pos < length) {
+ int n = (int) Math.min(buf.remaining(), length - pos);
+ if (n > 0) {
+ buf.put(data, pos, n);
+ }
+ pos += n;
+ return n;
+ }
+ return -1;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ checkStream();
+ if ( n > 0 ) {
+ if( n + pos > length ) {
+ n = length - pos;
+ }
+ pos += n;
+ return n;
+ }
+ return n < 0 ? -1 : 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ }
+
+ @Override
+ public int read(long position, byte[] b, int off, int len)
+ throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ if (position > length) {
+ throw new IOException("Cannot read after EOF.");
+ }
+ if (position < 0) {
+ throw new IOException("Cannot read to negative offset.");
+ }
+
+ checkStream();
+
+ if (position < length) {
+ int n = (int) Math.min(len, length - position);
+ System.arraycopy(data, (int)position, b, off, n);
+ return n;
+ }
+
+ return -1;
+ }
+
+ @Override
+ public void readFully(long position, byte[] b, int off, int len)
+ throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return;
+ }
+
+ if (position > length) {
+ throw new IOException("Cannot read after EOF.");
+ }
+ if (position < 0) {
+ throw new IOException("Cannot read to negative offset.");
+ }
+
+ checkStream();
+
+ if (position + len > length) {
+ throw new EOFException("Reach the end of stream.");
+ }
+
+ System.arraycopy(data, (int)position, b, off, len);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ readFully(position, buffer, 0, buffer.length);
+ }
+
+ @Override
+ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
+ EnumSet opts) throws IOException,
+ UnsupportedOperationException {
+ if (bufferPool == null) {
+ throw new IOException("Please specify buffer pool.");
+ }
+ ByteBuffer buffer = bufferPool.getBuffer(true, maxLength);
+ int pos = buffer.position();
+ int n = read(buffer);
+ if (n >= 0) {
+ buffer.position(pos);
+ return buffer;
+ }
+
+ return null;
+ }
+
+ @Override
+ public void releaseBuffer(ByteBuffer buffer) {
+
+ }
+
+ @Override
+ public void setReadahead(Long readahead) throws IOException,
+ UnsupportedOperationException {
+ }
+
+ @Override
+ public void setDropBehind(Boolean dropCache) throws IOException,
+ UnsupportedOperationException {
+ }
+
+ @Override
+ public FileDescriptor getFileDescriptor() throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ if (targetPos > length) {
+ throw new IOException("Attempted to read past end of file.");
+ }
+ if (targetPos < 0) {
+ throw new IOException("Cannot seek after EOF.");
+ }
+ checkStream();
+ this.pos = (int)targetPos;
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int ret = read( oneByteBuf, 0, 1 );
+ return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
new file mode 100644
index 0000000000..286fb6a3d2
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase {
+ private static final String TEST_ROOT_DIR
+ = System.getProperty("test.build.data","build/test/data") + "/work-dir/localfs";
+
+ private final File base = new File(TEST_ROOT_DIR);
+ private final Path file = new Path(TEST_ROOT_DIR, "test-file");
+ private static LocalFileSystem fileSys;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ Configuration conf = new Configuration();
+ conf = new Configuration(false);
+ conf.set("fs.file.impl", LocalFileSystem.class.getName());
+ fileSys = FileSystem.getLocal(conf);
+ codec = CryptoCodec.getInstance(conf);
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ }
+
+ @Before
+ @Override
+ public void setUp() throws IOException {
+ fileSys.delete(new Path(TEST_ROOT_DIR), true);
+ super.setUp();
+ }
+
+ @After
+ public void cleanUp() throws IOException {
+ FileUtil.setWritable(base, true);
+ FileUtil.fullyDelete(base);
+ assertTrue(!base.exists());
+ }
+
+ @Override
+ protected OutputStream getOutputStream(int bufferSize, byte[] key, byte[] iv)
+ throws IOException {
+ return new CryptoOutputStream(fileSys.create(file), codec, bufferSize,
+ key, iv);
+ }
+
+ @Override
+ protected InputStream getInputStream(int bufferSize, byte[] key, byte[] iv)
+ throws IOException {
+ return new CryptoInputStream(fileSys.open(file), codec, bufferSize,
+ key, iv);
+ }
+
+ @Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read")
+ @Override
+ @Test(timeout=1000)
+ public void testByteBufferRead() throws Exception {}
+
+ @Ignore("ChecksumFSOutputSummer doesn't support Syncable")
+ @Override
+ @Test(timeout=1000)
+ public void testSyncable() throws IOException {}
+
+ @Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read")
+ @Override
+ @Test(timeout=1000)
+ public void testCombinedOp() throws Exception {}
+
+ @Ignore("ChecksumFSInputChecker doesn't support enhanced ByteBuffer access")
+ @Override
+ @Test(timeout=1000)
+ public void testHasEnhancedByteBufferAccess() throws Exception {
+ }
+
+ @Ignore("ChecksumFSInputChecker doesn't support seekToNewSource")
+ @Override
+ @Test(timeout=1000)
+ public void testSeekToNewSource() throws Exception {
+ }
+}