From 1bb4101b599255bed6b51a91a280972488f46ae3 Mon Sep 17 00:00:00 2001 From: Anoop Sam John Date: Tue, 16 Feb 2021 22:27:52 +0530 Subject: [PATCH] HADOOP-17038 Support disabling buffered reads in ABFS positional reads. (#2646) - Contributed by @anoopsjohn --- .../fs/azurebfs/AzureBlobFileSystem.java | 28 ++- .../fs/azurebfs/AzureBlobFileSystemStore.java | 21 +- .../azurebfs/constants/ConfigurationKeys.java | 9 +- .../fs/azurebfs/services/AbfsInputStream.java | 45 ++++ .../services/AbfsInputStreamContext.java | 11 + .../hadoop-azure/src/site/markdown/abfs.md | 10 + .../services/ITestAbfsPositionedRead.java | 233 ++++++++++++++++++ 7 files changed, 352 insertions(+), 5 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPositionedRead.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index ead8566b4c..ed607b38e6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -30,9 +30,12 @@ import java.util.Hashtable; import java.util.List; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -73,6 +76,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; +import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -82,6 +87,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.hadoop.util.LambdaUtils; import org.apache.hadoop.util.Progressable; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; @@ -176,11 +182,18 @@ public URI getUri() { @Override public FSDataInputStream open(final Path path, final int bufferSize) throws IOException { LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize); + // bufferSize is unused. + return open(path, Optional.empty()); + } + + private FSDataInputStream open(final Path path, + final Optional options) throws IOException { statIncrement(CALL_OPEN); Path qualifiedPath = makeQualified(path); try { - InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, statistics); + InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, + options, statistics); return new FSDataInputStream(inputStream); } catch(AzureBlobFileSystemException ex) { checkException(path, ex); @@ -188,6 +201,19 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx } } + @Override + protected CompletableFuture openFileWithOptions( + final Path path, final OpenFileParameters parameters) throws IOException { + LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path); + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), + Collections.emptySet(), + "for " + path); + return LambdaUtils.eval( + new CompletableFuture<>(), () -> + open(path, Optional.of(parameters.getOptions()))); + } + @Override public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index f4be159bf9..678f0b4f20 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -125,6 +126,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS; /** @@ -606,7 +608,15 @@ public void createDirectory(final Path path, final FsPermission permission, fina } } - public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) + public AbfsInputStream openFileForRead(final Path path, + final FileSystem.Statistics statistics) + throws AzureBlobFileSystemException { + return openFileForRead(path, Optional.empty(), statistics); + } + + public AbfsInputStream openFileForRead(final Path path, + final Optional options, + final FileSystem.Statistics statistics) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) { LOG.debug("openFileForRead filesystem: {} path: {}", @@ -635,12 +645,16 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist // Add statistics for InputStream return new AbfsInputStream(client, statistics, relativePath, contentLength, - populateAbfsInputStreamContext(), + populateAbfsInputStreamContext(options), eTag); } } - private AbfsInputStreamContext populateAbfsInputStreamContext() { + private AbfsInputStreamContext populateAbfsInputStreamContext( + Optional options) { + boolean bufferedPreadDisabled = options + .map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false)) + .orElse(false); return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) @@ -651,6 +665,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext() { .withShouldReadBufferSizeAlways( abfsConfiguration.shouldReadBufferSizeAlways()) .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize()) + .withBufferedPreadDisabled(bufferedPreadDisabled) .build(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 8a9c63ddbe..5857864912 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; /** * Responsible to keep all the Azure Blob File System configurations keys in Hadoop configuration file. @@ -181,6 +182,12 @@ public static String accountProperty(String property, String account) { public static final String FS_AZURE_LOCAL_USER_SP_MAPPING_FILE_PATH = "fs.azure.identity.transformer.local.service.principal.mapping.file.path"; /** Key for Local Group to Service Group file location. */ public static final String FS_AZURE_LOCAL_GROUP_SG_MAPPING_FILE_PATH = "fs.azure.identity.transformer.local.service.group.mapping.file.path"; - + /** + * Optional config to enable a lock free pread which will bypass buffer in AbfsInputStream. + * This is not a config which can be set at cluster level. It can be used as + * an option on FutureDataInputStreamBuilder. + * @see FileSystem#openFile(org.apache.hadoop.fs.Path) + */ + public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable"; private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index c1de031215..0dd3dcf065 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -70,6 +70,14 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final boolean tolerateOobAppends; // whether tolerate Oob Appends private final boolean readAheadEnabled; // whether enable readAhead; private final boolean alwaysReadBufferSize; + /* + * By default the pread API will do a seek + read as in FSInputStream. + * The read data will be kept in a buffer. When bufferedPreadDisabled is true, + * the pread API will read only the specified amount of data from the given + * offset and the buffer will not come into use at all. + * @see #read(long, byte[], int, int) + */ + private final boolean bufferedPreadDisabled; private boolean firstRead = true; // SAS tokens can be re-used until they expire @@ -117,6 +125,8 @@ public AbfsInputStream( this.readAheadEnabled = true; this.alwaysReadBufferSize = abfsInputStreamContext.shouldReadBufferSizeAlways(); + this.bufferedPreadDisabled = abfsInputStreamContext + .isBufferedPreadDisabled(); this.cachedSasToken = new CachedSASToken( abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); this.streamStatistics = abfsInputStreamContext.getStreamStatistics(); @@ -135,6 +145,41 @@ public String getPath() { return path; } + @Override + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + // When bufferedPreadDisabled = true, this API does not use any shared buffer, + // cursor position etc. So this is implemented as NOT synchronized. HBase + // kind of random reads on a shared file input stream will greatly get + // benefited by such implementation. + // Strict close check at the begin of the API only not for the entire flow. + synchronized (this) { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + } + LOG.debug("pread requested offset = {} len = {} bufferedPreadDisabled = {}", + offset, length, bufferedPreadDisabled); + if (!bufferedPreadDisabled) { + return super.read(position, buffer, offset, length); + } + validatePositionedReadArgs(position, buffer, offset, length); + if (length == 0) { + return 0; + } + if (streamStatistics != null) { + streamStatistics.readOperationStarted(); + } + int bytesRead = readRemote(position, buffer, offset, length); + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + if (streamStatistics != null) { + streamStatistics.bytesRead(bytesRead); + } + return bytesRead; + } + @Override public int read() throws IOException { byte[] b = new byte[1]; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index ab3d3b0e76..fe41f22a77 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -44,6 +44,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean optimizeFooterRead; + private boolean bufferedPreadDisabled; + public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -97,6 +99,12 @@ public AbfsInputStreamContext withReadAheadBlockSize( return this; } + public AbfsInputStreamContext withBufferedPreadDisabled( + final boolean bufferedPreadDisabled) { + this.bufferedPreadDisabled = bufferedPreadDisabled; + return this; + } + public AbfsInputStreamContext build() { if (readBufferSize > readAheadBlockSize) { LOG.debug( @@ -142,4 +150,7 @@ public int getReadAheadBlockSize() { return readAheadBlockSize; } + public boolean isBufferedPreadDisabled() { + return bufferedPreadDisabled; + } } diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 0777f9b43b..33d4a0fa42 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -812,6 +812,16 @@ aheads. Specify the value in bytes. The value should be between 16384 to 104857600 both inclusive (16 KB to 100 MB). The default value will be 4194304 (4 MB). +`fs.azure.buffered.pread.disable`: By default the positional read API will do a +seek and read on input stream. This read will fill the buffer cache in +AbfsInputStream and update the cursor positions. If this optimization is true +it will skip usage of buffer and do a lock free REST call for reading from blob. +This optimization is very much helpful for HBase kind of short random read over +a shared AbfsInputStream instance. +Note: This is not a config which can be set at cluster level. It can be used as +an option on FutureDataInputStreamBuilder. +See FileSystem#openFile(Path path) + To run under limited memory situations configure the following. Especially when there are too many writes from the same process. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPositionedRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPositionedRead.java new file mode 100644 index 0000000000..25f33db1ca --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPositionedRead.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.ExecutionException; + +import org.junit.Rule; +import org.junit.rules.TestName; +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import org.assertj.core.api.Assertions; + +public class ITestAbfsPositionedRead extends AbstractAbfsIntegrationTest { + + private static final int TEST_FILE_DATA_SIZE = 100; + + @Rule + public TestName methodName = new TestName(); + + public ITestAbfsPositionedRead() throws Exception { + } + + @Test + public void testPositionedRead() throws IOException { + describe("Testing positioned reads in AbfsInputStream"); + Path dest = path(methodName.getMethodName()); + + byte[] data = ContractTestUtils.dataset(TEST_FILE_DATA_SIZE, 'a', 'z'); + ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length, + TEST_FILE_DATA_SIZE, true); + int bytesToRead = 10; + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + assertTrue( + "unexpected stream type " + + inputStream.getWrappedStream().getClass().getSimpleName(), + inputStream.getWrappedStream() instanceof AbfsInputStream); + byte[] readBuffer = new byte[bytesToRead]; + int readPos = 0; + Assertions + .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead)) + .describedAs( + "AbfsInputStream pread did not read the correct number of bytes") + .isEqualTo(bytesToRead); + Assertions.assertThat(readBuffer) + .describedAs("AbfsInputStream pread did not read correct data") + .containsExactly( + Arrays.copyOfRange(data, readPos, readPos + bytesToRead)); + // Read only 10 bytes from offset 0. But by default it will do the seek + // and read where the entire 100 bytes get read into the + // AbfsInputStream buffer. + Assertions + .assertThat(Arrays.copyOfRange( + ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0, + TEST_FILE_DATA_SIZE)) + .describedAs( + "AbfsInputStream pread did not read more data into its buffer") + .containsExactly(data); + // Check statistics + assertStatistics(inputStream.getIOStatistics(), bytesToRead, 1, 1, + TEST_FILE_DATA_SIZE); + + readPos = 50; + Assertions + .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead)) + .describedAs( + "AbfsInputStream pread did not read the correct number of bytes") + .isEqualTo(bytesToRead); + Assertions.assertThat(readBuffer) + .describedAs("AbfsInputStream pread did not read correct data") + .containsExactly( + Arrays.copyOfRange(data, readPos, readPos + bytesToRead)); + // Check statistics + assertStatistics(inputStream.getIOStatistics(), 2 * bytesToRead, 2, 1, + TEST_FILE_DATA_SIZE); + // Did positioned read from pos 0 and then 50 but the stream pos should + // remain at 0. + Assertions.assertThat(inputStream.getPos()) + .describedAs("AbfsInputStream positioned reads moved stream position") + .isEqualTo(0); + } + } + + private void assertStatistics(IOStatistics ioStatistics, + long expectedBytesRead, long expectedReadOps, long expectedRemoteReadOps, + long expectedRemoteReadBytes) { + Assertions + .assertThat(ioStatistics.counters() + .get(StreamStatisticNames.STREAM_READ_BYTES).longValue()) + .describedAs("Mismatch in bytesRead statistics") + .isEqualTo(expectedBytesRead); + Assertions + .assertThat(ioStatistics.counters() + .get(StreamStatisticNames.STREAM_READ_OPERATIONS).longValue()) + .describedAs("Mismatch in readOps statistics") + .isEqualTo(expectedReadOps); + Assertions + .assertThat(ioStatistics.counters() + .get(StreamStatisticNames.REMOTE_READ_OP).longValue()) + .describedAs("Mismatch in remoteReadOps statistics") + .isEqualTo(expectedRemoteReadOps); + Assertions + .assertThat(ioStatistics.counters() + .get(StreamStatisticNames.REMOTE_BYTES_READ).longValue()) + .describedAs("Mismatch in remoteReadBytes statistics") + .isEqualTo(expectedRemoteReadBytes); + } + + @Test + public void testPositionedReadWithBufferedReadDisabled() throws IOException { + describe("Testing positioned reads in AbfsInputStream with BufferedReadDisabled"); + Path dest = path(methodName.getMethodName()); + byte[] data = ContractTestUtils.dataset(TEST_FILE_DATA_SIZE, 'a', 'z'); + ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length, + TEST_FILE_DATA_SIZE, true); + FutureDataInputStreamBuilder builder = getFileSystem().openFile(dest); + builder.opt(ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE, true); + FSDataInputStream inputStream = null; + try { + inputStream = builder.build().get(); + } catch (IllegalArgumentException | UnsupportedOperationException + | InterruptedException | ExecutionException e) { + throw new IOException( + "Exception opening " + dest + " with FutureDataInputStreamBuilder", + e); + } + assertNotNull("Null InputStream over " + dest, inputStream); + int bytesToRead = 10; + try { + AbfsInputStream abfsIs = (AbfsInputStream) inputStream.getWrappedStream(); + byte[] readBuffer = new byte[bytesToRead]; + int readPos = 10; + Assertions + .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead)) + .describedAs( + "AbfsInputStream pread did not read the correct number of bytes") + .isEqualTo(bytesToRead); + Assertions.assertThat(readBuffer) + .describedAs("AbfsInputStream pread did not read correct data") + .containsExactly( + Arrays.copyOfRange(data, readPos, readPos + bytesToRead)); + // Read only 10 bytes from offset 10. This time, as buffered pread is + // disabled, it will only read the exact bytes as requested and no data + // will get read into the AbfsInputStream#buffer. Infact the buffer won't + // even get initialized. + assertNull("AbfsInputStream pread caused the internal buffer creation", + abfsIs.getBuffer()); + // Check statistics + assertStatistics(inputStream.getIOStatistics(), bytesToRead, 1, 1, + bytesToRead); + readPos = 40; + Assertions + .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead)) + .describedAs( + "AbfsInputStream pread did not read the correct number of bytes") + .isEqualTo(bytesToRead); + Assertions.assertThat(readBuffer) + .describedAs("AbfsInputStream pread did not read correct data") + .containsExactly( + Arrays.copyOfRange(data, readPos, readPos + bytesToRead)); + assertStatistics(inputStream.getIOStatistics(), 2 * bytesToRead, 2, 2, + 2 * bytesToRead); + // Now make a seek and read so that internal buffer gets created + inputStream.seek(0); + Assertions.assertThat(inputStream.read(readBuffer)).describedAs( + "AbfsInputStream seek+read did not read the correct number of bytes") + .isEqualTo(bytesToRead); + // This read would have fetched all 100 bytes into internal buffer. + Assertions + .assertThat(Arrays.copyOfRange( + ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0, + TEST_FILE_DATA_SIZE)) + .describedAs( + "AbfsInputStream seek+read did not read more data into its buffer") + .containsExactly(data); + assertStatistics(inputStream.getIOStatistics(), 3 * bytesToRead, 3, 3, + TEST_FILE_DATA_SIZE + 2 * bytesToRead); + resetBuffer(abfsIs.getBuffer()); + // Now again do pos read and make sure not any extra data being fetched. + readPos = 0; + Assertions + .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead)) + .describedAs( + "AbfsInputStream pread did not read the correct number of bytes") + .isEqualTo(bytesToRead); + Assertions.assertThat(readBuffer) + .describedAs("AbfsInputStream pread did not read correct data") + .containsExactly( + Arrays.copyOfRange(data, readPos, readPos + bytesToRead)); + Assertions + .assertThat(Arrays.copyOfRange( + ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0, + TEST_FILE_DATA_SIZE)) + .describedAs( + "AbfsInputStream pread read more data into its buffer than expected") + .doesNotContain(data); + assertStatistics(inputStream.getIOStatistics(), 4 * bytesToRead, 4, 4, + TEST_FILE_DATA_SIZE + 3 * bytesToRead); + } finally { + inputStream.close(); + } + } + + private void resetBuffer(byte[] buf) { + for (int i = 0; i < buf.length; i++) { + buf[i] = (byte) 0; + } + } +}