diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java index 67e8690456..80364ce7fe 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java @@ -33,6 +33,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; @@ -64,7 +65,8 @@ public class CryptoInputStream extends FilterInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, - ReadableByteChannel, CanUnbuffer, StreamCapabilities { + ReadableByteChannel, CanUnbuffer, StreamCapabilities, + ByteBufferPositionedReadable { private final byte[] oneByteBuf = new byte[1]; private final CryptoCodec codec; private final Decryptor decryptor; @@ -327,19 +329,39 @@ public void close() throws IOException { public int read(long position, byte[] buffer, int offset, int length) throws IOException { checkStream(); - try { - final int n = ((PositionedReadable) in).read(position, buffer, offset, - length); - if (n > 0) { - // This operation does not change the current offset of the file - decrypt(position, buffer, offset, n); - } - - return n; - } catch (ClassCastException e) { + if (!(in instanceof PositionedReadable)) { throw new UnsupportedOperationException("This stream does not support " + "positioned read."); } + final int n = ((PositionedReadable) in).read(position, buffer, offset, + length); + if (n > 0) { + // This operation does not change the current offset of the file + decrypt(position, buffer, offset, n); + } + + return n; + } + + /** + * Positioned read using {@link ByteBuffer}s. This method is thread-safe. + */ + @Override + public int read(long position, final ByteBuffer buf) + throws IOException { + checkStream(); + if (!(in instanceof ByteBufferPositionedReadable)) { + throw new UnsupportedOperationException("This stream does not support " + + "positioned reads with byte buffers."); + } + int bufPos = buf.position(); + final int n = ((ByteBufferPositionedReadable) in).read(position, buf); + if (n > 0) { + // This operation does not change the current offset of the file + decrypt(position, buf, n, bufPos); + } + + return n; } /** @@ -348,49 +370,124 @@ public int read(long position, byte[] buffer, int offset, int length) */ private void decrypt(long position, byte[] buffer, int offset, int length) throws IOException { - ByteBuffer inBuffer = getBuffer(); - ByteBuffer outBuffer = getBuffer(); + ByteBuffer localInBuffer = null; + ByteBuffer localOutBuffer = null; Decryptor decryptor = null; try { + localInBuffer = getBuffer(); + localOutBuffer = getBuffer(); decryptor = getDecryptor(); byte[] iv = initIV.clone(); updateDecryptor(decryptor, position, iv); byte padding = getPadding(position); - inBuffer.position(padding); // Set proper position for input data. + localInBuffer.position(padding); // Set proper position for input data. int n = 0; while (n < length) { - int toDecrypt = Math.min(length - n, inBuffer.remaining()); - inBuffer.put(buffer, offset + n, toDecrypt); + int toDecrypt = Math.min(length - n, localInBuffer.remaining()); + localInBuffer.put(buffer, offset + n, toDecrypt); // Do decryption - decrypt(decryptor, inBuffer, outBuffer, padding); + decrypt(decryptor, localInBuffer, localOutBuffer, padding); - outBuffer.get(buffer, offset + n, toDecrypt); + localOutBuffer.get(buffer, offset + n, toDecrypt); n += toDecrypt; - padding = afterDecryption(decryptor, inBuffer, position + n, iv); + padding = afterDecryption(decryptor, localInBuffer, position + n, iv); } } finally { - returnBuffer(inBuffer); - returnBuffer(outBuffer); + returnBuffer(localInBuffer); + returnBuffer(localOutBuffer); returnDecryptor(decryptor); } } - + + /** + * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are + * decrypted from {@code buf} starting at {@code start}. + * {@code buf.position()} and {@code buf.limit()} are unchanged after this + * method returns. This method is thread-safe. + * + *

+ * This method decrypts the input buf chunk-by-chunk and writes the + * decrypted output back into the input buf. It uses two local buffers + * taken from the {@link #bufferPool} to assist in this process: one is + * designated as the input buffer and it stores a single chunk of the + * given buf, the other is designated as the output buffer, which stores + * the output of decrypting the input buffer. Both buffers are of size + * {@link #bufferSize}. + *

+ * + *

+ * Decryption is done by using a {@link Decryptor} and the + * {@link #decrypt(Decryptor, ByteBuffer, ByteBuffer, byte)} method. Once + * the decrypted data is written into the output buffer, is is copied back + * into buf. Both buffers are returned back into the pool once the entire + * buf is decrypted. + *

+ * + * @param filePosition the current position of the file being read + * @param buf the {@link ByteBuffer} to decrypt + * @param length the number of bytes in {@code buf} to decrypt + * @param start the position in {@code buf} to start decrypting data from + */ + private void decrypt(long filePosition, ByteBuffer buf, int length, int start) + throws IOException { + ByteBuffer localInBuffer = null; + ByteBuffer localOutBuffer = null; + + // Duplicate the buffer so we don't have to worry about resetting the + // original position and limit at the end of the method + buf = buf.duplicate(); + + int decryptedBytes = 0; + Decryptor localDecryptor = null; + try { + localInBuffer = getBuffer(); + localOutBuffer = getBuffer(); + localDecryptor = getDecryptor(); + byte[] localIV = initIV.clone(); + updateDecryptor(localDecryptor, filePosition, localIV); + byte localPadding = getPadding(filePosition); + // Set proper filePosition for inputdata. + localInBuffer.position(localPadding); + + while (decryptedBytes < length) { + buf.position(start + decryptedBytes); + buf.limit(start + decryptedBytes + + Math.min(length - decryptedBytes, localInBuffer.remaining())); + localInBuffer.put(buf); + // Do decryption + try { + decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding); + buf.position(start + decryptedBytes); + buf.limit(start + length); + decryptedBytes += localOutBuffer.remaining(); + buf.put(localOutBuffer); + } finally { + localPadding = afterDecryption(localDecryptor, localInBuffer, + filePosition + length, localIV); + } + } + } finally { + returnBuffer(localInBuffer); + returnBuffer(localOutBuffer); + returnDecryptor(localDecryptor); + } + } + /** Positioned read fully. It is thread-safe */ @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { checkStream(); - try { - ((PositionedReadable) in).readFully(position, buffer, offset, length); - if (length > 0) { - // This operation does not change the current offset of the file - decrypt(position, buffer, offset, length); - } - } catch (ClassCastException e) { + if (!(in instanceof PositionedReadable)) { throw new UnsupportedOperationException("This stream does not support " + "positioned readFully."); } + ((PositionedReadable) in).readFully(position, buffer, offset, length); + if (length > 0) { + // This operation does not change the current offset of the file + decrypt(position, buffer, offset, length); + } } @Override @@ -405,23 +502,22 @@ public void seek(long pos) throws IOException { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } checkStream(); - try { - /* - * If data of target pos in the underlying stream has already been read - * and decrypted in outBuffer, we just need to re-position outBuffer. - */ - if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) { - int forward = (int) (pos - (streamOffset - outBuffer.remaining())); - if (forward > 0) { - outBuffer.position(outBuffer.position() + forward); - } - } else { - ((Seekable) in).seek(pos); - resetStreamOffset(pos); + /* + * If data of target pos in the underlying stream has already been read + * and decrypted in outBuffer, we just need to re-position outBuffer. + */ + if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) { + int forward = (int) (pos - (streamOffset - outBuffer.remaining())); + if (forward > 0) { + outBuffer.position(outBuffer.position() + forward); } - } catch (ClassCastException e) { - throw new UnsupportedOperationException("This stream does not support " + - "seek."); + } else { + if (!(in instanceof Seekable)) { + throw new UnsupportedOperationException("This stream does not " + + "support seek."); + } + ((Seekable) in).seek(pos); + resetStreamOffset(pos); } } @@ -519,31 +615,34 @@ public int read(ByteBuffer buf) throws IOException { } /** - * Decrypt all data in buf: total n bytes from given start position. - * Output is also buf and same start position. - * buf.position() and buf.limit() should be unchanged after decryption. + * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are + * decrypted from {@code buf} starting at {@code start}. + * {@code buf.position()} and {@code buf.limit()} are unchanged after this + * method returns. + * + * @see #decrypt(long, ByteBuffer, int, int) */ - private void decrypt(ByteBuffer buf, int n, int start) + private void decrypt(ByteBuffer buf, int length, int start) throws IOException { - final int pos = buf.position(); - final int limit = buf.limit(); - int len = 0; - while (len < n) { - buf.position(start + len); - buf.limit(start + len + Math.min(n - len, inBuffer.remaining())); + buf = buf.duplicate(); + int decryptedBytes = 0; + while (decryptedBytes < length) { + buf.position(start + decryptedBytes); + buf.limit(start + decryptedBytes + + Math.min(length - decryptedBytes, inBuffer.remaining())); inBuffer.put(buf); // Do decryption try { decrypt(decryptor, inBuffer, outBuffer, padding); - buf.position(start + len); - buf.limit(limit); - len += outBuffer.remaining(); + buf.position(start + decryptedBytes); + buf.limit(start + length); + decryptedBytes += outBuffer.remaining(); buf.put(outBuffer); } finally { - padding = afterDecryption(decryptor, inBuffer, streamOffset - (n - len), iv); + padding = afterDecryption(decryptor, inBuffer, + streamOffset - (length - decryptedBytes), iv); } } - buf.position(pos); } @Override @@ -572,14 +671,13 @@ public boolean seekToNewSource(long targetPos) throws IOException { Preconditions.checkArgument(targetPos >= 0, "Cannot seek to negative offset."); checkStream(); - try { - boolean result = ((Seekable) in).seekToNewSource(targetPos); - resetStreamOffset(targetPos); - return result; - } catch (ClassCastException e) { + if (!(in instanceof Seekable)) { throw new UnsupportedOperationException("This stream does not support " + "seekToNewSource."); } + boolean result = ((Seekable) in).seekToNewSource(targetPos); + resetStreamOffset(targetPos); + return result; } @Override @@ -587,59 +685,59 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet opts) throws IOException, UnsupportedOperationException { checkStream(); - try { - if (outBuffer.remaining() > 0) { - // Have some decrypted data unread, need to reset. - ((Seekable) in).seek(getPos()); - resetStreamOffset(getPos()); + if (outBuffer.remaining() > 0) { + if (!(in instanceof Seekable)) { + throw new UnsupportedOperationException("This stream does not " + + "support seek."); } - final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in). - read(bufferPool, maxLength, opts); - if (buffer != null) { - final int n = buffer.remaining(); - if (n > 0) { - streamOffset += buffer.remaining(); // Read n bytes - final int pos = buffer.position(); - decrypt(buffer, n, pos); - } - } - return buffer; - } catch (ClassCastException e) { - throw new UnsupportedOperationException("This stream does not support " + + // Have some decrypted data unread, need to reset. + ((Seekable) in).seek(getPos()); + resetStreamOffset(getPos()); + } + if (!(in instanceof HasEnhancedByteBufferAccess)) { + throw new UnsupportedOperationException("This stream does not support " + "enhanced byte buffer access."); } + final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in). + read(bufferPool, maxLength, opts); + if (buffer != null) { + final int n = buffer.remaining(); + if (n > 0) { + streamOffset += buffer.remaining(); // Read n bytes + final int pos = buffer.position(); + decrypt(buffer, n, pos); + } + } + return buffer; } @Override public void releaseBuffer(ByteBuffer buffer) { - try { - ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer); - } catch (ClassCastException e) { + if (!(in instanceof HasEnhancedByteBufferAccess)) { throw new UnsupportedOperationException("This stream does not support " + "release buffer."); } + ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer); } @Override public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException { - try { - ((CanSetReadahead) in).setReadahead(readahead); - } catch (ClassCastException e) { + if (!(in instanceof CanSetReadahead)) { throw new UnsupportedOperationException("This stream does not support " + "setting the readahead caching strategy."); } + ((CanSetReadahead) in).setReadahead(readahead); } @Override public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException { - try { - ((CanSetDropBehind) in).setDropBehind(dropCache); - } catch (ClassCastException e) { + if (!(in instanceof CanSetReadahead)) { throw new UnsupportedOperationException("This stream does not " + "support setting the drop-behind caching setting."); } + ((CanSetDropBehind) in).setDropBehind(dropCache); } @Override @@ -737,11 +835,17 @@ public void unbuffer() { @Override public boolean hasCapability(String capability) { switch (StringUtils.toLowerCase(capability)) { + case StreamCapabilities.UNBUFFER: + return true; case StreamCapabilities.READAHEAD: case StreamCapabilities.DROPBEHIND: - case StreamCapabilities.UNBUFFER: case StreamCapabilities.READBYTEBUFFER: - return true; + case StreamCapabilities.PREADBYTEBUFFER: + if (!(in instanceof StreamCapabilities)) { + throw new UnsupportedOperationException("This stream does not expose " + + "its stream capabilities."); + } + return ((StreamCapabilities) in).hasCapability(capability); default: return false; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java new file mode 100644 index 0000000000..d99ee1624e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Implementers of this interface provide a positioned read API that writes to a + * {@link ByteBuffer} rather than a {@code byte[]}. + * + * @see PositionedReadable + * @see ByteBufferReadable + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ByteBufferPositionedReadable { + /** + * Reads up to {@code buf.remaining()} bytes into buf from a given position + * in the file and returns the number of bytes read. Callers should use + * {@code buf.limit(...)} to control the size of the desired read and + * {@code buf.position(...)} to control the offset into the buffer the data + * should be written to. + *

+ * After a successful call, {@code buf.position()} will be advanced by the + * number of bytes read and {@code buf.limit()} will be unchanged. + *

+ * In the case of an exception, the state of the buffer (the contents of the + * buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is + * undefined, and callers should be prepared to recover from this + * eventuality. + *

+ * Callers should use {@link StreamCapabilities#hasCapability(String)} with + * {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying + * stream supports this interface, otherwise they might get a + * {@link UnsupportedOperationException}. + *

+ * Implementations should treat 0-length requests as legitimate, and must not + * signal an error upon their receipt. + * + * @param position position within file + * @param buf the ByteBuffer to receive the results of the read operation. + * @return the number of bytes read, possibly zero, or -1 if reached + * end-of-stream + * @throws IOException if there is some error performing the read + */ + int read(long position, ByteBuffer buf) throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java index 926b554f42..5a4ae04bea 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -33,16 +34,18 @@ public interface ByteBufferReadable { * Reads up to buf.remaining() bytes into buf. Callers should use * buf.limit(..) to control the size of the desired read. *

- * After a successful call, buf.position() will be advanced by the number - * of bytes read and buf.limit() should be unchanged. + * After a successful call, {@code buf.position()} will be advanced by the + * number of bytes read and {@code buf.limit()} will be unchanged. *

- * In the case of an exception, the values of buf.position() and buf.limit() - * are undefined, and callers should be prepared to recover from this + * In the case of an exception, the state of the buffer (the contents of the + * buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is + * undefined, and callers should be prepared to recover from this * eventuality. *

- * Many implementations will throw {@link UnsupportedOperationException}, so - * callers that are not confident in support for this method from the - * underlying filesystem should be prepared to handle that exception. + * Callers should use {@link StreamCapabilities#hasCapability(String)} with + * {@link StreamCapabilities#READBYTEBUFFER} to check if the underlying + * stream supports this interface, otherwise they might get a + * {@link UnsupportedOperationException}. *

* Implementations should treat 0-length requests as legitimate, and must not * signal an error upon their receipt. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index 62c45f183a..066cc3d8b1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -39,7 +39,8 @@ public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities { + HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities, + ByteBufferPositionedReadable { /** * Map ByteBuffers that we have handed out to readers to ByteBufferPool * objects @@ -148,7 +149,8 @@ public int read(ByteBuffer buf) throws IOException { return ((ByteBufferReadable)in).read(buf); } - throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); + throw new UnsupportedOperationException("Byte-buffer read unsupported " + + "by input stream"); } @Override @@ -247,4 +249,13 @@ public boolean hasCapability(String capability) { public String toString() { return super.toString() + ": " + in; } + + @Override + public int read(long position, ByteBuffer buf) throws IOException { + if (in instanceof ByteBufferPositionedReadable) { + return ((ByteBufferPositionedReadable) in).read(position, buf); + } + throw new UnsupportedOperationException("Byte-buffer pread unsupported " + + "by input stream"); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index c52d30762f..e68e7b351e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -65,6 +65,12 @@ public interface StreamCapabilities { */ String READBYTEBUFFER = "in:readbytebuffer"; + /** + * Stream read(long, ByteBuffer) capability implemented by + * {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}. + */ + String PREADBYTEBUFFER = "in:preadbytebuffer"; + /** * Capabilities that a stream can support and be queried for. */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java index a0eb105833..7463d6c3bc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java @@ -26,6 +26,7 @@ import java.util.EnumSet; import java.util.Random; +import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSDataOutputStream; @@ -129,6 +130,32 @@ private void preadCheck(PositionedReadable in) throws Exception { Assert.assertArrayEquals(result, expectedData); } + private int byteBufferPreadAll(ByteBufferPositionedReadable in, + ByteBuffer buf) throws IOException { + int n = 0; + int total = 0; + while (n != -1) { + total += n; + if (!buf.hasRemaining()) { + break; + } + n = in.read(total, buf); + } + + return total; + } + + private void byteBufferPreadCheck(ByteBufferPositionedReadable in) + throws Exception { + ByteBuffer result = ByteBuffer.allocate(dataLen); + int n = byteBufferPreadAll(in, result); + + Assert.assertEquals(dataLen, n); + ByteBuffer expectedData = ByteBuffer.allocate(n); + expectedData.put(data, 0, n); + Assert.assertArrayEquals(result.array(), expectedData.array()); + } + protected OutputStream getOutputStream(int bufferSize) throws IOException { return getOutputStream(bufferSize, key, iv); } @@ -288,20 +315,36 @@ private int readAll(InputStream in, long pos, byte[] b, int off, int len) return total; } + + private int readAll(InputStream in, long pos, ByteBuffer buf) + throws IOException { + int n = 0; + int total = 0; + while (n != -1) { + total += n; + if (!buf.hasRemaining()) { + break; + } + n = ((ByteBufferPositionedReadable) in).read(pos + total, buf); + } + + return total; + } /** Test positioned read. */ @Test(timeout=120000) public void testPositionedRead() throws Exception { - OutputStream out = getOutputStream(defaultBufferSize); - writeData(out); + try (OutputStream out = getOutputStream(defaultBufferSize)) { + writeData(out); + } - InputStream in = getInputStream(defaultBufferSize); - // Pos: 1/3 dataLen - positionedReadCheck(in , dataLen / 3); + try (InputStream in = getInputStream(defaultBufferSize)) { + // Pos: 1/3 dataLen + positionedReadCheck(in, dataLen / 3); - // Pos: 1/2 dataLen - positionedReadCheck(in, dataLen / 2); - in.close(); + // Pos: 1/2 dataLen + positionedReadCheck(in, dataLen / 2); + } } private void positionedReadCheck(InputStream in, int pos) throws Exception { @@ -315,6 +358,35 @@ private void positionedReadCheck(InputStream in, int pos) throws Exception { System.arraycopy(data, pos, expectedData, 0, n); Assert.assertArrayEquals(readData, expectedData); } + + /** Test positioned read with ByteBuffers. */ + @Test(timeout=120000) + public void testPositionedReadWithByteBuffer() throws Exception { + try (OutputStream out = getOutputStream(defaultBufferSize)) { + writeData(out); + } + + try (InputStream in = getInputStream(defaultBufferSize)) { + // Pos: 1/3 dataLen + positionedReadCheckWithByteBuffer(in, dataLen / 3); + + // Pos: 1/2 dataLen + positionedReadCheckWithByteBuffer(in, dataLen / 2); + } + } + + private void positionedReadCheckWithByteBuffer(InputStream in, int pos) + throws Exception { + ByteBuffer result = ByteBuffer.allocate(dataLen); + int n = readAll(in, pos, result); + + Assert.assertEquals(dataLen, n + pos); + byte[] readData = new byte[n]; + System.arraycopy(result.array(), 0, readData, 0, n); + byte[] expectedData = new byte[n]; + System.arraycopy(data, pos, expectedData, 0, n); + Assert.assertArrayEquals(readData, expectedData); + } /** Test read fully */ @Test(timeout=120000) @@ -505,12 +577,40 @@ private void byteBufferReadCheck(InputStream in, ByteBuffer buf, System.arraycopy(data, 0, expectedData, 0, n); Assert.assertArrayEquals(readData, expectedData); } + + private void byteBufferPreadCheck(InputStream in, ByteBuffer buf, + int bufPos) throws Exception { + // Test reading from position 0 + buf.position(bufPos); + int n = ((ByteBufferPositionedReadable) in).read(0, buf); + Assert.assertEquals(bufPos + n, buf.position()); + byte[] readData = new byte[n]; + buf.rewind(); + buf.position(bufPos); + buf.get(readData); + byte[] expectedData = new byte[n]; + System.arraycopy(data, 0, expectedData, 0, n); + Assert.assertArrayEquals(readData, expectedData); + + // Test reading from half way through the data + buf.position(bufPos); + n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf); + Assert.assertEquals(bufPos + n, buf.position()); + readData = new byte[n]; + buf.rewind(); + buf.position(bufPos); + buf.get(readData); + expectedData = new byte[n]; + System.arraycopy(data, dataLen / 2, expectedData, 0, n); + Assert.assertArrayEquals(readData, expectedData); + } /** Test byte buffer read with different buffer size. */ @Test(timeout=120000) public void testByteBufferRead() throws Exception { - OutputStream out = getOutputStream(defaultBufferSize); - writeData(out); + try (OutputStream out = getOutputStream(defaultBufferSize)) { + writeData(out); + } // Default buffer size, initial buffer position is 0 InputStream in = getInputStream(defaultBufferSize); @@ -560,6 +660,53 @@ public void testByteBufferRead() throws Exception { byteBufferReadCheck(in, buf, 11); in.close(); } + + /** Test byte buffer pread with different buffer size. */ + @Test(timeout=120000) + public void testByteBufferPread() throws Exception { + try (OutputStream out = getOutputStream(defaultBufferSize)) { + writeData(out); + } + + try (InputStream defaultBuf = getInputStream(defaultBufferSize); + InputStream smallBuf = getInputStream(smallBufferSize)) { + + ByteBuffer buf = ByteBuffer.allocate(dataLen + 100); + + // Default buffer size, initial buffer position is 0 + byteBufferPreadCheck(defaultBuf, buf, 0); + + // Default buffer size, initial buffer position is not 0 + buf.clear(); + byteBufferPreadCheck(defaultBuf, buf, 11); + + // Small buffer size, initial buffer position is 0 + buf.clear(); + byteBufferPreadCheck(smallBuf, buf, 0); + + // Small buffer size, initial buffer position is not 0 + buf.clear(); + byteBufferPreadCheck(smallBuf, buf, 11); + + // Test with direct ByteBuffer + buf = ByteBuffer.allocateDirect(dataLen + 100); + + // Direct buffer, default buffer size, initial buffer position is 0 + byteBufferPreadCheck(defaultBuf, buf, 0); + + // Direct buffer, default buffer size, initial buffer position is not 0 + buf.clear(); + byteBufferPreadCheck(defaultBuf, buf, 11); + + // Direct buffer, small buffer size, initial buffer position is 0 + buf.clear(); + byteBufferPreadCheck(smallBuf, buf, 0); + + // Direct buffer, small buffer size, initial buffer position is not 0 + buf.clear(); + byteBufferPreadCheck(smallBuf, buf, 11); + } + } @Test(timeout=120000) public void testCombinedOp() throws Exception { @@ -797,5 +944,23 @@ public void testUnbuffer() throws Exception { // The close will be called when exiting this try-with-resource block } } + + // Test ByteBuffer pread + try (InputStream in = getInputStream(smallBufferSize)) { + if (in instanceof ByteBufferPositionedReadable) { + ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in; + + // Test unbuffer after pread + byteBufferPreadCheck(bbpin); + ((CanUnbuffer) in).unbuffer(); + + // Test pread again after unbuffer + byteBufferPreadCheck(bbpin); + + // Test close after unbuffer + ((CanUnbuffer) in).unbuffer(); + // The close will be called when exiting this try-with-resource block + } + } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java index cd7391a02c..8bcf46eacf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java @@ -26,6 +26,7 @@ import java.util.EnumSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; @@ -180,7 +181,7 @@ static class FakeInputStream extends InputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer, - StreamCapabilities { + StreamCapabilities, ByteBufferPositionedReadable { private final byte[] oneByteBuf = new byte[1]; private int pos = 0; private final byte[] data; @@ -303,6 +304,32 @@ public int read(long position, byte[] b, int off, int len) return -1; } + @Override + public int read(long position, ByteBuffer buf) throws IOException { + if (buf == null) { + throw new NullPointerException(); + } else if (!buf.hasRemaining()) { + return 0; + } + + if (position > length) { + throw new IOException("Cannot read after EOF."); + } + if (position < 0) { + throw new IOException("Cannot read to negative offset."); + } + + checkStream(); + + if (position < length) { + int n = (int) Math.min(buf.remaining(), length - position); + buf.put(data, (int) position, n); + return n; + } + + return -1; + } + @Override public void readFully(long position, byte[] b, int off, int len) throws IOException { @@ -378,6 +405,8 @@ public boolean hasCapability(String capability) { case StreamCapabilities.READAHEAD: case StreamCapabilities.DROPBEHIND: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.READBYTEBUFFER: + case StreamCapabilities.PREADBYTEBUFFER: return true; default: return false; @@ -439,7 +468,9 @@ public void testHasCapability() throws Exception { new String[] { StreamCapabilities.DROPBEHIND, StreamCapabilities.READAHEAD, - StreamCapabilities.UNBUFFER + StreamCapabilities.UNBUFFER, + StreamCapabilities.READBYTEBUFFER, + StreamCapabilities.PREADBYTEBUFFER }, new String[] { StreamCapabilities.HFLUSH, diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java index bb3fd7a68d..e7d922e78a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java @@ -90,11 +90,21 @@ protected InputStream getInputStream(int bufferSize, byte[] key, byte[] iv) @Override @Test(timeout=10000) public void testByteBufferRead() throws Exception {} + + @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable") + @Override + @Test(timeout=10000) + public void testPositionedReadWithByteBuffer() throws IOException {} @Ignore("ChecksumFSOutputSummer doesn't support Syncable") @Override @Test(timeout=10000) public void testSyncable() throws IOException {} + + @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable") + @Override + @Test(timeout=10000) + public void testByteBufferPread() throws IOException {} @Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read") @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java index 7e300777a3..036706f435 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java @@ -91,6 +91,11 @@ public void testSyncable() throws IOException {} @Test(timeout=10000) public void testPositionedRead() throws IOException {} + @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable") + @Override + @Test(timeout=10000) + public void testPositionedReadWithByteBuffer() throws IOException {} + @Ignore("Wrapped stream doesn't support ReadFully") @Override @Test(timeout=10000) @@ -105,6 +110,11 @@ public void testSeek() throws IOException {} @Override @Test(timeout=10000) public void testByteBufferRead() throws IOException {} + + @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable") + @Override + @Test(timeout=10000) + public void testByteBufferPread() throws IOException {} @Ignore("Wrapped stream doesn't support ByteBufferRead, Seek") @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index a3e2ad5afc..22622956cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -46,6 +46,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferUtil; import org.apache.hadoop.fs.CanSetDropBehind; @@ -84,7 +85,6 @@ import org.apache.hadoop.util.IdentityHashStore; import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.StringUtils; -import org.apache.htrace.core.SpanId; import com.google.common.annotations.VisibleForTesting; @@ -99,7 +99,8 @@ @InterfaceAudience.Private public class DFSInputStream extends FSInputStream implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities { + HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities, + ByteBufferPositionedReadable { @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; private long hedgedReadOpsLoopNumForTesting = 0; @@ -1561,6 +1562,14 @@ public void reset() throws IOException { throw new IOException("Mark/reset not supported"); } + @Override + public int read(long position, final ByteBuffer buf) throws IOException { + if (!buf.hasRemaining()) { + return 0; + } + return pread(position, buf); + } + /** Utility class to encapsulate data node info and its address. */ static final class DNAddrPair { final DatanodeInfo info; @@ -1780,6 +1789,7 @@ public boolean hasCapability(String capability) { case StreamCapabilities.DROPBEHIND: case StreamCapabilities.UNBUFFER: case StreamCapabilities.READBYTEBUFFER: + case StreamCapabilities.PREADBYTEBUFFER: return true; default: return false; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h index 0eab9a68ae..f00326317f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h @@ -49,6 +49,24 @@ extern "C" { */ void hdfsFileDisableDirectRead(struct hdfsFile_internal *file); + /** + * Determine if a file is using the "direct pread" optimization. + * + * @param file The HDFS file + * @return 1 if the file is using the direct pread optimization, + * 0 otherwise. + */ + int hdfsFileUsesDirectPread(struct hdfsFile_internal *file); + + /** + * Disable the direct pread optimization for a file. + * + * This is mainly provided for unit testing purposes. + * + * @param file The HDFS file + */ + void hdfsFileDisableDirectPread(struct hdfsFile_internal *file); + /** * Disable domain socket security checks. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c index 1cd497b112..ebf0dd7e1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c @@ -88,9 +88,9 @@ int main(int argc, char **argv) { const char *userPath = "/tmp/usertestfile.txt"; char buffer[32], buffer2[256], rdbuffer[32]; - tSize num_written_bytes, num_read_bytes; + tSize num_written_bytes, num_read_bytes, num_pread_bytes; hdfsFS fs, lfs; - hdfsFile writeFile, readFile, localFile, appendFile, userFile; + hdfsFile writeFile, readFile, preadFile, localFile, appendFile, userFile; tOffset currentPos, seekPos; int exists, totalResult, result, numEntries, i, j; const char *resp; @@ -145,7 +145,7 @@ int main(int argc, char **argv) { } { - //Write tests + // Write tests writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); if(!writeFile) { @@ -188,7 +188,7 @@ int main(int argc, char **argv) { } { - //Read tests + // Read tests exists = hdfsExists(fs, readPath); @@ -250,6 +250,7 @@ int main(int argc, char **argv) { } fprintf(stderr, "Read (direct) following %d bytes:\n%s\n", num_read_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); if (hdfsSeek(fs, readFile, 0L)) { fprintf(stderr, "Failed to seek to file start!\n"); shutdown_and_exit(cl, -1); @@ -259,18 +260,28 @@ int main(int argc, char **argv) { // read path hdfsFileDisableDirectRead(readFile); - num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, - sizeof(buffer)); - fprintf(stderr, "Read following %d bytes:\n%s\n", - num_read_bytes, buffer); + if (hdfsFileUsesDirectRead(readFile)) { + fprintf(stderr, "Disabled direct reads, but it is still enabled"); + shutdown_and_exit(cl, -1); + } + if (!hdfsFileUsesDirectPread(readFile)) { + fprintf(stderr, "Disabled direct reads, but direct preads was " + "disabled as well"); + shutdown_and_exit(cl, -1); + } + + num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, + sizeof(buffer)); + if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { + fprintf(stderr, "Failed to read. Expected %s but got %s (%d bytes)\n", + fileContents, buffer, num_read_bytes); + shutdown_and_exit(cl, -1); + } + fprintf(stderr, "Read following %d bytes:\n%s\n", + num_read_bytes, buffer); memset(buffer, 0, strlen(fileContents + 1)); - num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer, - sizeof(buffer)); - fprintf(stderr, "Read following %d bytes:\n%s\n", - num_read_bytes, buffer); - hdfsCloseFile(fs, readFile); // Test correct behaviour for unsupported filesystems @@ -295,6 +306,115 @@ int main(int argc, char **argv) { hdfsCloseFile(lfs, localFile); } + { + // Pread tests + + exists = hdfsExists(fs, readPath); + + if (exists) { + fprintf(stderr, "Failed to validate existence of %s\n", readPath); + shutdown_and_exit(cl, -1); + } + + preadFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0); + if (!preadFile) { + fprintf(stderr, "Failed to open %s for reading!\n", readPath); + shutdown_and_exit(cl, -1); + } + + if (!hdfsFileIsOpenForRead(preadFile)) { + fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file " + "with O_RDONLY, and it did not show up as 'open for " + "read'\n"); + shutdown_and_exit(cl, -1); + } + + fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, preadFile)); + + num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer)); + if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { + fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n", + fileContents, buffer, num_read_bytes); + shutdown_and_exit(cl, -1); + } + fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", + num_pread_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + if (hdfsTell(fs, preadFile) != 0) { + fprintf(stderr, "Pread changed position of file\n"); + shutdown_and_exit(cl, -1); + } + + // Test pread midway through the file rather than at the beginning + const char *fileContentsChunk = "World!"; + num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer)); + if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) { + fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n", + fileContentsChunk, buffer, num_read_bytes); + shutdown_and_exit(cl, -1); + } + fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + if (hdfsTell(fs, preadFile) != 0) { + fprintf(stderr, "Pread changed position of file\n"); + shutdown_and_exit(cl, -1); + } + + // Disable the direct pread path so that we really go through the slow + // read path + hdfsFileDisableDirectPread(preadFile); + + if (hdfsFileUsesDirectPread(preadFile)) { + fprintf(stderr, "Disabled direct preads, but it is still enabled"); + shutdown_and_exit(cl, -1); + } + + if (!hdfsFileUsesDirectRead(preadFile)) { + fprintf(stderr, "Disabled direct preads, but direct read was " + "disabled as well"); + shutdown_and_exit(cl, -1); + } + + num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer)); + if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { + fprintf(stderr, "Failed to pread. Expected %s but got %s (%d bytes)\n", + fileContents, buffer, num_pread_bytes); + shutdown_and_exit(cl, -1); + } + fprintf(stderr, "Pread following %d bytes:\n%s\n", num_pread_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + if (hdfsTell(fs, preadFile) != 0) { + fprintf(stderr, "Pread changed position of file\n"); + shutdown_and_exit(cl, -1); + } + + num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer)); + if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) { + fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n", + fileContentsChunk, buffer, num_read_bytes); + shutdown_and_exit(cl, -1); + } + fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + if (hdfsTell(fs, preadFile) != 0) { + fprintf(stderr, "Pread changed position of file\n"); + shutdown_and_exit(cl, -1); + } + + hdfsCloseFile(fs, preadFile); + + // Test correct behaviour for unsupported filesystems + localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0); + + if (hdfsFileUsesDirectPread(localFile)) { + fprintf(stderr, "Direct pread support incorrectly detected for local " + "filesystem\n"); + shutdown_and_exit(cl, -1); + } + + hdfsCloseFile(lfs, localFile); + } + totalResult = 0; result = 0; { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c index e212f2198f..2fd05929ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c @@ -40,8 +40,23 @@ // Bit fields for hdfsFile_internal flags #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0) +#define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<1) +/** + * Reads bytes using the read(ByteBuffer) API. By using Java + * DirectByteBuffers we can avoid copying the bytes onto the Java heap. + * Instead the data will be directly copied from kernel space to the C heap. + */ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length); + +/** + * Reads bytes using the read(long, ByteBuffer) API. By using Java + * DirectByteBuffers we can avoid copying the bytes onto the Java heap. + * Instead the data will be directly copied from kernel space to the C heap. + */ +tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, + tSize length); + static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo); /** @@ -285,7 +300,7 @@ int hdfsFileIsOpenForWrite(hdfsFile file) int hdfsFileUsesDirectRead(hdfsFile file) { - return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ); + return (file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) != 0; } void hdfsFileDisableDirectRead(hdfsFile file) @@ -293,6 +308,17 @@ void hdfsFileDisableDirectRead(hdfsFile file) file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; } +int hdfsFileUsesDirectPread(hdfsFile file) +{ + return (file->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) != 0; +} + +void hdfsFileDisableDirectPread(hdfsFile file) +{ + file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_PREAD; +} + + int hdfsDisableDomainSocketSecurity(void) { jthrowable jthr; @@ -985,6 +1011,62 @@ int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld, return 0; } +/** + * Delegates to FsDataInputStream#hasCapability(String). Used to check if a + * given input stream supports certain methods, such as + * ByteBufferReadable#read(ByteBuffer). + * + * @param jFile the FsDataInputStream to call hasCapability on + * @param capability the name of the capability to query; for a full list of + * possible values see StreamCapabilities + * + * @return true if the given jFile has the given capability, false otherwise + * + * @see org.apache.hadoop.fs.StreamCapabilities + */ +static int hdfsHasStreamCapability(jobject jFile, + const char *capability) { + int ret = 0; + jthrowable jthr = NULL; + jvalue jVal; + jstring jCapabilityString = NULL; + + /* Get the JNIEnv* corresponding to current thread */ + JNIEnv* env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return 0; + } + + jthr = newJavaStr(env, capability, &jCapabilityString); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsHasStreamCapability(%s): newJavaStr", capability); + goto done; + } + jthr = invokeMethod(env, &jVal, INSTANCE, jFile, + JC_FS_DATA_INPUT_STREAM, "hasCapability", "(Ljava/lang/String;)Z", + jCapabilityString); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsHasStreamCapability(%s): FSDataInputStream#hasCapability", + capability); + goto done; + } + +done: + destroyLocalReference(env, jthr); + destroyLocalReference(env, jCapabilityString); + if (ret) { + errno = ret; + return 0; + } + if (jVal.z == JNI_TRUE) { + return 1; + } + return 0; +} + static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, int32_t bufferSize, int16_t replication, int64_t blockSize) { @@ -995,7 +1077,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, return f{is|os}; */ int accmode = flags & O_ACCMODE; - jstring jStrBufferSize = NULL, jStrReplication = NULL, jCapabilityString = NULL; + jstring jStrBufferSize = NULL, jStrReplication = NULL; jobject jConfiguration = NULL, jPath = NULL, jFile = NULL; jobject jFS = (jobject)fs; jthrowable jthr; @@ -1024,13 +1106,15 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, errno = ENOTSUP; return NULL; } else { - fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n", accmode); + fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n", + accmode); errno = EINVAL; return NULL; } if ((flags & O_CREAT) && (flags & O_EXCL)) { - fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); + fprintf(stderr, + "WARN: hdfs does not truly support O_CREATE && O_EXCL\n"); } if (accmode == O_RDONLY) { @@ -1153,34 +1237,26 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, file->flags = 0; if ((flags & O_WRONLY) == 0) { - // Check the StreamCapabilities of jFile to see if we can do direct reads - jthr = newJavaStr(env, "in:readbytebuffer", &jCapabilityString); - if (jthr) { - ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsOpenFile(%s): newJavaStr", path); - goto done; - } - jthr = invokeMethod(env, &jVal, INSTANCE, jFile, - JC_FS_DATA_INPUT_STREAM, "hasCapability", - "(Ljava/lang/String;)Z", jCapabilityString); - if (jthr) { - ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, - "hdfsOpenFile(%s): FSDataInputStream#hasCapability", path); - goto done; - } - if (jVal.z) { + // Check the StreamCapabilities of jFile to see if we can do direct + // reads + if (hdfsHasStreamCapability(jFile, "in:readbytebuffer")) { file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; } + + // Check the StreamCapabilities of jFile to see if we can do direct + // preads + if (hdfsHasStreamCapability(jFile, "in:preadbytebuffer")) { + file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD; + } } ret = 0; done: destroyLocalReference(env, jStrBufferSize); destroyLocalReference(env, jStrReplication); - destroyLocalReference(env, jConfiguration); - destroyLocalReference(env, jPath); + destroyLocalReference(env, jConfiguration); + destroyLocalReference(env, jPath); destroyLocalReference(env, jFile); - destroyLocalReference(env, jCapabilityString); if (ret) { if (file) { if (file->file) { @@ -1385,6 +1461,13 @@ static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f, return 0; } +/** + * If the underlying stream supports the ByteBufferReadable interface then + * this method will transparently use read(ByteBuffer). This can help + * improve performance as it avoids unnecessarily copying data on to the Java + * heap. Instead the data will be directly copied from kernel space to the C + * heap. + */ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { jobject jInputStream; @@ -1459,12 +1542,11 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) return jVal.i; } -// Reads using the read(ByteBuffer) API, which does fewer copies tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { // JAVA EQUIVALENT: - // ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer - // fis.read(bbuffer); + // ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer + // fis.read(buf); jobject jInputStream; jvalue jVal; @@ -1499,9 +1581,25 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length) "readDirect: FSDataInputStream#read"); return -1; } - return (jVal.i < 0) ? 0 : jVal.i; + // Reached EOF, return 0 + if (jVal.i < 0) { + return 0; + } + // 0 bytes read, return error + if (jVal.i == 0) { + errno = EINTR; + return -1; + } + return jVal.i; } +/** + * If the underlying stream supports the ByteBufferPositionedReadable + * interface then this method will transparently use read(long, ByteBuffer). + * This can help improve performance as it avoids unnecessarily copying data + * on to the Java heap. Instead the data will be directly copied from kernel + * space to the C heap. + */ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, tSize length) { @@ -1521,6 +1619,10 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, return -1; } + if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) { + return preadDirect(fs, f, position, buffer, length); + } + env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; @@ -1571,6 +1673,60 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, return jVal.i; } +tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, + tSize length) +{ + // JAVA EQUIVALENT: + // ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer + // fis.read(position, buf); + + jvalue jVal; + jthrowable jthr; + jobject bb; + + //Get the JNIEnv* corresponding to current thread + JNIEnv* env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return -1; + } + + //Error checking... make sure that this file is 'readable' + if (f->type != HDFS_STREAM_INPUT) { + fprintf(stderr, "Cannot read from a non-InputStream object!\n"); + errno = EINVAL; + return -1; + } + + //Read the requisite bytes + bb = (*env)->NewDirectByteBuffer(env, buffer, length); + if (bb == NULL) { + errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, + "readDirect: NewDirectByteBuffer"); + return -1; + } + + jthr = invokeMethod(env, &jVal, INSTANCE, f->file, + JC_FS_DATA_INPUT_STREAM, "read", "(JLjava/nio/ByteBuffer;)I", + position, bb); + destroyLocalReference(env, bb); + if (jthr) { + errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "preadDirect: FSDataInputStream#read"); + return -1; + } + // Reached EOF, return 0 + if (jVal.i < 0) { + return 0; + } + // 0 bytes read, return error + if (jVal.i == 0) { + errno = EINTR; + return -1; + } + return jVal.i; +} + tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length) { // JAVA EQUIVALENT diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java new file mode 100644 index 0000000000..4547db1c98 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * This class tests the DFS positional read functionality on a single node + * mini-cluster. These tests are inspired from {@link TestPread}. The tests + * are much less comprehensive than other pread tests because pread already + * internally uses {@link ByteBuffer}s. + */ +public class TestByteBufferPread { + + private static MiniDFSCluster cluster; + private static FileSystem fs; + private static byte[] fileContents; + private static Path testFile; + private static Random rand; + + private static final long SEED = 0xDEADBEEFL; + private static final int BLOCK_SIZE = 4096; + private static final int FILE_SIZE = 12 * BLOCK_SIZE; + + @BeforeClass + public static void setup() throws IOException { + // Setup the cluster with a small block size so we can create small files + // that span multiple blocks + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + fs = cluster.getFileSystem(); + + // Create a test file that spans 12 blocks, and contains a bunch of random + // bytes + fileContents = new byte[FILE_SIZE]; + rand = new Random(SEED); + rand.nextBytes(fileContents); + testFile = new Path("/byte-buffer-pread-test.dat"); + try (FSDataOutputStream out = fs.create(testFile, (short) 3)) { + out.write(fileContents); + } + } + + /** + * Test preads with {@link java.nio.HeapByteBuffer}s. + */ + @Test + public void testPreadWithHeapByteBuffer() throws IOException { + testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + } + + /** + * Test preads with {@link java.nio.DirectByteBuffer}s. + */ + @Test + public void testPreadWithDirectByteBuffer() throws IOException { + testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + } + + /** + * Reads the entire testFile using the pread API and validates that its + * contents are properly loaded into the supplied {@link ByteBuffer}. + */ + private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + try (FSDataInputStream in = fs.open(testFile)) { + while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead, buffer.position()); + } + + // Make sure the buffer is full + assertFalse(buffer.hasRemaining()); + // Make sure the contents of the read buffer equal the contents of the + // file + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, fileContents); + buffer.position(buffer.limit()); + } + } + + /** + * Attempts to read the testFile into a {@link ByteBuffer} that is already + * full, and validates that doing so does not change the contents of the + * supplied {@link ByteBuffer}. + */ + private void testPreadWithFullByteBuffer(ByteBuffer buffer) + throws IOException { + // Load some dummy data into the buffer + byte[] existingBufferBytes = new byte[FILE_SIZE]; + rand.nextBytes(existingBufferBytes); + buffer.put(existingBufferBytes); + // Make sure the buffer is full + assertFalse(buffer.hasRemaining()); + + try (FSDataInputStream in = fs.open(testFile)) { + // Attempt to read into the buffer, 0 bytes should be read since the + // buffer is full + assertEquals(0, in.read(buffer)); + + // Double check the buffer is still full and its contents have not + // changed + assertFalse(buffer.hasRemaining()); + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, existingBufferBytes); + } + } + + /** + * Reads half of the testFile into the {@link ByteBuffer} by setting a + * {@link ByteBuffer#limit} on the buffer. Validates that only half of the + * testFile is loaded into the buffer. + */ + private void testPreadWithLimitedByteBuffer( + ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + // Set the buffer limit to half the size of the file + buffer.limit(FILE_SIZE / 2); + + try (FSDataInputStream in = fs.open(testFile)) { + while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead, buffer.position()); + } + + // Since we set the buffer limit to half the size of the file, we should + // have only read half of the file into the buffer + assertEquals(totalBytesRead, FILE_SIZE / 2); + // Check that the buffer is full and the contents equal the first half of + // the file + assertFalse(buffer.hasRemaining()); + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE / 2]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, + Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2)); + } + } + + /** + * Reads half of the testFile into the {@link ByteBuffer} by setting the + * {@link ByteBuffer#position} the half the size of the file. Validates that + * only half of the testFile is loaded into the buffer. + */ + private void testPreadWithPositionedByteBuffer( + ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + // Set the buffer position to half the size of the file + buffer.position(FILE_SIZE / 2); + + try (FSDataInputStream in = fs.open(testFile)) { + while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position()); + } + + // Since we set the buffer position to half the size of the file, we + // should have only read half of the file into the buffer + assertEquals(totalBytesRead, FILE_SIZE / 2); + // Check that the buffer is full and the contents equal the first half of + // the file + assertFalse(buffer.hasRemaining()); + buffer.position(FILE_SIZE / 2); + byte[] bufferContents = new byte[FILE_SIZE / 2]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, + Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2)); + } + } + + /** + * Reads half of the testFile into the {@link ByteBuffer} by specifying a + * position for the pread API that is half of the file size. Validates that + * only half of the testFile is loaded into the buffer. + */ + private void testPositionedPreadWithByteBuffer( + ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + + try (FSDataInputStream in = fs.open(testFile)) { + // Start reading from halfway through the file + while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2, + buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead, buffer.position()); + } + + // Since we starting reading halfway through the file, the buffer should + // only be half full + assertEquals(totalBytesRead, FILE_SIZE / 2); + assertEquals(buffer.position(), FILE_SIZE / 2); + assertTrue(buffer.hasRemaining()); + // Check that the buffer contents equal the second half of the file + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE / 2]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, + Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE)); + } + } + + @AfterClass + public static void shutdown() throws IOException { + try { + fs.delete(testFile, false); + fs.close(); + } finally { + cluster.shutdown(true); + } + } +}