HADOOP-17347. ABFS: Read optimizations

- Contributed by Bilahari T H
This commit is contained in:
bilaharith 2021-01-03 00:07:10 +05:30 committed by GitHub
parent 5ca1ea89b3
commit 1448add08f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1176 additions and 22 deletions

View File

@ -100,6 +100,16 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_WRITE_BUFFER_SIZE) DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
private int writeBufferSize; private int writeBufferSize;
@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY,
DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY)
private boolean readSmallFilesCompletely;
@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = AZURE_READ_OPTIMIZE_FOOTER_READ,
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE, @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
MinValue = MIN_BUFFER_SIZE, MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE, MaxValue = MAX_BUFFER_SIZE,
@ -527,6 +537,14 @@ public int getWriteBufferSize() {
return this.writeBufferSize; return this.writeBufferSize;
} }
public boolean readSmallFilesCompletely() {
return this.readSmallFilesCompletely;
}
public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}
public int getReadBufferSize() { public int getReadBufferSize() {
return this.readBufferSize; return this.readBufferSize;
} }
@ -925,4 +943,14 @@ private String appendSlashIfNeeded(String authority) {
return authority; return authority;
} }
@VisibleForTesting
public void setReadSmallFilesCompletely(boolean readSmallFilesCompletely) {
this.readSmallFilesCompletely = readSmallFilesCompletely;
}
@VisibleForTesting
public void setOptimizeFooterRead(boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
}
} }

View File

