diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 82a116ba83..32f175a88f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -121,6 +121,12 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_READ_BUFFER_SIZE) private int readBufferSize; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_AHEAD_RANGE, + MinValue = MIN_BUFFER_SIZE, + MaxValue = MAX_BUFFER_SIZE, + DefaultValue = DEFAULT_READ_AHEAD_RANGE) + private int readAheadRange; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MIN_BACKOFF_INTERVAL, DefaultValue = DEFAULT_MIN_BACKOFF_INTERVAL) private int minBackoffInterval; @@ -878,6 +884,10 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio } } + public int getReadAheadRange() { + return this.readAheadRange; + } + int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException { IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class); String value = get(validator.ConfigurationKey()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index fa7e12bc80..be9a9f265f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -697,6 +697,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely()) .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead()) + .withReadAheadRange(abfsConfiguration.getReadAheadRange()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .withShouldReadBufferSizeAlways( abfsConfiguration.shouldReadBufferSizeAlways()) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 2dbb2b9b08..276203b042 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -68,6 +68,14 @@ public final class ConfigurationKeys { public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size"; public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely"; public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread"; + + /** + * Read ahead range parameter which can be set by user. + * Default value is {@link FileSystemConfigurations#DEFAULT_READ_AHEAD_RANGE}. + * This might reduce number of calls to remote as next requested + * data could already be present in buffer {@value}. + */ + public static final String AZURE_READ_AHEAD_RANGE = "fs.azure.readahead.range"; public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index dc4caa98a5..4194d21449 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -62,6 +62,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false; public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false; public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB; + public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index c05ba0d947..5dd5eb79ff 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -75,6 +75,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, * @see #read(long, byte[], int, int) */ private final boolean bufferedPreadDisabled; + // User configured size of read ahead. + private final int readAheadRange; private boolean firstRead = true; // SAS tokens can be re-used until they expire @@ -103,6 +105,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final AbfsInputStreamContext context; private IOStatistics ioStatistics; + /** + * This is the actual position within the object, used by + * lazy seek to decide whether to seek on the next read or not. + */ + private long nextReadPos; public AbfsInputStream( final AbfsClient client, @@ -119,6 +126,7 @@ public AbfsInputStream( this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; + this.readAheadRange = abfsInputStreamContext.getReadAheadRange(); this.readAheadEnabled = true; this.alwaysReadBufferSize = abfsInputStreamContext.shouldReadBufferSizeAlways(); @@ -207,6 +215,28 @@ public synchronized int read(final byte[] b, final int off, final int len) throw } incrementReadOps(); do { + + // limit is the maximum amount of data present in buffer. + // fCursor is the current file pointer. Thus maximum we can + // go back and read from buffer is fCursor - limit. + // There maybe case that we read less than requested data. + long filePosAtStartOfBuffer = fCursor - limit; + if (nextReadPos >= filePosAtStartOfBuffer && nextReadPos <= fCursor) { + // Determining position in buffer from where data is to be read. + bCursor = (int) (nextReadPos - filePosAtStartOfBuffer); + + // When bCursor == limit, buffer will be filled again. + // So in this case we are not actually reading from buffer. + if (bCursor != limit && streamStatistics != null) { + streamStatistics.seekInBuffer(); + } + } else { + // Clearing the buffer and setting the file pointer + // based on previous seek() call. + fCursor = nextReadPos; + limit = 0; + bCursor = 0; + } if (shouldReadFully()) { lastReadBytes = readFileCompletely(b, currentOff, currentLen); } else if (shouldReadLastBlock()) { @@ -265,9 +295,13 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO } else { // Enable readAhead when reading sequentially if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { + LOG.debug("Sequential read with read ahead size of {}", bufferSize); bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); + // Enabling read ahead for random reads as well to reduce number of remote calls. + int lengthWithReadAhead = Math.min(b.length + readAheadRange, bufferSize); + LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead); + bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, true); } } if (firstRead) { @@ -401,6 +435,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){ int bytesToRead = min(len, bytesRemaining); System.arraycopy(buffer, bCursor, b, off, bytesToRead); bCursor += bytesToRead; + nextReadPos += bytesToRead; if (statistics != null) { statistics.incrementBytesRead(bytesToRead); } @@ -481,13 +516,13 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti final AbfsRestOperation op; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { + if (streamStatistics != null) { + streamStatistics.remoteReadOperation(); + } LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); - if (streamStatistics != null) { - streamStatistics.remoteReadOperation(); - } LOG.debug("issuing HTTP GET request params position = {} b.length = {} " + "offset = {} length = {}", position, b.length, offset, length); perfInfo.registerResult(op.getResult()).registerSuccess(true); @@ -545,21 +580,9 @@ public synchronized void seek(long n) throws IOException { streamStatistics.seek(n, fCursor); } - if (n>=fCursor-limit && n<=fCursor) { // within buffer - bCursor = (int) (n-(fCursor-limit)); - if (streamStatistics != null) { - streamStatistics.seekInBuffer(); - } - return; - } - // next read will read from here - fCursor = n; - LOG.debug("set fCursor to {}", fCursor); - - //invalidate buffer - limit = 0; - bCursor = 0; + nextReadPos = n; + LOG.debug("set nextReadPos to {}", nextReadPos); } @Override @@ -630,7 +653,7 @@ public synchronized long getPos() throws IOException { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - return fCursor - limit + bCursor; + return nextReadPos < 0 ? 0 : nextReadPos; } /** @@ -696,6 +719,11 @@ byte[] getBuffer() { return buffer; } + @VisibleForTesting + public int getReadAheadRange() { + return readAheadRange; + } + @VisibleForTesting protected void setCachedSasToken(final CachedSASToken cachedSasToken) { this.cachedSasToken = cachedSasToken; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index fe41f22a77..55f01bf15b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; /** * Class to hold extra input stream configs. @@ -38,6 +39,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private int readAheadBlockSize; + private int readAheadRange; + private AbfsInputStreamStatistics streamStatistics; private boolean readSmallFilesCompletely; @@ -69,6 +72,12 @@ public AbfsInputStreamContext withTolerateOobAppends( return this; } + public AbfsInputStreamContext withReadAheadRange( + final int readAheadRange) { + this.readAheadRange = readAheadRange; + return this; + } + public AbfsInputStreamContext withStreamStatistics( final AbfsInputStreamStatistics streamStatistics) { this.streamStatistics = streamStatistics; @@ -115,6 +124,8 @@ public AbfsInputStreamContext build() { readAheadBlockSize = readBufferSize; } // Validation of parameters to be done here. + Preconditions.checkArgument(readAheadRange > 0, + "Read ahead range should be greater than 0"); return this; } @@ -130,6 +141,10 @@ public boolean isTolerateOobAppends() { return tolerateOobAppends; } + public int getReadAheadRange() { + return readAheadRange; + } + public AbfsInputStreamStatistics getStreamStatistics() { return streamStatistics; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index 4d47d5a6ed..0e1eb769ce 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -127,6 +127,7 @@ public void testSeekStatistics() throws IOException { */ for (int i = 0; i < OPERATIONS; i++) { in.seek(0); + in.read(); in.seek(ONE_MB); } @@ -155,7 +156,7 @@ public void testSeekStatistics() throws IOException { * are in buffer. * * seekInBuffer - Since all seeks were in buffer, the seekInBuffer - * would be equal to 2 * OPERATIONS. + * would be equal to OPERATIONS. * */ assertEquals("Mismatch in seekOps value", 2 * OPERATIONS, @@ -168,7 +169,7 @@ public void testSeekStatistics() throws IOException { OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); assertEquals("Mismatch in bytesSkippedOnSeek value", 0, stats.getBytesSkippedOnSeek()); - assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, + assertEquals("Mismatch in seekInBuffer value", OPERATIONS, stats.getSeekInBuffer()); in.close(); @@ -260,6 +261,7 @@ public void testWithNullStreamStatistics() throws IOException { .withReadBufferSize(getConfiguration().getReadBufferSize()) .withReadAheadQueueDepth(getConfiguration().getReadAheadQueueDepth()) .withStreamStatistics(null) + .withReadAheadRange(getConfiguration().getReadAheadRange()) .build(); AbfsOutputStream out = null; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java index bda845bb45..fe25477beb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.azurebfs.utils.Base64; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_RANGE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS; @@ -141,6 +142,7 @@ public void testConfigServiceImplAnnotatedFieldsInitialized() throws Exception { assertEquals(DEFAULT_MAX_RETRY_ATTEMPTS, abfsConfiguration.getMaxIoRetries()); assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize()); assertEquals(AZURE_BLOCK_LOCATION_HOST_DEFAULT, abfsConfiguration.getAzureBlockLocationHost()); + assertEquals(DEFAULT_READ_AHEAD_RANGE, abfsConfiguration.getReadAheadRange()); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java index 35a5e1733d..3222a15cd5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java @@ -18,10 +18,27 @@ package org.apache.hadoop.fs.azurebfs.contract; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; import org.apache.hadoop.fs.contract.AbstractContractSeekTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture; + /** * Contract test for seek operation. */ @@ -29,6 +46,8 @@ public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{ private final boolean isSecure; private final ABFSContractTestBinding binding; + private static final byte[] BLOCK = dataset(100 * 1024, 0, 255); + public ITestAbfsFileSystemContractSeek() throws Exception { binding = new ABFSContractTestBinding(); this.isSecure = binding.isSecureMode(); @@ -47,6 +66,225 @@ protected Configuration createConfiguration() { @Override protected AbstractFSContract createContract(final Configuration conf) { + conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE); + conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); return new AbfsFileSystemContract(conf, isSecure); } + + /** + * Test verifies if the data is read correctly + * when {@code ConfigurationKeys#AZURE_READ_AHEAD_RANGE} is set. + * Reason for not breaking this test into smaller parts is we + * really want to simulate lot of forward and backward seeks + * similar to real production use case. + */ + @Test + public void testSeekAndReadWithReadAhead() throws IOException { + describe(" Testing seek and read with read ahead " + + "enabled for random reads"); + + Path testSeekFile = path(getMethodName() + "bigseekfile.txt"); + createDataSet(testSeekFile); + try (FSDataInputStream in = getFileSystem().open(testSeekFile)) { + AbfsInputStream inStream = ((AbfsInputStream) in.getWrappedStream()); + AbfsInputStreamStatisticsImpl streamStatistics = + (AbfsInputStreamStatisticsImpl) inStream.getStreamStatistics(); + assertEquals(String.format("Value of %s is not set correctly", AZURE_READ_AHEAD_RANGE), + MIN_BUFFER_SIZE, inStream.getReadAheadRange()); + + long remoteReadOperationsOldVal = streamStatistics.getRemoteReadOperations(); + Assertions.assertThat(remoteReadOperationsOldVal) + .describedAs("Number of remote read ops should be 0 " + + "before any read call is made") + .isEqualTo(0); + + // Test read at first position. Remote read. + Assertions.assertThat(inStream.getPos()) + .describedAs("First call to getPos() should return 0") + .isEqualTo(0); + assertDataAtPos(0, (byte) in.read()); + assertSeekBufferStats(0, streamStatistics.getSeekInBuffer()); + long remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just before read ahead range. Read from buffer. + int newSeek = inStream.getReadAheadRange() - 1; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(1, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking boundary of read ahead range. Read from buffer manager. + newSeek = inStream.getReadAheadRange(); + inStream.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(1, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just after read ahead range. Read from buffer. + newSeek = inStream.getReadAheadRange() + 1; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(2, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just 10 more bytes such that data is read from buffer. + newSeek += 10; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(3, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seek backward such that data is read from remote. + newSeek -= 106; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(3, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Seeking just 10 more bytes such that data is read from buffer. + newSeek += 10; + in.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(4, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + + // Read multiple bytes across read ahead range. Remote read. + long oldSeek = newSeek; + newSeek = 2*inStream.getReadAheadRange() -1; + byte[] bytes = new byte[5]; + in.readFully(newSeek, bytes); + // With readFully getPos should return oldSeek pos. + // Adding one as one byte is already read + // after the last seek is done. + assertGetPosition(oldSeek + 1, in.getPos()); + assertSeekBufferStats(4, streamStatistics.getSeekInBuffer()); + assertDatasetEquals(newSeek, "Read across read ahead ", + bytes, bytes.length); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, + remoteReadOperationsNewVal); + } + } + + /** + * Test to validate the getPos() when a seek is done + * post {@code AbfsInputStream#unbuffer} call is made. + * Also using optimised builder api to open file. + */ + @Test + public void testSeekAfterUnbuffer() throws IOException { + describe("Test to make sure that seeking in AbfsInputStream after " + + "unbuffer() call is not doing anyIO."); + Path testFile = path(getMethodName() + ".txt"); + createDataSet(testFile); + final CompletableFuture future = + getFileSystem().openFile(testFile) + .build(); + try (FSDataInputStream inputStream = awaitFuture(future)) { + AbfsInputStream abfsInputStream = (AbfsInputStream) inputStream.getWrappedStream(); + AbfsInputStreamStatisticsImpl streamStatistics = + (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics(); + int readAheadRange = abfsInputStream.getReadAheadRange(); + long seekPos = readAheadRange; + inputStream.seek(seekPos); + assertDataAtPos(readAheadRange, (byte) inputStream.read()); + long currentRemoteReadOps = streamStatistics.getRemoteReadOperations(); + assertIncrementInRemoteReadOps(0, currentRemoteReadOps); + inputStream.unbuffer(); + seekPos -= 10; + inputStream.seek(seekPos); + // Seek backwards shouldn't do any IO + assertNoIncrementInRemoteReadOps(currentRemoteReadOps, streamStatistics.getRemoteReadOperations()); + assertGetPosition(seekPos, inputStream.getPos()); + } + } + + private void createDataSet(Path path) throws IOException { + createFile(getFileSystem(), path, true, BLOCK); + } + + private void assertGetPosition(long expected, long actual) { + final String seekPosErrorMsg = "getPos() should return %s"; + Assertions.assertThat(actual) + .describedAs(seekPosErrorMsg, expected) + .isEqualTo(actual); + } + + private void assertDataAtPos(int pos, byte actualData) { + final String dataErrorMsg = "Mismatch in data@%s"; + Assertions.assertThat(actualData) + .describedAs(dataErrorMsg, pos) + .isEqualTo(BLOCK[pos]); + } + + private void assertSeekBufferStats(long expected, long actual) { + final String statsErrorMsg = "Mismatch in seekInBuffer counts"; + Assertions.assertThat(actual) + .describedAs(statsErrorMsg) + .isEqualTo(expected); + } + + private void assertNoIncrementInRemoteReadOps(long oldVal, long newVal) { + final String incrementErrorMsg = "Number of remote read ops shouldn't increase"; + Assertions.assertThat(newVal) + .describedAs(incrementErrorMsg) + .isEqualTo(oldVal); + } + + private void assertIncrementInRemoteReadOps(long oldVal, long newVal) { + final String incrementErrorMsg = "Number of remote read ops should increase"; + Assertions.assertThat(newVal) + .describedAs(incrementErrorMsg) + .isGreaterThan(oldVal); + } + + /** + * Assert that the data read matches the dataset at the given offset. + * This helps verify that the seek process is moving the read pointer + * to the correct location in the file. + * @param readOffset the offset in the file where the read began. + * @param operation operation name for the assertion. + * @param data data read in. + * @param length length of data to check. + */ + private void assertDatasetEquals( + final int readOffset, + final String operation, + final byte[] data, + int length) { + for (int i = 0; i < length; i++) { + int o = readOffset + i; + Assertions.assertThat(data[i]) + .describedAs(operation + "with read offset " + readOffset + + ": data[" + i + "] != actualData[" + o + "]") + .isEqualTo(BLOCK[o]); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 44b0a362dc..299f085ee1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -247,8 +247,8 @@ private void verifyBeforeSeek(AbfsInputStream abfsInputStream){ assertEquals(0, abfsInputStream.getBCursor()); } - private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos){ - assertEquals(seekPos, abfsInputStream.getFCursor()); + private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos) throws IOException { + assertEquals(seekPos, abfsInputStream.getPos()); assertEquals(-1, abfsInputStream.getFCursorAfterLastRead()); assertEquals(0, abfsInputStream.getLimit()); assertEquals(0, abfsInputStream.getBCursor());