diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java index 554241500d..c37b2bec6e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java @@ -43,11 +43,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable { private InputStream blobInputStream = null; private int minimumReadSizeInBytes = 0; private long streamPositionAfterLastRead = -1; + // position of next network read within stream private long streamPosition = 0; + // length of stream private long streamLength = 0; private boolean closed = false; + // internal buffer, re-used for performance optimization private byte[] streamBuffer; + // zero-based offset within streamBuffer of current read position private int streamBufferPosition; + // length of data written to streamBuffer, streamBuffer may be larger private int streamBufferLength; /** @@ -81,6 +86,16 @@ private void checkState() throws IOException { } } + /** + * Reset the internal stream buffer but do not release the memory. + * The buffer can be reused to avoid frequent memory allocations of + * a large buffer. + */ + private void resetStreamBuffer() { + streamBufferPosition = 0; + streamBufferLength = 0; + } + /** * Gets the read position of the stream. * @return the zero-based byte offset of the read position. @@ -89,7 +104,9 @@ private void checkState() throws IOException { @Override public synchronized long getPos() throws IOException { checkState(); - return streamPosition; + return (streamBuffer != null) + ? streamPosition - streamBufferLength + streamBufferPosition + : streamPosition; } /** @@ -107,21 +124,39 @@ public synchronized void seek(long pos) throws IOException { throw new EOFException( FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); } - if (pos == getPos()) { + + // calculate offset between the target and current position in the stream + long offset = pos - getPos(); + + if (offset == 0) { // no=op, no state change return; } - if (streamBuffer != null) { - long offset = streamPosition - pos; - if (offset > 0 && offset < streamBufferLength) { - streamBufferPosition = streamBufferLength - (int) offset; - } else { - streamBufferPosition = streamBufferLength; + if (offset > 0) { + // forward seek, data can be skipped as an optimization + if (skip(offset) != offset) { + throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); } + return; + } + + // reverse seek, offset is negative + if (streamBuffer != null) { + if (streamBufferPosition + offset >= 0) { + // target position is inside the stream buffer, + // only need to move backwards within the stream buffer + streamBufferPosition += offset; + } else { + // target position is outside the stream buffer, + // need to reset stream buffer and move position for next network read + resetStreamBuffer(); + streamPosition = pos; + } + } else { + streamPosition = pos; } - streamPosition = pos; // close BlobInputStream after seek is invoked because BlobInputStream // does not support seek closeBlobInputStream(); @@ -189,8 +224,7 @@ private int doNetworkRead(byte[] buffer, int offset, int len) streamBuffer = new byte[(int) Math.min(minimumReadSizeInBytes, streamLength)]; } - streamBufferPosition = 0; - streamBufferLength = 0; + resetStreamBuffer(); outputStream = new MemoryOutputStream(streamBuffer, streamBufferPosition, streamBuffer.length); needToCopy = true; @@ -295,27 +329,44 @@ public int read() throws IOException { * @param n the number of bytes to be skipped. * @return the actual number of bytes skipped. * @throws IOException IO failure + * @throws IndexOutOfBoundsException if n is negative or if the sum of n + * and the current value of getPos() is greater than the length of the stream. */ @Override public synchronized long skip(long n) throws IOException { checkState(); if (blobInputStream != null) { - return blobInputStream.skip(n); - } else { - if (n < 0 || streamPosition + n > streamLength) { - throw new IndexOutOfBoundsException("skip range"); - } - - if (streamBuffer != null) { - streamBufferPosition = (n < streamBufferLength - streamBufferPosition) - ? streamBufferPosition + (int) n - : streamBufferLength; - } - - streamPosition += n; - return n; + // blobInput stream is open; delegate the work to it + long skipped = blobInputStream.skip(n); + // update position to the actual skip value + streamPosition += skipped; + return skipped; } + + // no blob stream; implement the skip logic directly + if (n < 0 || n > streamLength - getPos()) { + throw new IndexOutOfBoundsException("skip range"); + } + + if (streamBuffer != null) { + // there's a buffer, so seek with it + if (n < streamBufferLength - streamBufferPosition) { + // new range is in the buffer, so just update the buffer position + // skip within the buffer. + streamBufferPosition += (int) n; + } else { + // skip is out of range, so move position to ne value and reset + // the buffer ready for the next read() + streamPosition = getPos() + n; + resetStreamBuffer(); + } + } else { + // no stream buffer; increment the stream position ready for + // the next triggered connection & read + streamPosition += n; + } + return n; } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java index 24535840b2..0ae4012847 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java @@ -155,7 +155,7 @@ private void createTestFileAndSetLength() throws IOException { } LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH, - TEST_FILE_SIZE ); + TEST_FILE_SIZE); ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) { @@ -198,7 +198,7 @@ public void test_0100_CreateHugeFile() throws IOException { } @Test - public void test_0200_BasicReadTestV2() throws Exception { + public void test_0200_BasicReadTest() throws Exception { assumeHugeFileExists(); try ( @@ -214,12 +214,12 @@ public void test_0200_BasicReadTestV2() throws Exception { // v1 forward seek and read a kilobyte into first kilobyte of bufferV1 inputStreamV1.seek(5 * MEGABYTE); int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE); - assertEquals(numBytesReadV1, KILOBYTE); + assertEquals(KILOBYTE, numBytesReadV1); // v2 forward seek and read a kilobyte into first kilobyte of bufferV2 inputStreamV2.seek(5 * MEGABYTE); int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE); - assertEquals(numBytesReadV2, KILOBYTE); + assertEquals(KILOBYTE, numBytesReadV2); assertArrayEquals(bufferV1, bufferV2); @@ -229,17 +229,90 @@ public void test_0200_BasicReadTestV2() throws Exception { // v1 reverse seek and read a megabyte into last megabyte of bufferV1 inputStreamV1.seek(3 * MEGABYTE); numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len); - assertEquals(numBytesReadV1, len); + assertEquals(len, numBytesReadV1); // v2 reverse seek and read a megabyte into last megabyte of bufferV2 inputStreamV2.seek(3 * MEGABYTE); numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len); - assertEquals(numBytesReadV2, len); + assertEquals(len, numBytesReadV2); assertArrayEquals(bufferV1, bufferV2); } } + @Test + public void test_0201_RandomReadTest() throws Exception { + assumeHugeFileExists(); + + try ( + FSDataInputStream inputStreamV1 + = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH); + + FSDataInputStream inputStreamV2 + = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH); + ) { + final int bufferSize = 4 * KILOBYTE; + byte[] bufferV1 = new byte[bufferSize]; + byte[] bufferV2 = new byte[bufferV1.length]; + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + inputStreamV1.seek(0); + inputStreamV2.seek(0); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + int seekPosition = 2 * KILOBYTE; + inputStreamV1.seek(seekPosition); + inputStreamV2.seek(seekPosition); + + inputStreamV1.seek(0); + inputStreamV2.seek(0); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + seekPosition = 5 * KILOBYTE; + inputStreamV1.seek(seekPosition); + inputStreamV2.seek(seekPosition); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + seekPosition = 10 * KILOBYTE; + inputStreamV1.seek(seekPosition); + inputStreamV2.seek(seekPosition); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + + seekPosition = 4100 * KILOBYTE; + inputStreamV1.seek(seekPosition); + inputStreamV2.seek(seekPosition); + + verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2); + } + } + + private void verifyConsistentReads(FSDataInputStream inputStreamV1, + FSDataInputStream inputStreamV2, + byte[] bufferV1, + byte[] bufferV2) throws IOException { + int size = bufferV1.length; + final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size); + assertEquals("Bytes read from V1 stream", size, numBytesReadV1); + + final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size); + assertEquals("Bytes read from V2 stream", size, numBytesReadV2); + + assertArrayEquals("Mismatch in read data", bufferV1, bufferV2); + } + /** * Validates the implementation of InputStream.markSupported. * @throws IOException