@ -643,6 +643,8 @@ private AbfsInputStreamContext populateAbfsInputStreamContext() {
.withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways( .withShouldReadBufferSizeAlways(
abfsConfiguration.shouldReadBufferSizeAlways()) abfsConfiguration.shouldReadBufferSizeAlways())

View File

@ -56,6 +56,8 @@ public final class ConfigurationKeys {
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue"; public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size"; public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size"; public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost"; public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost";
public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";

View File

@ -50,13 +50,15 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS; public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2; public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2;
private static final int ONE_KB = 1024; public static final int ONE_KB = 1024;
private static final int ONE_MB = ONE_KB * ONE_KB; public static final int ONE_MB = ONE_KB * ONE_KB;
// Default upload and download buffer size // Default upload and download buffer size
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false; public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB; public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB

View File

@ -38,6 +38,10 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.util.StringUtils.toLowerCase; import static org.apache.hadoop.util.StringUtils.toLowerCase;
/** /**
@ -46,6 +50,9 @@
public class AbfsInputStream extends FSInputStream implements CanUnbuffer, public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
StreamCapabilities { StreamCapabilities {
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
// Footer size is set to qualify for both ORC and parquet files
public static final int FOOTER_SIZE = 16 * ONE_KB;
public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2;
private int readAheadBlockSize; private int readAheadBlockSize;
private final AbfsClient client; private final AbfsClient client;
@ -59,6 +66,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final boolean readAheadEnabled; // whether enable readAhead; private final boolean readAheadEnabled; // whether enable readAhead;
private final boolean alwaysReadBufferSize; private final boolean alwaysReadBufferSize;
private boolean firstRead = true;
// SAS tokens can be re-used until they expire // SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken; private CachedSASToken cachedSasToken;
private byte[] buffer = null; // will be initialized on first use private byte[] buffer = null; // will be initialized on first use
@ -70,11 +78,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
// of valid bytes in buffer) // of valid bytes in buffer)
private boolean closed = false; private boolean closed = false;
// Optimisations modify the pointer fields.
// For better resilience the following fields are used to save the
// existing state before optimization flows.
private int limitBkp;
private int bCursorBkp;
private long fCursorBkp;
private long fCursorAfterLastReadBkp;
/** Stream statistics. */ /** Stream statistics. */
private final AbfsInputStreamStatistics streamStatistics; private final AbfsInputStreamStatistics streamStatistics;
private long bytesFromReadAhead; // bytes read from readAhead; for testing private long bytesFromReadAhead; // bytes read from readAhead; for testing
private long bytesFromRemoteRead; // bytes read remotely; for testing private long bytesFromRemoteRead; // bytes read remotely; for testing
private final AbfsInputStreamContext context;
public AbfsInputStream( public AbfsInputStream(
final AbfsClient client, final AbfsClient client,
final Statistics statistics, final Statistics statistics,
@ -96,6 +114,7 @@ public AbfsInputStream(
this.cachedSasToken = new CachedSASToken( this.cachedSasToken = new CachedSASToken(
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
this.streamStatistics = abfsInputStreamContext.getStreamStatistics(); this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
this.context = abfsInputStreamContext;
readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize(); readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
// Propagate the config values to ReadBufferManager so that the first instance // Propagate the config values to ReadBufferManager so that the first instance
@ -137,7 +156,13 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
} }
incrementReadOps(); incrementReadOps();
do { do {
lastReadBytes = readOneBlock(b, currentOff, currentLen); if (shouldReadFully()) {
lastReadBytes = readFileCompletely(b, currentOff, currentLen);
} else if (shouldReadLastBlock()) {
lastReadBytes = readLastBlock(b, currentOff, currentLen);
} else {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
}
if (lastReadBytes > 0) { if (lastReadBytes > 0) {
currentOff += lastReadBytes; currentOff += lastReadBytes;
currentLen -= lastReadBytes; currentLen -= lastReadBytes;
@ -150,27 +175,24 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
} }
private boolean shouldReadFully() {
return this.firstRead && this.context.readSmallFilesCompletely()
&& this.contentLength <= this.bufferSize;
}
private boolean shouldReadLastBlock() {
long footerStart = max(0, this.contentLength - FOOTER_SIZE);
return this.firstRead && this.context.optimizeFooterRead()
&& this.fCursor >= footerStart;
}
private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
Preconditions.checkNotNull(b);
LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
off, len);
if (len == 0) { if (len == 0) {
return 0; return 0;
} }
if (!validate(b, off, len)) {
if (this.available() == 0) {
return -1; return -1;
} }
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
//If buffer is empty, then fill the buffer. //If buffer is empty, then fill the buffer.
if (bCursor == limit) { if (bCursor == limit) {
//If EOF, then return -1 //If EOF, then return -1
@ -197,6 +219,9 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
bytesRead = readInternal(fCursor, buffer, 0, b.length, true); bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
} }
} }
if (firstRead) {
firstRead = false;
}
if (bytesRead == -1) { if (bytesRead == -1) {
return -1; return -1;
@ -206,11 +231,123 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
fCursor += bytesRead; fCursor += bytesRead;
fCursorAfterLastRead = fCursor; fCursorAfterLastRead = fCursor;
} }
return copyToUserBuffer(b, off, len);
}
private int readFileCompletely(final byte[] b, final int off, final int len)
throws IOException {
if (len == 0) {
return 0;
}
if (!validate(b, off, len)) {
return -1;
}
savePointerState();
// data need to be copied to user buffer from index bCursor, bCursor has
// to be the current fCusor
bCursor = (int) fCursor;
return optimisedRead(b, off, len, 0, contentLength);
}
private int readLastBlock(final byte[] b, final int off, final int len)
throws IOException {
if (len == 0) {
return 0;
}
if (!validate(b, off, len)) {
return -1;
}
savePointerState();
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
long lastBlockStart = max(0, contentLength - bufferSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
long actualLenToRead = min(bufferSize, contentLength);
return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
}
private int optimisedRead(final byte[] b, final int off, final int len,
final long readFrom, final long actualLen) throws IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
for (int i = 0;
i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
lastBytesRead = readInternal(fCursor, buffer, limit,
(int) actualLen - limit, true);
if (lastBytesRead > 0) {
totalBytesRead += lastBytesRead;
limit += lastBytesRead;
fCursor += lastBytesRead;
fCursorAfterLastRead = fCursor;
}
}
} catch (IOException e) {
LOG.debug("Optimized read failed. Defaulting to readOneBlock {}", e);
restorePointerState();
return readOneBlock(b, off, len);
} finally {
firstRead = false;
}
if (totalBytesRead < 1) {
restorePointerState();
return -1;
}
// If the read was partial and the user requested part of data has
// not read then fallback to readoneblock. When limit is smaller than
// bCursor that means the user requested data has not been read.
if (fCursor < contentLength && bCursor > limit) {
restorePointerState();
return readOneBlock(b, off, len);
}
return copyToUserBuffer(b, off, len);
}
private void savePointerState() {
// Saving the current state for fall back ifn case optimization fails
this.limitBkp = this.limit;
this.fCursorBkp = this.fCursor;
this.fCursorAfterLastReadBkp = this.fCursorAfterLastRead;
this.bCursorBkp = this.bCursor;
}
private void restorePointerState() {
// Saving the current state for fall back ifn case optimization fails
this.limit = this.limitBkp;
this.fCursor = this.fCursorBkp;
this.fCursorAfterLastRead = this.fCursorAfterLastReadBkp;
this.bCursor = this.bCursorBkp;
}
private boolean validate(final byte[] b, final int off, final int len)
throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
Preconditions.checkNotNull(b);
LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
off, len);
if (this.available() == 0) {
return false;
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
return true;
}
private int copyToUserBuffer(byte[] b, int off, int len){
//If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
//(bytes returned may be less than requested) //(bytes returned may be less than requested)
int bytesRemaining = limit - bCursor; int bytesRemaining = limit - bCursor;
int bytesToRead = Math.min(len, bytesRemaining); int bytesToRead = min(len, bytesRemaining);
System.arraycopy(buffer, bCursor, b, off, bytesToRead); System.arraycopy(buffer, bCursor, b, off, bytesToRead);
bCursor += bytesToRead; bCursor += bytesToRead;
if (statistics != null) { if (statistics != null) {
@ -224,7 +361,6 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
return bytesToRead; return bytesToRead;
} }
private int readInternal(final long position, final byte[] b, final int offset, final int length, private int readInternal(final long position, final byte[] b, final int offset, final int length,
final boolean bypassReadAhead) throws IOException { final boolean bypassReadAhead) throws IOException {
if (readAheadEnabled && !bypassReadAhead) { if (readAheadEnabled && !bypassReadAhead) {
@ -239,7 +375,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
long nextOffset = position; long nextOffset = position;
// First read to queue needs to be of readBufferSize and later // First read to queue needs to be of readBufferSize and later
// of readAhead Block size // of readAhead Block size
long nextSize = Math.min((long) bufferSize, contentLength - nextOffset); long nextSize = min((long) bufferSize, contentLength - nextOffset);
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads); LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
while (numReadAheads > 0 && nextOffset < contentLength) { while (numReadAheads > 0 && nextOffset < contentLength) {
LOG.debug("issuing read ahead requestedOffset = {} requested size {}", LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
@ -248,7 +384,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
nextOffset = nextOffset + nextSize; nextOffset = nextOffset + nextSize;
numReadAheads--; numReadAheads--;
// From next round onwards should be of readahead block size. // From next round onwards should be of readahead block size.
nextSize = Math.min((long) readAheadBlockSize, contentLength - nextOffset); nextSize = min((long) readAheadBlockSize, contentLength - nextOffset);
} }
// try reading from buffers first // try reading from buffers first
@ -572,4 +708,24 @@ public String toString() {
} }
return sb.toString(); return sb.toString();
} }
@VisibleForTesting
int getBCursor() {
return this.bCursor;
}
@VisibleForTesting
long getFCursor() {
return this.fCursor;
}
@VisibleForTesting
long getFCursorAfterLastRead() {
return this.fCursorAfterLastRead;
}
@VisibleForTesting
long getLimit() {
return this.limit;
}
} }

View File

@ -40,6 +40,10 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
private AbfsInputStreamStatistics streamStatistics; private AbfsInputStreamStatistics streamStatistics;
private boolean readSmallFilesCompletely;
private boolean optimizeFooterRead;
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds); super(sasTokenRenewPeriodForStreamsInSeconds);
} }
@ -69,6 +73,18 @@ public AbfsInputStreamContext withStreamStatistics(
return this; return this;
} }
public AbfsInputStreamContext withReadSmallFilesCompletely(
final boolean readSmallFilesCompletely) {
this.readSmallFilesCompletely = readSmallFilesCompletely;
return this;
}
public AbfsInputStreamContext withOptimizeFooterRead(
final boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
return this;
}
public AbfsInputStreamContext withShouldReadBufferSizeAlways( public AbfsInputStreamContext withShouldReadBufferSizeAlways(
final boolean alwaysReadBufferSize) { final boolean alwaysReadBufferSize) {
this.alwaysReadBufferSize = alwaysReadBufferSize; this.alwaysReadBufferSize = alwaysReadBufferSize;
@ -110,6 +126,14 @@ public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics; return streamStatistics;
} }
public boolean readSmallFilesCompletely() {
return this.readSmallFilesCompletely;
}
public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}
public boolean shouldReadBufferSizeAlways() { public boolean shouldReadBufferSizeAlways() {
return alwaysReadBufferSize; return alwaysReadBufferSize;
} }

