HADOOP-10603. Crypto input and output streams implementing Hadoop stream interfaces. Contributed by Yi Liu and Charles Lamb.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1597230 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
@ -8,6 +8,9 @@ fs-encryption (Unreleased)
HADOOP-10603. Crypto input and output streams implementing Hadoop stream
interfaces. (Yi Liu and Charles Lamb)
@ -0,0 +1,57 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
public abstract class AESCTRCryptoCodec extends CryptoCodec {
* For AES, the algorithm block is fixed size of 128 bits.
* @see http://en.wikipedia.org/wiki/Advanced_Encryption_Standard
private static final int AES_BLOCK_SIZE = 16;
public int getAlgorithmBlockSize() {
* IV is produced by combining initial IV and the counter using addition.
* IV length should be the same as {@link #AES_BLOCK_SIZE}
public void calculateIV(byte[] initIV, long counter, byte[] IV) {
Preconditions.checkArgument(initIV.length == AES_BLOCK_SIZE);
Preconditions.checkArgument(IV.length == AES_BLOCK_SIZE);
ByteBuffer buf = ByteBuffer.wrap(IV);
counter += buf.getLong(AES_BLOCK_SIZE - 8);
buf.putLong(AES_BLOCK_SIZE - 8, counter);
@ -0,0 +1,82 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.security.GeneralSecurityException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASS_KEY;
* Crypto codec class, encapsulates encryptor/decryptor pair.
public abstract class CryptoCodec implements Configurable {
public static CryptoCodec getInstance(Configuration conf) {
final Class<? extends CryptoCodec> klass = conf.getClass(
return ReflectionUtils.newInstance(klass, conf);
* Get block size of a block cipher.
* For different algorithms, the block size may be different.
* @return int block size
public abstract int getAlgorithmBlockSize();
* Get a {@link #org.apache.hadoop.crypto.Encryptor}.
* @return Encryptor
public abstract Encryptor getEncryptor() throws GeneralSecurityException;
* Get a {@link #org.apache.hadoop.crypto.Decryptor}.
* @return Decryptor
public abstract Decryptor getDecryptor() throws GeneralSecurityException;
* This interface is only for Counter (CTR) mode. Typically calculating
* IV(Initialization Vector) is up to Encryptor or Decryptor, for
* example {@link #javax.crypto.Cipher} will maintain encryption context
* internally when do encryption/decryption continuously using its
* Cipher#update interface.
* <p/>
* In Hadoop, multiple nodes may read splits of a file, so decrypting of
* file is not continuous, even for encrypting may be not continuous. For
* each part, we need to calculate the counter through file position.
* <p/>
* Typically IV for a file position is produced by combining initial IV and
* the counter using any lossless operation (concatenation, addition, or XOR).
* @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29
* @param initIV initial IV
* @param counter counter for input stream position
* @param IV the IV for input stream position
public abstract void calculateIV(byte[] initIV, long counter, byte[] IV);
@ -0,0 +1,613 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.ByteBufferPool;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT;
import com.google.common.base.Preconditions;
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
* mapping. The decryption is buffer based. The key points of the decryption
* are (1) calculating the counter and (2) padding through stream position:
* <p/>
* counter = base + pos/(algorithm blocksize);
* padding = pos%(algorithm blocksize);
* <p/>
* The underlying stream offset is maintained as state.
public class CryptoInputStream extends FilterInputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
private static final int MIN_BUFFER_SIZE = 512;
private static final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Decryptor decryptor;
* Input data buffer. The data starts at inBuffer.position() and ends at
* to inBuffer.limit().
private ByteBuffer inBuffer;
* The decrypted data buffer. The data starts at outBuffer.position() and
* ends at outBuffer.limit();
private ByteBuffer outBuffer;
private long streamOffset = 0; // Underlying stream offset.
* Whether underlying stream supports
* {@link #org.apache.hadoop.fs.ByteBufferReadable}
private Boolean usingByteBufferRead = null;
* Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer}
* before any other data goes in. The purpose of padding is to put input data
* at proper position.
private byte padding;
private boolean closed;
private final byte[] key;
private final byte[] initIV;
private byte[] iv;
public CryptoInputStream(InputStream in, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv) throws IOException {
Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE,
"Minimum value of buffer size is 512.");
this.key = key;
this.initIV = iv;
this.iv = iv.clone();
inBuffer = ByteBuffer.allocateDirect(bufferSize);
outBuffer = ByteBuffer.allocateDirect(bufferSize);
this.codec = codec;
try {
decryptor = codec.getDecryptor();
} catch (GeneralSecurityException e) {
throw new IOException(e);
if (in instanceof Seekable) {
streamOffset = ((Seekable) in).getPos();
public CryptoInputStream(InputStream in, CryptoCodec codec,
byte[] key, byte[] iv) throws IOException {
this(in, codec, getBufferSize(codec.getConf()), key, iv);
public InputStream getWrappedStream() {
return in;
* Decryption is buffer based.
* If there is data in {@link #outBuffer}, then read it out of this buffer.
* If there is no data in {@link #outBuffer}, then read more from the
* underlying stream and do the decryption.
* @param b the buffer into which the decrypted data is read.
* @param off the buffer offset.
* @param len the maximum number of decrypted data bytes to read.
* @return int the total number of decrypted data bytes read into the buffer.
* @throws IOException
public int read(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
int remaining = outBuffer.remaining();
if (remaining > 0) {
int n = Math.min(len, remaining);
outBuffer.get(b, off, n);
return n;
} else {
int n = 0;
* Check whether the underlying stream is {@link ByteBufferReadable},
* it can avoid bytes copy.
if (usingByteBufferRead == null) {
if (in instanceof ByteBufferReadable) {
try {
n = ((ByteBufferReadable) in).read(inBuffer);
usingByteBufferRead = Boolean.TRUE;
} catch (UnsupportedOperationException e) {
usingByteBufferRead = Boolean.FALSE;
if (!usingByteBufferRead.booleanValue()) {
n = readFromUnderlyingStream();
} else {
if (usingByteBufferRead.booleanValue()) {
n = ((ByteBufferReadable) in).read(inBuffer);
} else {
n = readFromUnderlyingStream();
if (n <= 0) {
return n;
streamOffset += n; // Read n bytes
n = Math.min(len, outBuffer.remaining());
outBuffer.get(b, off, n);
return n;
// Read data from underlying stream.
private int readFromUnderlyingStream() throws IOException {
int toRead = inBuffer.remaining();
byte[] tmp = getTmpBuf();
int n = in.read(tmp, 0, toRead);
if (n > 0) {
inBuffer.put(tmp, 0, n);
return n;
private byte[] tmpBuf;
private byte[] getTmpBuf() {
if (tmpBuf == null) {
tmpBuf = new byte[inBuffer.capacity()];
return tmpBuf;
* Do the decryption using {@link #inBuffer} as input and {@link #outBuffer}
* as output.
private void decrypt() throws IOException {
Preconditions.checkState(inBuffer.position() >= padding);
if(inBuffer.position() == padding) {
// There is no real data in inBuffer.
decryptor.decrypt(inBuffer, outBuffer);
if (padding > 0) {
* The plain text and cipher text have 1:1 mapping, they start at same
* position.
padding = 0;
if (decryptor.isContextReset()) {
* Typically we will not get here. To improve performance in CTR mode,
* we rely on the decryptor maintaining context, for example calculating
* the counter. Unfortunately, some bad implementations can't maintain
* context so we need to re-init after doing decryption.
* Update the {@link #decryptor}. Calculate the counter and {@link #padding}.
private void updateDecryptor() throws IOException {
long counter = streamOffset / codec.getAlgorithmBlockSize();
padding = (byte)(streamOffset % codec.getAlgorithmBlockSize());
inBuffer.position(padding); // Set proper position for input data.
codec.calculateIV(initIV, counter, iv);
decryptor.init(key, iv);
* Reset the underlying stream offset; and clear {@link #inBuffer} and
* {@link #outBuffer}. Typically this happens when doing {@link #seek(long)}
* or {@link #skip(long)}.
private void resetStreamOffset(long offset) throws IOException {
streamOffset = offset;
public void close() throws IOException {
if (closed) {
closed = true;
* Free the direct buffer manually.
private void freeBuffers() {
sun.misc.Cleaner inBufferCleaner =
((sun.nio.ch.DirectBuffer) inBuffer).cleaner();
sun.misc.Cleaner outBufferCleaner =
((sun.nio.ch.DirectBuffer) outBuffer).cleaner();
// Positioned read.
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
try {
int n = ((PositionedReadable) in).read(position, buffer, offset, length);
if (n > 0) {
* Since this operation does not change the current offset of a file,
* streamOffset should be not changed and we need to restore the
* decryptor and outBuffer after decryption.
decrypt(position, buffer, offset, length);
return n;
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned read.");
* Decrypt given length of data in buffer: start from offset.
* Output is also buffer and start from same offset. Restore the
* {@link #decryptor} and {@link #outBuffer} after decryption.
private void decrypt(long position, byte[] buffer, int offset, int length)
throws IOException {
byte[] tmp = getTmpBuf();
int unread = outBuffer.remaining();
if (unread > 0) { // Cache outBuffer
outBuffer.get(tmp, 0, unread);
long curOffset = streamOffset;
int n = 0;
while (n < length) {
int toDecrypt = Math.min(length - n, inBuffer.remaining());
inBuffer.put(buffer, offset + n, toDecrypt);
// Do decryption
outBuffer.get(buffer, offset + n, toDecrypt);
n += toDecrypt;
// After decryption
if (unread > 0) { // Restore outBuffer
outBuffer.put(tmp, 0, unread);
// Positioned read fully.
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
try {
((PositionedReadable) in).readFully(position, buffer, offset, length);
if (length > 0) {
* Since this operation does not change the current offset of a file,
* streamOffset should be not changed and we need to restore the decryptor
* and outBuffer after decryption.
decrypt(position, buffer, offset, length);
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned readFully.");
public void readFully(long position, byte[] buffer) throws IOException {
readFully(position, buffer, 0, buffer.length);
// Seek to a position.
public void seek(long pos) throws IOException {
Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
try {
// If target pos we have already read and decrypt.
if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
if (forward > 0) {
outBuffer.position(outBuffer.position() + forward);
} else {
((Seekable) in).seek(pos);
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
// Skip n bytes
public long skip(long n) throws IOException {
Preconditions.checkArgument(n >= 0, "Negative skip length.");
if (n == 0) {
return 0;
} else if (n <= outBuffer.remaining()) {
int pos = outBuffer.position() + (int) n;
return n;
} else {
* Subtract outBuffer.remaining() to see how many bytes we need to
* skip in underlying stream. We get real skipped bytes number of
* underlying stream then add outBuffer.remaining() to get skipped
* bytes number from user's view.
n -= outBuffer.remaining();
long skipped = in.skip(n);
if (skipped < 0) {
skipped = 0;
long pos = streamOffset + skipped;
skipped += outBuffer.remaining();
return skipped;
// Get underlying stream position.
public long getPos() throws IOException {
// Equals: ((Seekable) in).getPos() - outBuffer.remaining()
return streamOffset - outBuffer.remaining();
// ByteBuffer read.
public int read(ByteBuffer buf) throws IOException {
if (in instanceof ByteBufferReadable) {
int unread = outBuffer.remaining();
if (unread > 0) { // Have unread decrypted data in buffer.
int toRead = buf.remaining();
if (toRead <= unread) {
int limit = outBuffer.limit();
outBuffer.limit(outBuffer.position() + toRead);
return toRead;
} else {
int pos = buf.position();
int n = ((ByteBufferReadable) in).read(buf);
if (n > 0) {
streamOffset += n; // Read n bytes
decrypt(buf, n, pos);
return n;
throw new UnsupportedOperationException("ByteBuffer read unsupported " +
"by input stream.");
* Decrypt all data in buf: total n bytes from given start position.
* Output is also buf and same start position.
* buf.position() and buf.limit() should be unchanged after decryption.
private void decrypt(ByteBuffer buf, int n, int start)
throws IOException {
int pos = buf.position();
int limit = buf.limit();
int len = 0;
while (len < n) {
buf.position(start + len);
buf.limit(start + len + Math.min(n - len, inBuffer.remaining()));
// Do decryption
buf.position(start + len);
len += outBuffer.remaining();
public int available() throws IOException {
return in.available() + outBuffer.remaining();
public boolean markSupported() {
return false;
public void mark(int readLimit) {
public void reset() throws IOException {
throw new IOException("Mark/reset not supported");
public boolean seekToNewSource(long targetPos) throws IOException {
Preconditions.checkArgument(targetPos >= 0,
"Cannot seek to negative offset.");
try {
boolean result = ((Seekable) in).seekToNewSource(targetPos);
return result;
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
try {
if (outBuffer.remaining() > 0) {
// Have some decrypted data unread, need to reset.
((Seekable) in).seek(getPos());
ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
read(bufferPool, maxLength, opts);
if (buffer != null) {
int n = buffer.remaining();
if (n > 0) {
streamOffset += buffer.remaining(); // Read n bytes
int pos = buffer.position();
decrypt(buffer, n, pos);
return buffer;
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"enhanced byte buffer access.");
public void releaseBuffer(ByteBuffer buffer) {
try {
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"release buffer.");
public void setReadahead(Long readahead) throws IOException,
UnsupportedOperationException {
try {
((CanSetReadahead) in).setReadahead(readahead);
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"setting the readahead caching strategy.");
public void setDropBehind(Boolean dropCache) throws IOException,
UnsupportedOperationException {
try {
((CanSetDropBehind) in).setDropBehind(dropCache);
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not " +
"support setting the drop-behind caching setting.");
public FileDescriptor getFileDescriptor() throws IOException {
if (in instanceof HasFileDescriptor) {
return ((HasFileDescriptor) in).getFileDescriptor();
} else if (in instanceof FileInputStream) {
return ((FileInputStream) in).getFD();
} else {
return null;
public int read() throws IOException {
return (read(oneByteBuf, 0, 1) == -1) ? -1 : (oneByteBuf[0] & 0xff);
private void checkStream() throws IOException {
if (closed) {
throw new IOException("Stream closed");
private static int getBufferSize(Configuration conf) {
@ -0,0 +1,291 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.Syncable;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT;
import com.google.common.base.Preconditions;
* CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
* mapping. The encryption is buffer based. The key points of the encryption are
* (1) calculating counter and (2) padding through stream position.
* <p/>
* counter = base + pos/(algorithm blocksize);
* padding = pos%(algorithm blocksize);
* <p/>
* The underlying stream offset is maintained as state.
public class CryptoOutputStream extends FilterOutputStream implements
Syncable, CanSetDropBehind {
private static final int MIN_BUFFER_SIZE = 512;
private static final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Encryptor encryptor;
* Input data buffer. The data starts at inBuffer.position() and ends at
* inBuffer.limit().
private ByteBuffer inBuffer;
* Encrypted data buffer. The data starts at outBuffer.position() and ends at
* outBuffer.limit();
private ByteBuffer outBuffer;
private long streamOffset = 0; // Underlying stream offset.
* Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer}
* before any other data goes in. The purpose of padding is to put input data
* at proper position.
private byte padding;
private boolean closed;
private final byte[] key;
private final byte[] initIV;
private byte[] iv;
public CryptoOutputStream(OutputStream out, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv) throws IOException {
this(out, codec, bufferSize, key, iv, 0);
public CryptoOutputStream(OutputStream out, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv, long streamOffset)
throws IOException {
Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE,
"Minimum value of buffer size is 512.");
this.key = key;
this.initIV = iv;
this.iv = iv.clone();
inBuffer = ByteBuffer.allocateDirect(bufferSize);
outBuffer = ByteBuffer.allocateDirect(bufferSize);
this.streamOffset = streamOffset;
this.codec = codec;
try {
encryptor = codec.getEncryptor();
} catch (GeneralSecurityException e) {
throw new IOException(e);
public CryptoOutputStream(OutputStream out, CryptoCodec codec,
byte[] key, byte[] iv) throws IOException {
this(out, codec, key, iv, 0);
public CryptoOutputStream(OutputStream out, CryptoCodec codec,
byte[] key, byte[] iv, long streamOffset) throws IOException {
this(out, codec, getBufferSize(codec.getConf()), key, iv, streamOffset);
public OutputStream getWrappedStream() {
return out;
* Encryption is buffer based.
* If there is enough room in {@link #inBuffer}, then write to this buffer.
* If {@link #inBuffer} is full, then do encryption and write data to the
* underlying stream.
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @throws IOException
public void write(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || off > b.length ||
len > b.length - off) {
throw new IndexOutOfBoundsException();
while (len > 0) {
int remaining = inBuffer.remaining();
if (len < remaining) {
inBuffer.put(b, off, len);
len = 0;
} else {
inBuffer.put(b, off, remaining);
off += remaining;
len -= remaining;
* Do the encryption, input is {@link #inBuffer} and output is
* {@link #outBuffer}.
private void encrypt() throws IOException {
Preconditions.checkState(inBuffer.position() >= padding);
if (inBuffer.position() == padding) {
// There is no real data in the inBuffer.
encryptor.encrypt(inBuffer, outBuffer);
if (padding > 0) {
* The plain text and cipher text have 1:1 mapping, they start at same
* position.
padding = 0;
int len = outBuffer.remaining();
* If underlying stream supports {@link ByteBuffer} write in future, needs
* refine here.
final byte[] tmp = getTmpBuf();
outBuffer.get(tmp, 0, len);
out.write(tmp, 0, len);
streamOffset += len;
if (encryptor.isContextReset()) {
* We will generally not get here. For CTR mode, to improve
* performance, we rely on the encryptor maintaining context, for
* example to calculate the counter. But some bad implementations
* can't maintain context, and need us to re-init after doing
* encryption.
* Update the {@link #encryptor}: calculate counter and {@link #padding}.
private void updateEncryptor() throws IOException {
long counter = streamOffset / codec.getAlgorithmBlockSize();
padding = (byte)(streamOffset % codec.getAlgorithmBlockSize());
inBuffer.position(padding); // Set proper position for input data.
codec.calculateIV(initIV, counter, iv);
encryptor.init(key, iv);
private byte[] tmpBuf;
private byte[] getTmpBuf() {
if (tmpBuf == null) {
tmpBuf = new byte[outBuffer.capacity()];
return tmpBuf;
public void close() throws IOException {
if (closed) {
closed = true;
* Free the direct buffer manually.
private void freeBuffers() {
sun.misc.Cleaner inBufferCleaner =
((sun.nio.ch.DirectBuffer) inBuffer).cleaner();
sun.misc.Cleaner outBufferCleaner =
((sun.nio.ch.DirectBuffer) outBuffer).cleaner();
* To flush, we need to encrypt the data in buffer and write to underlying
* stream, then do the flush.
public void flush() throws IOException {
public void write(int b) throws IOException {
oneByteBuf[0] = (byte)(b & 0xff);
write(oneByteBuf, 0, oneByteBuf.length);
private void checkStream() throws IOException {
if (closed) {
throw new IOException("Stream closed");
public void setDropBehind(Boolean dropCache) throws IOException,
UnsupportedOperationException {
try {
((CanSetDropBehind) out).setDropBehind(dropCache);
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not " +
"support setting the drop-behind caching.");
public void hflush() throws IOException {
if (out instanceof Syncable) {
public void hsync() throws IOException {
if (out instanceof Syncable) {
private static int getBufferSize(Configuration conf) {
@ -0,0 +1,75 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
public interface Decryptor {
* Initialize the decryptor, the internal decryption context will be
* reset.
* @param key decryption key.
* @param iv decryption initialization vector
* @throws IOException if initialization fails
public void init(byte[] key, byte[] iv) throws IOException;
* Indicate whether decryption context is reset.
* <p/>
* It's useful for some mode like CTR which requires different IV for
* different parts of data. Usually decryptor can maintain the context
* internally such as calculating IV/counter, then continue a multiple-part
* decryption operation without reinit the decryptor using key and the new
* IV. For mode like CTR, if context is reset after each decryption, the
* decryptor should be reinit before each operation, that's not efficient.
* @return boolean whether context is reset.
public boolean isContextReset();
* This exposes a direct interface for record decryption with direct byte
* buffers.
* <p/>
* The decrypt() function need not always consume the buffers provided,
* it will need to be called multiple times to decrypt an entire buffer
* and the object will hold the decryption context internally.
* <p/>
* Some implementation may need enough space in the destination buffer to
* decrypt an entire input.
* <p/>
* The end result will move inBuffer.position() by the bytes-read and
* outBuffer.position() by the bytes-written. It should not modify the
* inBuffer.limit() or outBuffer.limit() to maintain consistency of operation.
* <p/>
* @param inBuffer in direct {@link ByteBuffer} for reading from. Requires
* inBuffer != null and inBuffer.remaining() > 0
* @param outBuffer out direct {@link ByteBuffer} for storing the results
* into. Requires outBuffer != null and outBuffer.remaining() > 0
* @throws IOException if decryption fails
public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
throws IOException;
@ -0,0 +1,75 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
public interface Encryptor {
* Initialize the encryptor, the internal encryption context will be
* reset.
* @param key encryption key.
* @param iv encryption initialization vector
* @throws IOException if initialization fails
public void init(byte[] key, byte[] iv) throws IOException;
* Indicate whether encryption context is reset.
* <p/>
* It's useful for some mode like CTR which requires different IV for
* different parts of data. Usually encryptor can maintain the context
* internally such as calculating IV/counter, then continue a multiple-part
* encryption operation without reinit the encryptor using key and the new
* IV. For mode like CTR, if context is reset after each encryption, the
* encryptor should be reinit before each operation, that's not efficient.
* @return boolean whether context is reset.
public boolean isContextReset();
* This exposes a direct interface for record encryption with direct byte
* buffers.
* <p/>
* The encrypt() function need not always consume the buffers provided,
* it will need to be called multiple times to encrypt an entire buffer
* and the object will hold the encryption context internally.
* <p/>
* Some implementation may need enough space in the destination buffer to
* encrypt an entire input.
* <p/>
* The end result will move inBuffer.position() by the bytes-read and
* outBuffer.position() by the bytes-written. It should not modify the
* inBuffer.limit() or outBuffer.limit() to maintain consistency of operation.
* <p/>
* @param inBuffer in direct {@link ByteBuffer} for reading from. Requires
* inBuffer != null and inBuffer.remaining() > 0
* @param outBuffer out direct {@link ByteBuffer} for storing the results
* into. Requires outBuffer != null and outBuffer.remaining() > 0
* @throws IOException if encryption fails
public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
throws IOException;
@ -0,0 +1,55 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.security.GeneralSecurityException;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_JCE_PROVIDER_KEY;
* Implement the AES-CTR crypto codec using JCE provider.
public class JCEAESCTRCryptoCodec extends AESCTRCryptoCodec {
private Configuration conf;
private String provider;
public JCEAESCTRCryptoCodec() {
public Configuration getConf() {
return conf;
public void setConf(Configuration conf) {
this.conf = conf;
public Encryptor getEncryptor() throws GeneralSecurityException {
return new JCEAESCTREncryptor(provider);
public Decryptor getDecryptor() throws GeneralSecurityException {
return new JCEAESCTRDecryptor(provider);
@ -0,0 +1,84 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import javax.crypto.Cipher;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import com.google.common.base.Preconditions;
public class JCEAESCTRDecryptor implements Decryptor {
private final Cipher cipher;
private boolean contextReset = false;
public JCEAESCTRDecryptor(String provider) throws GeneralSecurityException {
if (provider == null || provider.isEmpty()) {
cipher = Cipher.getInstance("AES/CTR/NoPadding");
} else {
cipher = Cipher.getInstance("AES/CTR/NoPadding", provider);
public void init(byte[] key, byte[] iv) throws IOException {
contextReset = false;
try {
cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(key, "AES"),
new IvParameterSpec(iv));
} catch (Exception e) {
throw new IOException(e);
* For AES-CTR, will consume all input data and needs enough space in the
* destination buffer to decrypt entire input data.
public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
throws IOException {
try {
int inputSize = inBuffer.remaining();
// Cipher#update will maintain decryption context.
int n = cipher.update(inBuffer, outBuffer);
if (n < inputSize) {
* Typically code will not get here. Cipher#update will decrypt all
* input data and put result in outBuffer.
* Cipher#doFinal will reset the decryption context.
contextReset = true;
cipher.doFinal(inBuffer, outBuffer);
} catch (Exception e) {
throw new IOException(e);
public boolean isContextReset() {
return contextReset;
@ -0,0 +1,84 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import javax.crypto.Cipher;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import com.google.common.base.Preconditions;
public class JCEAESCTREncryptor implements Encryptor {
private final Cipher cipher;
private boolean contextReset = false;
public JCEAESCTREncryptor(String provider) throws GeneralSecurityException {
if (provider == null || provider.isEmpty()) {
cipher = Cipher.getInstance("AES/CTR/NoPadding");
} else {
cipher = Cipher.getInstance("AES/CTR/NoPadding", provider);
public void init(byte[] key, byte[] iv) throws IOException {
contextReset = false;
try {
cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(key, "AES"),
new IvParameterSpec(iv));
} catch (Exception e) {
throw new IOException(e);
* For AES-CTR, will consume all input data and needs enough space in the
* destination buffer to encrypt entire input data.
public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
throws IOException {
try {
int inputSize = inBuffer.remaining();
// Cipher#update will maintain encryption context.
int n = cipher.update(inBuffer, outBuffer);
if (n < inputSize) {
* Typically code will not get here. Cipher#update will encrypt all
* input data and put result in outBuffer.
* Cipher#doFinal will reset the encryption context.
contextReset = true;
cipher.doFinal(inBuffer, outBuffer);
} catch (Exception e) {
throw new IOException(e);
public boolean isContextReset() {
return contextReset;
@ -282,5 +282,16 @@ public class CommonConfigurationKeysPublic {
/** Class to override Sasl Properties for a connection */
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final int HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT = 8192;
@ -0,0 +1,37 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.fs.crypto;
import java.io.IOException;
import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
public class CryptoFSDataInputStream extends FSDataInputStream {
public CryptoFSDataInputStream(FSDataInputStream in, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv) throws IOException {
super(new CryptoInputStream(in, codec, bufferSize, key, iv));
public CryptoFSDataInputStream(FSDataInputStream in, CryptoCodec codec,
byte[] key, byte[] iv) throws IOException {
super(new CryptoInputStream(in, codec, key, iv));
@ -0,0 +1,47 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.fs.crypto;
import java.io.IOException;
import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
public class CryptoFSDataOutputStream extends FSDataOutputStream {
private final FSDataOutputStream fsOut;
public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv) throws IOException {
super(new CryptoOutputStream(out, codec, bufferSize, key, iv,
out.getPos()), null, out.getPos());
this.fsOut = out;
public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec,
byte[] key, byte[] iv) throws IOException {
super(new CryptoOutputStream(out, codec, key, iv, out.getPos()),
null, out.getPos());
this.fsOut = out;
public long getPos() {
return fsOut.getPos();
@ -1348,4 +1348,30 @@
The default implementation of CryptoCodec which is used for encryption
and decryption.
The JCE provider name used in CryptoCodec.
The buffer size used in Crypto InputStream and OutputStream, and default
value is 8192.
@ -0,0 +1,712 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.EnumSet;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.RandomDatum;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public abstract class CryptoStreamsTestBase {
protected static final Log LOG= LogFactory.getLog(
protected static CryptoCodec codec;
private static final byte[] key = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16};
private static final byte[] iv = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
protected static final int count = 10000;
protected static int defaultBufferSize = 8192;
protected static int smallBufferSize = 1024;
private byte[] data;
private int dataLen;
public void setUp() throws IOException {
// Generate data
int seed = new Random().nextInt();
DataOutputBuffer dataBuf = new DataOutputBuffer();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
for(int i=0; i < count; ++i) {
RandomDatum key = generator.getKey();
RandomDatum value = generator.getValue();
LOG.info("Generated " + count + " records");
data = dataBuf.getData();
dataLen = dataBuf.getLength();
protected void writeData(OutputStream out) throws Exception {
out.write(data, 0, dataLen);
protected int getDataLen() {
return dataLen;
private int readAll(InputStream in, byte[] b, int off, int len)
throws IOException {
int n = 0;
int total = 0;
while (n != -1) {
total += n;
if (total >= len) {
n = in.read(b, off + total, len - total);
return total;
protected OutputStream getOutputStream(int bufferSize) throws IOException {
return getOutputStream(bufferSize, key, iv);
protected abstract OutputStream getOutputStream(int bufferSize, byte[] key,
byte[] iv) throws IOException;
protected InputStream getInputStream(int bufferSize) throws IOException {
return getInputStream(bufferSize, key, iv);
protected abstract InputStream getInputStream(int bufferSize, byte[] key,
byte[] iv) throws IOException;
* Test crypto reading with different buffer size.
public void testRead() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
// Default buffer size
InputStream in = getInputStream(defaultBufferSize);
// Small buffer size
in = getInputStream(smallBufferSize);
private void readCheck(InputStream in) throws Exception {
byte[] result = new byte[dataLen];
int n = readAll(in, result, 0, dataLen);
Assert.assertEquals(dataLen, n);
byte[] expectedData = new byte[n];
System.arraycopy(data, 0, expectedData, 0, n);
Assert.assertArrayEquals(result, expectedData);
// EOF
n = in.read(result, 0, dataLen);
Assert.assertEquals(n, -1);
* Test crypto with different IV.
public void testCryptoIV() throws Exception {
byte[] iv1 = iv.clone();
// Counter base: Long.MAX_VALUE
setCounterBaseForIV(iv1, Long.MAX_VALUE);
// Counter base: Long.MAX_VALUE - 1
setCounterBaseForIV(iv1, Long.MAX_VALUE - 1);
// Counter base: Integer.MAX_VALUE
setCounterBaseForIV(iv1, Integer.MAX_VALUE);
// Counter base: 0
setCounterBaseForIV(iv1, 0);
// Counter base: -1
setCounterBaseForIV(iv1, -1);
private void cryptoCheck(byte[] iv) throws Exception {
OutputStream out = getOutputStream(defaultBufferSize, key, iv);
InputStream in = getInputStream(defaultBufferSize, key, iv);
private void setCounterBaseForIV(byte[] iv, long counterBase) {
ByteBuffer buf = ByteBuffer.wrap(iv);
buf.putLong(iv.length - 8, counterBase);
* Test hflush/hsync of crypto output stream, and with different buffer size.
public void testSyncable() throws IOException {
private void syncableCheck() throws IOException {
OutputStream out = getOutputStream(smallBufferSize);
try {
int bytesWritten = dataLen/3;
out.write(data, 0, bytesWritten);
((Syncable) out).hflush();
InputStream in = getInputStream(defaultBufferSize);
verify(in, bytesWritten, data);
out.write(data, bytesWritten, dataLen - bytesWritten);
((Syncable) out).hsync();
in = getInputStream(defaultBufferSize);
verify(in, dataLen, data);
} finally {
private void verify(InputStream in, int bytesToVerify,
byte[] expectedBytes) throws IOException {
byte[] readBuf = new byte[bytesToVerify];
readAll(in, readBuf, 0, bytesToVerify);
for (int i=0; i<bytesToVerify; i++) {
Assert.assertEquals(expectedBytes[i], readBuf[i]);
private int readAll(InputStream in, long pos, byte[] b, int off, int len)
throws IOException {
int n = 0;
int total = 0;
while (n != -1) {
total += n;
if (total >= len) {
n = ((PositionedReadable) in).read(pos + total, b, off + total,
len - total);
return total;
* Test positioned read.
public void testPositionedRead() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
InputStream in = getInputStream(defaultBufferSize);
// Pos: 1/3 dataLen
positionedReadCheck(in , dataLen/3);
// Pos: 1/2 dataLen
positionedReadCheck(in, dataLen/2);
private void positionedReadCheck(InputStream in, int pos) throws Exception {
byte[] result = new byte[dataLen];
int n = readAll(in, pos, result, 0, dataLen);
Assert.assertEquals(dataLen, n + pos);
byte[] readData = new byte[n];
System.arraycopy(result, 0, readData, 0, n);
byte[] expectedData = new byte[n];
System.arraycopy(data, pos, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
* Test read fully
public void testReadFully() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
InputStream in = getInputStream(defaultBufferSize);
final int len1 = dataLen/4;
// Read len1 bytes
byte [] readData = new byte[len1];
readAll(in, readData, 0, len1);
byte[] expectedData = new byte[len1];
System.arraycopy(data, 0, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
// Pos: 1/3 dataLen
readFullyCheck(in, dataLen/3);
// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, len1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
// Pos: 1/2 dataLen
readFullyCheck(in, dataLen/2);
// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, 2 * len1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
private void readFullyCheck(InputStream in, int pos) throws Exception {
byte[] result = new byte[dataLen - pos];
((PositionedReadable) in).readFully(pos, result);
byte[] expectedData = new byte[dataLen - pos];
System.arraycopy(data, pos, expectedData, 0, dataLen - pos);
Assert.assertArrayEquals(result, expectedData);
result = new byte[dataLen]; // Exceeds maximum length
try {
((PositionedReadable) in).readFully(pos, result);
Assert.fail("Read fully exceeds maximum length should fail.");
} catch (IOException e) {
* Test seek to different position.
public void testSeek() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
InputStream in = getInputStream(defaultBufferSize);
// Pos: 1/3 dataLen
seekCheck(in, dataLen/3);
// Pos: 0
seekCheck(in, 0);
// Pos: 1/2 dataLen
seekCheck(in, dataLen/2);
// Pos: -3
try {
seekCheck(in, -3);
Assert.fail("Seek to negative offset should fail.");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Cannot seek to negative " +
"offset", e);
// Pos: dataLen + 3
try {
seekCheck(in, dataLen + 3);
Assert.fail("Seek after EOF should fail.");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Cannot seek after EOF", e);
private void seekCheck(InputStream in, int pos) throws Exception {
byte[] result = new byte[dataLen];
((Seekable) in).seek(pos);
int n = readAll(in, result, 0, dataLen);
Assert.assertEquals(dataLen, n + pos);
byte[] readData = new byte[n];
System.arraycopy(result, 0, readData, 0, n);
byte[] expectedData = new byte[n];
System.arraycopy(data, pos, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
* Test get position.
public void testGetPos() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
// Default buffer size
InputStream in = getInputStream(defaultBufferSize);
byte[] result = new byte[dataLen];
int n1 = readAll(in, result, 0, dataLen/3);
Assert.assertEquals(n1, ((Seekable) in).getPos());
int n2 = readAll(in, result, n1, dataLen - n1);
Assert.assertEquals(n1 + n2, ((Seekable) in).getPos());
public void testAvailable() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
// Default buffer size
InputStream in = getInputStream(defaultBufferSize);
byte[] result = new byte[dataLen];
int n1 = readAll(in, result, 0, dataLen/3);
Assert.assertEquals(in.available(), dataLen - n1);
int n2 = readAll(in, result, n1, dataLen - n1);
Assert.assertEquals(in.available(), dataLen - n1 - n2);
* Test skip.
public void testSkip() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
// Default buffer size
InputStream in = getInputStream(defaultBufferSize);
byte[] result = new byte[dataLen];
int n1 = readAll(in, result, 0, dataLen/3);
Assert.assertEquals(n1, ((Seekable) in).getPos());
long skipped = in.skip(dataLen/3);
int n2 = readAll(in, result, 0, dataLen);
Assert.assertEquals(dataLen, n1 + skipped + n2);
byte[] readData = new byte[n2];
System.arraycopy(result, 0, readData, 0, n2);
byte[] expectedData = new byte[n2];
System.arraycopy(data, dataLen - n2, expectedData, 0, n2);
Assert.assertArrayEquals(readData, expectedData);
try {
skipped = in.skip(-3);
Assert.fail("Skip Negative length should fail.");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Negative skip length", e);
// Skip after EOF
skipped = in.skip(3);
Assert.assertEquals(skipped, 0);
private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
int bufPos) throws Exception {
int n = ((ByteBufferReadable) in).read(buf);
byte[] readData = new byte[n];
byte[] expectedData = new byte[n];
System.arraycopy(data, 0, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
* Test byte buffer read with different buffer size.
public void testByteBufferRead() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
// Default buffer size, initial buffer position is 0
InputStream in = getInputStream(defaultBufferSize);
ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);
byteBufferReadCheck(in, buf, 0);
// Default buffer size, initial buffer position is not 0
in = getInputStream(defaultBufferSize);
byteBufferReadCheck(in, buf, 11);
// Small buffer size, initial buffer position is 0
in = getInputStream(smallBufferSize);
byteBufferReadCheck(in, buf, 0);
// Small buffer size, initial buffer position is not 0
in = getInputStream(smallBufferSize);
byteBufferReadCheck(in, buf, 11);
// Direct buffer, default buffer size, initial buffer position is 0
in = getInputStream(defaultBufferSize);
buf = ByteBuffer.allocateDirect(dataLen + 100);
byteBufferReadCheck(in, buf, 0);
// Direct buffer, default buffer size, initial buffer position is not 0
in = getInputStream(defaultBufferSize);
byteBufferReadCheck(in, buf, 11);
// Direct buffer, small buffer size, initial buffer position is 0
in = getInputStream(smallBufferSize);
byteBufferReadCheck(in, buf, 0);
// Direct buffer, small buffer size, initial buffer position is not 0
in = getInputStream(smallBufferSize);
byteBufferReadCheck(in, buf, 11);
public void testCombinedOp() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
final int len1 = dataLen/8;
final int len2 = dataLen/10;
InputStream in = getInputStream(defaultBufferSize);
// Read len1 data.
byte[] readData = new byte[len1];
readAll(in, readData, 0, len1);
byte[] expectedData = new byte[len1];
System.arraycopy(data, 0, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
long pos = ((Seekable) in).getPos();
Assert.assertEquals(len1, pos);
// Seek forward len2
((Seekable) in).seek(pos + len2);
// Skip forward len2
long n = in.skip(len2);
Assert.assertEquals(len2, n);
// Pos: 1/4 dataLen
positionedReadCheck(in , dataLen/4);
// Pos should be len1 + len2 + len2
pos = ((Seekable) in).getPos();
Assert.assertEquals(len1 + len2 + len2, pos);
// Read forward len1
ByteBuffer buf = ByteBuffer.allocate(len1);
int nRead = ((ByteBufferReadable) in).read(buf);
readData = new byte[nRead];
expectedData = new byte[nRead];
System.arraycopy(data, (int)pos, expectedData, 0, nRead);
Assert.assertArrayEquals(readData, expectedData);
// Pos should be len1 + 2 * len2 + nRead
pos = ((Seekable) in).getPos();
Assert.assertEquals(len1 + 2 * len2 + nRead, pos);
// Pos: 1/3 dataLen
positionedReadCheck(in , dataLen/3);
// Read forward len1
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, (int)pos, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
// Pos should be 2 * len1 + 2 * len2 + nRead
pos = ((Seekable) in).getPos();
Assert.assertEquals(2 * len1 + 2 * len2 + nRead, pos);
// Read forward len1
buf = ByteBuffer.allocate(len1);
nRead = ((ByteBufferReadable) in).read(buf);
readData = new byte[nRead];
expectedData = new byte[nRead];
System.arraycopy(data, (int)pos, expectedData, 0, nRead);
Assert.assertArrayEquals(readData, expectedData);
// ByteBuffer read after EOF
((Seekable) in).seek(dataLen);
n = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(n, -1);
public void testSeekToNewSource() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
InputStream in = getInputStream(defaultBufferSize);
final int len1 = dataLen/8;
byte[] readData = new byte[len1];
readAll(in, readData, 0, len1);
// Pos: 1/3 dataLen
seekToNewSourceCheck(in, dataLen/3);
// Pos: 0
seekToNewSourceCheck(in, 0);
// Pos: 1/2 dataLen
seekToNewSourceCheck(in, dataLen/2);
// Pos: -3
try {
seekToNewSourceCheck(in, -3);
Assert.fail("Seek to negative offset should fail.");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Cannot seek to negative " +
"offset", e);
// Pos: dataLen + 3
try {
seekToNewSourceCheck(in, dataLen + 3);
Assert.fail("Seek after EOF should fail.");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Attempted to read past end of file", e);
private void seekToNewSourceCheck(InputStream in, int targetPos)
throws Exception {
byte[] result = new byte[dataLen];
((Seekable) in).seekToNewSource(targetPos);
int n = readAll(in, result, 0, dataLen);
Assert.assertEquals(dataLen, n + targetPos);
byte[] readData = new byte[n];
System.arraycopy(result, 0, readData, 0, n);
byte[] expectedData = new byte[n];
System.arraycopy(data, targetPos, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
private ByteBufferPool getBufferPool() {
return new ByteBufferPool() {
public ByteBuffer getBuffer(boolean direct, int length) {
return ByteBuffer.allocateDirect(length);
public void putBuffer(ByteBuffer buffer) {
public void testHasEnhancedByteBufferAccess() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
InputStream in = getInputStream(defaultBufferSize);
final int len1 = dataLen/8;
// ByteBuffer size is len1
ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).read(
getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
int n1 = buffer.remaining();
byte[] readData = new byte[n1];
byte[] expectedData = new byte[n1];
System.arraycopy(data, 0, expectedData, 0, n1);
Assert.assertArrayEquals(readData, expectedData);
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, n1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
// ByteBuffer size is len1
buffer = ((HasEnhancedByteBufferAccess) in).read(
getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
int n2 = buffer.remaining();
readData = new byte[n2];
expectedData = new byte[n2];
System.arraycopy(data, n1 + len1, expectedData, 0, n2);
Assert.assertArrayEquals(readData, expectedData);
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
@ -0,0 +1,376 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public class TestCryptoStreams extends CryptoStreamsTestBase {
* Data storage.
* {@link #getOutputStream(int)} will write to this buf.
* {@link #getInputStream(int)} will read from this buf.
private byte[] buf;
private int bufLen;
public static void init() throws Exception {
Configuration conf = new Configuration();
codec = CryptoCodec.getInstance(conf);
public static void shutdown() throws Exception {
protected OutputStream getOutputStream(int bufferSize, byte[] key, byte[] iv)
throws IOException {
DataOutputBuffer out = new DataOutputBuffer() {
public void flush() throws IOException {
buf = getData();
bufLen = getLength();
public void close() throws IOException {
buf = getData();
bufLen = getLength();
return new CryptoOutputStream(new FakeOutputStream(out),
codec, bufferSize, key, iv);
protected InputStream getInputStream(int bufferSize, byte[] key, byte[] iv)
throws IOException {
DataInputBuffer in = new DataInputBuffer();
in.reset(buf, 0, bufLen);
return new CryptoInputStream(new FakeInputStream(in), codec, bufferSize,
key, iv);
private class FakeOutputStream extends OutputStream
implements Syncable, CanSetDropBehind{
private final byte[] oneByteBuf = new byte[1];
private final DataOutputBuffer out;
private boolean closed;
public FakeOutputStream(DataOutputBuffer out) {
this.out = out;
public void write(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
out.write(b, off, len);
public void flush() throws IOException {
public void close() throws IOException {
if (closed) {
closed = true;
public void write(int b) throws IOException {
oneByteBuf[0] = (byte)(b & 0xff);
write(oneByteBuf, 0, oneByteBuf.length);
public void setDropBehind(Boolean dropCache) throws IOException,
UnsupportedOperationException {
public void hflush() throws IOException {
public void hsync() throws IOException {
private void checkStream() throws IOException {
if (closed) {
throw new IOException("Stream is closed!");
private class FakeInputStream extends InputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
private final byte[] oneByteBuf = new byte[1];
private int pos = 0;
private final byte[] data;
private final int length;
private boolean closed = false;
public FakeInputStream(DataInputBuffer in) {
data = in.getData();
length = in.getLength();
public void seek(long pos) throws IOException {
if (pos > length) {
throw new IOException("Cannot seek after EOF.");
if (pos < 0) {
throw new IOException("Cannot seek to negative offset.");
this.pos = (int)pos;
public long getPos() throws IOException {
return pos;
public int available() throws IOException {
return length - pos;
public int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
if (pos < length) {
int n = (int) Math.min(len, length - pos);
System.arraycopy(data, pos, b, off, n);
pos += n;
return n;
return -1;
private void checkStream() throws IOException {
if (closed) {
throw new IOException("Stream is closed!");
public int read(ByteBuffer buf) throws IOException {
if (pos < length) {
int n = (int) Math.min(buf.remaining(), length - pos);
if (n > 0) {
buf.put(data, pos, n);
pos += n;
return n;
return -1;
public long skip(long n) throws IOException {
if ( n > 0 ) {
if( n + pos > length ) {
n = length - pos;
pos += n;
return n;
return n < 0 ? -1 : 0;
public void close() throws IOException {
closed = true;
public int read(long position, byte[] b, int off, int len)
throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
if (position > length) {
throw new IOException("Cannot read after EOF.");
if (position < 0) {
throw new IOException("Cannot read to negative offset.");
if (position < length) {
int n = (int) Math.min(len, length - position);
System.arraycopy(data, (int)position, b, off, n);
return n;
return -1;
public void readFully(long position, byte[] b, int off, int len)
throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
if (position > length) {
throw new IOException("Cannot read after EOF.");
if (position < 0) {
throw new IOException("Cannot read to negative offset.");
if (position + len > length) {
throw new EOFException("Reach the end of stream.");
System.arraycopy(data, (int)position, b, off, len);
public void readFully(long position, byte[] buffer) throws IOException {
readFully(position, buffer, 0, buffer.length);
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
if (bufferPool == null) {
throw new IOException("Please specify buffer pool.");
ByteBuffer buffer = bufferPool.getBuffer(true, maxLength);
int pos = buffer.position();
int n = read(buffer);
if (n >= 0) {
return buffer;
return null;
public void releaseBuffer(ByteBuffer buffer) {
public void setReadahead(Long readahead) throws IOException,
UnsupportedOperationException {
public void setDropBehind(Boolean dropCache) throws IOException,
UnsupportedOperationException {
public FileDescriptor getFileDescriptor() throws IOException {
return null;
public boolean seekToNewSource(long targetPos) throws IOException {
if (targetPos > length) {
throw new IOException("Attempted to read past end of file.");
if (targetPos < 0) {
throw new IOException("Cannot seek after EOF.");
this.pos = (int)targetPos;
return false;
public int read() throws IOException {
int ret = read( oneByteBuf, 0, 1 );
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
@ -0,0 +1,114 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.crypto;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase {
private static final String TEST_ROOT_DIR
= System.getProperty("test.build.data","build/test/data") + "/work-dir/localfs";
private final File base = new File(TEST_ROOT_DIR);
private final Path file = new Path(TEST_ROOT_DIR, "test-file");
private static LocalFileSystem fileSys;
public static void init() throws Exception {
Configuration conf = new Configuration();
conf = new Configuration(false);
conf.set("fs.file.impl", LocalFileSystem.class.getName());
fileSys = FileSystem.getLocal(conf);
codec = CryptoCodec.getInstance(conf);
public static void shutdown() throws Exception {
public void setUp() throws IOException {
fileSys.delete(new Path(TEST_ROOT_DIR), true);
public void cleanUp() throws IOException {
FileUtil.setWritable(base, true);
protected OutputStream getOutputStream(int bufferSize, byte[] key, byte[] iv)
throws IOException {
return new CryptoOutputStream(fileSys.create(file), codec, bufferSize,
key, iv);
protected InputStream getInputStream(int bufferSize, byte[] key, byte[] iv)
throws IOException {
return new CryptoInputStream(fileSys.open(file), codec, bufferSize,
key, iv);
@Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read")
public void testByteBufferRead() throws Exception {}
@Ignore("ChecksumFSOutputSummer doesn't support Syncable")
public void testSyncable() throws IOException {}
@Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read")
public void testCombinedOp() throws Exception {}
@Ignore("ChecksumFSInputChecker doesn't support enhanced ByteBuffer access")
public void testHasEnhancedByteBufferAccess() throws Exception {
@Ignore("ChecksumFSInputChecker doesn't support seekToNewSource")
public void testSeekToNewSource() throws Exception {
Reference in New Issue
Block a user