diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 926c23d7c5..ff3bd63cc7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -70,6 +70,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, /** Stream statistics. */ private final AbfsInputStreamStatistics streamStatistics; + private long bytesFromReadAhead; // bytes read from readAhead; for testing + private long bytesFromRemoteRead; // bytes read remotely; for testing public AbfsInputStream( final AbfsClient client, @@ -235,6 +237,7 @@ private int readInternal(final long position, final byte[] b, final int offset, // try reading from buffers first receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); + bytesFromReadAhead += receivedBytes; if (receivedBytes > 0) { incrementReadOps(); LOG.debug("Received data from read ahead, not doing remote read"); @@ -302,6 +305,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti throw new IOException("Unexpected Content-Length"); } LOG.debug("HTTP request read bytes = {}", bytesRead); + bytesFromRemoteRead += bytesRead; return (int) bytesRead; } @@ -503,6 +507,26 @@ public AbfsInputStreamStatistics getStreamStatistics() { return streamStatistics; } + /** + * Getter for bytes read from readAhead buffer that fills asynchronously. + * + * @return value of the counter in long. + */ + @VisibleForTesting + public long getBytesFromReadAhead() { + return bytesFromReadAhead; + } + + /** + * Getter for bytes read remotely from the data store. + * + * @return value of the counter in long. + */ + @VisibleForTesting + public long getBytesFromRemoteRead() { + return bytesFromRemoteRead; + } + /** * Get the statistics of the stream. * @return a string value. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index 8385099a78..52dfdf2a61 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -41,9 +41,6 @@ public class ITestAbfsInputStreamStatistics private static final int ONE_MB = 1024 * 1024; private static final int ONE_KB = 1024; private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024; - private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE; - private static final int THREAD_SLEEP_10_SECONDS = 10; - private static final int TIMEOUT_30_SECONDS = 30000; private byte[] defBuffer = new byte[ONE_MB]; public ITestAbfsInputStreamStatistics() throws Exception { @@ -295,8 +292,8 @@ public void testWithNullStreamStatistics() throws IOException { /** * Testing readAhead counters in AbfsInputStream with 30 seconds timeout. */ - @Test(timeout = TIMEOUT_30_SECONDS) - public void testReadAheadCounters() throws IOException, InterruptedException { + @Test + public void testReadAheadCounters() throws IOException { describe("Test to check correct values for readAhead counters in " + "AbfsInputStream"); @@ -334,18 +331,6 @@ public void testReadAheadCounters() throws IOException, InterruptedException { AbfsInputStreamStatisticsImpl stats = (AbfsInputStreamStatisticsImpl) in.getStreamStatistics(); - /* - * Since, readAhead is done in background threads. Sometimes, the - * threads aren't finished in the background and could result in - * inaccurate results. So, we wait till we have the accurate values - * with a limit of 30 seconds as that's when the test times out. - * - */ - while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE - || stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) { - Thread.sleep(THREAD_SLEEP_10_SECONDS); - } - /* * Verifying the counter values of readAheadBytesRead and remoteBytesRead. * @@ -353,27 +338,28 @@ public void testReadAheadCounters() throws IOException, InterruptedException { * from 0 to 5KB in the file. The bufferSize is set to 4KB, and since * we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB * buffer. Our read is till 5KB, hence readAhead would ideally read 2 - * blocks of 4KB which is equal to 8KB. But, sometimes to get more than - * one block from readAhead buffer we might have to wait for background + * blocks of 4KB which is equal to 8KB. But, sometimes to get blocks + * from readAhead buffer we might have to wait for background * threads to fill the buffer and hence we might do remote read which - * would be faster. Therefore, readAheadBytesRead would be equal to or - * greater than 4KB. + * would be faster. Therefore, readAheadBytesRead would be greater than + * or equal to the value of bytesFromReadAhead at the point we measure it. * * remoteBytesRead : Since, the bufferSize is set to 4KB and the number * of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4 * KB buffer on the first read, which is equal to 32KB. But, if we are not * able to read some bytes that were in the buffer after doing * readAhead, we might use remote read again. Thus, the bytes read - * remotely could also be greater than 32Kb. + * remotely would be greater than or equal to the bytesFromRemoteRead + * value that we measure at some point of the operation. * */ Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs( "Mismatch in readAheadBytesRead counter value") - .isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE); + .isGreaterThanOrEqualTo(in.getBytesFromReadAhead()); Assertions.assertThat(stats.getRemoteBytesRead()).describedAs( "Mismatch in remoteBytesRead counter value") - .isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE); + .isGreaterThanOrEqualTo(in.getBytesFromRemoteRead()); } finally { IOUtils.cleanupWithLogger(LOG, out, in);