View File

@ -0,0 +1,256 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.junit.Test;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest {
protected static final int HUNDRED = 100;
public ITestAbfsInputStream() throws Exception {
}
@Test
public void testWithNoOptimization() throws Exception {
for (int i = 2; i <= 7; i++) {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testWithNoOptimization(fs, testFilePath, HUNDRED, fileContent);
}
}
protected void testWithNoOptimization(final FileSystem fs,
final Path testFilePath, final int seekPos, final byte[] fileContent)
throws IOException {
FSDataInputStream iStream = fs.open(testFilePath);
try {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
iStream = new FSDataInputStream(abfsInputStream);
seek(iStream, seekPos);
long totalBytesRead = 0;
int length = HUNDRED * HUNDRED;
do {
byte[] buffer = new byte[length];
int bytesRead = iStream.read(buffer, 0, length);
totalBytesRead += bytesRead;
if ((totalBytesRead + seekPos) >= fileContent.length) {
length = (fileContent.length - seekPos) % length;
}
assertEquals(length, bytesRead);
assertContentReadCorrectly(fileContent,
(int) (seekPos + totalBytesRead - length), length, buffer);
assertTrue(abfsInputStream.getFCursor() >= seekPos + totalBytesRead);
assertTrue(abfsInputStream.getFCursorAfterLastRead() >= seekPos + totalBytesRead);
assertTrue(abfsInputStream.getBCursor() >= totalBytesRead % abfsInputStream.getBufferSize());
assertTrue(abfsInputStream.getLimit() >= totalBytesRead % abfsInputStream.getBufferSize());
} while (totalBytesRead + seekPos < fileContent.length);
} finally {
iStream.close();
}
}
@Test
public void testExceptionInOptimization() throws Exception {
for (int i = 2; i <= 7; i++) {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(true, true, fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testExceptionInOptimization(fs, testFilePath, fileSize - HUNDRED,
fileSize / 4, fileContent);
}
}
private void testExceptionInOptimization(final FileSystem fs,
final Path testFilePath,
final int seekPos, final int length, final byte[] fileContent)
throws IOException {
FSDataInputStream iStream = fs.open(testFilePath);
try {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
abfsInputStream = spy(abfsInputStream);
doThrow(new IOException())
.doCallRealMethod()
.when(abfsInputStream)
.readRemote(anyLong(), any(), anyInt(), anyInt());
iStream = new FSDataInputStream(abfsInputStream);
verifyBeforeSeek(abfsInputStream);
seek(iStream, seekPos);
byte[] buffer = new byte[length];
int bytesRead = iStream.read(buffer, 0, length);
long actualLength = length;
if (seekPos + length > fileContent.length) {
long delta = seekPos + length - fileContent.length;
actualLength = length - delta;
}
assertEquals(bytesRead, actualLength);
assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer);
assertEquals(fileContent.length, abfsInputStream.getFCursor());
assertEquals(fileContent.length, abfsInputStream.getFCursorAfterLastRead());
assertEquals(actualLength, abfsInputStream.getBCursor());
assertTrue(abfsInputStream.getLimit() >= actualLength);
} finally {
iStream.close();
}
}
protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
throws IOException {
final AzureBlobFileSystem fs = getFileSystem();
getAbfsStore(fs).getAbfsConfiguration()
.setReadSmallFilesCompletely(readSmallFilesCompletely);
return fs;
}
private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
boolean readSmallFileCompletely, int fileSize) throws IOException {
final AzureBlobFileSystem fs = getFileSystem();
getAbfsStore(fs).getAbfsConfiguration()
.setOptimizeFooterRead(optimizeFooterRead);
if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
.getReadBufferSize()) {
getAbfsStore(fs).getAbfsConfiguration()
.setReadSmallFilesCompletely(readSmallFileCompletely);
}
return fs;
}
protected byte[] getRandomBytesArray(int length) {
final byte[] b = new byte[length];
new Random().nextBytes(b);
return b;
}
protected Path createFileWithContent(FileSystem fs, String fileName,
byte[] fileContent) throws IOException {
Path testFilePath = path(fileName);
try (FSDataOutputStream oStream = fs.create(testFilePath)) {
oStream.write(fileContent);
oStream.flush();
}
return testFilePath;
}
protected AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
throws NoSuchFieldException, IllegalAccessException {
AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
Field abfsStoreField = AzureBlobFileSystem.class
.getDeclaredField("abfsStore");
abfsStoreField.setAccessible(true);
return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
}
protected Map<String, Long> getInstrumentationMap(FileSystem fs)
throws NoSuchFieldException, IllegalAccessException {
AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
Field abfsCountersField = AzureBlobFileSystem.class
.getDeclaredField("abfsCounters");
abfsCountersField.setAccessible(true);
AbfsCounters abfsCounters = (AbfsCounters) abfsCountersField.get(abfs);
return abfsCounters.toMap();
}
protected void assertContentReadCorrectly(byte[] actualFileContent, int from,
int len, byte[] contentRead) {
for (int i = 0; i < len; i++) {
assertEquals(contentRead[i], actualFileContent[i + from]);
}
}
protected void assertBuffersAreNotEqual(byte[] actualContent,
byte[] contentRead, AbfsConfiguration conf) {
assertBufferEquality(actualContent, contentRead, conf, false);
}
protected void assertBuffersAreEqual(byte[] actualContent, byte[] contentRead,
AbfsConfiguration conf) {
assertBufferEquality(actualContent, contentRead, conf, true);
}
private void assertBufferEquality(byte[] actualContent, byte[] contentRead,
AbfsConfiguration conf, boolean assertEqual) {
int bufferSize = conf.getReadBufferSize();
int actualContentSize = actualContent.length;
int n = (actualContentSize < bufferSize) ? actualContentSize : bufferSize;
int matches = 0;
for (int i = 0; i < n; i++) {
if (actualContent[i] == contentRead[i]) {
matches++;
}
}
if (assertEqual) {
assertEquals(n, matches);
} else {
assertNotEquals(n, matches);
}
}
protected void seek(FSDataInputStream iStream, long seekPos)
throws IOException {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream.getWrappedStream();
verifyBeforeSeek(abfsInputStream);
iStream.seek(seekPos);
verifyAfterSeek(abfsInputStream, seekPos);
}
private void verifyBeforeSeek(AbfsInputStream abfsInputStream){
assertEquals(0, abfsInputStream.getFCursor());
assertEquals(-1, abfsInputStream.getFCursorAfterLastRead());
assertEquals(0, abfsInputStream.getLimit());
assertEquals(0, abfsInputStream.getBCursor());
}
private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos){
assertEquals(seekPos, abfsInputStream.getFCursor());
assertEquals(-1, abfsInputStream.getFCursorAfterLastRead());
assertEquals(0, abfsInputStream.getLimit());
assertEquals(0, abfsInputStream.getBCursor());
}
}

