HADOOP-18971. [ABFS] Read and cache file footer with fs.azure.footer.read.request.size (#6270)
The option fs.azure.footer.read.request.size sets the size of the footer to read and cache; the default value of 524288 has been measured to be good for most workloads running on parquet, ORC and similar file formats. Contributed by Anuj Modi
This commit is contained in:
parent
556fbcf025
commit
e3c135b0b3
@ -118,6 +118,11 @@ public class AbfsConfiguration{
|
||||
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
|
||||
private boolean optimizeFooterRead;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(
|
||||
ConfigurationKey = AZURE_FOOTER_READ_BUFFER_SIZE,
|
||||
DefaultValue = DEFAULT_FOOTER_READ_BUFFER_SIZE)
|
||||
private int footerReadBufferSize;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(
|
||||
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
|
||||
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
|
||||
@ -648,6 +653,10 @@ public boolean optimizeFooterRead() {
|
||||
return this.optimizeFooterRead;
|
||||
}
|
||||
|
||||
public int getFooterReadBufferSize() {
|
||||
return this.footerReadBufferSize;
|
||||
}
|
||||
|
||||
public int getReadBufferSize() {
|
||||
return this.readBufferSize;
|
||||
}
|
||||
@ -1182,6 +1191,11 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) {
|
||||
this.optimizeFooterRead = optimizeFooterRead;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setFooterReadBufferSize(int footerReadBufferSize) {
|
||||
this.footerReadBufferSize = footerReadBufferSize;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
|
||||
this.enableAbfsListIterator = enableAbfsListIterator;
|
||||
|
@ -155,6 +155,7 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;
|
||||
@ -895,6 +896,9 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
|
||||
boolean bufferedPreadDisabled = options
|
||||
.map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false))
|
||||
.orElse(false);
|
||||
int footerReadBufferSize = options.map(c -> c.getInt(
|
||||
AZURE_FOOTER_READ_BUFFER_SIZE, abfsConfiguration.getFooterReadBufferSize()))
|
||||
.orElse(abfsConfiguration.getFooterReadBufferSize());
|
||||
return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
|
||||
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
|
||||
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
|
||||
@ -902,6 +906,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
|
||||
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
|
||||
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
|
||||
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
|
||||
.withFooterReadBufferSize(footerReadBufferSize)
|
||||
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
|
||||
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
|
||||
.withShouldReadBufferSizeAlways(
|
||||
|
@ -104,7 +104,22 @@ public final class ConfigurationKeys {
|
||||
public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush";
|
||||
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
|
||||
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
|
||||
/**
|
||||
* When parquet files are read, first few read are metadata reads before
|
||||
* reading the actual data. First the read is done of last 8 bytes of parquet
|
||||
* file to get the postion of metadta and next read is done for reading that
|
||||
* metadata. With this optimization these two reads can be combined into 1.
|
||||
* Value: {@value}
|
||||
*/
|
||||
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
|
||||
/**
|
||||
* In case of footer reads it was not required to read full buffer size.
|
||||
* Most of the metadata information required was within 256 KB and it will be
|
||||
* more performant to read less. 512 KB is a sweet spot.
|
||||
* This config is used to define how much footer length the user wants to read.
|
||||
* Value: {@value}
|
||||
*/
|
||||
public static final String AZURE_FOOTER_READ_BUFFER_SIZE = "fs.azure.footer.read.request.size";
|
||||
|
||||
/**
|
||||
* Read ahead range parameter which can be set by user.
|
||||
|
@ -59,7 +59,8 @@ public final class FileSystemConfigurations {
|
||||
public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false;
|
||||
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
||||
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
|
||||
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
|
||||
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = true;
|
||||
public static final int DEFAULT_FOOTER_READ_BUFFER_SIZE = 512 * ONE_KB;
|
||||
public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
|
||||
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
|
||||
public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB
|
||||
|
@ -71,6 +71,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||
private final String path;
|
||||
private final long contentLength;
|
||||
private final int bufferSize; // default buffer size
|
||||
private final int footerReadSize; // default buffer size to read when reading footer
|
||||
private final int readAheadQueueDepth; // initialized in constructor
|
||||
private final String eTag; // eTag of the path when InputStream are created
|
||||
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
|
||||
@ -140,6 +141,7 @@ public AbfsInputStream(
|
||||
this.path = path;
|
||||
this.contentLength = contentLength;
|
||||
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
|
||||
this.footerReadSize = abfsInputStreamContext.getFooterReadBufferSize();
|
||||
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
|
||||
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
|
||||
this.eTag = eTag;
|
||||
@ -361,6 +363,7 @@ private int readFileCompletely(final byte[] b, final int off, final int len)
|
||||
return optimisedRead(b, off, len, 0, contentLength);
|
||||
}
|
||||
|
||||
// To do footer read of files when enabled.
|
||||
private int readLastBlock(final byte[] b, final int off, final int len)
|
||||
throws IOException {
|
||||
if (len == 0) {
|
||||
@ -373,10 +376,10 @@ private int readLastBlock(final byte[] b, final int off, final int len)
|
||||
// data need to be copied to user buffer from index bCursor,
|
||||
// AbfsInutStream buffer is going to contain data from last block start. In
|
||||
// that case bCursor will be set to fCursor - lastBlockStart
|
||||
long lastBlockStart = max(0, contentLength - bufferSize);
|
||||
long lastBlockStart = max(0, contentLength - footerReadSize);
|
||||
bCursor = (int) (fCursor - lastBlockStart);
|
||||
// 0 if contentlength is < buffersize
|
||||
long actualLenToRead = min(bufferSize, contentLength);
|
||||
long actualLenToRead = min(footerReadSize, contentLength);
|
||||
return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
|
||||
}
|
||||
|
||||
@ -819,6 +822,11 @@ public int getBufferSize() {
|
||||
return bufferSize;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected int getFooterReadBufferSize() {
|
||||
return footerReadSize;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getReadAheadQueueDepth() {
|
||||
return readAheadQueueDepth;
|
||||
|
@ -53,6 +53,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
|
||||
|
||||
private boolean optimizeFooterRead;
|
||||
|
||||
private int footerReadBufferSize;
|
||||
|
||||
private boolean bufferedPreadDisabled;
|
||||
|
||||
/** A BackReference to the FS instance that created this OutputStream. */
|
||||
@ -113,6 +115,11 @@ public AbfsInputStreamContext withOptimizeFooterRead(
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsInputStreamContext withFooterReadBufferSize(final int footerReadBufferSize) {
|
||||
this.footerReadBufferSize = footerReadBufferSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsInputStreamContext withShouldReadBufferSizeAlways(
|
||||
final boolean alwaysReadBufferSize) {
|
||||
this.alwaysReadBufferSize = alwaysReadBufferSize;
|
||||
@ -190,6 +197,10 @@ public boolean optimizeFooterRead() {
|
||||
return this.optimizeFooterRead;
|
||||
}
|
||||
|
||||
public int getFooterReadBufferSize() {
|
||||
return footerReadBufferSize;
|
||||
}
|
||||
|
||||
public boolean shouldReadBufferSizeAlways() {
|
||||
return alwaysReadBufferSize;
|
||||
}
|
||||
|
@ -84,6 +84,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
|
||||
abfsConfiguration.setWriteBufferSize(bufferSize);
|
||||
abfsConfiguration.setReadBufferSize(bufferSize);
|
||||
abfsConfiguration.setReadAheadEnabled(readaheadEnabled);
|
||||
abfsConfiguration.setOptimizeFooterRead(false);
|
||||
|
||||
final byte[] b = new byte[2 * bufferSize];
|
||||
new Random().nextBytes(b);
|
||||
|
@ -169,6 +169,8 @@ protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setReadSmallFilesCompletely(readSmallFilesCompletely);
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setOptimizeFooterRead(false);
|
||||
return fs;
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,11 @@
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
@ -33,6 +38,8 @@
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FOOTER_READ_BUFFER_SIZE;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
@ -41,7 +48,6 @@
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
|
||||
|
||||
public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
||||
|
||||
@ -64,38 +70,46 @@ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
|
||||
|
||||
private void testNumBackendCalls(boolean optimizeFooterRead)
|
||||
throws Exception {
|
||||
for (int i = 1; i <= 4; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
|
||||
fileSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
int length = AbfsInputStream.FOOTER_SIZE;
|
||||
try (FSDataInputStream iStream = fs.open(testFilePath)) {
|
||||
byte[] buffer = new byte[length];
|
||||
int fileIdx = 0;
|
||||
for (int i = 0; i <= 4; i++) {
|
||||
for (int j = 0; j <= 2; j++) {
|
||||
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
|
||||
int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(
|
||||
optimizeFooterRead, fileSize);
|
||||
Path testFilePath = createPathAndFileWithContent(
|
||||
fs, fileIdx++, fileSize);
|
||||
int length = AbfsInputStream.FOOTER_SIZE;
|
||||
FutureDataInputStreamBuilder builder = getParameterizedBuilder(
|
||||
testFilePath, fs, footerReadBufferSize);
|
||||
try (FSDataInputStream iStream = builder.build().get()) {
|
||||
verifyConfigValueInStream(iStream, footerReadBufferSize);
|
||||
byte[] buffer = new byte[length];
|
||||
|
||||
Map<String, Long> metricMap = getInstrumentationMap(fs);
|
||||
long requestsMadeBeforeTest = metricMap
|
||||
.get(CONNECTIONS_MADE.getStatName());
|
||||
Map<String, Long> metricMap = getInstrumentationMap(fs);
|
||||
long requestsMadeBeforeTest = metricMap
|
||||
.get(CONNECTIONS_MADE.getStatName());
|
||||
|
||||
iStream.seek(fileSize - 8);
|
||||
iStream.read(buffer, 0, length);
|
||||
iStream.seek(fileSize - 8);
|
||||
iStream.read(buffer, 0, length);
|
||||
|
||||
iStream.seek(fileSize - (TEN * ONE_KB));
|
||||
iStream.read(buffer, 0, length);
|
||||
iStream.seek(fileSize - (TEN * ONE_KB));
|
||||
iStream.read(buffer, 0, length);
|
||||
|
||||
iStream.seek(fileSize - (TWENTY * ONE_KB));
|
||||
iStream.read(buffer, 0, length);
|
||||
iStream.seek(fileSize - (TWENTY * ONE_KB));
|
||||
iStream.read(buffer, 0, length);
|
||||
|
||||
metricMap = getInstrumentationMap(fs);
|
||||
long requestsMadeAfterTest = metricMap
|
||||
.get(CONNECTIONS_MADE.getStatName());
|
||||
metricMap = getInstrumentationMap(fs);
|
||||
long requestsMadeAfterTest = metricMap
|
||||
.get(CONNECTIONS_MADE.getStatName());
|
||||
|
||||
if (optimizeFooterRead) {
|
||||
assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
|
||||
} else {
|
||||
assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
|
||||
if (optimizeFooterRead) {
|
||||
assertEquals(1,
|
||||
requestsMadeAfterTest - requestsMadeBeforeTest);
|
||||
} else {
|
||||
assertEquals(3,
|
||||
requestsMadeAfterTest - requestsMadeBeforeTest);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -153,15 +167,24 @@ public void testSeekToEndAndReadWithConfFalse() throws Exception {
|
||||
|
||||
private void testSeekAndReadWithConf(boolean optimizeFooterRead,
|
||||
SeekTo seekTo) throws Exception {
|
||||
for (int i = 2; i <= 6; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
|
||||
fileSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
|
||||
fileContent);
|
||||
// Running the test for file sizes ranging from 256 KB to 4 MB with
|
||||
// Footer Read Buffer size ranging from 256 KB to 1 MB
|
||||
// This will cover files less than footer read buffer size,
|
||||
// Files between footer read buffer and read buffer size
|
||||
// Files bigger than read buffer size
|
||||
int fileIdx = 0;
|
||||
for (int i = 0; i <= 4; i++) {
|
||||
for (int j = 0; j <= 2; j++) {
|
||||
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
|
||||
int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(
|
||||
optimizeFooterRead, fileSize);
|
||||
String fileName = methodName.getMethodName() + fileIdx++;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
|
||||
fileContent, footerReadBufferSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -182,15 +205,17 @@ private int seekPos(SeekTo seekTo, int fileSize) {
|
||||
return fileSize - AbfsInputStream.FOOTER_SIZE + 1;
|
||||
}
|
||||
|
||||
private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
|
||||
final int seekPos, final int length, final byte[] fileContent)
|
||||
throws IOException, NoSuchFieldException, IllegalAccessException {
|
||||
private void seekReadAndTest(final AzureBlobFileSystem fs,
|
||||
final Path testFilePath, final int seekPos, final int length,
|
||||
final byte[] fileContent, int footerReadBufferSize) throws Exception {
|
||||
AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
|
||||
long actualContentLength = fileContent.length;
|
||||
try (FSDataInputStream iStream = fs.open(testFilePath)) {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
long bufferSize = abfsInputStream.getBufferSize();
|
||||
FutureDataInputStreamBuilder builder = getParameterizedBuilder(
|
||||
testFilePath, fs, footerReadBufferSize);
|
||||
try (FSDataInputStream iStream = builder.build().get()) {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream.getWrappedStream();
|
||||
verifyConfigValueInStream(iStream, footerReadBufferSize);
|
||||
long readBufferSize = abfsInputStream.getBufferSize();
|
||||
seek(iStream, seekPos);
|
||||
byte[] buffer = new byte[length];
|
||||
long bytesRead = iStream.read(buffer, 0, length);
|
||||
@ -206,40 +231,40 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
|
||||
actualLength = length - delta;
|
||||
}
|
||||
long expectedLimit;
|
||||
long expectedBCurson;
|
||||
long expectedBCursor;
|
||||
long expectedFCursor;
|
||||
if (optimizationOn) {
|
||||
if (actualContentLength <= bufferSize) {
|
||||
if (actualContentLength <= footerReadBufferSize) {
|
||||
expectedLimit = actualContentLength;
|
||||
expectedBCurson = seekPos + actualLength;
|
||||
expectedBCursor = seekPos + actualLength;
|
||||
} else {
|
||||
expectedLimit = bufferSize;
|
||||
long lastBlockStart = max(0, actualContentLength - bufferSize);
|
||||
expectedBCurson = seekPos - lastBlockStart + actualLength;
|
||||
expectedLimit = footerReadBufferSize;
|
||||
long lastBlockStart = max(0, actualContentLength - footerReadBufferSize);
|
||||
expectedBCursor = seekPos - lastBlockStart + actualLength;
|
||||
}
|
||||
expectedFCursor = actualContentLength;
|
||||
} else {
|
||||
if (seekPos + bufferSize < actualContentLength) {
|
||||
expectedLimit = bufferSize;
|
||||
expectedFCursor = bufferSize;
|
||||
if (seekPos + readBufferSize < actualContentLength) {
|
||||
expectedLimit = readBufferSize;
|
||||
expectedFCursor = readBufferSize;
|
||||
} else {
|
||||
expectedLimit = actualContentLength - seekPos;
|
||||
expectedFCursor = min(seekPos + bufferSize, actualContentLength);
|
||||
expectedFCursor = min(seekPos + readBufferSize, actualContentLength);
|
||||
}
|
||||
expectedBCurson = actualLength;
|
||||
expectedBCursor = actualLength;
|
||||
}
|
||||
|
||||
assertEquals(expectedFCursor, abfsInputStream.getFCursor());
|
||||
assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead());
|
||||
assertEquals(expectedLimit, abfsInputStream.getLimit());
|
||||
assertEquals(expectedBCurson, abfsInputStream.getBCursor());
|
||||
assertEquals(expectedBCursor, abfsInputStream.getBCursor());
|
||||
assertEquals(actualLength, bytesRead);
|
||||
// Verify user-content read
|
||||
assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer, testFilePath);
|
||||
// Verify data read to AbfsInputStream buffer
|
||||
int from = seekPos;
|
||||
if (optimizationOn) {
|
||||
from = (int) max(0, actualContentLength - bufferSize);
|
||||
from = (int) max(0, actualContentLength - footerReadBufferSize);
|
||||
}
|
||||
assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(),
|
||||
abfsInputStream.getBuffer(), testFilePath);
|
||||
@ -247,28 +272,34 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartialReadWithNoData()
|
||||
throws Exception {
|
||||
for (int i = 2; i <= 6; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
testPartialReadWithNoData(fs, testFilePath,
|
||||
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
|
||||
fileContent);
|
||||
public void testPartialReadWithNoData() throws Exception {
|
||||
int fileIdx = 0;
|
||||
for (int i = 0; i <= 4; i++) {
|
||||
for (int j = 0; j <= 2; j++) {
|
||||
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
|
||||
int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(
|
||||
true, fileSize, footerReadBufferSize);
|
||||
String fileName = methodName.getMethodName() + fileIdx++;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
testPartialReadWithNoData(fs, testFilePath,
|
||||
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
|
||||
fileContent, footerReadBufferSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testPartialReadWithNoData(final FileSystem fs,
|
||||
final Path testFilePath, final int seekPos, final int length,
|
||||
final byte[] fileContent)
|
||||
throws IOException, NoSuchFieldException, IllegalAccessException {
|
||||
final byte[] fileContent, int footerReadBufferSize) throws IOException {
|
||||
FSDataInputStream iStream = fs.open(testFilePath);
|
||||
try {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
Assertions.assertThat(abfsInputStream.getFooterReadBufferSize())
|
||||
.describedAs("Footer Read Buffer Size Should be same as what set in builder")
|
||||
.isEqualTo(footerReadBufferSize);
|
||||
abfsInputStream = spy(abfsInputStream);
|
||||
doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream)
|
||||
.readRemote(anyLong(), any(), anyInt(), anyInt(),
|
||||
@ -290,34 +321,36 @@ private void testPartialReadWithNoData(final FileSystem fs,
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartialReadWithSomeDat()
|
||||
throws Exception {
|
||||
for (int i = 3; i <= 6; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
testPartialReadWithSomeDat(fs, testFilePath,
|
||||
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
|
||||
fileContent);
|
||||
public void testPartialReadWithSomeData() throws Exception {
|
||||
for (int i = 0; i <= 4; i++) {
|
||||
for (int j = 0; j <= 2; j++) {
|
||||
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
|
||||
int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(true,
|
||||
fileSize, footerReadBufferSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
testPartialReadWithSomeData(fs, testFilePath,
|
||||
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
|
||||
fileContent, footerReadBufferSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testPartialReadWithSomeDat(final FileSystem fs,
|
||||
private void testPartialReadWithSomeData(final FileSystem fs,
|
||||
final Path testFilePath, final int seekPos, final int length,
|
||||
final byte[] fileContent)
|
||||
throws IOException, NoSuchFieldException, IllegalAccessException {
|
||||
final byte[] fileContent, final int footerReadBufferSize) throws IOException {
|
||||
FSDataInputStream iStream = fs.open(testFilePath);
|
||||
try {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
abfsInputStream = spy(abfsInputStream);
|
||||
verifyConfigValueInStream(iStream, footerReadBufferSize);
|
||||
AbfsInputStream abfsInputStream = spy((AbfsInputStream) iStream
|
||||
.getWrappedStream());
|
||||
// first readRemote, will return first 10 bytes
|
||||
// second readRemote returns data till the last 2 bytes
|
||||
int someDataLength = 2;
|
||||
int secondReturnSize =
|
||||
min(fileContent.length, abfsInputStream.getBufferSize()) - 10
|
||||
min(fileContent.length, abfsInputStream.getFooterReadBufferSize()) - 10
|
||||
- someDataLength;
|
||||
doReturn(10).doReturn(secondReturnSize).doCallRealMethod()
|
||||
.when(abfsInputStream)
|
||||
@ -342,15 +375,93 @@ private void testPartialReadWithSomeDat(final FileSystem fs,
|
||||
}
|
||||
}
|
||||
|
||||
private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
|
||||
int fileSize) throws IOException {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setOptimizeFooterRead(optimizeFooterRead);
|
||||
if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
|
||||
.getReadBufferSize()) {
|
||||
@Test
|
||||
public void testFooterReadBufferSizeConfiguration() throws Exception {
|
||||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
config.unset(AZURE_FOOTER_READ_BUFFER_SIZE);
|
||||
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(config)){
|
||||
Path testFilePath = createPathAndFileWithContent(fs, 0, ONE_KB);
|
||||
final int footerReadBufferSizeConfig = 4 * ONE_KB;
|
||||
final int footerReadBufferSizeBuilder = 5 * ONE_KB;
|
||||
|
||||
// Verify that default value is used if nothing is set explicitly
|
||||
FSDataInputStream iStream = fs.open(testFilePath);
|
||||
verifyConfigValueInStream(iStream, DEFAULT_FOOTER_READ_BUFFER_SIZE);
|
||||
|
||||
// Verify that value set in config is used if builder is not used
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setReadSmallFilesCompletely(false);
|
||||
.setFooterReadBufferSize(footerReadBufferSizeConfig);
|
||||
iStream = fs.open(testFilePath);
|
||||
verifyConfigValueInStream(iStream, footerReadBufferSizeConfig);
|
||||
|
||||
// Verify that when builder is used value set in parameters is used
|
||||
getAbfsStore(fs).getAbfsConfiguration().unset(AZURE_FOOTER_READ_BUFFER_SIZE);
|
||||
FutureDataInputStreamBuilder builder = fs.openFile(testFilePath);
|
||||
builder.opt(AZURE_FOOTER_READ_BUFFER_SIZE,
|
||||
footerReadBufferSizeBuilder);
|
||||
iStream = builder.build().get();
|
||||
verifyConfigValueInStream(iStream, footerReadBufferSizeBuilder);
|
||||
|
||||
// Verify that when builder is used value set in parameters is used
|
||||
// even if config is set
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setFooterReadBufferSize(footerReadBufferSizeConfig);
|
||||
iStream = builder.build().get();
|
||||
verifyConfigValueInStream(iStream, footerReadBufferSizeBuilder);
|
||||
|
||||
// Verify that when the builder is used and parameter in builder is not set,
|
||||
// the value set in configuration is used
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setFooterReadBufferSize(footerReadBufferSizeConfig);
|
||||
builder = fs.openFile(testFilePath);
|
||||
iStream = builder.build().get();
|
||||
verifyConfigValueInStream(iStream, footerReadBufferSizeConfig);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyConfigValueInStream(final FSDataInputStream inputStream,
|
||||
final int expectedValue) {
|
||||
AbfsInputStream stream = (AbfsInputStream) inputStream.getWrappedStream();
|
||||
Assertions.assertThat(stream.getFooterReadBufferSize())
|
||||
.describedAs(
|
||||
"Footer Read Buffer Size Value Is Not As Expected")
|
||||
.isEqualTo(expectedValue);
|
||||
}
|
||||
|
||||
private Path createPathAndFileWithContent(final AzureBlobFileSystem fs,
|
||||
final int fileIdx, final int fileSize) throws Exception {
|
||||
String fileName = methodName.getMethodName() + fileIdx;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
return createFileWithContent(fs, fileName, fileContent);
|
||||
}
|
||||
|
||||
private FutureDataInputStreamBuilder getParameterizedBuilder(final Path path,
|
||||
final AzureBlobFileSystem fs, int footerReadBufferSize) throws Exception {
|
||||
FutureDataInputStreamBuilder builder = fs.openFile(path);
|
||||
builder.opt(AZURE_FOOTER_READ_BUFFER_SIZE,
|
||||
footerReadBufferSize);
|
||||
return builder;
|
||||
}
|
||||
|
||||
private AzureBlobFileSystem getFileSystem(final boolean optimizeFooterRead,
|
||||
final int fileSize) throws IOException {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
AzureBlobFileSystemStore store = getAbfsStore(fs);
|
||||
store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead);
|
||||
if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) {
|
||||
store.getAbfsConfiguration().setReadSmallFilesCompletely(false);
|
||||
}
|
||||
return fs;
|
||||
}
|
||||
|
||||
private AzureBlobFileSystem getFileSystem(final boolean optimizeFooterRead,
|
||||
final int fileSize, final int footerReadBufferSize) throws IOException {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
AzureBlobFileSystemStore store = getAbfsStore(fs);
|
||||
store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead);
|
||||
store.getAbfsConfiguration().setFooterReadBufferSize(footerReadBufferSize);
|
||||
if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) {
|
||||
store.getAbfsConfiguration().setReadSmallFilesCompletely(false);
|
||||
}
|
||||
return fs;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user