From a44890eb63f5320a542d5160f140e11e82256932 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Fri, 27 Nov 2020 19:52:34 +0530 Subject: [PATCH] HADOOP-17296. ABFS: Force reads to be always of buffer size. Contributed by Sneha Vijayarajan. (cherry picked from commit 142941b96e221fc1b4524476ce445714d7f6eec3) --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 18 ++ .../fs/azurebfs/AzureBlobFileSystemStore.java | 3 + .../azurebfs/constants/ConfigurationKeys.java | 2 + .../constants/FileSystemConfigurations.java | 3 + .../fs/azurebfs/services/AbfsInputStream.java | 41 ++- .../services/AbfsInputStreamContext.java | 38 +++ .../azurebfs/services/ReadBufferManager.java | 107 +++++++- .../hadoop-azure/src/site/markdown/abfs.md | 16 ++ .../azurebfs/AbstractAbfsIntegrationTest.java | 8 + .../ITestAzureBlobFileSystemRandomRead.java | 244 ++++++++++++++---- .../services/TestAbfsInputStream.java | 223 +++++++++++++++- 11 files changed, 635 insertions(+), 68 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index c4a2b67649..3d09a806fd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -201,6 +201,16 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_READ_AHEAD_QUEUE_DEPTH) private int readAheadQueueDepth; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_BLOCK_SIZE, + MinValue = MIN_BUFFER_SIZE, + MaxValue = MAX_BUFFER_SIZE, + DefaultValue = DEFAULT_READ_AHEAD_BLOCK_SIZE) + private int readAheadBlockSize; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ALWAYS_READ_BUFFER_SIZE, + DefaultValue = DEFAULT_ALWAYS_READ_BUFFER_SIZE) + private boolean alwaysReadBufferSize; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH, DefaultValue = DEFAULT_ENABLE_FLUSH) private boolean enableFlush; @@ -599,6 +609,14 @@ public class AbfsConfiguration{ return this.readAheadQueueDepth; } + public int getReadAheadBlockSize() { + return this.readAheadBlockSize; + } + + public boolean shouldReadBufferSizeAlways() { + return this.alwaysReadBufferSize; + } + public boolean isFlushEnabled() { return this.enableFlush; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index e8f355fcfb..a766c62153 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -644,6 +644,9 @@ public class AzureBlobFileSystemStore implements Closeable { .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) + .withShouldReadBufferSizeAlways( + abfsConfiguration.shouldReadBufferSizeAlways()) + .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize()) .build(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index c15c470a44..cb9c0de59f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -75,6 +75,8 @@ public final class ConfigurationKeys { * Default is empty. **/ public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; + public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize"; + public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize"; /** Provides a config control to enable or disable ABFS Flush operations - * HFlush and HSync. Default is true. **/ public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index fa0ee6a892..49fc58ba56 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -57,6 +57,8 @@ public final class FileSystemConfigurations { public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB + public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false; + public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB; public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB @@ -74,6 +76,7 @@ public final class FileSystemConfigurations { public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = ""; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; + public static final boolean DEFAULT_ENABLE_FLUSH = true; public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true; public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index c43f910cf5..3682bcbc3a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -47,6 +47,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, StreamCapabilities { private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); + private int readAheadBlockSize; private final AbfsClient client; private final Statistics statistics; private final String path; @@ -56,6 +57,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final String eTag; // eTag of the path when InputStream are created private final boolean tolerateOobAppends; // whether tolerate Oob Appends private final boolean readAheadEnabled; // whether enable readAhead; + private final boolean alwaysReadBufferSize; // SAS tokens can be re-used until they expire private CachedSASToken cachedSasToken; @@ -89,9 +91,16 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; this.readAheadEnabled = true; + this.alwaysReadBufferSize + = abfsInputStreamContext.shouldReadBufferSizeAlways(); this.cachedSasToken = new CachedSASToken( abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); this.streamStatistics = abfsInputStreamContext.getStreamStatistics(); + readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize(); + + // Propagate the config values to ReadBufferManager so that the first instance + // to initialize can set the readAheadBlockSize + ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize); } public String getPath() { @@ -178,11 +187,15 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, buffer = new byte[bufferSize]; } - // Enable readAhead when reading sequentially - if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { + if (alwaysReadBufferSize) { bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); + // Enable readAhead when reading sequentially + if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { + bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); + } else { + bytesRead = readInternal(fCursor, buffer, 0, b.length, true); + } } if (bytesRead == -1) { @@ -223,16 +236,19 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, // queue read-aheads int numReadAheads = this.readAheadQueueDepth; - long nextSize; long nextOffset = position; + // First read to queue needs to be of readBufferSize and later + // of readAhead Block size + long nextSize = Math.min((long) bufferSize, contentLength - nextOffset); LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads); while (numReadAheads > 0 && nextOffset < contentLength) { - nextSize = Math.min((long) bufferSize, contentLength - nextOffset); LOG.debug("issuing read ahead requestedOffset = {} requested size {}", nextOffset, nextSize); ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); nextOffset = nextOffset + nextSize; numReadAheads--; + // From next round onwards should be of readahead block size. + nextSize = Math.min((long) readAheadBlockSize, contentLength - nextOffset); } // try reading from buffers first @@ -527,6 +543,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, return bytesFromRemoteRead; } + @VisibleForTesting + public int getBufferSize() { + return bufferSize; + } + + @VisibleForTesting + public int getReadAheadQueueDepth() { + return readAheadQueueDepth; + } + + @VisibleForTesting + public boolean shouldAlwaysReadBufferSize() { + return alwaysReadBufferSize; + } + /** * Get the statistics of the stream. * @return a string value. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index f8d3b2a599..ade05834a2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -18,10 +18,15 @@ package org.apache.hadoop.fs.azurebfs.services; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Class to hold extra input stream configs. */ public class AbfsInputStreamContext extends AbfsStreamContext { + // Retaining logger of AbfsInputStream + private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); private int readBufferSize; @@ -29,6 +34,10 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean tolerateOobAppends; + private boolean alwaysReadBufferSize; + + private int readAheadBlockSize; + private AbfsInputStreamStatistics streamStatistics; public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { @@ -60,7 +69,27 @@ public class AbfsInputStreamContext extends AbfsStreamContext { return this; } + public AbfsInputStreamContext withShouldReadBufferSizeAlways( + final boolean alwaysReadBufferSize) { + this.alwaysReadBufferSize = alwaysReadBufferSize; + return this; + } + + public AbfsInputStreamContext withReadAheadBlockSize( + final int readAheadBlockSize) { + this.readAheadBlockSize = readAheadBlockSize; + return this; + } + public AbfsInputStreamContext build() { + if (readBufferSize > readAheadBlockSize) { + LOG.debug( + "fs.azure.read.request.size[={}] is configured for higher size than " + + "fs.azure.read.readahead.blocksize[={}]. Auto-align " + + "readAhead block size to be same as readRequestSize.", + readBufferSize, readAheadBlockSize); + readAheadBlockSize = readBufferSize; + } // Validation of parameters to be done here. return this; } @@ -80,4 +109,13 @@ public class AbfsInputStreamContext extends AbfsStreamContext { public AbfsInputStreamStatistics getStreamStatistics() { return streamStatistics; } + + public boolean shouldReadBufferSizeAlways() { + return alwaysReadBufferSize; + } + + public int getReadAheadBlockSize() { + return readAheadBlockSize; + } + } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index f5c6393ccf..f330d790eb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -28,6 +28,7 @@ import java.util.LinkedList; import java.util.Queue; import java.util.Stack; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -36,12 +37,14 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest */ final class ReadBufferManager { private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); + private static final int ONE_KB = 1024; + private static final int ONE_MB = ONE_KB * ONE_KB; private static final int NUM_BUFFERS = 16; - private static final int BLOCK_SIZE = 4 * 1024 * 1024; private static final int NUM_THREADS = 8; private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + private static int blockSize = 4 * ONE_MB; private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; private Thread[] threads = new Thread[NUM_THREADS]; private byte[][] buffers; // array of byte[] buffers, to hold the data that is read @@ -50,21 +53,37 @@ final class ReadBufferManager { private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading - private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block - - static { - BUFFER_MANAGER = new ReadBufferManager(); - BUFFER_MANAGER.init(); - } + private static ReadBufferManager bufferManager; // singleton, initialized in static initialization block + private static final ReentrantLock LOCK = new ReentrantLock(); static ReadBufferManager getBufferManager() { - return BUFFER_MANAGER; + if (bufferManager == null) { + LOCK.lock(); + try { + if (bufferManager == null) { + bufferManager = new ReadBufferManager(); + bufferManager.init(); + } + } finally { + LOCK.unlock(); + } + } + return bufferManager; + } + + static void setReadBufferManagerConfigs(int readAheadBlockSize) { + if (bufferManager == null) { + LOGGER.debug( + "ReadBufferManager not initialized yet. Overriding readAheadBlockSize as {}", + readAheadBlockSize); + blockSize = readAheadBlockSize; + } } private void init() { buffers = new byte[NUM_BUFFERS][]; for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC + buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC freeList.add(i); } for (int i = 0; i < NUM_THREADS; i++) { @@ -124,10 +143,10 @@ final class ReadBufferManager { buffer.setBufferindex(bufferIndex); readAheadQueue.add(buffer); notifyAll(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", + stream.getPath(), requestedOffset, buffer.getBufferindex()); + } } } @@ -272,6 +291,7 @@ final class ReadBufferManager { return evict(nodeToEvict); } + LOGGER.trace("No buffer eligible for eviction"); // nothing can be evicted return false; } @@ -483,6 +503,67 @@ final class ReadBufferManager { tryEvict(); } + /** + * Test method that can clean up the current state of readAhead buffers and + * the lists. Will also trigger a fresh init. + */ + @VisibleForTesting + void testResetReadBufferManager() { + synchronized (this) { + ArrayList completedBuffers = new ArrayList<>(); + for (ReadBuffer buf : completedReadList) { + if (buf != null) { + completedBuffers.add(buf); + } + } + + for (ReadBuffer buf : completedBuffers) { + evict(buf); + } + + readAheadQueue.clear(); + inProgressList.clear(); + completedReadList.clear(); + freeList.clear(); + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = null; + } + buffers = null; + resetBufferManager(); + } + } + + /** + * Reset buffer manager to null. + */ + @VisibleForTesting + static void resetBufferManager() { + bufferManager = null; + } + + /** + * Reset readAhead buffer to needed readAhead block size and + * thresholdAgeMilliseconds. + * @param readAheadBlockSize + * @param thresholdAgeMilliseconds + */ + @VisibleForTesting + void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds) { + setBlockSize(readAheadBlockSize); + setThresholdAgeMilliseconds(thresholdAgeMilliseconds); + testResetReadBufferManager(); + } + + @VisibleForTesting + static void setBlockSize(int readAheadBlockSize) { + blockSize = readAheadBlockSize; + } + + @VisibleForTesting + int getReadAheadBlockSize() { + return blockSize; + } + /** * Test method that can mimic no free buffers scenario and also add a ReadBuffer * into completedReadList. This readBuffer will get picked up by TryEvict() diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 79b897b6bd..a4188111e0 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -789,6 +789,17 @@ to 100 MB). The default value will be 8388608 (8 MB). bytes. The value should be between 16384 to 104857600 both inclusive (16 KB to 100 MB). The default value will be 4194304 (4 MB). +`fs.azure.read.alwaysReadBufferSize`: Read request size configured by +`fs.azure.read.request.size` will be honoured only when the reads done are in +sequential pattern. When the read pattern is detected to be random, read size +will be same as the buffer length provided by the calling process. +This config when set to true will force random reads to also read in same +request sizes as sequential reads. This is a means to have same read patterns +as of ADLS Gen1, as it does not differentiate read patterns and always reads by +the configured read request size. The default value for this config will be +false, where reads for the provided buffer length is done when random read +pattern is detected. + `fs.azure.readaheadqueue.depth`: Sets the readahead queue depth in AbfsInputStream. In case the set value is negative the read ahead queue depth will be set as Runtime.getRuntime().availableProcessors(). By default the value @@ -796,6 +807,11 @@ will be -1. To disable readaheads, set this value to 0. If your workload is doing only random reads (non-sequential) or you are seeing throttling, you may try setting this value to 0. +`fs.azure.read.readahead.blocksize`: To set the read buffer size for the read +aheads. Specify the value in bytes. The value should be between 16384 to +104857600 both inclusive (16 KB to 100 MB). The default value will be +4194304 (4 MB). + To run under limited memory situations configure the following. Especially when there are too many writes from the same process. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 34b3615c1b..7b3b5c1099 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -392,6 +392,14 @@ public abstract class AbstractAbfsIntegrationTest extends return path; } + public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) { + return fs.getAbfsStore(); + } + + public Path makeQualified(Path path) throws java.io.IOException { + return getFileSystem().makeQualified(path); + } + /** * Create a path under the test path provided by * {@link #getTestPath()}. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index f582763128..ef531acb2b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -21,6 +21,7 @@ import java.io.EOFException; import java.io.IOException; import java.util.Random; import java.util.concurrent.Callable; +import java.util.UUID; import org.junit.Assume; import org.junit.Ignore; @@ -28,6 +29,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSExceptionMessages; @@ -37,30 +39,43 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.TestAbfsInputStream; + import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ETAG; /** * Test random read operation. */ public class ITestAzureBlobFileSystemRandomRead extends AbstractAbfsScaleTest { + private static final int BYTE = 1; + private static final int THREE_BYTES = 3; + private static final int FIVE_BYTES = 5; + private static final int TWENTY_BYTES = 20; + private static final int THIRTY_BYTES = 30; private static final int KILOBYTE = 1024; private static final int MEGABYTE = KILOBYTE * KILOBYTE; + private static final int FOUR_MB = 4 * MEGABYTE; + private static final int NINE_MB = 9 * MEGABYTE; private static final long TEST_FILE_SIZE = 8 * MEGABYTE; private static final int MAX_ELAPSEDTIMEMS = 20; private static final int SEQUENTIAL_READ_BUFFER_SIZE = 16 * KILOBYTE; - private static final int CREATE_BUFFER_SIZE = 26 * KILOBYTE; private static final int SEEK_POSITION_ONE = 2* KILOBYTE; private static final int SEEK_POSITION_TWO = 5 * KILOBYTE; private static final int SEEK_POSITION_THREE = 10 * KILOBYTE; private static final int SEEK_POSITION_FOUR = 4100 * KILOBYTE; - private static final Path TEST_FILE_PATH = new Path( - "/TestRandomRead.txt"); + private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * MEGABYTE; + private static final int DISABLED_READAHEAD_DEPTH = 0; + + private static final String TEST_FILE_PREFIX = "/TestRandomRead"; private static final String WASB = "WASB"; private static final String ABFS = "ABFS"; - private static long testFileLength = 0; private static final Logger LOG = LoggerFactory.getLogger(ITestAzureBlobFileSystemRandomRead.class); @@ -71,9 +86,10 @@ public class ITestAzureBlobFileSystemRandomRead extends @Test public void testBasicRead() throws Exception { - assumeHugeFileExists(); + Path testPath = new Path(TEST_FILE_PREFIX + "_testBasicRead"); + assumeHugeFileExists(testPath); - try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) { + try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) { byte[] buffer = new byte[3 * MEGABYTE]; // forward seek and read a kilobyte into first kilobyte of bufferV2 @@ -99,12 +115,14 @@ public class ITestAzureBlobFileSystemRandomRead extends public void testRandomRead() throws Exception { Assume.assumeFalse("This test does not support namespace enabled account", this.getFileSystem().getIsNamespaceEnabled()); - assumeHugeFileExists(); + Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomRead"); + assumeHugeFileExists(testPath); + try ( FSDataInputStream inputStreamV1 - = this.getFileSystem().open(TEST_FILE_PATH); + = this.getFileSystem().open(testPath); FSDataInputStream inputStreamV2 - = this.getWasbFileSystem().open(TEST_FILE_PATH); + = this.getWasbFileSystem().open(testPath); ) { final int bufferSize = 4 * KILOBYTE; byte[] bufferV1 = new byte[bufferSize]; @@ -156,8 +174,10 @@ public class ITestAzureBlobFileSystemRandomRead extends */ @Test public void testSeekToNewSource() throws Exception { - assumeHugeFileExists(); - try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) { + Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekToNewSource"); + assumeHugeFileExists(testPath); + + try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) { assertFalse(inputStream.seekToNewSource(0)); } } @@ -169,8 +189,10 @@ public class ITestAzureBlobFileSystemRandomRead extends */ @Test public void testSkipBounds() throws Exception { - assumeHugeFileExists(); - try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) { + Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipBounds"); + long testFileLength = assumeHugeFileExists(testPath); + + try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) { ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); long skipped = inputStream.skip(-1); @@ -208,8 +230,10 @@ public class ITestAzureBlobFileSystemRandomRead extends */ @Test public void testValidateSeekBounds() throws Exception { - assumeHugeFileExists(); - try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) { + Path testPath = new Path(TEST_FILE_PREFIX + "_testValidateSeekBounds"); + long testFileLength = assumeHugeFileExists(testPath); + + try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) { ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); inputStream.seek(0); @@ -257,8 +281,10 @@ public class ITestAzureBlobFileSystemRandomRead extends */ @Test public void testSeekAndAvailableAndPosition() throws Exception { - assumeHugeFileExists(); - try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) { + Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekAndAvailableAndPosition"); + long testFileLength = assumeHugeFileExists(testPath); + + try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) { byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'}; byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'}; byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'}; @@ -321,8 +347,10 @@ public class ITestAzureBlobFileSystemRandomRead extends */ @Test public void testSkipAndAvailableAndPosition() throws Exception { - assumeHugeFileExists(); - try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) { + Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipAndAvailableAndPosition"); + long testFileLength = assumeHugeFileExists(testPath); + + try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) { byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'}; byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'}; byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'}; @@ -385,15 +413,16 @@ public class ITestAzureBlobFileSystemRandomRead extends @Test public void testSequentialReadAfterReverseSeekPerformance() throws Exception { - assumeHugeFileExists(); + Path testPath = new Path(TEST_FILE_PREFIX + "_testSequentialReadAfterReverseSeekPerformance"); + assumeHugeFileExists(testPath); final int maxAttempts = 10; final double maxAcceptableRatio = 1.01; double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0; double ratio = Double.MAX_VALUE; for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { - beforeSeekElapsedMs = sequentialRead(ABFS, + beforeSeekElapsedMs = sequentialRead(ABFS, testPath, this.getFileSystem(), false); - afterSeekElapsedMs = sequentialRead(ABFS, + afterSeekElapsedMs = sequentialRead(ABFS, testPath, this.getFileSystem(), true); ratio = afterSeekElapsedMs / beforeSeekElapsedMs; LOG.info((String.format( @@ -417,8 +446,8 @@ public class ITestAzureBlobFileSystemRandomRead extends public void testRandomReadPerformance() throws Exception { Assume.assumeFalse("This test does not support namespace enabled account", this.getFileSystem().getIsNamespaceEnabled()); - createTestFile(); - assumeHugeFileExists(); + Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomReadPerformance"); + assumeHugeFileExists(testPath); final AzureBlobFileSystem abFs = this.getFileSystem(); final NativeAzureFileSystem wasbFs = this.getWasbFileSystem(); @@ -428,8 +457,8 @@ public class ITestAzureBlobFileSystemRandomRead extends double v1ElapsedMs = 0, v2ElapsedMs = 0; double ratio = Double.MAX_VALUE; for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { - v1ElapsedMs = randomRead(1, wasbFs); - v2ElapsedMs = randomRead(2, abFs); + v1ElapsedMs = randomRead(1, testPath, wasbFs); + v2ElapsedMs = randomRead(2, testPath, abFs); ratio = v2ElapsedMs / v1ElapsedMs; @@ -448,15 +477,112 @@ public class ITestAzureBlobFileSystemRandomRead extends ratio < maxAcceptableRatio); } + /** + * With this test we should see a full buffer read being triggered in case + * alwaysReadBufferSize is on, else only the requested buffer size. + * Hence a seek done few bytes away from last read position will trigger + * a network read when alwaysReadBufferSize is off, whereas it will return + * from the internal buffer when it is on. + * Reading a full buffer size is the Gen1 behaviour. + * @throws Throwable + */ + @Test + public void testAlwaysReadBufferSizeConfig() throws Throwable { + testAlwaysReadBufferSizeConfig(false); + testAlwaysReadBufferSizeConfig(true); + } + + public void testAlwaysReadBufferSizeConfig(boolean alwaysReadBufferSizeConfigValue) + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.readaheadqueue.depth", "0"); + config.set("fs.azure.read.alwaysReadBufferSize", + Boolean.toString(alwaysReadBufferSizeConfigValue)); + + final Path testFile = new Path("/FileName_" + + UUID.randomUUID().toString()); + + final AzureBlobFileSystem fs = createTestFile(testFile, 16 * MEGABYTE, + 1 * MEGABYTE, config); + String eTag = fs.getAbfsClient() + .getPathStatus(testFile.toUri().getPath(), false) + .getResult() + .getResponseHeader(ETAG); + + TestAbfsInputStream testInputStream = new TestAbfsInputStream(); + + AbfsInputStream inputStream = testInputStream.getAbfsInputStream( + fs.getAbfsClient(), + testFile.getName(), ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, eTag, + DISABLED_READAHEAD_DEPTH, FOUR_MB, + alwaysReadBufferSizeConfigValue, FOUR_MB); + + long connectionsAtStart = fs.getInstrumentationMap() + .get(GET_RESPONSES.getStatName()); + + long dateSizeReadStatAtStart = fs.getInstrumentationMap() + .get(BYTES_RECEIVED.getStatName()); + + long newReqCount = 0; + long newDataSizeRead = 0; + + byte[] buffer20b = new byte[TWENTY_BYTES]; + byte[] buffer30b = new byte[THIRTY_BYTES]; + byte[] byteBuffer5 = new byte[FIVE_BYTES]; + + // first read + // if alwaysReadBufferSize is off, this is a sequential read + inputStream.read(byteBuffer5, 0, FIVE_BYTES); + newReqCount++; + newDataSizeRead += FOUR_MB; + + assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, + fs.getInstrumentationMap()); + assertAbfsStatistics(BYTES_RECEIVED, + dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap()); + + // second read beyond that the buffer holds + // if alwaysReadBufferSize is off, this is a random read. Reads only + // incoming buffer size + // else, reads a buffer size + inputStream.seek(NINE_MB); + inputStream.read(buffer20b, 0, BYTE); + newReqCount++; + if (alwaysReadBufferSizeConfigValue) { + newDataSizeRead += FOUR_MB; + } else { + newDataSizeRead += TWENTY_BYTES; + } + + assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, fs.getInstrumentationMap()); + assertAbfsStatistics(BYTES_RECEIVED, + dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap()); + + // third read adjacent to second but not exactly sequential. + // if alwaysReadBufferSize is off, this is another random read + // else second read would have read this too. + inputStream.seek(NINE_MB + TWENTY_BYTES + THREE_BYTES); + inputStream.read(buffer30b, 0, THREE_BYTES); + if (!alwaysReadBufferSizeConfigValue) { + newReqCount++; + newDataSizeRead += THIRTY_BYTES; + } + + assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, fs.getInstrumentationMap()); + assertAbfsStatistics(BYTES_RECEIVED, dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap()); + } private long sequentialRead(String version, + Path testPath, FileSystem fs, boolean afterReverseSeek) throws IOException { byte[] buffer = new byte[SEQUENTIAL_READ_BUFFER_SIZE]; long totalBytesRead = 0; long bytesRead = 0; - try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + long testFileLength = fs.getFileStatus(testPath).getLen(); + try(FSDataInputStream inputStream = fs.open(testPath)) { if (afterReverseSeek) { while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) { bytesRead = inputStream.read(buffer); @@ -487,14 +613,14 @@ public class ITestAzureBlobFileSystemRandomRead extends } } - private long randomRead(int version, FileSystem fs) throws Exception { - assumeHugeFileExists(); + private long randomRead(int version, Path testPath, FileSystem fs) throws Exception { + assumeHugeFileExists(testPath); final long minBytesToRead = 2 * MEGABYTE; Random random = new Random(); byte[] buffer = new byte[8 * KILOBYTE]; long totalBytesRead = 0; long bytesRead = 0; - try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + try(FSDataInputStream inputStream = fs.open(testPath)) { ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); do { bytesRead = inputStream.read(buffer); @@ -526,28 +652,48 @@ public class ITestAzureBlobFileSystemRandomRead extends return bytes / 1000.0 * 8 / milliseconds; } - private void createTestFile() throws Exception { - final AzureBlobFileSystem fs = this.getFileSystem(); - if (fs.exists(TEST_FILE_PATH)) { - FileStatus status = fs.getFileStatus(TEST_FILE_PATH); - if (status.getLen() >= TEST_FILE_SIZE) { - return; + private long createTestFile(Path testPath) throws Exception { + createTestFile(testPath, + TEST_FILE_SIZE, + MEGABYTE, + null); + + return TEST_FILE_SIZE; + } + + private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize, + int createBufferSize, Configuration config) throws Exception { + AzureBlobFileSystem fs; + + if (config == null) { + config = this.getRawConfiguration(); + } + + final AzureBlobFileSystem currentFs = getFileSystem(); + fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + if (fs.exists(testFilePath)) { + FileStatus status = fs.getFileStatus(testFilePath); + if (status.getLen() == testFileSize) { + return fs; } } - byte[] buffer = new byte[CREATE_BUFFER_SIZE]; + byte[] buffer = new byte[createBufferSize]; char character = 'a'; for (int i = 0; i < buffer.length; i++) { buffer[i] = (byte) character; character = (character == 'z') ? 'a' : (char) ((int) character + 1); } - LOG.info(String.format("Creating test file %s of size: %d ", TEST_FILE_PATH, TEST_FILE_SIZE)); + LOG.info(String.format("Creating test file %s of size: %d ", testFilePath, testFileSize)); ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - try (FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) { + try (FSDataOutputStream outputStream = fs.create(testFilePath)) { + String bufferContents = new String(buffer); int bytesWritten = 0; - while (bytesWritten < TEST_FILE_SIZE) { + while (bytesWritten < testFileSize) { outputStream.write(buffer); bytesWritten += buffer.length; } @@ -557,18 +703,18 @@ public class ITestAzureBlobFileSystemRandomRead extends outputStream.close(); closeTimer.end("time to close() output stream"); } - timer.end("time to write %d KB", TEST_FILE_SIZE / 1024); - testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen(); - + timer.end("time to write %d KB", testFileSize / 1024); + return fs; } - private void assumeHugeFileExists() throws Exception{ - createTestFile(); + private long assumeHugeFileExists(Path testPath) throws Exception{ + long fileSize = createTestFile(testPath); FileSystem fs = this.getFileSystem(); - ContractTestUtils.assertPathExists(this.getFileSystem(), "huge file not created", TEST_FILE_PATH); - FileStatus status = fs.getFileStatus(TEST_FILE_PATH); - ContractTestUtils.assertIsFile(TEST_FILE_PATH, status); - assertTrue("File " + TEST_FILE_PATH + " is empty", status.getLen() > 0); + ContractTestUtils.assertPathExists(this.getFileSystem(), "huge file not created", testPath); + FileStatus status = fs.getFileStatus(testPath); + ContractTestUtils.assertIsFile(testPath, status); + assertTrue("File " + testPath + " is not of expected size " + fileSize + ":actual=" + status.getLen(), status.getLen() == fileSize); + return fileSize; } private void verifyConsistentReads(FSDataInputStream inputStreamV1, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index ae72c5ae9d..cbf3d6a2a6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -22,10 +22,17 @@ import java.io.IOException; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; import org.assertj.core.api.Assertions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; @@ -51,9 +58,17 @@ public class TestAbfsInputStream extends private static final int ONE_KB = 1 * 1024; private static final int TWO_KB = 2 * 1024; private static final int THREE_KB = 3 * 1024; + private static final int SIXTEEN_KB = 16 * ONE_KB; + private static final int FORTY_EIGHT_KB = 48 * ONE_KB; + private static final int ONE_MB = 1 * 1024 * 1024; + private static final int FOUR_MB = 4 * ONE_MB; + private static final int EIGHT_MB = 8 * ONE_MB; + private static final int TEST_READAHEAD_DEPTH_2 = 2; + private static final int TEST_READAHEAD_DEPTH_4 = 4; private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD = REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec + private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; private AbfsRestOperation getMockRestOp() { AbfsRestOperation op = mock(AbfsRestOperation.class); @@ -84,7 +99,7 @@ public class TestAbfsInputStream extends null, FORWARD_SLASH + fileName, THREE_KB, - inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10), + inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB), "eTag"); inputStream.setCachedSasToken( @@ -93,6 +108,33 @@ public class TestAbfsInputStream extends return inputStream; } + public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, + String fileName, + int fileSize, + String eTag, + int readAheadQueueDepth, + int readBufferSize, + boolean alwaysReadBufferSize, + int readAheadBlockSize) { + AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); + // Create AbfsInputStream with the client instance + AbfsInputStream inputStream = new AbfsInputStream( + abfsClient, + null, + FORWARD_SLASH + fileName, + fileSize, + inputStreamContext.withReadBufferSize(readBufferSize) + .withReadAheadQueueDepth(readAheadQueueDepth) + .withShouldReadBufferSizeAlways(alwaysReadBufferSize) + .withReadAheadBlockSize(readAheadBlockSize), + eTag); + + inputStream.setCachedSasToken( + TestCachedSASToken.getTestCachedSASTokenInstance()); + + return inputStream; + } + private void queueReadAheads(AbfsInputStream inputStream) { // Mimic AbfsInputStream readAhead queue requests ReadBufferManager.getBufferManager() @@ -496,4 +538,183 @@ public class TestAbfsInputStream extends checkEvictedStatus(inputStream, 0, true); } + /** + * Test readahead with different config settings for request request size and + * readAhead block size + * @throws Exception + */ + @Test + public void testDiffReadRequestSizeAndRAHBlockSize() throws Exception { + // Set requestRequestSize = 4MB and readAheadBufferSize=8MB + resetReadBufferManager(FOUR_MB, INCREASED_READ_BUFFER_AGE_THRESHOLD); + testReadAheadConfigs(FOUR_MB, TEST_READAHEAD_DEPTH_4, false, EIGHT_MB); + + // Test for requestRequestSize =16KB and readAheadBufferSize=16KB + resetReadBufferManager(SIXTEEN_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD); + AbfsInputStream inputStream = testReadAheadConfigs(SIXTEEN_KB, + TEST_READAHEAD_DEPTH_2, true, SIXTEEN_KB); + testReadAheads(inputStream, SIXTEEN_KB, SIXTEEN_KB); + + // Test for requestRequestSize =16KB and readAheadBufferSize=48KB + resetReadBufferManager(FORTY_EIGHT_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD); + inputStream = testReadAheadConfigs(SIXTEEN_KB, TEST_READAHEAD_DEPTH_2, true, + FORTY_EIGHT_KB); + testReadAheads(inputStream, SIXTEEN_KB, FORTY_EIGHT_KB); + + // Test for requestRequestSize =48KB and readAheadBufferSize=16KB + resetReadBufferManager(FORTY_EIGHT_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD); + inputStream = testReadAheadConfigs(FORTY_EIGHT_KB, TEST_READAHEAD_DEPTH_2, + true, + SIXTEEN_KB); + testReadAheads(inputStream, FORTY_EIGHT_KB, SIXTEEN_KB); + } + + + private void testReadAheads(AbfsInputStream inputStream, + int readRequestSize, + int readAheadRequestSize) + throws Exception { + if (readRequestSize > readAheadRequestSize) { + readAheadRequestSize = readRequestSize; + } + + byte[] firstReadBuffer = new byte[readRequestSize]; + byte[] secondReadBuffer = new byte[readAheadRequestSize]; + + // get the expected bytes to compare + byte[] expectedFirstReadAheadBufferContents = new byte[readRequestSize]; + byte[] expectedSecondReadAheadBufferContents = new byte[readAheadRequestSize]; + getExpectedBufferData(0, readRequestSize, expectedFirstReadAheadBufferContents); + getExpectedBufferData(readRequestSize, readAheadRequestSize, + expectedSecondReadAheadBufferContents); + + Assertions.assertThat(inputStream.read(firstReadBuffer, 0, readRequestSize)) + .describedAs("Read should be of exact requested size") + .isEqualTo(readRequestSize); + + assertTrue("Data mismatch found in RAH1", + Arrays.equals(firstReadBuffer, + expectedFirstReadAheadBufferContents)); + + Assertions.assertThat(inputStream.read(secondReadBuffer, 0, readAheadRequestSize)) + .describedAs("Read should be of exact requested size") + .isEqualTo(readAheadRequestSize); + + assertTrue("Data mismatch found in RAH2", + Arrays.equals(secondReadBuffer, + expectedSecondReadAheadBufferContents)); + } + + public AbfsInputStream testReadAheadConfigs(int readRequestSize, + int readAheadQueueDepth, + boolean alwaysReadBufferSizeEnabled, + int readAheadBlockSize) throws Exception { + Configuration + config = new Configuration( + this.getRawConfiguration()); + config.set("fs.azure.read.request.size", Integer.toString(readRequestSize)); + config.set("fs.azure.readaheadqueue.depth", + Integer.toString(readAheadQueueDepth)); + config.set("fs.azure.read.alwaysReadBufferSize", + Boolean.toString(alwaysReadBufferSizeEnabled)); + config.set("fs.azure.read.readahead.blocksize", + Integer.toString(readAheadBlockSize)); + if (readRequestSize > readAheadBlockSize) { + readAheadBlockSize = readRequestSize; + } + + Path testPath = new Path( + "/testReadAheadConfigs"); + final AzureBlobFileSystem fs = createTestFile(testPath, + ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, config); + byte[] byteBuffer = new byte[ONE_MB]; + AbfsInputStream inputStream = this.getAbfsStore(fs) + .openFileForRead(testPath, null); + + Assertions.assertThat(inputStream.getBufferSize()) + .describedAs("Unexpected AbfsInputStream buffer size") + .isEqualTo(readRequestSize); + + Assertions.assertThat(inputStream.getReadAheadQueueDepth()) + .describedAs("Unexpected ReadAhead queue depth") + .isEqualTo(readAheadQueueDepth); + + Assertions.assertThat(inputStream.shouldAlwaysReadBufferSize()) + .describedAs("Unexpected AlwaysReadBufferSize settings") + .isEqualTo(alwaysReadBufferSizeEnabled); + + Assertions.assertThat(ReadBufferManager.getBufferManager().getReadAheadBlockSize()) + .describedAs("Unexpected readAhead block size") + .isEqualTo(readAheadBlockSize); + + return inputStream; + } + + private void getExpectedBufferData(int offset, int length, byte[] b) { + boolean startFillingIn = false; + int indexIntoBuffer = 0; + char character = 'a'; + + for (int i = 0; i < (offset + length); i++) { + if (i == offset) { + startFillingIn = true; + } + + if ((startFillingIn) && (indexIntoBuffer < length)) { + b[indexIntoBuffer] = (byte) character; + indexIntoBuffer++; + } + + character = (character == 'z') ? 'a' : (char) ((int) character + 1); + } + } + + private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize, + Configuration config) throws Exception { + AzureBlobFileSystem fs; + + if (config == null) { + fs = this.getFileSystem(); + } else { + final AzureBlobFileSystem currentFs = getFileSystem(); + fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + } + + if (fs.exists(testFilePath)) { + FileStatus status = fs.getFileStatus(testFilePath); + if (status.getLen() >= testFileSize) { + return fs; + } + } + + byte[] buffer = new byte[EIGHT_MB]; + char character = 'a'; + for (int i = 0; i < buffer.length; i++) { + buffer[i] = (byte) character; + character = (character == 'z') ? 'a' : (char) ((int) character + 1); + } + + try (FSDataOutputStream outputStream = fs.create(testFilePath)) { + int bytesWritten = 0; + while (bytesWritten < testFileSize) { + outputStream.write(buffer); + bytesWritten += buffer.length; + } + } + + Assertions.assertThat(fs.getFileStatus(testFilePath).getLen()) + .describedAs("File not created of expected size") + .isEqualTo(testFileSize); + + return fs; + } + + private void resetReadBufferManager(int bufferSize, int threshold) { + ReadBufferManager.getBufferManager() + .testResetReadBufferManager(bufferSize, threshold); + // Trigger GC as aggressive recreation of ReadBufferManager buffers + // by successive tests can lead to OOM based on the dev VM/machine capacity. + System.gc(); + } } \ No newline at end of file