View File

@ -0,0 +1,358 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.Map;
import org.junit.Test;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
private static final int TEN = 10;
private static final int TWENTY = 20;
public ITestAbfsInputStreamReadFooter() throws Exception {
}
@Test
public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
testNumBackendCalls(true);
}
@Test
public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
throws Exception {
testNumBackendCalls(false);
}
private void testNumBackendCalls(boolean optimizeFooterRead)
throws Exception {
for (int i = 1; i <= 4; i++) {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
int length = AbfsInputStream.FOOTER_SIZE;
try (FSDataInputStream iStream = fs.open(testFilePath)) {
byte[] buffer = new byte[length];
Map<String, Long> metricMap = getInstrumentationMap(fs);
long requestsMadeBeforeTest = metricMap
.get(CONNECTIONS_MADE.getStatName());
iStream.seek(fileSize - 8);
iStream.read(buffer, 0, length);
iStream.seek(fileSize - (TEN * ONE_KB));
iStream.read(buffer, 0, length);
iStream.seek(fileSize - (TWENTY * ONE_KB));
iStream.read(buffer, 0, length);
metricMap = getInstrumentationMap(fs);
long requestsMadeAfterTest = metricMap
.get(CONNECTIONS_MADE.getStatName());
if (optimizeFooterRead) {
assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
} else {
assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
}
}
}
}
@Test
public void testSeekToBeginAndReadWithConfTrue() throws Exception {
testSeekAndReadWithConf(true, SeekTo.BEGIN);
}
@Test
public void testSeekToBeginAndReadWithConfFalse() throws Exception {
testSeekAndReadWithConf(false, SeekTo.BEGIN);
}
@Test
public void testSeekToBeforeFooterAndReadWithConfTrue() throws Exception {
testSeekAndReadWithConf(true, SeekTo.BEFORE_FOOTER_START);
}
@Test
public void testSeekToBeforeFooterAndReadWithConfFalse() throws Exception {
testSeekAndReadWithConf(false, SeekTo.BEFORE_FOOTER_START);
}
@Test
public void testSeekToFooterAndReadWithConfTrue() throws Exception {
testSeekAndReadWithConf(true, SeekTo.AT_FOOTER_START);
}
@Test
public void testSeekToFooterAndReadWithConfFalse() throws Exception {
testSeekAndReadWithConf(false, SeekTo.AT_FOOTER_START);
}
@Test
public void testSeekToAfterFooterAndReadWithConfTrue() throws Exception {
testSeekAndReadWithConf(true, SeekTo.AFTER_FOOTER_START);
}
@Test
public void testSeekToToAfterFooterAndReadWithConfFalse() throws Exception {
testSeekAndReadWithConf(false, SeekTo.AFTER_FOOTER_START);
}
@Test
public void testSeekToEndAndReadWithConfTrue() throws Exception {
testSeekAndReadWithConf(true, SeekTo.END);
}
@Test
public void testSeekToEndAndReadWithConfFalse() throws Exception {
testSeekAndReadWithConf(false, SeekTo.END);
}
private void testSeekAndReadWithConf(boolean optimizeFooterRead,
SeekTo seekTo) throws Exception {
for (int i = 2; i <= 6; i++) {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
fileContent);
}
}
private int seekPos(SeekTo seekTo, int fileSize) {
if (seekTo == SeekTo.BEGIN) {
return 0;
}
if (seekTo == SeekTo.BEFORE_FOOTER_START) {
return fileSize - AbfsInputStream.FOOTER_SIZE - 1;
}
if (seekTo == SeekTo.AT_FOOTER_START) {
return fileSize - AbfsInputStream.FOOTER_SIZE;
}
if (seekTo == SeekTo.END) {
return fileSize - 1;
}
//seekTo == SeekTo.AFTER_FOOTER_START
return fileSize - AbfsInputStream.FOOTER_SIZE + 1;
}
private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
final int seekPos, final int length, final byte[] fileContent)
throws IOException, NoSuchFieldException, IllegalAccessException {
AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
long actualContentLength = fileContent.length;
try (FSDataInputStream iStream = fs.open(testFilePath)) {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
long bufferSize = abfsInputStream.getBufferSize();
seek(iStream, seekPos);
byte[] buffer = new byte[length];
long bytesRead = iStream.read(buffer, 0, length);
long footerStart = max(0,
actualContentLength - AbfsInputStream.FOOTER_SIZE);
boolean optimizationOn =
conf.optimizeFooterRead() && seekPos >= footerStart;
long actualLength = length;
if (seekPos + length > actualContentLength) {
long delta = seekPos + length - actualContentLength;
actualLength = length - delta;
}
long expectedLimit;
long expectedBCurson;
long expectedFCursor;
if (optimizationOn) {
if (actualContentLength <= bufferSize) {
expectedLimit = actualContentLength;
expectedBCurson = seekPos + actualLength;
} else {
expectedLimit = bufferSize;
long lastBlockStart = max(0, actualContentLength - bufferSize);
expectedBCurson = seekPos - lastBlockStart + actualLength;
}
expectedFCursor = actualContentLength;
} else {
if (seekPos + bufferSize < actualContentLength) {
expectedLimit = bufferSize;
expectedFCursor = bufferSize;
} else {
expectedLimit = actualContentLength - seekPos;
expectedFCursor = min(seekPos + bufferSize, actualContentLength);
}
expectedBCurson = actualLength;
}
assertEquals(expectedFCursor, abfsInputStream.getFCursor());
assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead());
assertEquals(expectedLimit, abfsInputStream.getLimit());
assertEquals(expectedBCurson, abfsInputStream.getBCursor());
assertEquals(actualLength, bytesRead);
// Verify user-content read
assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer);
// Verify data read to AbfsInputStream buffer
int from = seekPos;
if (optimizationOn) {
from = (int) max(0, actualContentLength - bufferSize);
}
assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(),
abfsInputStream.getBuffer());
}
}
@Test
public void testPartialReadWithNoData()
throws Exception {
for (int i = 2; i <= 6; i++) {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testPartialReadWithNoData(fs, testFilePath,
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
fileContent);
}
}
private void testPartialReadWithNoData(final FileSystem fs,
final Path testFilePath, final int seekPos, final int length,
final byte[] fileContent)
throws IOException, NoSuchFieldException, IllegalAccessException {
FSDataInputStream iStream = fs.open(testFilePath);
try {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
abfsInputStream = spy(abfsInputStream);
doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream)
.readRemote(anyLong(), any(), anyInt(), anyInt());
iStream = new FSDataInputStream(abfsInputStream);
seek(iStream, seekPos);
byte[] buffer = new byte[length];
int bytesRead = iStream.read(buffer, 0, length);
assertEquals(length, bytesRead);
assertContentReadCorrectly(fileContent, seekPos, length, buffer);
assertEquals(fileContent.length, abfsInputStream.getFCursor());
assertEquals(length, abfsInputStream.getBCursor());
assertTrue(abfsInputStream.getLimit() >= length);
} finally {
iStream.close();
}
}
@Test
public void testPartialReadWithSomeDat()
throws Exception {
for (int i = 3; i <= 6; i++) {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testPartialReadWithSomeDat(fs, testFilePath,
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
fileContent);
}
}
private void testPartialReadWithSomeDat(final FileSystem fs,
final Path testFilePath, final int seekPos, final int length,
final byte[] fileContent)
throws IOException, NoSuchFieldException, IllegalAccessException {
FSDataInputStream iStream = fs.open(testFilePath);
try {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
abfsInputStream = spy(abfsInputStream);
// first readRemote, will return first 10 bytes
// second readRemote returns data till the last 2 bytes
int someDataLength = 2;
int secondReturnSize =
min(fileContent.length, abfsInputStream.getBufferSize()) - 10
- someDataLength;
doReturn(10).doReturn(secondReturnSize).doCallRealMethod()
.when(abfsInputStream)
.readRemote(anyLong(), any(), anyInt(), anyInt());
iStream = new FSDataInputStream(abfsInputStream);
seek(iStream, seekPos);
byte[] buffer = new byte[length];
int bytesRead = iStream.read(buffer, 0, length);
assertEquals(length, bytesRead);
assertEquals(fileContent.length, abfsInputStream.getFCursor());
// someDataLength(2), because in the do-while loop in read, the 2nd loop
// will go to readoneblock and that resets the bCursor to 0 as
// bCursor == limit finally when the 2 bytes are read bCursor and limit
// will be at someDataLength(2)
assertEquals(someDataLength, abfsInputStream.getBCursor());
assertEquals(someDataLength, abfsInputStream.getLimit());
} finally {
iStream.close();
}
}
private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
int fileSize) throws IOException {
final AzureBlobFileSystem fs = getFileSystem();
getAbfsStore(fs).getAbfsConfiguration()
.setOptimizeFooterRead(optimizeFooterRead);
if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
.getReadBufferSize()) {
getAbfsStore(fs).getAbfsConfiguration()
.setReadSmallFilesCompletely(false);
}
return fs;
}
private enum SeekTo {
BEGIN, AT_FOOTER_START, BEFORE_FOOTER_START, AFTER_FOOTER_START, END
}
}

