diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/FilePosition.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/FilePosition.java index 7cd3bb3de2..286bdd7ae8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/FilePosition.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/FilePosition.java @@ -116,7 +116,7 @@ public void setData(BufferData bufferData, readOffset, "readOffset", startOffset, - startOffset + bufferData.getBuffer().limit() - 1); + startOffset + bufferData.getBuffer().limit()); data = bufferData; buffer = bufferData.getBuffer().duplicate(); @@ -182,7 +182,7 @@ public int relative() { */ public boolean isWithinCurrentBuffer(long pos) { throwIfInvalidBuffer(); - long bufferEndOffset = bufferStartOffset + buffer.limit() - 1; + long bufferEndOffset = bufferStartOffset + buffer.limit(); return (pos >= bufferStartOffset) && (pos <= bufferEndOffset); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java index e86c4be97b..12ab62556a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java @@ -26,6 +26,7 @@ import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -43,6 +44,7 @@ public void testArgChecks() throws Exception { new FilePosition(10, 5); new FilePosition(5, 10); new FilePosition(10, 5).setData(data, 3, 4); + new FilePosition(10, 10).setData(data, 3, 13); // Verify it throws correctly. @@ -94,11 +96,11 @@ public void testArgChecks() throws Exception { "'readOffset' must not be negative", () -> pos.setData(data, 4, -4)); intercept(IllegalArgumentException.class, - "'readOffset' (15) must be within the range [4, 13]", + "'readOffset' (15) must be within the range [4, 14]", () -> pos.setData(data, 4, 15)); intercept(IllegalArgumentException.class, - "'readOffset' (3) must be within the range [4, 13]", + "'readOffset' (3) must be within the range [4, 14]", () -> pos.setData(data, 4, 3)); } @@ -192,4 +194,31 @@ public void testBufferStats() { } assertTrue(pos.bufferFullyRead()); } + + @Test + public void testBounds() { + int bufferSize = 8; + long fileSize = bufferSize; + + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + BufferData data = new BufferData(0, buffer); + FilePosition pos = new FilePosition(fileSize, bufferSize); + + long eofOffset = fileSize; + pos.setData(data, 0, eofOffset); + + assertThat(pos.isWithinCurrentBuffer(eofOffset)) + .describedAs("EOF offset %d should be within the current buffer", eofOffset) + .isTrue(); + assertThat(pos.absolute()) + .describedAs("absolute() should return the EOF offset") + .isEqualTo(eofOffset); + + assertThat(pos.setAbsolute(eofOffset)) + .describedAs("setAbsolute() should return true on the EOF offset %d", eofOffset) + .isTrue(); + assertThat(pos.absolute()) + .describedAs("absolute() should return the EOF offset") + .isEqualTo(eofOffset); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java index 0afd071246..f9ee4e412f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.impl.prefetch.BlockManager; import org.apache.hadoop.fs.impl.prefetch.BufferData; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; +import org.apache.hadoop.fs.impl.prefetch.FilePosition; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -84,46 +85,6 @@ public S3ACachingInputStream( fileSize); } - /** - * Moves the current read position so that the next read will occur at {@code pos}. - * - * @param pos the next read will take place at this position. - * - * @throws IllegalArgumentException if pos is outside of the range [0, file size]. - */ - @Override - public void seek(long pos) throws IOException { - throwIfClosed(); - throwIfInvalidSeek(pos); - - // The call to setAbsolute() returns true if the target position is valid and - // within the current block. Therefore, no additional work is needed when we get back true. - if (!getFilePosition().setAbsolute(pos)) { - LOG.info("seek({})", getOffsetStr(pos)); - // We could be here in two cases: - // -- the target position is invalid: - // We ignore this case here as the next read will return an error. - // -- it is valid but outside of the current block. - if (getFilePosition().isValid()) { - // There are two cases to consider: - // -- the seek was issued after this buffer was fully read. - // In this case, it is very unlikely that this buffer will be needed again; - // therefore we release the buffer without caching. - // -- if we are jumping out of the buffer before reading it completely then - // we will likely need this buffer again (as observed empirically for Parquet) - // therefore we issue an async request to cache this buffer. - if (!getFilePosition().bufferFullyRead()) { - blockManager.requestCaching(getFilePosition().data()); - } else { - blockManager.release(getFilePosition().data()); - } - getFilePosition().invalidate(); - blockManager.cancelPrefetches(); - } - setSeekTargetPos(pos); - } - } - @Override public void close() throws IOException { // Close the BlockManager first, cancelling active prefetches, @@ -139,36 +100,45 @@ protected boolean ensureCurrentBuffer() throws IOException { return false; } - if (getFilePosition().isValid() && getFilePosition() - .buffer() - .hasRemaining()) { - return true; - } - - long readPos; - int prefetchCount; - - if (getFilePosition().isValid()) { - // A sequential read results in a prefetch. - readPos = getFilePosition().absolute(); - prefetchCount = numBlocksToPrefetch; - } else { - // A seek invalidates the current position. - // We prefetch only 1 block immediately after a seek operation. - readPos = getSeekTargetPos(); - prefetchCount = 1; - } - + long readPos = getNextReadPos(); if (!getBlockData().isValidOffset(readPos)) { return false; } - if (getFilePosition().isValid()) { - if (getFilePosition().bufferFullyRead()) { - blockManager.release(getFilePosition().data()); + // Determine whether this is an out of order read. + FilePosition filePosition = getFilePosition(); + boolean outOfOrderRead = !filePosition.setAbsolute(readPos); + + if (!outOfOrderRead && filePosition.buffer().hasRemaining()) { + // Use the current buffer. + return true; + } + + if (filePosition.isValid()) { + // We are jumping out of the current buffer. There are two cases to consider: + if (filePosition.bufferFullyRead()) { + // This buffer was fully read: + // it is very unlikely that this buffer will be needed again; + // therefore we release the buffer without caching. + blockManager.release(filePosition.data()); } else { - blockManager.requestCaching(getFilePosition().data()); + // We will likely need this buffer again (as observed empirically for Parquet) + // therefore we issue an async request to cache this buffer. + blockManager.requestCaching(filePosition.data()); } + filePosition.invalidate(); + } + + int prefetchCount; + if (outOfOrderRead) { + LOG.debug("lazy-seek({})", getOffsetStr(readPos)); + blockManager.cancelPrefetches(); + + // We prefetch only 1 block immediately after a seek operation. + prefetchCount = 1; + } else { + // A sequential read results in a prefetch. + prefetchCount = numBlocksToPrefetch; } int toBlockNumber = getBlockData().getBlockNumber(readPos); @@ -186,7 +156,7 @@ protected boolean ensureCurrentBuffer() throws IOException { .trackDuration(STREAM_READ_BLOCK_ACQUIRE_AND_READ), () -> blockManager.get(toBlockNumber)); - getFilePosition().setData(data, startOffset, readPos); + filePosition.setData(data, startOffset, readPos); return true; } @@ -197,7 +167,7 @@ public String toString() { } StringBuilder sb = new StringBuilder(); - sb.append(String.format("fpos = (%s)%n", getFilePosition())); + sb.append(String.format("%s%n", super.toString())); sb.append(blockManager.toString()); return sb.toString(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java index 322459a958..e8bfe946f4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.impl.prefetch.BufferData; +import org.apache.hadoop.fs.impl.prefetch.FilePosition; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -86,7 +87,12 @@ protected boolean ensureCurrentBuffer() throws IOException { return false; } - if (!getFilePosition().isValid()) { + FilePosition filePosition = getFilePosition(); + if (filePosition.isValid()) { + // Update current position (lazy seek). + filePosition.setAbsolute(getNextReadPos()); + } else { + // Read entire file into buffer. buffer.clear(); int numBytesRead = getReader().read(buffer, 0, buffer.capacity()); @@ -94,9 +100,9 @@ protected boolean ensureCurrentBuffer() throws IOException { return false; } BufferData data = new BufferData(0, buffer); - getFilePosition().setData(data, 0, getSeekTargetPos()); + filePosition.setData(data, 0, getNextReadPos()); } - return getFilePosition().buffer().hasRemaining(); + return filePosition.buffer().hasRemaining(); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java index 0f46a8ed5e..38d740bd74 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java @@ -77,12 +77,16 @@ public abstract class S3ARemoteInputStream private volatile boolean closed; /** - * Current position within the file. + * Internal position within the file. Updated lazily + * after a seek before a read. */ private FilePosition fpos; - /** The target of the most recent seek operation. */ - private long seekTargetPos; + /** + * This is the actual position within the file, used by + * lazy seek to decide whether to seek on the next read or not. + */ + private long nextReadPos; /** Information about each block of the mapped S3 file. */ private BlockData blockData; @@ -146,7 +150,7 @@ public S3ARemoteInputStream( this.remoteObject = getS3File(); this.reader = new S3ARemoteObjectReader(remoteObject); - this.seekTargetPos = 0; + this.nextReadPos = 0; } /** @@ -212,7 +216,8 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) { public int available() throws IOException { throwIfClosed(); - if (!ensureCurrentBuffer()) { + // Update the current position in the current buffer, if possible. + if (!fpos.setAbsolute(nextReadPos)) { return 0; } @@ -228,11 +233,7 @@ public int available() throws IOException { public long getPos() throws IOException { throwIfClosed(); - if (fpos.isValid()) { - return fpos.absolute(); - } else { - return seekTargetPos; - } + return nextReadPos; } /** @@ -247,10 +248,7 @@ public void seek(long pos) throws IOException { throwIfClosed(); throwIfInvalidSeek(pos); - if (!fpos.setAbsolute(pos)) { - fpos.invalidate(); - seekTargetPos = pos; - } + nextReadPos = pos; } /** @@ -268,7 +266,7 @@ public int read() throws IOException { throwIfClosed(); if (remoteObject.size() == 0 - || seekTargetPos >= remoteObject.size()) { + || nextReadPos >= remoteObject.size()) { return -1; } @@ -276,6 +274,7 @@ public int read() throws IOException { return -1; } + nextReadPos++; incrementBytesRead(1); return fpos.buffer().get() & 0xff; @@ -315,7 +314,7 @@ public int read(byte[] buffer, int offset, int len) throws IOException { } if (remoteObject.size() == 0 - || seekTargetPos >= remoteObject.size()) { + || nextReadPos >= remoteObject.size()) { return -1; } @@ -334,6 +333,7 @@ public int read(byte[] buffer, int offset, int len) throws IOException { ByteBuffer buf = fpos.buffer(); int bytesToRead = Math.min(numBytesRemaining, buf.remaining()); buf.get(buffer, offset, bytesToRead); + nextReadPos += bytesToRead; incrementBytesRead(bytesToRead); offset += bytesToRead; numBytesRemaining -= bytesToRead; @@ -367,12 +367,8 @@ protected boolean isClosed() { return closed; } - protected long getSeekTargetPos() { - return seekTargetPos; - } - - protected void setSeekTargetPos(long pos) { - seekTargetPos = pos; + protected long getNextReadPos() { + return nextReadPos; } protected BlockData getBlockData() { @@ -443,6 +439,18 @@ public boolean markSupported() { return false; } + @Override + public String toString() { + if (isClosed()) { + return "closed"; + } + + StringBuilder sb = new StringBuilder(); + sb.append(String.format("nextReadPos = (%d)%n", nextReadPos)); + sb.append(String.format("fpos = (%s)", fpos)); + return sb.toString(); + } + protected void throwIfClosed() throws IOException { if (closed) { throw new IOException( @@ -453,6 +461,8 @@ protected void throwIfClosed() throws IOException { protected void throwIfInvalidSeek(long pos) throws EOFException { if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos); + } else if (pos > this.getBlockData().getFileSize()) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); } } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md index 9b4e888d60..e966c2dce4 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md @@ -158,7 +158,7 @@ The buffer for the block furthest from the current block is released. Once a buffer has been acquired by `CachingBlockManager`, if the buffer is in a *READY* state, it is returned. This means that data was already read into this buffer asynchronously by a prefetch. -If it's state is *BLANK* then data is read into it using +If its state is *BLANK* then data is read into it using `S3Reader.read(ByteBuffer buffer, long offset, int size).` For the second read call, `in.read(buffer, 0, 8MB)`, since the block sizes are of 8MB and only 5MB @@ -170,7 +170,10 @@ The number of blocks to be prefetched is determined by `fs.s3a.prefetch.block.co ##### Random Reads -If the caller makes the following calls: +The `CachingInputStream` also caches prefetched blocks. This happens when `read()` is issued +after a `seek()` outside the current block, but the current block still has not been fully read. + +For example, consider the following calls: ``` in.read(buffer, 0, 5MB) @@ -180,13 +183,14 @@ in.seek(2MB) in.read(buffer, 0, 4MB) ``` -The `CachingInputStream` also caches prefetched blocks. -This happens when a `seek()` is issued for outside the current block and the current block still has -not been fully read. +For the above read sequence, after the `seek(10MB)` call is issued, block 0 has not been read +completely so the subsequent call to `read()` will cache it, on the assumption that the caller +will probably want to read from it again. -For the above read sequence, when the `seek(10MB)` call is issued, block 0 has not been read -completely so cache it as the caller will probably want to read from it again. +After `seek(2MB)` is called, the position is back inside block 0. The next read can now be +satisfied from the locally cached block file, which is typically orders of magnitude faster +than a network based read. -When `seek(2MB)` is called, the position is back inside block 0. -The next read can now be satisfied from the locally cached block file, which is typically orders of -magnitude faster than a network based read. \ No newline at end of file +NB: `seek()` is implemented lazily, so it only keeps track of the new position but does not +otherwise affect the internal state of the stream. Only when a `read()` is issued, it will call +the `ensureCurrentBuffer()` method and fetch a new block if required. \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 24f74b3a02..d597c05e1d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -32,8 +32,6 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.fs.statistics.StoreStatisticNames; -import org.apache.hadoop.fs.statistics.StreamStatisticNames; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; @@ -41,7 +39,13 @@ import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** @@ -120,20 +124,54 @@ public void testReadLargeFileFully() throws Throwable { in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead)); bytesRead += buffer.length; // Blocks are fully read, no blocks should be cached - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); } // Assert that first block is read synchronously, following blocks are prefetched - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, numBlocks - 1); - verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks); } // Verify that once stream is closed, all memory is freed - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); assertThatStatisticMaximum(ioStats, - StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); + ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); + } + + @Test + public void testReadLargeFileFullyLazySeek() throws Throwable { + describe("read a large file using readFully(position,buffer,offset,length)," + + " uses S3ACachingInputStream"); + IOStatistics ioStats; + openFS(); + + try (FSDataInputStream in = largeFileFS.open(largeFile)) { + ioStats = in.getIOStatistics(); + + byte[] buffer = new byte[S_1M * 10]; + long bytesRead = 0; + + while (bytesRead < largeFileSize) { + in.readFully(bytesRead, buffer, 0, (int) Math.min(buffer.length, + largeFileSize - bytesRead)); + bytesRead += buffer.length; + // Blocks are fully read, no blocks should be cached + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, + 0); + } + + // Assert that first block is read synchronously, following blocks are prefetched + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, + numBlocks - 1); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks); + } + // Verify that once stream is closed, all memory is freed + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + assertThatStatisticMaximum(ioStats, + ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } @Test @@ -147,24 +185,31 @@ public void testRandomReadLargeFile() throws Throwable { byte[] buffer = new byte[blockSize]; - // Don't read the block completely so it gets cached on seek + // Don't read block 0 completely so it gets cached on read after seek in.read(buffer, 0, blockSize - S_1K * 10); - in.seek(blockSize + S_1K * 10); - // Backwards seek, will use cached block + + // Seek to block 2 and read all of it + in.seek(blockSize * 2); + in.read(buffer, 0, blockSize); + + // Seek to block 4 but don't read: noop. + in.seek(blockSize * 4); + + // Backwards seek, will use cached block 0 in.seek(S_1K * 5); in.read(); - verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 1); - // block 0 is cached when we seek to block 1, block 1 is cached as it is being prefetched - // when we seek out of block 0, see cancelPrefetches() - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 2); + // Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch) + // Blocks 0, 1, 3 were not fully read, so remain in the file cache + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4); + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2); + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3); } - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); assertThatStatisticMaximum(ioStats, - StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); + ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } @Test @@ -184,14 +229,14 @@ public void testRandomReadSmallFile() throws Throwable { in.seek(S_1K * 12); in.read(buffer, 0, S_1K * 4); - verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 0); // The buffer pool is not used - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); // no prefetch ops, so no action_executor_acquired assertThatStatisticMaximum(ioStats, - StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1); + ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java index 4ab33ef6cd..d449a79a5a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java @@ -21,6 +21,7 @@ import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -35,6 +36,7 @@ import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.test.AbstractHadoopTestBase; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; /** @@ -97,7 +99,7 @@ public void testRead0SizedFile() throws Exception { private void testRead0SizedFileHelper(S3ARemoteInputStream inputStream, int bufferSize) throws Exception { - assertEquals(0, inputStream.available()); + assertAvailable(0, inputStream); assertEquals(-1, inputStream.read()); assertEquals(-1, inputStream.read()); @@ -121,8 +123,8 @@ public void testRead() throws Exception { private void testReadHelper(S3ARemoteInputStream inputStream, int bufferSize) throws Exception { - assertEquals(bufferSize, inputStream.available()); assertEquals(0, inputStream.read()); + assertAvailable(bufferSize - 1, inputStream); assertEquals(1, inputStream.read()); byte[] buffer = new byte[2]; @@ -170,12 +172,14 @@ private void testSeekHelper(S3ARemoteInputStream inputStream, int bufferSize, int fileSize) throws Exception { + assertAvailable(0, inputStream); assertEquals(0, inputStream.getPos()); - inputStream.seek(7); - assertEquals(7, inputStream.getPos()); + inputStream.seek(bufferSize); + assertAvailable(0, inputStream); + assertEquals(bufferSize, inputStream.getPos()); inputStream.seek(0); + assertAvailable(0, inputStream); - assertEquals(bufferSize, inputStream.available()); for (int i = 0; i < fileSize; i++) { assertEquals(i, inputStream.read()); } @@ -187,11 +191,20 @@ private void testSeekHelper(S3ARemoteInputStream inputStream, } } + // Can seek to the EOF: read() will then return -1. + inputStream.seek(fileSize); + assertEquals(-1, inputStream.read()); + // Test invalid seeks. ExceptionAsserts.assertThrows( EOFException.class, FSExceptionMessages.NEGATIVE_SEEK, () -> inputStream.seek(-1)); + + ExceptionAsserts.assertThrows( + EOFException.class, + FSExceptionMessages.CANNOT_SEEK_PAST_EOF, + () -> inputStream.seek(fileSize + 1)); } @Test @@ -217,7 +230,7 @@ private void testRandomSeekHelper(S3ARemoteInputStream inputStream, assertEquals(7, inputStream.getPos()); inputStream.seek(0); - assertEquals(bufferSize, inputStream.available()); + assertAvailable(0, inputStream); for (int i = 0; i < fileSize; i++) { assertEquals(i, inputStream.read()); } @@ -251,9 +264,10 @@ public void testClose() throws Exception { private void testCloseHelper(S3ARemoteInputStream inputStream, int bufferSize) throws Exception { - assertEquals(bufferSize, inputStream.available()); + assertAvailable(0, inputStream); assertEquals(0, inputStream.read()); assertEquals(1, inputStream.read()); + assertAvailable(bufferSize - 2, inputStream); inputStream.close(); @@ -276,4 +290,11 @@ private void testCloseHelper(S3ARemoteInputStream inputStream, int bufferSize) // Verify a second close() does not throw. inputStream.close(); } + + private static void assertAvailable(int expected, InputStream inputStream) + throws IOException { + assertThat(inputStream.available()) + .describedAs("Check available bytes on stream %s", inputStream) + .isEqualTo(expected); + } }