diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index e1bc8aa57b..eff8c08605 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -118,6 +118,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ) private boolean optimizeFooterRead; + @IntegerConfigurationValidatorAnnotation( + ConfigurationKey = AZURE_FOOTER_READ_BUFFER_SIZE, + DefaultValue = DEFAULT_FOOTER_READ_BUFFER_SIZE) + private int footerReadBufferSize; + @BooleanConfigurationValidatorAnnotation( ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED) @@ -648,6 +653,10 @@ public boolean optimizeFooterRead() { return this.optimizeFooterRead; } + public int getFooterReadBufferSize() { + return this.footerReadBufferSize; + } + public int getReadBufferSize() { return this.readBufferSize; } @@ -1182,6 +1191,11 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) { this.optimizeFooterRead = optimizeFooterRead; } + @VisibleForTesting + public void setFooterReadBufferSize(int footerReadBufferSize) { + this.footerReadBufferSize = footerReadBufferSize; + } + @VisibleForTesting public void setEnableAbfsListIterator(boolean enableAbfsListIterator) { this.enableAbfsListIterator = enableAbfsListIterator; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 2b7141e6cb..d9693dd7e1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -155,6 +155,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT; @@ -895,6 +896,9 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( boolean bufferedPreadDisabled = options .map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false)) .orElse(false); + int footerReadBufferSize = options.map(c -> c.getInt( + AZURE_FOOTER_READ_BUFFER_SIZE, abfsConfiguration.getFooterReadBufferSize())) + .orElse(abfsConfiguration.getFooterReadBufferSize()); return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) @@ -902,6 +906,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( .isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled()) .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely()) .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead()) + .withFooterReadBufferSize(footerReadBufferSize) .withReadAheadRange(abfsConfiguration.getReadAheadRange()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .withShouldReadBufferSizeAlways( diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 91f9eff532..a27c757026 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -104,7 +104,22 @@ public final class ConfigurationKeys { public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush"; public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size"; public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely"; + /** + * When parquet files are read, first few read are metadata reads before + * reading the actual data. First the read is done of last 8 bytes of parquet + * file to get the postion of metadta and next read is done for reading that + * metadata. With this optimization these two reads can be combined into 1. + * Value: {@value} + */ public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread"; + /** + * In case of footer reads it was not required to read full buffer size. + * Most of the metadata information required was within 256 KB and it will be + * more performant to read less. 512 KB is a sweet spot. + * This config is used to define how much footer length the user wants to read. + * Value: {@value} + */ + public static final String AZURE_FOOTER_READ_BUFFER_SIZE = "fs.azure.footer.read.request.size"; /** * Read ahead range parameter which can be set by user. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 00fc4a6a3d..b3825b4c53 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -59,7 +59,8 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false; public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false; - public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false; + public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = true; + public static final int DEFAULT_FOOTER_READ_BUFFER_SIZE = 512 * ONE_KB; public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false; public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB; public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 00b48e25b5..af82c3f128 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -71,6 +71,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final String path; private final long contentLength; private final int bufferSize; // default buffer size + private final int footerReadSize; // default buffer size to read when reading footer private final int readAheadQueueDepth; // initialized in constructor private final String eTag; // eTag of the path when InputStream are created private final boolean tolerateOobAppends; // whether tolerate Oob Appends @@ -140,6 +141,7 @@ public AbfsInputStream( this.path = path; this.contentLength = contentLength; this.bufferSize = abfsInputStreamContext.getReadBufferSize(); + this.footerReadSize = abfsInputStreamContext.getFooterReadBufferSize(); this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; @@ -361,6 +363,7 @@ private int readFileCompletely(final byte[] b, final int off, final int len) return optimisedRead(b, off, len, 0, contentLength); } + // To do footer read of files when enabled. private int readLastBlock(final byte[] b, final int off, final int len) throws IOException { if (len == 0) { @@ -373,10 +376,10 @@ private int readLastBlock(final byte[] b, final int off, final int len) // data need to be copied to user buffer from index bCursor, // AbfsInutStream buffer is going to contain data from last block start. In // that case bCursor will be set to fCursor - lastBlockStart - long lastBlockStart = max(0, contentLength - bufferSize); + long lastBlockStart = max(0, contentLength - footerReadSize); bCursor = (int) (fCursor - lastBlockStart); // 0 if contentlength is < buffersize - long actualLenToRead = min(bufferSize, contentLength); + long actualLenToRead = min(footerReadSize, contentLength); return optimisedRead(b, off, len, lastBlockStart, actualLenToRead); } @@ -819,6 +822,11 @@ public int getBufferSize() { return bufferSize; } + @VisibleForTesting + protected int getFooterReadBufferSize() { + return footerReadSize; + } + @VisibleForTesting public int getReadAheadQueueDepth() { return readAheadQueueDepth; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index 59352d174f..fdcad5ac3a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -53,6 +53,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean optimizeFooterRead; + private int footerReadBufferSize; + private boolean bufferedPreadDisabled; /** A BackReference to the FS instance that created this OutputStream. */ @@ -113,6 +115,11 @@ public AbfsInputStreamContext withOptimizeFooterRead( return this; } + public AbfsInputStreamContext withFooterReadBufferSize(final int footerReadBufferSize) { + this.footerReadBufferSize = footerReadBufferSize; + return this; + } + public AbfsInputStreamContext withShouldReadBufferSizeAlways( final boolean alwaysReadBufferSize) { this.alwaysReadBufferSize = alwaysReadBufferSize; @@ -190,6 +197,10 @@ public boolean optimizeFooterRead() { return this.optimizeFooterRead; } + public int getFooterReadBufferSize() { + return footerReadBufferSize; + } + public boolean shouldReadBufferSizeAlways() { return alwaysReadBufferSize; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index beada775ae..bc420c6a1f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -84,6 +84,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { abfsConfiguration.setWriteBufferSize(bufferSize); abfsConfiguration.setReadBufferSize(bufferSize); abfsConfiguration.setReadAheadEnabled(readaheadEnabled); + abfsConfiguration.setOptimizeFooterRead(false); final byte[] b = new byte[2 * bufferSize]; new Random().nextBytes(b); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 2ac58fbcb1..11b14162eb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -169,6 +169,8 @@ protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely) final AzureBlobFileSystem fs = getFileSystem(); getAbfsStore(fs).getAbfsConfiguration() .setReadSmallFilesCompletely(readSmallFilesCompletely); + getAbfsStore(fs).getAbfsConfiguration() + .setOptimizeFooterRead(false); return fs; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java index cb3eaffe02..bf205879cb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java @@ -21,6 +21,11 @@ import java.io.IOException; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; + +import org.assertj.core.api.Assertions; import org.junit.Test; import org.apache.hadoop.fs.FSDataInputStream; @@ -33,6 +38,8 @@ import static java.lang.Math.max; import static java.lang.Math.min; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FOOTER_READ_BUFFER_SIZE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -41,7 +48,6 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream { @@ -64,38 +70,46 @@ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse() private void testNumBackendCalls(boolean optimizeFooterRead) throws Exception { - for (int i = 1; i <= 4; i++) { - int fileSize = i * ONE_MB; - final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead, - fileSize); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - int length = AbfsInputStream.FOOTER_SIZE; - try (FSDataInputStream iStream = fs.open(testFilePath)) { - byte[] buffer = new byte[length]; + int fileIdx = 0; + for (int i = 0; i <= 4; i++) { + for (int j = 0; j <= 2; j++) { + int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; + int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; + final AzureBlobFileSystem fs = getFileSystem( + optimizeFooterRead, fileSize); + Path testFilePath = createPathAndFileWithContent( + fs, fileIdx++, fileSize); + int length = AbfsInputStream.FOOTER_SIZE; + FutureDataInputStreamBuilder builder = getParameterizedBuilder( + testFilePath, fs, footerReadBufferSize); + try (FSDataInputStream iStream = builder.build().get()) { + verifyConfigValueInStream(iStream, footerReadBufferSize); + byte[] buffer = new byte[length]; - Map metricMap = getInstrumentationMap(fs); - long requestsMadeBeforeTest = metricMap - .get(CONNECTIONS_MADE.getStatName()); + Map metricMap = getInstrumentationMap(fs); + long requestsMadeBeforeTest = metricMap + .get(CONNECTIONS_MADE.getStatName()); - iStream.seek(fileSize - 8); - iStream.read(buffer, 0, length); + iStream.seek(fileSize - 8); + iStream.read(buffer, 0, length); - iStream.seek(fileSize - (TEN * ONE_KB)); - iStream.read(buffer, 0, length); + iStream.seek(fileSize - (TEN * ONE_KB)); + iStream.read(buffer, 0, length); - iStream.seek(fileSize - (TWENTY * ONE_KB)); - iStream.read(buffer, 0, length); + iStream.seek(fileSize - (TWENTY * ONE_KB)); + iStream.read(buffer, 0, length); - metricMap = getInstrumentationMap(fs); - long requestsMadeAfterTest = metricMap - .get(CONNECTIONS_MADE.getStatName()); + metricMap = getInstrumentationMap(fs); + long requestsMadeAfterTest = metricMap + .get(CONNECTIONS_MADE.getStatName()); - if (optimizeFooterRead) { - assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest); - } else { - assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest); + if (optimizeFooterRead) { + assertEquals(1, + requestsMadeAfterTest - requestsMadeBeforeTest); + } else { + assertEquals(3, + requestsMadeAfterTest - requestsMadeBeforeTest); + } } } } @@ -153,15 +167,24 @@ public void testSeekToEndAndReadWithConfFalse() throws Exception { private void testSeekAndReadWithConf(boolean optimizeFooterRead, SeekTo seekTo) throws Exception { - for (int i = 2; i <= 6; i++) { - int fileSize = i * ONE_MB; - final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead, - fileSize); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED, - fileContent); + // Running the test for file sizes ranging from 256 KB to 4 MB with + // Footer Read Buffer size ranging from 256 KB to 1 MB + // This will cover files less than footer read buffer size, + // Files between footer read buffer and read buffer size + // Files bigger than read buffer size + int fileIdx = 0; + for (int i = 0; i <= 4; i++) { + for (int j = 0; j <= 2; j++) { + int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; + int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; + final AzureBlobFileSystem fs = getFileSystem( + optimizeFooterRead, fileSize); + String fileName = methodName.getMethodName() + fileIdx++; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED, + fileContent, footerReadBufferSize); + } } } @@ -182,15 +205,17 @@ private int seekPos(SeekTo seekTo, int fileSize) { return fileSize - AbfsInputStream.FOOTER_SIZE + 1; } - private void seekReadAndTest(final FileSystem fs, final Path testFilePath, - final int seekPos, final int length, final byte[] fileContent) - throws IOException, NoSuchFieldException, IllegalAccessException { + private void seekReadAndTest(final AzureBlobFileSystem fs, + final Path testFilePath, final int seekPos, final int length, + final byte[] fileContent, int footerReadBufferSize) throws Exception { AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration(); long actualContentLength = fileContent.length; - try (FSDataInputStream iStream = fs.open(testFilePath)) { - AbfsInputStream abfsInputStream = (AbfsInputStream) iStream - .getWrappedStream(); - long bufferSize = abfsInputStream.getBufferSize(); + FutureDataInputStreamBuilder builder = getParameterizedBuilder( + testFilePath, fs, footerReadBufferSize); + try (FSDataInputStream iStream = builder.build().get()) { + AbfsInputStream abfsInputStream = (AbfsInputStream) iStream.getWrappedStream(); + verifyConfigValueInStream(iStream, footerReadBufferSize); + long readBufferSize = abfsInputStream.getBufferSize(); seek(iStream, seekPos); byte[] buffer = new byte[length]; long bytesRead = iStream.read(buffer, 0, length); @@ -206,40 +231,40 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath, actualLength = length - delta; } long expectedLimit; - long expectedBCurson; + long expectedBCursor; long expectedFCursor; if (optimizationOn) { - if (actualContentLength <= bufferSize) { + if (actualContentLength <= footerReadBufferSize) { expectedLimit = actualContentLength; - expectedBCurson = seekPos + actualLength; + expectedBCursor = seekPos + actualLength; } else { - expectedLimit = bufferSize; - long lastBlockStart = max(0, actualContentLength - bufferSize); - expectedBCurson = seekPos - lastBlockStart + actualLength; + expectedLimit = footerReadBufferSize; + long lastBlockStart = max(0, actualContentLength - footerReadBufferSize); + expectedBCursor = seekPos - lastBlockStart + actualLength; } expectedFCursor = actualContentLength; } else { - if (seekPos + bufferSize < actualContentLength) { - expectedLimit = bufferSize; - expectedFCursor = bufferSize; + if (seekPos + readBufferSize < actualContentLength) { + expectedLimit = readBufferSize; + expectedFCursor = readBufferSize; } else { expectedLimit = actualContentLength - seekPos; - expectedFCursor = min(seekPos + bufferSize, actualContentLength); + expectedFCursor = min(seekPos + readBufferSize, actualContentLength); } - expectedBCurson = actualLength; + expectedBCursor = actualLength; } assertEquals(expectedFCursor, abfsInputStream.getFCursor()); assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead()); assertEquals(expectedLimit, abfsInputStream.getLimit()); - assertEquals(expectedBCurson, abfsInputStream.getBCursor()); + assertEquals(expectedBCursor, abfsInputStream.getBCursor()); assertEquals(actualLength, bytesRead); // Verify user-content read assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer, testFilePath); // Verify data read to AbfsInputStream buffer int from = seekPos; if (optimizationOn) { - from = (int) max(0, actualContentLength - bufferSize); + from = (int) max(0, actualContentLength - footerReadBufferSize); } assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(), abfsInputStream.getBuffer(), testFilePath); @@ -247,28 +272,34 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath, } @Test - public void testPartialReadWithNoData() - throws Exception { - for (int i = 2; i <= 6; i++) { - int fileSize = i * ONE_MB; - final AzureBlobFileSystem fs = getFileSystem(true, fileSize); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - testPartialReadWithNoData(fs, testFilePath, - fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, - fileContent); + public void testPartialReadWithNoData() throws Exception { + int fileIdx = 0; + for (int i = 0; i <= 4; i++) { + for (int j = 0; j <= 2; j++) { + int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; + int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; + final AzureBlobFileSystem fs = getFileSystem( + true, fileSize, footerReadBufferSize); + String fileName = methodName.getMethodName() + fileIdx++; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + testPartialReadWithNoData(fs, testFilePath, + fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, + fileContent, footerReadBufferSize); + } } } private void testPartialReadWithNoData(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, - final byte[] fileContent) - throws IOException, NoSuchFieldException, IllegalAccessException { + final byte[] fileContent, int footerReadBufferSize) throws IOException { FSDataInputStream iStream = fs.open(testFilePath); try { AbfsInputStream abfsInputStream = (AbfsInputStream) iStream .getWrappedStream(); + Assertions.assertThat(abfsInputStream.getFooterReadBufferSize()) + .describedAs("Footer Read Buffer Size Should be same as what set in builder") + .isEqualTo(footerReadBufferSize); abfsInputStream = spy(abfsInputStream); doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream) .readRemote(anyLong(), any(), anyInt(), anyInt(), @@ -290,34 +321,36 @@ private void testPartialReadWithNoData(final FileSystem fs, } @Test - public void testPartialReadWithSomeDat() - throws Exception { - for (int i = 3; i <= 6; i++) { - int fileSize = i * ONE_MB; - final AzureBlobFileSystem fs = getFileSystem(true, fileSize); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - testPartialReadWithSomeDat(fs, testFilePath, - fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, - fileContent); + public void testPartialReadWithSomeData() throws Exception { + for (int i = 0; i <= 4; i++) { + for (int j = 0; j <= 2; j++) { + int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; + int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; + final AzureBlobFileSystem fs = getFileSystem(true, + fileSize, footerReadBufferSize); + String fileName = methodName.getMethodName() + i; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + testPartialReadWithSomeData(fs, testFilePath, + fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, + fileContent, footerReadBufferSize); + } } } - private void testPartialReadWithSomeDat(final FileSystem fs, + private void testPartialReadWithSomeData(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, - final byte[] fileContent) - throws IOException, NoSuchFieldException, IllegalAccessException { + final byte[] fileContent, final int footerReadBufferSize) throws IOException { FSDataInputStream iStream = fs.open(testFilePath); try { - AbfsInputStream abfsInputStream = (AbfsInputStream) iStream - .getWrappedStream(); - abfsInputStream = spy(abfsInputStream); + verifyConfigValueInStream(iStream, footerReadBufferSize); + AbfsInputStream abfsInputStream = spy((AbfsInputStream) iStream + .getWrappedStream()); // first readRemote, will return first 10 bytes // second readRemote returns data till the last 2 bytes int someDataLength = 2; int secondReturnSize = - min(fileContent.length, abfsInputStream.getBufferSize()) - 10 + min(fileContent.length, abfsInputStream.getFooterReadBufferSize()) - 10 - someDataLength; doReturn(10).doReturn(secondReturnSize).doCallRealMethod() .when(abfsInputStream) @@ -342,15 +375,93 @@ private void testPartialReadWithSomeDat(final FileSystem fs, } } - private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead, - int fileSize) throws IOException { - final AzureBlobFileSystem fs = getFileSystem(); - getAbfsStore(fs).getAbfsConfiguration() - .setOptimizeFooterRead(optimizeFooterRead); - if (fileSize <= getAbfsStore(fs).getAbfsConfiguration() - .getReadBufferSize()) { + @Test + public void testFooterReadBufferSizeConfiguration() throws Exception { + Configuration config = new Configuration(this.getRawConfiguration()); + config.unset(AZURE_FOOTER_READ_BUFFER_SIZE); + try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(config)){ + Path testFilePath = createPathAndFileWithContent(fs, 0, ONE_KB); + final int footerReadBufferSizeConfig = 4 * ONE_KB; + final int footerReadBufferSizeBuilder = 5 * ONE_KB; + + // Verify that default value is used if nothing is set explicitly + FSDataInputStream iStream = fs.open(testFilePath); + verifyConfigValueInStream(iStream, DEFAULT_FOOTER_READ_BUFFER_SIZE); + + // Verify that value set in config is used if builder is not used getAbfsStore(fs).getAbfsConfiguration() - .setReadSmallFilesCompletely(false); + .setFooterReadBufferSize(footerReadBufferSizeConfig); + iStream = fs.open(testFilePath); + verifyConfigValueInStream(iStream, footerReadBufferSizeConfig); + + // Verify that when builder is used value set in parameters is used + getAbfsStore(fs).getAbfsConfiguration().unset(AZURE_FOOTER_READ_BUFFER_SIZE); + FutureDataInputStreamBuilder builder = fs.openFile(testFilePath); + builder.opt(AZURE_FOOTER_READ_BUFFER_SIZE, + footerReadBufferSizeBuilder); + iStream = builder.build().get(); + verifyConfigValueInStream(iStream, footerReadBufferSizeBuilder); + + // Verify that when builder is used value set in parameters is used + // even if config is set + getAbfsStore(fs).getAbfsConfiguration() + .setFooterReadBufferSize(footerReadBufferSizeConfig); + iStream = builder.build().get(); + verifyConfigValueInStream(iStream, footerReadBufferSizeBuilder); + + // Verify that when the builder is used and parameter in builder is not set, + // the value set in configuration is used + getAbfsStore(fs).getAbfsConfiguration() + .setFooterReadBufferSize(footerReadBufferSizeConfig); + builder = fs.openFile(testFilePath); + iStream = builder.build().get(); + verifyConfigValueInStream(iStream, footerReadBufferSizeConfig); + } + } + + private void verifyConfigValueInStream(final FSDataInputStream inputStream, + final int expectedValue) { + AbfsInputStream stream = (AbfsInputStream) inputStream.getWrappedStream(); + Assertions.assertThat(stream.getFooterReadBufferSize()) + .describedAs( + "Footer Read Buffer Size Value Is Not As Expected") + .isEqualTo(expectedValue); + } + + private Path createPathAndFileWithContent(final AzureBlobFileSystem fs, + final int fileIdx, final int fileSize) throws Exception { + String fileName = methodName.getMethodName() + fileIdx; + byte[] fileContent = getRandomBytesArray(fileSize); + return createFileWithContent(fs, fileName, fileContent); + } + + private FutureDataInputStreamBuilder getParameterizedBuilder(final Path path, + final AzureBlobFileSystem fs, int footerReadBufferSize) throws Exception { + FutureDataInputStreamBuilder builder = fs.openFile(path); + builder.opt(AZURE_FOOTER_READ_BUFFER_SIZE, + footerReadBufferSize); + return builder; + } + + private AzureBlobFileSystem getFileSystem(final boolean optimizeFooterRead, + final int fileSize) throws IOException { + final AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore store = getAbfsStore(fs); + store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead); + if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) { + store.getAbfsConfiguration().setReadSmallFilesCompletely(false); + } + return fs; + } + + private AzureBlobFileSystem getFileSystem(final boolean optimizeFooterRead, + final int fileSize, final int footerReadBufferSize) throws IOException { + final AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore store = getAbfsStore(fs); + store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead); + store.getAbfsConfiguration().setFooterReadBufferSize(footerReadBufferSize); + if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) { + store.getAbfsConfiguration().setReadSmallFilesCompletely(false); } return fs; }