View File

@ -0,0 +1,326 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.Map;
import org.junit.Test;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
public class ITestAbfsInputStreamSmallFileReads extends ITestAbfsInputStream {
public ITestAbfsInputStreamSmallFileReads() throws Exception {
}
@Test
public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
testNumBackendCalls(true);
}
@Test
public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
throws Exception {
testNumBackendCalls(false);
}
private void testNumBackendCalls(boolean readSmallFilesCompletely)
throws Exception {
final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely);
for (int i = 1; i <= 4; i++) {
String fileName = methodName.getMethodName() + i;
int fileSize = i * ONE_MB;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
int length = ONE_KB;
try (FSDataInputStream iStream = fs.open(testFilePath)) {
byte[] buffer = new byte[length];
Map<String, Long> metricMap = getInstrumentationMap(fs);
long requestsMadeBeforeTest = metricMap
.get(CONNECTIONS_MADE.getStatName());
iStream.seek(seekPos(SeekTo.END, fileSize, length));
iStream.read(buffer, 0, length);
iStream.seek(seekPos(SeekTo.MIDDLE, fileSize, length));
iStream.read(buffer, 0, length);
iStream.seek(seekPos(SeekTo.BEGIN, fileSize, length));
iStream.read(buffer, 0, length);
metricMap = getInstrumentationMap(fs);
long requestsMadeAfterTest = metricMap
.get(CONNECTIONS_MADE.getStatName());
if (readSmallFilesCompletely) {
assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
} else {
assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
}
}
}
}
@Test
public void testSeekToBeginingAndReadSmallFileWithConfTrue()
throws Exception {
testSeekAndReadWithConf(SeekTo.BEGIN, 2, 4, true);
}
@Test
public void testSeekToBeginingAndReadSmallFileWithConfFalse()
throws Exception {
testSeekAndReadWithConf(SeekTo.BEGIN, 2, 4, false);
}
@Test
public void testSeekToBeginingAndReadBigFileWithConfTrue() throws Exception {
testSeekAndReadWithConf(SeekTo.BEGIN, 5, 6, true);
}
@Test
public void testSeekToBeginingAndReadBigFileWithConfFalse() throws Exception {
testSeekAndReadWithConf(SeekTo.BEGIN, 5, 6, false);
}
@Test
public void testSeekToEndAndReadSmallFileWithConfTrue() throws Exception {
testSeekAndReadWithConf(SeekTo.END, 2, 4, true);
}
@Test
public void testSeekToEndAndReadSmallFileWithConfFalse() throws Exception {
testSeekAndReadWithConf(SeekTo.END, 2, 4, false);
}
@Test
public void testSeekToEndAndReadBigFileWithConfTrue() throws Exception {
testSeekAndReadWithConf(SeekTo.END, 5, 6, true);
}
@Test
public void testSeekToEndAndReaBigFiledWithConfFalse() throws Exception {
testSeekAndReadWithConf(SeekTo.END, 5, 6, false);
}
@Test
public void testSeekToMiddleAndReadSmallFileWithConfTrue() throws Exception {
testSeekAndReadWithConf(SeekTo.MIDDLE, 2, 4, true);
}
@Test
public void testSeekToMiddleAndReadSmallFileWithConfFalse() throws Exception {
testSeekAndReadWithConf(SeekTo.MIDDLE, 2, 4, false);
}
@Test
public void testSeekToMiddleAndReaBigFileWithConfTrue() throws Exception {
testSeekAndReadWithConf(SeekTo.MIDDLE, 5, 6, true);
}
@Test
public void testSeekToMiddleAndReadBigFileWithConfFalse() throws Exception {
testSeekAndReadWithConf(SeekTo.MIDDLE, 5, 6, false);
}
private void testSeekAndReadWithConf(SeekTo seekTo, int startFileSizeInMB,
int endFileSizeInMB, boolean readSmallFilesCompletely) throws Exception {
final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely);
for (int i = startFileSizeInMB; i <= endFileSizeInMB; i++) {
String fileName = methodName.getMethodName() + i;
int fileSize = i * ONE_MB;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
int length = ONE_KB;
int seekPos = seekPos(seekTo, fileSize, length);
seekReadAndTest(fs, testFilePath, seekPos, length, fileContent);
}
}
private int seekPos(SeekTo seekTo, int fileSize, int length) {
if (seekTo == SeekTo.BEGIN) {
return 0;
}
if (seekTo == SeekTo.END) {
return fileSize - length;
}
return fileSize / 2;
}
private void seekReadAndTest(FileSystem fs, Path testFilePath, int seekPos,
int length, byte[] fileContent)
throws IOException, NoSuchFieldException, IllegalAccessException {
AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
try (FSDataInputStream iStream = fs.open(testFilePath)) {
seek(iStream, seekPos);
byte[] buffer = new byte[length];
int bytesRead = iStream.read(buffer, 0, length);
assertEquals(bytesRead, length);
assertContentReadCorrectly(fileContent, seekPos, length, buffer);
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
final int readBufferSize = conf.getReadBufferSize();
final int fileContentLength = fileContent.length;
final boolean smallFile = fileContentLength <= readBufferSize;
int expectedLimit, expectedFCursor;
int expectedBCursor;
if (conf.readSmallFilesCompletely() && smallFile) {
assertBuffersAreEqual(fileContent, abfsInputStream.getBuffer(), conf);
expectedFCursor = fileContentLength;
expectedLimit = fileContentLength;
expectedBCursor = seekPos + length;
} else {
if ((seekPos == 0)) {
assertBuffersAreEqual(fileContent, abfsInputStream.getBuffer(), conf);
} else {
assertBuffersAreNotEqual(fileContent, abfsInputStream.getBuffer(),
conf);
}
expectedBCursor = length;
expectedFCursor = (fileContentLength < (seekPos + readBufferSize))
? fileContentLength
: (seekPos + readBufferSize);
expectedLimit = (fileContentLength < (seekPos + readBufferSize))
? (fileContentLength - seekPos)
: readBufferSize;
}
assertEquals(expectedFCursor, abfsInputStream.getFCursor());
assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead());
assertEquals(expectedBCursor, abfsInputStream.getBCursor());
assertEquals(expectedLimit, abfsInputStream.getLimit());
}
}
@Test
public void testPartialReadWithNoData() throws Exception {
for (int i = 2; i <= 4; i++) {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(true);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
partialReadWithNoData(fs, testFilePath, fileSize / 2, fileSize / 4,
fileContent);
}
}
private void partialReadWithNoData(final FileSystem fs,
final Path testFilePath,
final int seekPos, final int length, final byte[] fileContent)
throws IOException {
FSDataInputStream iStream = fs.open(testFilePath);
try {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
abfsInputStream = spy(abfsInputStream);
doReturn(10)
.doReturn(10)
.doCallRealMethod()
.when(abfsInputStream)
.readRemote(anyLong(), any(), anyInt(), anyInt());
iStream = new FSDataInputStream(abfsInputStream);
seek(iStream, seekPos);
byte[] buffer = new byte[length];
int bytesRead = iStream.read(buffer, 0, length);
assertEquals(bytesRead, length);
assertContentReadCorrectly(fileContent, seekPos, length, buffer);
assertEquals(fileContent.length, abfsInputStream.getFCursor());
assertEquals(fileContent.length,
abfsInputStream.getFCursorAfterLastRead());
assertEquals(length, abfsInputStream.getBCursor());
assertTrue(abfsInputStream.getLimit() >= length);
} finally {
iStream.close();
}
}
@Test
public void testPartialReadWithSomeData() throws Exception {
for (int i = 2; i <= 4; i++) {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(true);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
partialReadWithSomeData(fs, testFilePath, fileSize / 2,
fileSize / 4, fileContent);
}
}
private void partialReadWithSomeData(final FileSystem fs,
final Path testFilePath,
final int seekPos, final int length, final byte[] fileContent)
throws IOException, NoSuchFieldException, IllegalAccessException {
FSDataInputStream iStream = fs.open(testFilePath);
try {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
abfsInputStream = spy(abfsInputStream);
// first readRemote, will return first 10 bytes
// second readRemote, seekPos - someDataLength(10) will reach the
// seekPos as 10 bytes are already read in the first call. Plus
// someDataLength(10)
int someDataLength = 10;
int secondReturnSize = seekPos - 10 + someDataLength;
doReturn(10)
.doReturn(secondReturnSize)
.doCallRealMethod()
.when(abfsInputStream)
.readRemote(anyLong(), any(), anyInt(), anyInt());
iStream = new FSDataInputStream(abfsInputStream);
seek(iStream, seekPos);
byte[] buffer = new byte[length];
int bytesRead = iStream.read(buffer, 0, length);
assertEquals(length, bytesRead);
assertTrue(abfsInputStream.getFCursor() > seekPos + length);
assertTrue(abfsInputStream.getFCursorAfterLastRead() > seekPos + length);
// Optimized read was no complete but it got some user requested data
// from server. So obviously the buffer will contain data more than
// seekPos + len
assertEquals(length - someDataLength, abfsInputStream.getBCursor());
assertTrue(abfsInputStream.getLimit() > length - someDataLength);
} finally {
iStream.close();
}
}
private enum SeekTo {BEGIN, MIDDLE, END}
}