diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java index 80364ce7fe..9e601e26cf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java @@ -330,8 +330,8 @@ public class CryptoInputStream extends FilterInputStream implements throws IOException { checkStream(); if (!(in instanceof PositionedReadable)) { - throw new UnsupportedOperationException("This stream does not support " + - "positioned read."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support positioned read."); } final int n = ((PositionedReadable) in).read(position, buffer, offset, length); @@ -351,8 +351,8 @@ public class CryptoInputStream extends FilterInputStream implements throws IOException { checkStream(); if (!(in instanceof ByteBufferPositionedReadable)) { - throw new UnsupportedOperationException("This stream does not support " + - "positioned reads with byte buffers."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support positioned reads with byte buffers."); } int bufPos = buf.position(); final int n = ((ByteBufferPositionedReadable) in).read(position, buf); @@ -363,7 +363,27 @@ public class CryptoInputStream extends FilterInputStream implements return n; } - + + /** + * Positioned readFully using {@link ByteBuffer}s. This method is thread-safe. + */ + @Override + public void readFully(long position, final ByteBuffer buf) + throws IOException { + checkStream(); + if (!(in instanceof ByteBufferPositionedReadable)) { + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support positioned reads with byte buffers."); + } + int bufPos = buf.position(); + ((ByteBufferPositionedReadable) in).readFully(position, buf); + final int n = buf.position() - bufPos; + if (n > 0) { + // This operation does not change the current offset of the file + decrypt(position, buf, n, bufPos); + } + } + /** * Decrypt length bytes in buffer starting at offset. Output is also put * into buffer starting at offset. It is thread-safe. @@ -480,8 +500,8 @@ public class CryptoInputStream extends FilterInputStream implements throws IOException { checkStream(); if (!(in instanceof PositionedReadable)) { - throw new UnsupportedOperationException("This stream does not support " + - "positioned readFully."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support positioned readFully."); } ((PositionedReadable) in).readFully(position, buffer, offset, length); if (length > 0) { @@ -513,8 +533,8 @@ public class CryptoInputStream extends FilterInputStream implements } } else { if (!(in instanceof Seekable)) { - throw new UnsupportedOperationException("This stream does not " + - "support seek."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support seek."); } ((Seekable) in).seek(pos); resetStreamOffset(pos); @@ -672,8 +692,8 @@ public class CryptoInputStream extends FilterInputStream implements "Cannot seek to negative offset."); checkStream(); if (!(in instanceof Seekable)) { - throw new UnsupportedOperationException("This stream does not support " + - "seekToNewSource."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support seekToNewSource."); } boolean result = ((Seekable) in).seekToNewSource(targetPos); resetStreamOffset(targetPos); @@ -687,16 +707,16 @@ public class CryptoInputStream extends FilterInputStream implements checkStream(); if (outBuffer.remaining() > 0) { if (!(in instanceof Seekable)) { - throw new UnsupportedOperationException("This stream does not " + - "support seek."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support seek."); } // Have some decrypted data unread, need to reset. ((Seekable) in).seek(getPos()); resetStreamOffset(getPos()); } if (!(in instanceof HasEnhancedByteBufferAccess)) { - throw new UnsupportedOperationException("This stream does not support " + - "enhanced byte buffer access."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support enhanced byte buffer access."); } final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in). read(bufferPool, maxLength, opts); @@ -714,8 +734,8 @@ public class CryptoInputStream extends FilterInputStream implements @Override public void releaseBuffer(ByteBuffer buffer) { if (!(in instanceof HasEnhancedByteBufferAccess)) { - throw new UnsupportedOperationException("This stream does not support " + - "release buffer."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support release buffer."); } ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer); } @@ -724,8 +744,8 @@ public class CryptoInputStream extends FilterInputStream implements public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException { if (!(in instanceof CanSetReadahead)) { - throw new UnsupportedOperationException("This stream does not support " + - "setting the readahead caching strategy."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support setting the readahead caching strategy."); } ((CanSetReadahead) in).setReadahead(readahead); } @@ -734,8 +754,9 @@ public class CryptoInputStream extends FilterInputStream implements public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException { if (!(in instanceof CanSetReadahead)) { - throw new UnsupportedOperationException("This stream does not " + - "support setting the drop-behind caching setting."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " stream does not support setting the drop-behind caching" + + " setting."); } ((CanSetDropBehind) in).setDropBehind(dropCache); } @@ -842,8 +863,8 @@ public class CryptoInputStream extends FilterInputStream implements case StreamCapabilities.READBYTEBUFFER: case StreamCapabilities.PREADBYTEBUFFER: if (!(in instanceof StreamCapabilities)) { - throw new UnsupportedOperationException("This stream does not expose " + - "its stream capabilities."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not expose its stream capabilities."); } return ((StreamCapabilities) in).hasCapability(capability); default: diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java index d99ee1624e..f8282d88c4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.fs; +import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -55,6 +56,8 @@ public interface ByteBufferPositionedReadable { *

* Implementations should treat 0-length requests as legitimate, and must not * signal an error upon their receipt. + *

+ * This does not change the current offset of a file, and is thread-safe. * * @param position position within file * @param buf the ByteBuffer to receive the results of the read operation. @@ -63,4 +66,25 @@ public interface ByteBufferPositionedReadable { * @throws IOException if there is some error performing the read */ int read(long position, ByteBuffer buf) throws IOException; + + /** + * Reads {@code buf.remaining()} bytes into buf from a given position in + * the file or until the end of the data was reached before the read + * operation completed. Callers should use {@code buf.limit(...)} to + * control the size of the desired read and {@code buf.position(...)} to + * control the offset into the buffer the data should be written to. + *

+ * This operation provides similar semantics to + * {@link #read(long, ByteBuffer)}, the difference is that this method is + * guaranteed to read data until the {@link ByteBuffer} is full, or until + * the end of the data stream is reached. + * + * @param position position within file + * @param buf the ByteBuffer to receive the results of the read operation. + * @throws IOException if there is some error performing the read + * @throws EOFException the end of the data was reached before + * the read operation completed + * @see #read(long, ByteBuffer) + */ + void readFully(long position, ByteBuffer buf) throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index 066cc3d8b1..31f8297589 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -52,8 +52,8 @@ public class FSDataInputStream extends DataInputStream public FSDataInputStream(InputStream in) { super(in); if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) { - throw new IllegalArgumentException( - "In is not an instance of Seekable or PositionedReadable"); + throw new IllegalArgumentException(in.getClass().getCanonicalName() + + " is not an instance of Seekable or PositionedReadable"); } } @@ -150,7 +150,7 @@ public class FSDataInputStream extends DataInputStream } throw new UnsupportedOperationException("Byte-buffer read unsupported " + - "by input stream"); + "by " + in.getClass().getCanonicalName()); } @Override @@ -170,9 +170,8 @@ public class FSDataInputStream extends DataInputStream try { ((CanSetReadahead)in).setReadahead(readahead); } catch (ClassCastException e) { - throw new UnsupportedOperationException( - "this stream does not support setting the readahead " + - "caching strategy."); + throw new UnsupportedOperationException(in.getClass().getCanonicalName() + + " does not support setting the readahead caching strategy."); } } @@ -256,6 +255,16 @@ public class FSDataInputStream extends DataInputStream return ((ByteBufferPositionedReadable) in).read(position, buf); } throw new UnsupportedOperationException("Byte-buffer pread unsupported " + - "by input stream"); + "by " + in.getClass().getCanonicalName()); + } + + @Override + public void readFully(long position, ByteBuffer buf) throws IOException { + if (in instanceof ByteBufferPositionedReadable) { + ((ByteBufferPositionedReadable) in).readFully(position, buf); + } else { + throw new UnsupportedOperationException("Byte-buffer pread " + + "unsupported by " + in.getClass().getCanonicalName()); + } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java index 7463d6c3bc..64bb966b15 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java @@ -388,42 +388,41 @@ public abstract class CryptoStreamsTestBase { Assert.assertArrayEquals(readData, expectedData); } - /** Test read fully */ + /** Test read fully. */ @Test(timeout=120000) public void testReadFully() throws Exception { OutputStream out = getOutputStream(defaultBufferSize); writeData(out); - InputStream in = getInputStream(defaultBufferSize); - final int len1 = dataLen / 4; - // Read len1 bytes - byte[] readData = new byte[len1]; - readAll(in, readData, 0, len1); - byte[] expectedData = new byte[len1]; - System.arraycopy(data, 0, expectedData, 0, len1); - Assert.assertArrayEquals(readData, expectedData); - - // Pos: 1/3 dataLen - readFullyCheck(in, dataLen / 3); - - // Read len1 bytes - readData = new byte[len1]; - readAll(in, readData, 0, len1); - expectedData = new byte[len1]; - System.arraycopy(data, len1, expectedData, 0, len1); - Assert.assertArrayEquals(readData, expectedData); - - // Pos: 1/2 dataLen - readFullyCheck(in, dataLen / 2); - - // Read len1 bytes - readData = new byte[len1]; - readAll(in, readData, 0, len1); - expectedData = new byte[len1]; - System.arraycopy(data, 2 * len1, expectedData, 0, len1); - Assert.assertArrayEquals(readData, expectedData); - - in.close(); + try (InputStream in = getInputStream(defaultBufferSize)) { + final int len1 = dataLen / 4; + // Read len1 bytes + byte[] readData = new byte[len1]; + readAll(in, readData, 0, len1); + byte[] expectedData = new byte[len1]; + System.arraycopy(data, 0, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + + // Pos: 1/3 dataLen + readFullyCheck(in, dataLen / 3); + + // Read len1 bytes + readData = new byte[len1]; + readAll(in, readData, 0, len1); + expectedData = new byte[len1]; + System.arraycopy(data, len1, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + + // Pos: 1/2 dataLen + readFullyCheck(in, dataLen / 2); + + // Read len1 bytes + readData = new byte[len1]; + readAll(in, readData, 0, len1); + expectedData = new byte[len1]; + System.arraycopy(data, 2 * len1, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + } } private void readFullyCheck(InputStream in, int pos) throws Exception { @@ -441,6 +440,60 @@ public abstract class CryptoStreamsTestBase { } catch (EOFException e) { } } + + /** Test byte byffer read fully. */ + @Test(timeout=120000) + public void testByteBufferReadFully() throws Exception { + OutputStream out = getOutputStream(defaultBufferSize); + writeData(out); + + try (InputStream in = getInputStream(defaultBufferSize)) { + final int len1 = dataLen / 4; + // Read len1 bytes + byte[] readData = new byte[len1]; + readAll(in, readData, 0, len1); + byte[] expectedData = new byte[len1]; + System.arraycopy(data, 0, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + + // Pos: 1/3 dataLen + byteBufferReadFullyCheck(in, dataLen / 3); + + // Read len1 bytes + readData = new byte[len1]; + readAll(in, readData, 0, len1); + expectedData = new byte[len1]; + System.arraycopy(data, len1, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + + // Pos: 1/2 dataLen + byteBufferReadFullyCheck(in, dataLen / 2); + + // Read len1 bytes + readData = new byte[len1]; + readAll(in, readData, 0, len1); + expectedData = new byte[len1]; + System.arraycopy(data, 2 * len1, expectedData, 0, len1); + Assert.assertArrayEquals(readData, expectedData); + } + } + + private void byteBufferReadFullyCheck(InputStream in, int pos) + throws Exception { + ByteBuffer result = ByteBuffer.allocate(dataLen - pos); + ((ByteBufferPositionedReadable) in).readFully(pos, result); + + byte[] expectedData = new byte[dataLen - pos]; + System.arraycopy(data, pos, expectedData, 0, dataLen - pos); + Assert.assertArrayEquals(result.array(), expectedData); + + result = ByteBuffer.allocate(dataLen); // Exceeds maximum length + try { + ((ByteBufferPositionedReadable) in).readFully(pos, result); + Assert.fail("Read fully exceeds maximum length should fail."); + } catch (EOFException e) { + } + } /** Test seek to different position. */ @Test(timeout=120000) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java index 8bcf46eacf..73c6249612 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java @@ -330,6 +330,30 @@ public class TestCryptoStreams extends CryptoStreamsTestBase { return -1; } + @Override + public void readFully(long position, ByteBuffer buf) throws IOException { + if (buf == null) { + throw new NullPointerException(); + } else if (!buf.hasRemaining()) { + return; + } + + if (position > length) { + throw new IOException("Cannot read after EOF."); + } + if (position < 0) { + throw new IOException("Cannot read to negative offset."); + } + + checkStream(); + + if (position + buf.remaining() > length) { + throw new EOFException("Reach the end of stream."); + } + + buf.put(data, (int) position, buf.remaining()); + } + @Override public void readFully(long position, byte[] b, int off, int len) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java index e7d922e78a..8453889b53 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java @@ -95,6 +95,11 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase { @Override @Test(timeout=10000) public void testPositionedReadWithByteBuffer() throws IOException {} + + @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable") + @Override + @Test(timeout=10000) + public void testByteBufferReadFully() throws Exception {} @Ignore("ChecksumFSOutputSummer doesn't support Syncable") @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java index 036706f435..1bf1dd3e0d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java @@ -96,6 +96,11 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase { @Test(timeout=10000) public void testPositionedReadWithByteBuffer() throws IOException {} + @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable") + @Override + @Test(timeout=10000) + public void testByteBufferReadFully() throws Exception {} + @Ignore("Wrapped stream doesn't support ReadFully") @Override @Test(timeout=10000) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 22622956cc..f9e7d6ec70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; @@ -1570,6 +1571,19 @@ public class DFSInputStream extends FSInputStream return pread(position, buf); } + @Override + public void readFully(long position, final ByteBuffer buf) + throws IOException { + int nread = 0; + while (buf.hasRemaining()) { + int nbytes = read(position + nread, buf); + if (nbytes < 0) { + throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); + } + nread += nbytes; + } + } + /** Utility class to encapsulate data node info and its address. */ static final class DNAddrPair { final DatanodeInfo info; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c index ebf0dd7e1e..23fa2e5112 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c @@ -360,6 +360,25 @@ int main(int argc, char **argv) { shutdown_and_exit(cl, -1); } + // hdfsPreadFully (direct) test + if (hdfsPreadFully(fs, preadFile, 0, (void*)buffer, + (tSize)(strlen(fileContents) + 1))) { + fprintf(stderr, "Failed to preadFully (direct)."); + shutdown_and_exit(cl, -1); + } + if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { + fprintf(stderr, "Failed to preadFully (direct). Expected %s but " + "got %s\n", fileContents, buffer); + shutdown_and_exit(cl, -1); + } + fprintf(stderr, "PreadFully (direct) following %d bytes:\n%s\n", + num_pread_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + if (hdfsTell(fs, preadFile) != 0) { + fprintf(stderr, "PreadFully changed position of file\n"); + shutdown_and_exit(cl, -1); + } + // Disable the direct pread path so that we really go through the slow // read path hdfsFileDisableDirectPread(preadFile); @@ -388,19 +407,39 @@ int main(int argc, char **argv) { shutdown_and_exit(cl, -1); } + // Test pread midway through the file rather than at the beginning num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer)); if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) { - fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n", + fprintf(stderr, "Failed to pread. Expected %s but got %s (%d bytes)\n", fileContentsChunk, buffer, num_read_bytes); shutdown_and_exit(cl, -1); } - fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer); + fprintf(stderr, "Pread following %d bytes:\n%s\n", num_pread_bytes, buffer); memset(buffer, 0, strlen(fileContents + 1)); if (hdfsTell(fs, preadFile) != 0) { fprintf(stderr, "Pread changed position of file\n"); shutdown_and_exit(cl, -1); } + // hdfsPreadFully test + if (hdfsPreadFully(fs, preadFile, 0, (void*)buffer, + (tSize)(strlen(fileContents) + 1))) { + fprintf(stderr, "Failed to preadFully."); + shutdown_and_exit(cl, -1); + } + if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { + fprintf(stderr, "Failed to preadFully. Expected %s but got %s\n", + fileContents, buffer); + shutdown_and_exit(cl, -1); + } + fprintf(stderr, "PreadFully following %d bytes:\n%s\n", + num_pread_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + if (hdfsTell(fs, preadFile) != 0) { + fprintf(stderr, "PreadFully changed position of file\n"); + shutdown_and_exit(cl, -1); + } + hdfsCloseFile(fs, preadFile); // Test correct behaviour for unsupported filesystems diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c index e6b20109b4..220208676e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c @@ -57,6 +57,9 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length); tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length); +int preadFullyDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, + tSize length); + static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo); /** @@ -1645,6 +1648,7 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, "hdfsPread: NewByteArray"); return -1; } + jthr = invokeMethod(env, &jVal, INSTANCE, f->file, JC_FS_DATA_INPUT_STREAM, "read", "(J[BII)I", position, jbRarray, 0, length); @@ -1727,6 +1731,119 @@ tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, return jVal.i; } +/** + * Like hdfsPread, if the underlying stream supports the + * ByteBufferPositionedReadable interface then this method will transparently + * use readFully(long, ByteBuffer). + */ +int hdfsPreadFully(hdfsFS fs, hdfsFile f, tOffset position, + void* buffer, tSize length) { + JNIEnv* env; + jbyteArray jbRarray; + jthrowable jthr; + + if (length == 0) { + return 0; + } else if (length < 0) { + errno = EINVAL; + return -1; + } + if (!f || f->type == HDFS_STREAM_UNINITIALIZED) { + errno = EBADF; + return -1; + } + + if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) { + return preadFullyDirect(fs, f, position, buffer, length); + } + + env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return -1; + } + + //Error checking... make sure that this file is 'readable' + if (f->type != HDFS_STREAM_INPUT) { + fprintf(stderr, "Cannot read from a non-InputStream object!\n"); + errno = EINVAL; + return -1; + } + + // JAVA EQUIVALENT: + // byte [] bR = new byte[length]; + // fis.read(pos, bR, 0, length); + jbRarray = (*env)->NewByteArray(env, length); + if (!jbRarray) { + errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, + "hdfsPread: NewByteArray"); + return -1; + } + + jthr = invokeMethod(env, NULL, INSTANCE, f->file, + JC_FS_DATA_INPUT_STREAM, "readFully", "(J[BII)V", + position, jbRarray, 0, length); + if (jthr) { + destroyLocalReference(env, jbRarray); + errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsPread: FSDataInputStream#read"); + return -1; + } + + (*env)->GetByteArrayRegion(env, jbRarray, 0, length, buffer); + destroyLocalReference(env, jbRarray); + if ((*env)->ExceptionCheck(env)) { + errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, + "hdfsPread: GetByteArrayRegion"); + return -1; + } + return 0; +} + +int preadFullyDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, + tSize length) +{ + // JAVA EQUIVALENT: + // ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer + // fis.read(position, buf); + + jthrowable jthr; + jobject bb; + + //Get the JNIEnv* corresponding to current thread + JNIEnv* env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return -1; + } + + //Error checking... make sure that this file is 'readable' + if (f->type != HDFS_STREAM_INPUT) { + fprintf(stderr, "Cannot read from a non-InputStream object!\n"); + errno = EINVAL; + return -1; + } + + //Read the requisite bytes + bb = (*env)->NewDirectByteBuffer(env, buffer, length); + if (bb == NULL) { + errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, + "readDirect: NewDirectByteBuffer"); + return -1; + } + + jthr = invokeMethod(env, NULL, INSTANCE, f->file, + JC_FS_DATA_INPUT_STREAM, "readFully", + "(JLjava/nio/ByteBuffer;)V", position, bb); + destroyLocalReference(env, bb); + if (jthr) { + errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "preadDirect: FSDataInputStream#read"); + return -1; + } + return 0; +} + tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length) { // JAVA EQUIVALENT diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h index 7e45634d4e..e58a6232d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h @@ -600,7 +600,8 @@ extern "C" { tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length); /** - * hdfsPread - Positional read of data from an open file. + * hdfsPread - Positional read of data from an open file. Reads up to the + * number of specified bytes in length. * @param fs The configured filesystem handle. * @param file The file handle. * @param position Position from which to read @@ -612,6 +613,24 @@ extern "C" { tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length); + /** + * hdfsPreadFully - Positional read of data from an open file. Reads the + * number of specified bytes in length, or until the end of the data is + * reached. Unlike hdfsRead and hdfsPread, this method does not return + * the number of bytes read because either (1) the entire length of the + * buffer is filled, or (2) the end of the file is reached. If the eof is + * reached, an exception is thrown and errno is set to EINTR. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param position Position from which to read + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsPreadFully(hdfsFS fs, hdfsFile file, tOffset position, + void* buffer, tSize length); + /** * hdfsWrite - Write data into an open file. diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c index 54d4cf651e..bda27b9a43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c @@ -317,6 +317,12 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, return ret; } +int hdfsPreadFully(hdfsFS fs, hdfsFile file, tOffset position, + void* buffer, tSize length) { + return libhdfs_hdfsPreadFully(fs->libhdfsRep, file->libhdfsRep, position, + buffer, length); +} + tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) { return libhdfs_hdfsWrite(fs->libhdfsRep, file->libhdfsRep, buffer, length); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h index b90776893f..0d014341b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h @@ -47,6 +47,7 @@ #define hdfsTell libhdfs_hdfsTell #define hdfsRead libhdfs_hdfsRead #define hdfsPread libhdfs_hdfsPread +#define hdfsPreadFully libhdfs_hdfsPreadFully #define hdfsWrite libhdfs_hdfsWrite #define hdfsFlush libhdfs_hdfsFlush #define hdfsHFlush libhdfs_hdfsHFlush diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h index fce0e823dd..d46768c02a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h @@ -47,6 +47,7 @@ #undef hdfsTell #undef hdfsRead #undef hdfsPread +#undef hdfsPreadFully #undef hdfsWrite #undef hdfsFlush #undef hdfsHFlush diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h index d0411c2126..4b08d0556c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h @@ -47,6 +47,7 @@ #define hdfsTell libhdfspp_hdfsTell #define hdfsRead libhdfspp_hdfsRead #define hdfsPread libhdfspp_hdfsPread +#define hdfsPreadFully libhdfspp_hdfsPreadFully #define hdfsWrite libhdfspp_hdfsWrite #define hdfsFlush libhdfspp_hdfsFlush #define hdfsHFlush libhdfspp_hdfsHFlush diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java index 4547db1c98..1c7f1500f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java @@ -85,6 +85,7 @@ public class TestByteBufferPread { testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPreadFullyWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); } /** @@ -97,6 +98,7 @@ public class TestByteBufferPread { testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPreadFullyWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); } /** @@ -122,7 +124,6 @@ public class TestByteBufferPread { byte[] bufferContents = new byte[FILE_SIZE]; buffer.get(bufferContents); assertArrayEquals(bufferContents, fileContents); - buffer.position(buffer.limit()); } } @@ -157,7 +158,7 @@ public class TestByteBufferPread { /** * Reads half of the testFile into the {@link ByteBuffer} by setting a - * {@link ByteBuffer#limit} on the buffer. Validates that only half of the + * {@link ByteBuffer#limit()} on the buffer. Validates that only half of the * testFile is loaded into the buffer. */ private void testPreadWithLimitedByteBuffer( @@ -191,7 +192,7 @@ public class TestByteBufferPread { /** * Reads half of the testFile into the {@link ByteBuffer} by setting the - * {@link ByteBuffer#position} the half the size of the file. Validates that + * {@link ByteBuffer#position()} the half the size of the file. Validates that * only half of the testFile is loaded into the buffer. */ private void testPreadWithPositionedByteBuffer( @@ -257,6 +258,26 @@ public class TestByteBufferPread { } } + /** + * Reads the entire testFile using the preadFully API and validates that its + * contents are properly loaded into the supplied {@link ByteBuffer}. + */ + private void testPreadFullyWithByteBuffer(ByteBuffer buffer) + throws IOException { + int totalBytesRead = 0; + try (FSDataInputStream in = fs.open(testFile)) { + in.readFully(totalBytesRead, buffer); + // Make sure the buffer is full + assertFalse(buffer.hasRemaining()); + // Make sure the contents of the read buffer equal the contents of the + // file + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, fileContents); + } + } + @AfterClass public static void shutdown() throws IOException { try {