From b4f9d8e6fa47ee422a31765240504e70d3ed2781 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 3 Apr 2024 13:15:05 +0100 Subject: [PATCH] Revert "HADOOP-19098. Vector IO: Specify and validate ranges consistently." This reverts commit ba7faf90c80476c79e6bfc7c02749dfc031337eb. --- .../apache/hadoop/fs/ChecksumFileSystem.java | 37 +- .../apache/hadoop/fs/PositionedReadable.java | 1 - .../apache/hadoop/fs/RawLocalFileSystem.java | 7 +- .../apache/hadoop/fs/VectoredReadUtils.java | 185 ++-- .../hadoop/fs/impl/CombinedFileRange.java | 43 +- .../apache/hadoop/fs/impl/FileRangeImpl.java | 3 +- .../markdown/filesystem/fsdatainputstream.md | 145 +--- .../hadoop/fs/TestVectoredReadUtils.java | 487 +++++++++++ .../AbstractContractVectoredReadTest.java | 395 ++++----- .../hadoop/fs/contract/ContractOptions.java | 5 - .../hadoop/fs/contract/ContractTestUtils.java | 68 +- .../TestLocalFSContractVectoredRead.java | 22 +- .../hadoop/fs/impl/TestVectoredReadUtils.java | 804 ------------------ .../src/test/resources/contract/localfs.xml | 5 - .../hdfs/TestHDFSContractVectoredRead.java | 54 -- .../apache/hadoop/fs/s3a/S3AInputStream.java | 196 ++--- .../hadoop/fs/s3a/impl/SDKStreamDrainer.java | 7 +- .../s3a/ITestS3AContractVectoredRead.java | 127 ++- .../auth/delegation/ITestDelegatedMRJob.java | 2 +- .../fs/s3a/performance/ITestS3AOpenCost.java | 3 +- .../s3a/scale/AbstractSTestS3AHugeFiles.java | 110 +-- .../scale/ITestS3AHugeFilesDiskBlocks.java | 6 - .../src/test/resources/contract/s3a.xml | 5 - ...estAbfsFileSystemContractVectoredRead.java | 54 -- 24 files changed, 940 insertions(+), 1831 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java delete mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractVectoredRead.java delete mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractVectoredRead.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 716c6c5004..4c7569d6ec 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -29,7 +29,6 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.IntFunction; @@ -53,9 +52,9 @@ import org.apache.hadoop.util.Progressable; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; -import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; /**************************************************************** * Abstract Checksumed FileSystem. @@ -426,31 +425,41 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, } /** - * Vectored read. - * If the file has no checksums: delegate to the underlying stream. - * If the file is checksummed: calculate the checksum ranges as - * well as the data ranges, read both, and validate the checksums - * as well as returning the data. - * @param ranges the byte ranges to read - * @param allocate the function to allocate ByteBuffer - * @throws IOException + * Validates range parameters. + * In case of CheckSum FS, we already have calculated + * fileLength so failing fast here. + * @param ranges requested ranges. + * @param fileLength length of file. + * @throws EOFException end of file exception. */ + private void validateRangeRequest(List ranges, + final long fileLength) throws EOFException { + for (FileRange range : ranges) { + VectoredReadUtils.validateRangeRequest(range); + if (range.getOffset() + range.getLength() > fileLength) { + final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s", + range.getOffset(), range.getLength(), file); + LOG.warn(errMsg); + throw new EOFException(errMsg); + } + } + } + @Override public void readVectored(List ranges, IntFunction allocate) throws IOException { + final long length = getFileLength(); + validateRangeRequest(ranges, length); // If the stream doesn't have checksums, just delegate. if (sums == null) { datas.readVectored(ranges, allocate); return; } - final long length = getFileLength(); - final List sorted = validateAndSortRanges(ranges, - Optional.of(length)); int minSeek = minSeekForVectorReads(); int maxSize = maxReadSizeForVectorReads(); List dataRanges = - VectoredReadUtils.mergeSortedRanges(sorted, bytesPerSum, + VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum, minSeek, maxReadSizeForVectorReads()); // While merging the ranges above, they are rounded up based on the value of bytesPerSum // which leads to some ranges crossing the EOF thus they need to be fixed else it will diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index 90009ecb61..7380402eb6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -127,7 +127,6 @@ default int maxReadSizeForVectorReads() { * @param ranges the byte ranges to read * @param allocate the function to allocate ByteBuffer * @throws IOException any IOE. - * @throws IllegalArgumentException if the any of ranges are invalid, or they overlap. */ default void readVectored(List ranges, IntFunction allocate) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index 083d2752b6..2f4f93099b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -68,8 +68,8 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; -import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS; @@ -319,11 +319,10 @@ AsynchronousFileChannel getAsyncChannel() throws IOException { public void readVectored(List ranges, IntFunction allocate) throws IOException { - // Validate, but do not pass in a file length as it may change. - List sortedRanges = validateAndSortRanges(ranges, - Optional.empty()); + List sortedRanges = Arrays.asList(sortRanges(ranges)); // Set up all of the futures, so that we can use them if things fail for(FileRange range: sortedRanges) { + VectoredReadUtils.validateRangeRequest(range); range.setData(new CompletableFuture<>()); } try { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index 493b8c3a33..cf1b1ef969 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -22,56 +22,36 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.IntFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.impl.CombinedFileRange; +import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.functional.Function4RaisingIOE; -import static java.util.Objects.requireNonNull; -import static org.apache.hadoop.util.Preconditions.checkArgument; - /** * Utility class which implements helper methods used * in vectored IO implementation. */ -@InterfaceAudience.LimitedPrivate("Filesystems") -@InterfaceStability.Unstable public final class VectoredReadUtils { private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024; - private static final Logger LOG = - LoggerFactory.getLogger(VectoredReadUtils.class); - /** * Validate a single range. - * @param range range to validate. - * @return the range. - * @param range type - * @throws IllegalArgumentException the range length is negative or other invalid condition - * is met other than the those which raise EOFException or NullPointerException. - * @throws EOFException the range offset is negative - * @throws NullPointerException if the range is null. + * @param range file range. + * @throws EOFException any EOF Exception. */ - public static T validateRangeRequest(T range) + public static void validateRangeRequest(FileRange range) throws EOFException { - requireNonNull(range, "range is null"); - - checkArgument(range.getLength() >= 0, "length is negative in %s", range); + Preconditions.checkArgument(range.getLength() >= 0, "length is negative"); if (range.getOffset() < 0) { - throw new EOFException("position is negative in range " + range); + throw new EOFException("position is negative"); } - return range; } /** @@ -81,9 +61,13 @@ public static T validateRangeRequest(T range) */ public static void validateVectoredReadRanges(List ranges) throws EOFException { - validateAndSortRanges(ranges, Optional.empty()); + for (FileRange range : ranges) { + validateRangeRequest(range); + } } + + /** * This is the default implementation which iterates through the ranges * to read each synchronously, but the intent is that subclasses @@ -92,13 +76,11 @@ public static void validateVectoredReadRanges(List ranges) * @param stream the stream to read the data from * @param ranges the byte ranges to read * @param allocate the byte buffer allocation - * @throws IllegalArgumentException if there are overlapping ranges or a range is invalid - * @throws EOFException the range offset is negative */ public static void readVectored(PositionedReadable stream, List ranges, - IntFunction allocate) throws EOFException { - for (FileRange range: validateAndSortRanges(ranges, Optional.empty())) { + IntFunction allocate) { + for (FileRange range: ranges) { range.setData(readRangeFrom(stream, range, allocate)); } } @@ -109,52 +91,33 @@ public static void readVectored(PositionedReadable stream, * @param stream the stream to read from * @param range the range to read * @param allocate the function to allocate ByteBuffers - * @return the CompletableFuture that contains the read data or an exception. - * @throws IllegalArgumentException the range is invalid other than by offset or being null. - * @throws EOFException the range offset is negative - * @throws NullPointerException if the range is null. + * @return the CompletableFuture that contains the read data */ - public static CompletableFuture readRangeFrom( - PositionedReadable stream, - FileRange range, - IntFunction allocate) throws EOFException { - - validateRangeRequest(range); + public static CompletableFuture readRangeFrom(PositionedReadable stream, + FileRange range, + IntFunction allocate) { CompletableFuture result = new CompletableFuture<>(); try { ByteBuffer buffer = allocate.apply(range.getLength()); if (stream instanceof ByteBufferPositionedReadable) { - LOG.debug("ByteBufferPositionedReadable.readFully of {}", range); ((ByteBufferPositionedReadable) stream).readFully(range.getOffset(), buffer); buffer.flip(); } else { - // no positioned readable support; fall back to - // PositionedReadable methods readNonByteBufferPositionedReadable(stream, range, buffer); } result.complete(buffer); } catch (IOException ioe) { - LOG.debug("Failed to read {}", range, ioe); result.completeExceptionally(ioe); } return result; } - /** - * Read into a direct tor indirect buffer using {@code PositionedReadable.readFully()}. - * @param stream stream - * @param range file range - * @param buffer destination buffer - * @throws IOException IO problems. - */ - private static void readNonByteBufferPositionedReadable( - PositionedReadable stream, - FileRange range, - ByteBuffer buffer) throws IOException { + private static void readNonByteBufferPositionedReadable(PositionedReadable stream, + FileRange range, + ByteBuffer buffer) throws IOException { if (buffer.isDirect()) { - LOG.debug("Reading {} into a direct byte buffer from {}", range, stream); - readInDirectBuffer(range, + readInDirectBuffer(range.getLength(), buffer, (position, buffer1, offset, length) -> { stream.readFully(position, buffer1, offset, length); @@ -162,8 +125,6 @@ private static void readNonByteBufferPositionedReadable( }); buffer.flip(); } else { - // not a direct buffer, so read straight into the array - LOG.debug("Reading {} into a byte buffer from {}", range, stream); stream.readFully(range.getOffset(), buffer.array(), buffer.arrayOffset(), range.getLength()); } @@ -172,42 +133,26 @@ private static void readNonByteBufferPositionedReadable( /** * Read bytes from stream into a byte buffer using an * intermediate byte array. - *
-   *     (position, buffer, buffer-offset, length): Void
-   *     position:= the position within the file to read data.
-   *     buffer := a buffer to read fully `length` bytes into.
-   *     buffer-offset := the offset within the buffer to write data
-   *     length := the number of bytes to read.
-   *   
- * The passed in function MUST block until the required length of - * data is read, or an exception is thrown. - * @param range range to read + * @param length number of bytes to read. * @param buffer buffer to fill. * @param operation operation to use for reading data. * @throws IOException any IOE. */ - public static void readInDirectBuffer(FileRange range, - ByteBuffer buffer, - Function4RaisingIOE operation) - throws IOException { - - LOG.debug("Reading {} into a direct buffer", range); - validateRangeRequest(range); - int length = range.getLength(); + public static void readInDirectBuffer(int length, + ByteBuffer buffer, + Function4RaisingIOE operation) throws IOException { if (length == 0) { - // no-op return; } int readBytes = 0; - long position = range.getOffset(); + int position = 0; int tmpBufferMaxSize = Math.min(TMP_BUFFER_MAX_SIZE, length); byte[] tmp = new byte[tmpBufferMaxSize]; while (readBytes < length) { int currentLength = (readBytes + tmpBufferMaxSize) < length ? tmpBufferMaxSize : (length - readBytes); - LOG.debug("Reading {} bytes from position {} (bytes read={}", - currentLength, position, readBytes); operation.apply(position, tmp, 0, currentLength); buffer.put(tmp, 0, currentLength); position = position + currentLength; @@ -260,7 +205,7 @@ public static long roundDown(long offset, int chunkSize) { } /** - * Calculates the ceiling value of offset based on chunk size. + * Calculates the ceil value of offset based on chunk size. * @param offset file offset. * @param chunkSize file chunk size. * @return ceil value. @@ -275,69 +220,39 @@ public static long roundUp(long offset, int chunkSize) { } /** - * Validate a list of ranges (including overlapping checks) and - * return the sorted list. - *

- * Two ranges overlap when the start offset + * Check if the input ranges are overlapping in nature. + * We call two ranges to be overlapping when start offset * of second is less than the end offset of first. * End offset is calculated as start offset + length. - * @param input input list - * @param fileLength file length if known - * @return a new sorted list. - * @throws IllegalArgumentException if there are overlapping ranges or - * a range element is invalid (other than with negative offset) - * @throws EOFException if the last range extends beyond the end of the file supplied - * or a range offset is negative + * @param input list if input ranges. + * @return true/false based on logic explained above. */ - public static List validateAndSortRanges( - final List input, - final Optional fileLength) throws EOFException { + public static List validateNonOverlappingAndReturnSortedRanges( + List input) { - requireNonNull(input, "Null input list"); - checkArgument(!input.isEmpty(), "Empty input list"); - final List sortedRanges; - - if (input.size() == 1) { - validateRangeRequest(input.get(0)); - sortedRanges = input; - } else { - sortedRanges = sortRanges(input); - FileRange prev = null; - for (final FileRange current : sortedRanges) { - validateRangeRequest(current); - if (prev != null) { - checkArgument(current.getOffset() >= prev.getOffset() + prev.getLength(), - "Overlapping ranges %s and %s", prev, current); - } - prev = current; - } + if (input.size() <= 1) { + return input; } - // at this point the final element in the list is the last range - // so make sure it is not beyond the end of the file, if passed in. - // where invalid is: starts at or after the end of the file - if (fileLength.isPresent()) { - final FileRange last = sortedRanges.get(sortedRanges.size() - 1); - final Long l = fileLength.get(); - // this check is superfluous, but it allows for different exception message. - if (last.getOffset() >= l) { - throw new EOFException("Range starts beyond the file length (" + l + "): " + last); - } - if (last.getOffset() + last.getLength() > l) { - throw new EOFException("Range extends beyond the file length (" + l + "): " + last); + FileRange[] sortedRanges = sortRanges(input); + FileRange prev = sortedRanges[0]; + for (int i=1; i sortRanges(List input) { - final List l = new ArrayList<>(input); - l.sort(Comparator.comparingLong(FileRange::getOffset)); - return l; + public static FileRange[] sortRanges(List input) { + FileRange[] sortedRanges = input.toArray(new FileRange[0]); + Arrays.sort(sortedRanges, Comparator.comparingLong(FileRange::getOffset)); + return sortedRanges; } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java index b0fae1305e..c9555a1e54 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.impl; -import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileRange; import java.util.ArrayList; @@ -28,32 +27,13 @@ * A file range that represents a set of underlying file ranges. * This is used when we combine the user's FileRange objects * together into a single read for efficiency. - *

- * This class is not part of the public API; it MAY BE used as a parameter - * to vector IO operations in FileSystem implementation code (and is) */ -@InterfaceAudience.Private public class CombinedFileRange extends FileRangeImpl { - private final List underlying = new ArrayList<>(); - - /** - * Total size of the data in the underlying ranges. - */ - private long dataSize; + private List underlying = new ArrayList<>(); public CombinedFileRange(long offset, long end, FileRange original) { super(offset, (int) (end - offset), null); - append(original); - } - - /** - * Add a range to the underlying list; update - * the {@link #dataSize} field in the process. - * @param range range. - */ - private void append(final FileRange range) { - this.underlying.add(range); - dataSize += range.getLength(); + this.underlying.add(original); } /** @@ -84,24 +64,7 @@ public boolean merge(long otherOffset, long otherEnd, FileRange other, return false; } this.setLength((int) (newEnd - this.getOffset())); - append(other); + underlying.add(other); return true; } - - @Override - public String toString() { - return super.toString() - + String.format("; range count=%d, data size=%,d", - underlying.size(), dataSize); - } - - /** - * Get the total amount of data which is actually useful; - * the difference between this and {@link #getLength()} records - * how much data which will be discarded. - * @return a number greater than 0 and less than or equal to {@link #getLength()}. - */ - public long getDataSize() { - return dataSize; - } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java index ee541f6e7c..1239be764b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java @@ -53,8 +53,7 @@ public FileRangeImpl(long offset, int length, Object reference) { @Override public String toString() { - return String.format("range [%d-%d], length=%,d, reference=%s", - getOffset(), getOffset() + getLength(), getLength(), getReference()); + return "range[" + offset + "," + (offset + length) + ")"; } @Override diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index 6cbb54ea70..3820d0b8af 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -441,9 +441,9 @@ The semantics of this are exactly equivalent to readFully(position, buffer, 0, len(buffer)) That is, the buffer is filled entirely with the contents of the input source -from position `position`. +from position `position` -### `void readVectored(List ranges, IntFunction allocate)` +### `default void readVectored(List ranges, IntFunction allocate)` Read fully data for a list of ranges asynchronously. The default implementation iterates through the ranges, tries to coalesce the ranges based on values of @@ -459,119 +459,51 @@ The position returned by `getPos()` after `readVectored()` is undefined. If a file is changed while the `readVectored()` operation is in progress, the output is undefined. Some ranges may have old data, some may have new, and some may have both. -While a `readVectored()` operation is in progress, normal read API calls MAY block; -the value of `getPos(`) is also undefined. Applications SHOULD NOT make such requests -while waiting for the results of a vectored read. +While a `readVectored()` operation is in progress, normal read api calls may block. + +Note: Don't use direct buffers for reading from ChecksumFileSystem as that may +lead to memory fragmentation explained in HADOOP-18296. -Note: Don't use direct buffers for reading from `ChecksumFileSystem` as that may -lead to memory fragmentation explained in -[HADOOP-18296](https://issues.apache.org/jira/browse/HADOOP-18296) -_Memory fragmentation in ChecksumFileSystem Vectored IO implementation_ #### Preconditions -No empty lists. +For each requested range: -```python -if ranges = null raise NullPointerException -if ranges.len() = 0 raise IllegalArgumentException -if allocate = null raise NullPointerException -``` - -For each requested range `range[i]` in the list of ranges `range[0..n]` sorted -on `getOffset()` ascending such that - -for all `i where i > 0`: - - range[i].getOffset() > range[i-1].getOffset() - -For all ranges `0..i` the preconditions are: - -```python -ranges[i] != null else raise IllegalArgumentException -ranges[i].getOffset() >= 0 else raise EOFException -ranges[i].getLength() >= 0 else raise IllegalArgumentException -if i > 0 and ranges[i].getOffset() < (ranges[i-1].getOffset() + ranges[i-1].getLength) : - raise IllegalArgumentException -``` -If the length of the file is known during the validation phase: - -```python -if range[i].getOffset + range[i].getLength >= data.length() raise EOFException -``` + range.getOffset >= 0 else raise IllegalArgumentException + range.getLength >= 0 else raise EOFException #### Postconditions -For each requested range `range[i]` in the list of ranges `range[0..n]` +For each requested range: -``` -ranges[i]'.getData() = CompletableFuture -``` + range.getData() returns CompletableFuture which will have data + from range.getOffset to range.getLength. - and when `getData().get()` completes: -``` -let buffer = `getData().get() -let len = ranges[i].getLength() -let data = new byte[len] -(buffer.position() - buffer.limit) = len -buffer.get(data, 0, len) = readFully(ranges[i].getOffset(), data, 0, len) -``` - -That is: the result of every ranged read is the result of the (possibly asynchronous) -call to `PositionedReadable.readFully()` for the same offset and length - -#### `minSeekForVectorReads()` +### `minSeekForVectorReads()` The smallest reasonable seek. Two ranges won't be merged together if the difference between end of first and start of next range is more than this value. -#### `maxReadSizeForVectorReads()` +### `maxReadSizeForVectorReads()` Maximum number of bytes which can be read in one go after merging the ranges. -Two ranges won't be merged if the combined data to be read It's okay we have a look at what we do right now for readOkayis more than this value. +Two ranges won't be merged if the combined data to be read is more than this value. Essentially setting this to 0 will disable the merging of ranges. -#### Concurrency +## Consistency -* When calling `readVectored()` while a separate thread is trying - to read data through `read()`/`readFully()`, all operations MUST - complete successfully. -* Invoking a vector read while an existing set of pending vector reads - are in progress MUST be supported. The order of which ranges across - the multiple requests complete is undefined. -* Invoking `read()`/`readFully()` while a vector API call is in progress - MUST be supported. The order of which calls return data is undefined. - -The S3A connector closes any open stream when its `synchronized readVectored()` -method is invoked; -It will then switch the read policy from normal to random -so that any future invocations will be for limited ranges. -This is because the expectation is that vector IO and large sequential -reads are not mixed and that holding on to any open HTTP connection is wasteful. - -#### Handling of zero-length ranges - -Implementations MAY short-circuit reads for any range where `range.getLength() = 0` -and return an empty buffer. - -In such circumstances, other validation checks MAY be omitted. - -There are no guarantees that such optimizations take place; callers SHOULD NOT -include empty ranges for this reason. - -#### Consistency - -* All readers, local and remote, of a data stream `FSDIS` provided from a `FileSystem.open(p)` +* All readers, local and remote, of a data stream FSDIS provided from a `FileSystem.open(p)` are expected to receive access to the data of `FS.Files[p]` at the time of opening. * If the underlying data is changed during the read process, these changes MAY or MAY NOT be visible. * Such changes that are visible MAY be partially visible. -At time `t0` + +At time t0 FSDIS0 = FS'read(p) = (0, data0[]) -At time `t1` +At time t1 FS' = FS' where FS'.Files[p] = data1 @@ -612,41 +544,6 @@ While at time `t3 > t2`: It may be that `r3 != r2`. (That is, some of the data my be cached or replicated, and on a subsequent read, a different version of the file's contents are returned). + Similarly, if the data at the path `p`, is deleted, this change MAY or MAY not be visible during read operations performed on `FSDIS0`. - -#### API Stabilization Notes - -The `readVectored()` API was shipped in Hadoop 3.3.5, with explicit local, raw local and S3A -support -and fallback everywhere else. - -*Overlapping ranges* - -The restriction "no overlapping ranges" was only initially enforced in -the S3A connector, which would raise `UnsupportedOperationException`. -Adding the range check as a precondition for all implementations guarantees -consistent behavior everywhere. -For reliable use with older hadoop releases with the API: sort the list of ranges -and check for overlaps before calling `readVectored()`. - -*Direct Buffer Reads* - -Releases without [HADOOP-19101](https://issues.apache.org/jira/browse/HADOOP-19101) -_Vectored Read into off-heap buffer broken in fallback implementation_ can read data -from the wrong offset with the default "fallback" implementation if the buffer allocator -function returns off heap "direct" buffers. - -The custom implementations in local filesystem and S3A's non-prefetching stream are safe. - -Anyone implementing support for the API, unless confident they only run -against releases with the fixed implementation, SHOULD NOT use the API -if the allocator is direct and the input stream does not explicitly declare -support through an explicit `hasCapability()` probe: - -```java -Stream.hasCapability("in:readvectored") -``` - -Given the HADOOP-18296 problem with `ChecksumFileSystem` and direct buffers, across all releases, -it is best to avoid using this API in production with direct buffers. - diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java new file mode 100644 index 0000000000..e964d23f4b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import org.apache.hadoop.fs.impl.CombinedFileRange; +import org.apache.hadoop.test.HadoopTestBase; + +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully; +import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally; + +/** + * Test behavior of {@link VectoredReadUtils}. + */ +public class TestVectoredReadUtils extends HadoopTestBase { + + @Test + public void testSliceTo() { + final int size = 64 * 1024; + ByteBuffer buffer = ByteBuffer.allocate(size); + // fill the buffer with data + IntBuffer intBuffer = buffer.asIntBuffer(); + for(int i=0; i < size / Integer.BYTES; ++i) { + intBuffer.put(i); + } + // ensure we don't make unnecessary slices + ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100, + FileRange.createFileRange(100, size)); + Assertions.assertThat(buffer) + .describedAs("Slicing on the same offset shouldn't " + + "create a new buffer") + .isEqualTo(slice); + Assertions.assertThat(slice.position()) + .describedAs("Slicing should return buffers starting from position 0") + .isEqualTo(0); + + // try slicing a range + final int offset = 100; + final int sliceStart = 1024; + final int sliceLength = 16 * 1024; + slice = VectoredReadUtils.sliceTo(buffer, offset, + FileRange.createFileRange(offset + sliceStart, sliceLength)); + // make sure they aren't the same, but use the same backing data + Assertions.assertThat(buffer) + .describedAs("Slicing on new offset should " + + "create a new buffer") + .isNotEqualTo(slice); + Assertions.assertThat(buffer.array()) + .describedAs("Slicing should use the same underlying " + + "data") + .isEqualTo(slice.array()); + Assertions.assertThat(slice.position()) + .describedAs("Slicing should return buffers starting from position 0") + .isEqualTo(0); + // test the contents of the slice + intBuffer = slice.asIntBuffer(); + for(int i=0; i < sliceLength / Integer.BYTES; ++i) { + assertEquals("i = " + i, i + sliceStart / Integer.BYTES, intBuffer.get()); + } + } + + @Test + public void testRounding() { + for(int i=5; i < 10; ++i) { + assertEquals("i = "+ i, 5, VectoredReadUtils.roundDown(i, 5)); + assertEquals("i = "+ i, 10, VectoredReadUtils.roundUp(i+1, 5)); + } + assertEquals("Error while roundDown", 13, VectoredReadUtils.roundDown(13, 1)); + assertEquals("Error while roundUp", 13, VectoredReadUtils.roundUp(13, 1)); + } + + @Test + public void testMerge() { + // a reference to use for tracking + Object tracker1 = "one"; + Object tracker2 = "two"; + FileRange base = FileRange.createFileRange(2000, 1000, tracker1); + CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); + + // test when the gap between is too big + assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000, + FileRange.createFileRange(5000, 1000), 2000, 4000)); + assertEquals("Number of ranges in merged range shouldn't increase", + 1, mergeBase.getUnderlying().size()); + assertFileRange(mergeBase, 2000, 1000); + + // test when the total size gets exceeded + assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000, + FileRange.createFileRange(5000, 1000), 2001, 3999)); + assertEquals("Number of ranges in merged range shouldn't increase", + 1, mergeBase.getUnderlying().size()); + assertFileRange(mergeBase, 2000, 1000); + + // test when the merge works + assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000, + FileRange.createFileRange(5000, 1000, tracker2), + 2001, 4000)); + assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); + assertFileRange(mergeBase, 2000, 4000); + + Assertions.assertThat(mergeBase.getUnderlying().get(0).getReference()) + .describedAs("reference of range %s", mergeBase.getUnderlying().get(0)) + .isSameAs(tracker1); + Assertions.assertThat(mergeBase.getUnderlying().get(1).getReference()) + .describedAs("reference of range %s", mergeBase.getUnderlying().get(1)) + .isSameAs(tracker2); + + // reset the mergeBase and test with a 10:1 reduction + mergeBase = new CombinedFileRange(200, 300, base); + assertFileRange(mergeBase, 200, 100); + + assertTrue("ranges should get merged ", mergeBase.merge(500, 600, + FileRange.createFileRange(5000, 1000), 201, 400)); + assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); + assertFileRange(mergeBase, 200, 400); + } + + @Test + public void testSortAndMerge() { + List input = Arrays.asList( + FileRange.createFileRange(3000, 100, "1"), + FileRange.createFileRange(2100, 100, null), + FileRange.createFileRange(1000, 100, "3") + ); + assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + final List outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 100, 1001, 2500); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); + CombinedFileRange output = outputList.get(0); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(3); + // range[1000,3100) + assertFileRange(output, 1000, 2100); + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); + + // the minSeek doesn't allow the first two to merge + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); + final List list2 = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), + 100, 1000, 2100); + Assertions.assertThat(list2) + .describedAs("merged range size") + .hasSize(2); + assertFileRange(list2.get(0), 1000, 100); + + // range[2100,3100) + assertFileRange(list2.get(1), 2100, 1000); + + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(list2, 100, 1000)); + + // the maxSize doesn't allow the third range to merge + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + final List list3 = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), + 100, 1001, 2099); + Assertions.assertThat(list3) + .describedAs("merged range size") + .hasSize(2); + // range[1000,2200) + CombinedFileRange range0 = list3.get(0); + assertFileRange(range0, 1000, 1200); + assertFileRange(range0.getUnderlying().get(0), + 1000, 100, "3"); + assertFileRange(range0.getUnderlying().get(1), + 2100, 100, null); + CombinedFileRange range1 = list3.get(1); + // range[3000,3100) + assertFileRange(range1, 3000, 100); + assertFileRange(range1.getUnderlying().get(0), + 3000, 100, "1"); + + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(list3, 100, 800)); + + // test the round up and round down (the maxSize doesn't allow any merges) + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); + final List list4 = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), + 16, 1001, 100); + Assertions.assertThat(list4) + .describedAs("merged range size") + .hasSize(3); + // range[992,1104) + assertFileRange(list4.get(0), 992, 112); + // range[2096,2208) + assertFileRange(list4.get(1), 2096, 112); + // range[2992,3104) + assertFileRange(list4.get(2), 2992, 112); + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(list4, 16, 700)); + } + + /** + * Assert that a file range satisfies the conditions. + * @param range range to validate + * @param offset offset of range + * @param length range length + */ + private void assertFileRange(FileRange range, long offset, int length) { + Assertions.assertThat(range) + .describedAs("file range %s", range) + .isNotNull(); + Assertions.assertThat(range.getOffset()) + .describedAs("offset of %s", range) + .isEqualTo(offset); + Assertions.assertThat(range.getLength()) + .describedAs("length of %s", range) + .isEqualTo(length); + } + + /** + * Assert that a file range satisfies the conditions. + * @param range range to validate + * @param offset offset of range + * @param length range length + * @param reference reference; may be null. + */ + private void assertFileRange(FileRange range, long offset, int length, Object reference) { + assertFileRange(range, offset, length); + Assertions.assertThat(range.getReference()) + .describedAs("reference field of file range %s", range) + .isEqualTo(reference); + } + + + @Test + public void testSortAndMergeMoreCases() throws Exception { + List input = Arrays.asList( + FileRange.createFileRange(3000, 110), + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100), + FileRange.createFileRange(1000, 100) + ); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + List outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 1, 1001, 2500); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); + CombinedFileRange output = outputList.get(0); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(4); + + assertFileRange(output, 1000, 2110); + + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); + + outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 100, 1001, 2500); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); + output = outputList.get(0); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(4); + assertFileRange(output, 1000, 2200); + + assertTrue("merged output ranges are disjoint", + VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); + + } + + @Test + public void testValidateOverlappingRanges() throws Exception { + List input = Arrays.asList( + FileRange.createFileRange(100, 100), + FileRange.createFileRange(200, 100), + FileRange.createFileRange(250, 100) + ); + + intercept(UnsupportedOperationException.class, + () -> validateNonOverlappingAndReturnSortedRanges(input)); + + List input1 = Arrays.asList( + FileRange.createFileRange(100, 100), + FileRange.createFileRange(500, 100), + FileRange.createFileRange(1000, 100), + FileRange.createFileRange(1000, 100) + ); + + intercept(UnsupportedOperationException.class, + () -> validateNonOverlappingAndReturnSortedRanges(input1)); + + List input2 = Arrays.asList( + FileRange.createFileRange(100, 100), + FileRange.createFileRange(200, 100), + FileRange.createFileRange(300, 100) + ); + // consecutive ranges should pass. + validateNonOverlappingAndReturnSortedRanges(input2); + } + + @Test + public void testMaxSizeZeroDisablesMering() throws Exception { + List randomRanges = Arrays.asList( + FileRange.createFileRange(3000, 110), + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100) + ); + assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0); + assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0); + assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0); + } + + private void assertEqualRangeCountsAfterMerging(List inputRanges, + int chunkSize, + int minimumSeek, + int maxSize) { + List combinedFileRanges = VectoredReadUtils + .mergeSortedRanges(inputRanges, chunkSize, minimumSeek, maxSize); + Assertions.assertThat(combinedFileRanges) + .describedAs("Mismatch in number of ranges post merging") + .hasSize(inputRanges.size()); + } + + interface Stream extends PositionedReadable, ByteBufferPositionedReadable { + // nothing + } + + static void fillBuffer(ByteBuffer buffer) { + byte b = 0; + while (buffer.remaining() > 0) { + buffer.put(b++); + } + } + + @Test + public void testReadRangeFromByteBufferPositionedReadable() throws Exception { + Stream stream = Mockito.mock(Stream.class); + Mockito.doAnswer(invocation -> { + fillBuffer(invocation.getArgument(1)); + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + CompletableFuture result = + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), + ByteBuffer::allocate); + assertFutureCompletedSuccessfully(result); + ByteBuffer buffer = result.get(); + assertEquals("Size of result buffer", 100, buffer.remaining()); + byte b = 0; + while (buffer.remaining() > 0) { + assertEquals("remain = " + buffer.remaining(), b++, buffer.get()); + } + + // test an IOException + Mockito.reset(stream); + Mockito.doThrow(new IOException("foo")) + .when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + result = + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), + ByteBuffer::allocate); + assertFutureFailedExceptionally(result); + } + + static void runReadRangeFromPositionedReadable(IntFunction allocate) + throws Exception { + PositionedReadable stream = Mockito.mock(PositionedReadable.class); + Mockito.doAnswer(invocation -> { + byte b=0; + byte[] buffer = invocation.getArgument(1); + for(int i=0; i < buffer.length; ++i) { + buffer[i] = b++; + } + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.anyInt(), + ArgumentMatchers.anyInt()); + CompletableFuture result = + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), + allocate); + assertFutureCompletedSuccessfully(result); + ByteBuffer buffer = result.get(); + assertEquals("Size of result buffer", 100, buffer.remaining()); + byte b = 0; + while (buffer.remaining() > 0) { + assertEquals("remain = " + buffer.remaining(), b++, buffer.get()); + } + + // test an IOException + Mockito.reset(stream); + Mockito.doThrow(new IOException("foo")) + .when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.anyInt(), + ArgumentMatchers.anyInt()); + result = + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), + ByteBuffer::allocate); + assertFutureFailedExceptionally(result); + } + + @Test + public void testReadRangeArray() throws Exception { + runReadRangeFromPositionedReadable(ByteBuffer::allocate); + } + + @Test + public void testReadRangeDirect() throws Exception { + runReadRangeFromPositionedReadable(ByteBuffer::allocateDirect); + } + + static void validateBuffer(String message, ByteBuffer buffer, int start) { + byte expected = (byte) start; + while (buffer.remaining() > 0) { + assertEquals(message + " remain: " + buffer.remaining(), expected++, + buffer.get()); + } + } + + @Test + public void testReadVectored() throws Exception { + List input = Arrays.asList(FileRange.createFileRange(0, 100), + FileRange.createFileRange(100_000, 100), + FileRange.createFileRange(200_000, 100)); + runAndValidateVectoredRead(input); + } + + @Test + public void testReadVectoredZeroBytes() throws Exception { + List input = Arrays.asList(FileRange.createFileRange(0, 0), + FileRange.createFileRange(100_000, 100), + FileRange.createFileRange(200_000, 0)); + runAndValidateVectoredRead(input); + } + + + private void runAndValidateVectoredRead(List input) + throws Exception { + Stream stream = Mockito.mock(Stream.class); + Mockito.doAnswer(invocation -> { + fillBuffer(invocation.getArgument(1)); + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + // should not merge the ranges + VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate); + Mockito.verify(stream, Mockito.times(3)) + .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); + for (int b = 0; b < input.size(); ++b) { + validateBuffer("buffer " + b, input.get(b).getData().get(), 0); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index d6a1fb1f0b..a39201df24 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -42,54 +42,39 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.functional.FutureIO; -import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; -import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; -import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR; import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; -import static org.apache.hadoop.fs.contract.ContractTestUtils.range; import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; -import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; @RunWith(Parameterized.class) public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase { - private static final Logger LOG = - LoggerFactory.getLogger(AbstractContractVectoredReadTest.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class); public static final int DATASET_LEN = 64 * 1024; protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; - /** - * Buffer allocator for vector IO. - */ private final IntFunction allocate; - /** - * Buffer pool for vector IO. - */ - private final ElasticByteBufferPool pool = + private final WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); private final String bufferType; - /** - * Path to the vector file. - */ - private Path vectorPath; - @Parameterized.Parameters(name = "Buffer type : {0}") public static List params() { return Arrays.asList("direct", "array"); @@ -97,73 +82,52 @@ public static List params() { public AbstractContractVectoredReadTest(String bufferType) { this.bufferType = bufferType; - final boolean isDirect = !"array".equals(bufferType); - this.allocate = size -> pool.getBuffer(isDirect, size); + this.allocate = value -> { + boolean isDirect = !"array".equals(bufferType); + return pool.getBuffer(isDirect, value); + }; } - /** - * Get the buffer allocator. - * @return allocator function for vector IO. - */ - protected IntFunction getAllocate() { + public IntFunction getAllocate() { return allocate; } - /** - * Get the vector IO buffer pool. - * @return a pool. - */ - - protected ElasticByteBufferPool getPool() { + public WeakReferencedElasticByteBufferPool getPool() { return pool; } @Override public void setup() throws Exception { super.setup(); - vectorPath = path(VECTORED_READ_FILE_NAME); + Path path = path(VECTORED_READ_FILE_NAME); FileSystem fs = getFileSystem(); - createFile(fs, vectorPath, true, DATASET); + createFile(fs, path, true, DATASET); } @Override public void teardown() throws Exception { - pool.release(); super.teardown(); + pool.release(); } - /** - * Open the vector file. - * @return the input stream. - * @throws IOException failure. - */ - protected FSDataInputStream openVectorFile() throws IOException { - return openVectorFile(getFileSystem()); - } - - /** - * Open the vector file. - * @param fs filesystem to use - * @return the input stream. - * @throws IOException failure. - */ - protected FSDataInputStream openVectorFile(final FileSystem fs) throws IOException { - return awaitFuture( - fs.openFile(vectorPath) - .opt(FS_OPTION_OPENFILE_LENGTH, DATASET_LEN) - .opt(FS_OPTION_OPENFILE_READ_POLICY, - FS_OPTION_OPENFILE_READ_POLICY_VECTOR) - .build()); + @Test + public void testVectoredReadCapability() throws Exception { + FileSystem fs = getFileSystem(); + String[] vectoredReadCapability = new String[]{StreamCapabilities.VECTOREDIO}; + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + assertCapabilities(in, vectoredReadCapability, null); + } } @Test public void testVectoredReadMultipleRanges() throws Exception { + FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); for (int i = 0; i < 10; i++) { FileRange fileRange = FileRange.createFileRange(i * 100, 100); fileRanges.add(fileRange); } - try (FSDataInputStream in = openVectorFile()) { + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; int i = 0; @@ -173,20 +137,21 @@ public void testVectoredReadMultipleRanges() throws Exception { CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); combinedFuture.get(); - validateVectoredReadResult(fileRanges, DATASET, 0); + validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, pool); } } @Test public void testVectoredReadAndReadFully() throws Exception { + FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - range(fileRanges, 100, 100); - try (FSDataInputStream in = openVectorFile()) { + fileRanges.add(FileRange.createFileRange(100, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); byte[] readFullRes = new byte[100]; in.readFully(100, readFullRes); - ByteBuffer vecRes = FutureIO.awaitFuture(fileRanges.get(0).getData()); + ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData()); Assertions.assertThat(vecRes) .describedAs("Result from vectored read and readFully must match") .isEqualByComparingTo(ByteBuffer.wrap(readFullRes)); @@ -194,34 +159,20 @@ public void testVectoredReadAndReadFully() throws Exception { } } - @Test - public void testVectoredReadWholeFile() throws Exception { - describe("Read the whole file in one single vectored read"); - List fileRanges = new ArrayList<>(); - range(fileRanges, 0, DATASET_LEN); - try (FSDataInputStream in = openVectorFile()) { - in.readVectored(fileRanges, allocate); - ByteBuffer vecRes = FutureIO.awaitFuture(fileRanges.get(0).getData()); - Assertions.assertThat(vecRes) - .describedAs("Result from vectored read and readFully must match") - .isEqualByComparingTo(ByteBuffer.wrap(DATASET)); - returnBuffersToPoolPostRead(fileRanges, pool); - } - } - /** * As the minimum seek value is 4*1024,none of the below ranges * will get merged. */ @Test public void testDisjointRanges() throws Exception { + FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - range(fileRanges, 0, 100); - range(fileRanges, 4_000 + 101, 100); - range(fileRanges, 16_000 + 101, 100); - try (FSDataInputStream in = openVectorFile()) { + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(4_000 + 101, 100)); + fileRanges.add(FileRange.createFileRange(16_000 + 101, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges, DATASET, 0); + validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -232,14 +183,14 @@ public void testDisjointRanges() throws Exception { */ @Test public void testAllRangesMergedIntoOne() throws Exception { + FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - final int length = 100; - range(fileRanges, 0, length); - range(fileRanges, 4_000 - length - 1, length); - range(fileRanges, 8_000 - length - 1, length); - try (FSDataInputStream in = openVectorFile()) { + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(4_000 - 101, 100)); + fileRanges.add(FileRange.createFileRange(8_000 - 101, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges, DATASET, 0); + validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -252,11 +203,11 @@ public void testAllRangesMergedIntoOne() throws Exception { public void testSomeRangesMergedSomeUnmerged() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - range(fileRanges, 8 * 1024, 100); - range(fileRanges, 14 * 1024, 100); - range(fileRanges, 10 * 1024, 100); - range(fileRanges, 2 * 1024 - 101, 100); - range(fileRanges, 40 * 1024, 1024); + fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); CompletableFuture builder = fs.openFile(path(VECTORED_READ_FILE_NAME)) @@ -264,185 +215,158 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception { .build(); try (FSDataInputStream in = builder.get()) { in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges, DATASET, 0); + validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, pool); } } - /** - * Vectored IO doesn't support overlapping ranges. - */ @Test public void testOverlappingRanges() throws Exception { - verifyExceptionalVectoredRead( - getSampleOverlappingRanges(), - IllegalArgumentException.class); + FileSystem fs = getFileSystem(); + List fileRanges = getSampleOverlappingRanges(); + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } } - /** - * Same ranges are special case of overlapping. - */ @Test public void testSameRanges() throws Exception { - verifyExceptionalVectoredRead( - getSampleSameRanges(), - IllegalArgumentException.class); - } - - /** - * A null range is not permitted. - */ - @Test - public void testNullRange() throws Exception { - List fileRanges = new ArrayList<>(); - range(fileRanges, 500, 100); - fileRanges.add(null); - verifyExceptionalVectoredRead( - fileRanges, - NullPointerException.class); - } - /** - * A null range is not permitted. - */ - @Test - public void testNullRangeList() throws Exception { - verifyExceptionalVectoredRead( - null, - NullPointerException.class); + // Same ranges are special case of overlapping only. + FileSystem fs = getFileSystem(); + List fileRanges = getSampleSameRanges(); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } } @Test public void testSomeRandomNonOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - range(fileRanges, 500, 100); - range(fileRanges, 1000, 200); - range(fileRanges, 50, 10); - range(fileRanges, 10, 5); - try (FSDataInputStream in = openVectorFile()) { + fileRanges.add(FileRange.createFileRange(500, 100)); + fileRanges.add(FileRange.createFileRange(1000, 200)); + fileRanges.add(FileRange.createFileRange(50, 10)); + fileRanges.add(FileRange.createFileRange(10, 5)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges, DATASET, 0); + validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, pool); } } @Test public void testConsecutiveRanges() throws Exception { + FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - final int offset = 500; - final int length = 100; - range(fileRanges, offset, length); - range(fileRanges, 600, 200); - range(fileRanges, 800, 100); - try (FSDataInputStream in = openVectorFile()) { + fileRanges.add(FileRange.createFileRange(500, 100)); + fileRanges.add(FileRange.createFileRange(600, 200)); + fileRanges.add(FileRange.createFileRange(800, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges, DATASET, 0); + validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, pool); } } /** - * Test to validate EOF ranges. - *

- * Default implementation fails with EOFException + * Test to validate EOF ranges. Default implementation fails with EOFException * while reading the ranges. Some implementation like s3, checksum fs fail fast * as they already have the file length calculated. - * The contract option {@link ContractOptions#VECTOR_IO_EARLY_EOF_CHECK} is used - * to determine which check to perform. */ @Test public void testEOFRanges() throws Exception { - describe("Testing reading with an offset past the end of the file"); - List fileRanges = range(DATASET_LEN + 1, 100); - - if (isSupported(VECTOR_IO_EARLY_EOF_CHECK)) { - LOG.info("Expecting early EOF failure"); - verifyExceptionalVectoredRead(fileRanges, EOFException.class); - } else { - expectEOFinRead(fileRanges); - } - } - - - @Test - public void testVectoredReadWholeFilePlusOne() throws Exception { - describe("Try to read whole file plus 1 byte"); - List fileRanges = range(0, DATASET_LEN + 1); - - if (isSupported(VECTOR_IO_EARLY_EOF_CHECK)) { - LOG.info("Expecting early EOF failure"); - verifyExceptionalVectoredRead(fileRanges, EOFException.class); - } else { - expectEOFinRead(fileRanges); - } - } - - private void expectEOFinRead(final List fileRanges) throws Exception { - LOG.info("Expecting late EOF failure"); - try (FSDataInputStream in = openVectorFile()) { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); for (FileRange res : fileRanges) { CompletableFuture data = res.getData(); interceptFuture(EOFException.class, - "", - ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, - TimeUnit.SECONDS, - data); + "", + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + data); } } } @Test public void testNegativeLengthRange() throws Exception { - - verifyExceptionalVectoredRead(range(0, -50), IllegalArgumentException.class); + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(0, -50)); + verifyExceptionalVectoredRead(fs, fileRanges, IllegalArgumentException.class); } @Test public void testNegativeOffsetRange() throws Exception { - verifyExceptionalVectoredRead(range(-1, 50), EOFException.class); + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(-1, 50)); + verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); } @Test public void testNormalReadAfterVectoredRead() throws Exception { + FileSystem fs = getFileSystem(); List fileRanges = createSampleNonOverlappingRanges(); - try (FSDataInputStream in = openVectorFile()) { + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); // read starting 200 bytes - final int len = 200; - byte[] res = new byte[len]; - in.readFully(res, 0, len); + byte[] res = new byte[200]; + in.read(res, 0, 200); ByteBuffer buffer = ByteBuffer.wrap(res); - assertDatasetEquals(0, "normal_read", buffer, len, DATASET); - validateVectoredReadResult(fileRanges, DATASET, 0); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + Assertions.assertThat(in.getPos()) + .describedAs("Vectored read shouldn't change file pointer.") + .isEqualTo(200); + validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, pool); } } @Test public void testVectoredReadAfterNormalRead() throws Exception { + FileSystem fs = getFileSystem(); List fileRanges = createSampleNonOverlappingRanges(); - try (FSDataInputStream in = openVectorFile()) { + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { // read starting 200 bytes - final int len = 200; - byte[] res = new byte[len]; - in.readFully(res, 0, len); + byte[] res = new byte[200]; + in.read(res, 0, 200); ByteBuffer buffer = ByteBuffer.wrap(res); - assertDatasetEquals(0, "normal_read", buffer, len, DATASET); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + Assertions.assertThat(in.getPos()) + .describedAs("Vectored read shouldn't change file pointer.") + .isEqualTo(200); in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges, DATASET, 0); + validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, pool); } } @Test public void testMultipleVectoredReads() throws Exception { + FileSystem fs = getFileSystem(); List fileRanges1 = createSampleNonOverlappingRanges(); List fileRanges2 = createSampleNonOverlappingRanges(); - try (FSDataInputStream in = openVectorFile()) { + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges1, allocate); in.readVectored(fileRanges2, allocate); - validateVectoredReadResult(fileRanges2, DATASET, 0); - validateVectoredReadResult(fileRanges1, DATASET, 0); + validateVectoredReadResult(fileRanges2, DATASET); + validateVectoredReadResult(fileRanges1, DATASET); returnBuffersToPoolPostRead(fileRanges1, pool); returnBuffersToPoolPostRead(fileRanges2, pool); } @@ -455,18 +379,19 @@ public void testMultipleVectoredReads() throws Exception { */ @Test public void testVectoredIOEndToEnd() throws Exception { + FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - range(fileRanges, 8 * 1024, 100); - range(fileRanges, 14 * 1024, 100); - range(fileRanges, 10 * 1024, 100); - range(fileRanges, 2 * 1024 - 101, 100); - range(fileRanges, 40 * 1024, 1024); + fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); ExecutorService dataProcessor = Executors.newFixedThreadPool(5); CountDownLatch countDown = new CountDownLatch(fileRanges.size()); - try (FSDataInputStream in = openVectorFile()) { - in.readVectored(fileRanges, this.allocate); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, value -> pool.getBuffer(true, value)); for (FileRange res : fileRanges) { dataProcessor.submit(() -> { try { @@ -491,70 +416,70 @@ public void testVectoredIOEndToEnd() throws Exception { private void readBufferValidateDataAndReturnToPool(FileRange res, CountDownLatch countDownLatch) throws IOException, TimeoutException { - try { - CompletableFuture data = res.getData(); - // Read the data and perform custom operation. Here we are just - // validating it with original data. - FutureIO.awaitFuture(data.thenAccept(buffer -> { - assertDatasetEquals((int) res.getOffset(), - "vecRead", buffer, res.getLength(), DATASET); - // return buffer to the pool once read. - // If the read failed, this doesn't get invoked. - pool.putBuffer(buffer); - }), - VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } finally { - // countdown to notify main thread that processing has been done. - countDownLatch.countDown(); - } + CompletableFuture data = res.getData(); + // Read the data and perform custom operation. Here we are just + // validating it with original data. + FutureIO.awaitFuture(data.thenAccept(buffer -> { + assertDatasetEquals((int) res.getOffset(), + "vecRead", buffer, res.getLength(), DATASET); + // return buffer to the pool once read. + pool.putBuffer(buffer); + }), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + // countdown to notify main thread that processing has been done. + countDownLatch.countDown(); } protected List createSampleNonOverlappingRanges() { List fileRanges = new ArrayList<>(); - range(fileRanges, 0, 100); - range(fileRanges, 110, 50); + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(110, 50)); return fileRanges; } protected List getSampleSameRanges() { List fileRanges = new ArrayList<>(); - range(fileRanges, 8_000, 1000); - range(fileRanges, 8_000, 1000); - range(fileRanges, 8_000, 1000); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); return fileRanges; } protected List getSampleOverlappingRanges() { List fileRanges = new ArrayList<>(); - range(fileRanges, 100, 500); - range(fileRanges, 400, 500); + fileRanges.add(FileRange.createFileRange(100, 500)); + fileRanges.add(FileRange.createFileRange(400, 500)); return fileRanges; } protected List getConsecutiveRanges() { List fileRanges = new ArrayList<>(); - range(fileRanges, 100, 500); - range(fileRanges, 600, 500); + fileRanges.add(FileRange.createFileRange(100, 500)); + fileRanges.add(FileRange.createFileRange(600, 500)); return fileRanges; } /** * Validate that exceptions must be thrown during a vectored * read operation with specific input ranges. + * @param fs FileSystem instance. * @param fileRanges input file ranges. * @param clazz type of exception expected. - * @throws Exception any other exception. + * @throws Exception any other IOE. */ protected void verifyExceptionalVectoredRead( + FileSystem fs, List fileRanges, Class clazz) throws Exception { - try (FSDataInputStream in = openVectorFile()) { - intercept(clazz, () -> { - in.readVectored(fileRanges, allocate); - return "triggered read of " + fileRanges.size() + " ranges" + " against " + in; - }); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .build(); + try (FSDataInputStream in = builder.get()) { + intercept(clazz, + () -> in.readVectored(fileRanges, allocate)); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java index f7cf27fb69..29cd29dfaf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java @@ -256,9 +256,4 @@ public interface ContractOptions { * HDFS does not do this. */ String METADATA_UPDATED_ON_HSYNC = "metadata_updated_on_hsync"; - - /** - * Does vector read check file length on open rather than in the read call? - */ - String VECTOR_IO_EARLY_EOF_CHECK = "vector-io-early-eof-check"; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 66b1057f7b..70a5e2de53 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -1117,14 +1117,11 @@ public static void validateFileContent(byte[] concat, byte[][] bytes) { * Utility to validate vectored read results. * @param fileRanges input ranges. * @param originalData original data. - * @param baseOffset base offset of the original data * @throws IOException any ioe. */ - public static void validateVectoredReadResult( - final List fileRanges, - final byte[] originalData, - final long baseOffset) - throws IOException, TimeoutException { + public static void validateVectoredReadResult(List fileRanges, + byte[] originalData) + throws IOException, TimeoutException { CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; int i = 0; for (FileRange res : fileRanges) { @@ -1140,8 +1137,8 @@ public static void validateVectoredReadResult( ByteBuffer buffer = FutureIO.awaitFuture(data, VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertDatasetEquals((int) (res.getOffset() - baseOffset), "vecRead", - buffer, res.getLength(), originalData); + assertDatasetEquals((int) res.getOffset(), "vecRead", + buffer, res.getLength(), originalData); } } @@ -1176,19 +1173,15 @@ public static void returnBuffersToPoolPostRead(List fileRanges, * @param originalData original data. */ public static void assertDatasetEquals( - final int readOffset, - final String operation, - final ByteBuffer data, - final int length, - final byte[] originalData) { + final int readOffset, + final String operation, + final ByteBuffer data, + int length, byte[] originalData) { for (int i = 0; i < length; i++) { int o = readOffset + i; - final byte orig = originalData[o]; - final byte current = data.get(); - Assertions.assertThat(current) - .describedAs("%s with read offset %d: data[0x%02X] != DATASET[0x%02X]", - operation, o, i, current) - .isEqualTo(orig); + assertEquals(operation + " with read offset " + readOffset + + ": data[" + i + "] != DATASET[" + o + "]", + originalData[o], data.get()); } } @@ -1769,43 +1762,6 @@ public static long readStream(InputStream in) { } } - /** - * Create a range list with a single range within it. - * @param offset offset - * @param length length - * @return the list. - */ - public static List range( - final long offset, - final int length) { - return range(new ArrayList<>(), offset, length); - } - - /** - * Create a range and add it to the supplied list. - * @param fileRanges list of ranges - * @param offset offset - * @param length length - * @return the list. - */ - public static List range( - final List fileRanges, - final long offset, - final int length) { - fileRanges.add(FileRange.createFileRange(offset, length)); - return fileRanges; - } - - /** - * Given a list of ranges, calculate the total size. - * @param fileRanges range list. - * @return total size of all reads. - */ - public static long totalReadSize(final List fileRanges) { - return fileRanges.stream() - .mapToLong(FileRange::getLength) - .sum(); - } /** * Results of recursive directory creation/scan operations. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 23cfcce75a..5ee8880153 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.contract.localfs; +import java.io.EOFException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; @@ -55,7 +57,7 @@ public void testChecksumValidationDuringVectoredRead() throws Exception { Path testPath = path("big_range_checksum_file"); List someRandomRanges = new ArrayList<>(); someRandomRanges.add(FileRange.createFileRange(10, 1024)); - someRandomRanges.add(FileRange.createFileRange(1040, 1024)); + someRandomRanges.add(FileRange.createFileRange(1025, 1024)); validateCheckReadException(testPath, DATASET_LEN, someRandomRanges); } @@ -89,7 +91,7 @@ private void validateCheckReadException(Path testPath, CompletableFuture fis = localFs.openFile(testPath).build(); try (FSDataInputStream in = fis.get()){ in.readVectored(ranges, getAllocate()); - validateVectoredReadResult(ranges, datasetCorrect, 0); + validateVectoredReadResult(ranges, datasetCorrect); } final byte[] datasetCorrupted = ContractTestUtils.dataset(length, 'a', 64); try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){ @@ -101,7 +103,7 @@ private void validateCheckReadException(Path testPath, // Expect checksum exception when data is updated directly through // raw local fs instance. intercept(ChecksumException.class, - () -> validateVectoredReadResult(ranges, datasetCorrupted, 0)); + () -> validateVectoredReadResult(ranges, datasetCorrupted)); } } @Test @@ -122,8 +124,20 @@ public void tesChecksumVectoredReadBoundaries() throws Exception { smallRange.add(FileRange.createFileRange(1000, 71)); try (FSDataInputStream in = fis.get()){ in.readVectored(smallRange, getAllocate()); - validateVectoredReadResult(smallRange, datasetCorrect, 0); + validateVectoredReadResult(smallRange, datasetCorrect); } } + + /** + * Overriding in checksum fs as vectored read api fails fast + * in case of EOF requested range. + */ + @Override + public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java deleted file mode 100644 index 2a290058ca..0000000000 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java +++ /dev/null @@ -1,804 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.impl; - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.IntBuffer; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.IntFunction; - -import org.assertj.core.api.Assertions; -import org.assertj.core.api.ListAssert; -import org.assertj.core.api.ObjectAssert; -import org.junit.Test; -import org.mockito.ArgumentMatchers; -import org.mockito.Mockito; - -import org.apache.hadoop.fs.ByteBufferPositionedReadable; -import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.hadoop.fs.VectoredReadUtils; -import org.apache.hadoop.test.HadoopTestBase; - -import static java.util.Arrays.asList; -import static org.apache.hadoop.fs.FileRange.createFileRange; -import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; -import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; -import static org.apache.hadoop.fs.VectoredReadUtils.readRangeFrom; -import static org.apache.hadoop.fs.VectoredReadUtils.readVectored; -import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; -import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; -import static org.apache.hadoop.test.LambdaTestUtils.intercept; -import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully; -import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally; - -/** - * Test behavior of {@link VectoredReadUtils}. - */ -public class TestVectoredReadUtils extends HadoopTestBase { - - /** - * Test {@link VectoredReadUtils#sliceTo(ByteBuffer, long, FileRange)}. - */ - @Test - public void testSliceTo() { - final int size = 64 * 1024; - ByteBuffer buffer = ByteBuffer.allocate(size); - // fill the buffer with data - IntBuffer intBuffer = buffer.asIntBuffer(); - for(int i=0; i < size / Integer.BYTES; ++i) { - intBuffer.put(i); - } - // ensure we don't make unnecessary slices - ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100, - createFileRange(100, size)); - Assertions.assertThat(buffer) - .describedAs("Slicing on the same offset shouldn't " + - "create a new buffer") - .isEqualTo(slice); - Assertions.assertThat(slice.position()) - .describedAs("Slicing should return buffers starting from position 0") - .isEqualTo(0); - - // try slicing a range - final int offset = 100; - final int sliceStart = 1024; - final int sliceLength = 16 * 1024; - slice = VectoredReadUtils.sliceTo(buffer, offset, - createFileRange(offset + sliceStart, sliceLength)); - // make sure they aren't the same, but use the same backing data - Assertions.assertThat(buffer) - .describedAs("Slicing on new offset should create a new buffer") - .isNotEqualTo(slice); - Assertions.assertThat(buffer.array()) - .describedAs("Slicing should use the same underlying data") - .isEqualTo(slice.array()); - Assertions.assertThat(slice.position()) - .describedAs("Slicing should return buffers starting from position 0") - .isEqualTo(0); - // test the contents of the slice - intBuffer = slice.asIntBuffer(); - for(int i=0; i < sliceLength / Integer.BYTES; ++i) { - assertEquals("i = " + i, i + sliceStart / Integer.BYTES, intBuffer.get()); - } - } - - /** - * Test {@link VectoredReadUtils#roundUp(long, int)} - * and {@link VectoredReadUtils#roundDown(long, int)}. - */ - @Test - public void testRounding() { - for (int i = 5; i < 10; ++i) { - assertEquals("i = " + i, 5, VectoredReadUtils.roundDown(i, 5)); - assertEquals("i = " + i, 10, VectoredReadUtils.roundUp(i + 1, 5)); - } - assertEquals("Error while roundDown", 13, VectoredReadUtils.roundDown(13, 1)); - assertEquals("Error while roundUp", 13, VectoredReadUtils.roundUp(13, 1)); - } - - /** - * Test {@link CombinedFileRange#merge(long, long, FileRange, int, int)}. - */ - @Test - public void testMerge() { - // a reference to use for tracking - Object tracker1 = "one"; - Object tracker2 = "two"; - FileRange base = createFileRange(2000, 1000, tracker1); - CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); - - // test when the gap between is too big - assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000, - createFileRange(5000, 1000), 2000, 4000)); - assertUnderlyingSize(mergeBase, - "Number of ranges in merged range shouldn't increase", - 1); - assertFileRange(mergeBase, 2000, 1000); - - // test when the total size gets exceeded - assertFalse("Large size ranges shouldn't get merged", - mergeBase.merge(5000, 6000, - createFileRange(5000, 1000), 2001, 3999)); - assertEquals("Number of ranges in merged range shouldn't increase", - 1, mergeBase.getUnderlying().size()); - assertFileRange(mergeBase, 2000, 1000); - - // test when the merge works - assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000, - createFileRange(5000, 1000, tracker2), - 2001, 4000)); - assertUnderlyingSize(mergeBase, "merge list after merge", 2); - assertFileRange(mergeBase, 2000, 4000); - - Assertions.assertThat(mergeBase.getUnderlying().get(0).getReference()) - .describedAs("reference of range %s", mergeBase.getUnderlying().get(0)) - .isSameAs(tracker1); - Assertions.assertThat(mergeBase.getUnderlying().get(1).getReference()) - .describedAs("reference of range %s", mergeBase.getUnderlying().get(1)) - .isSameAs(tracker2); - - // reset the mergeBase and test with a 10:1 reduction - mergeBase = new CombinedFileRange(200, 300, base); - assertFileRange(mergeBase, 200, 100); - - assertTrue("ranges should get merged ", mergeBase.merge(500, 600, - createFileRange(5000, 1000), 201, 400)); - assertUnderlyingSize(mergeBase, "merge list after merge", 2); - assertFileRange(mergeBase, 200, 400); - } - - /** - * Assert that a combined file range has a specific number of underlying ranges. - * @param combinedFileRange file range - * @param description text for errors - * @param expected expected value. - */ - private static ListAssert assertUnderlyingSize( - final CombinedFileRange combinedFileRange, - final String description, - final int expected) { - return Assertions.assertThat(combinedFileRange.getUnderlying()) - .describedAs(description) - .hasSize(expected); - } - - /** - * Test sort and merge logic. - */ - @Test - public void testSortAndMerge() { - List input = asList( - createFileRange(3000, 100, "1"), - createFileRange(2100, 100, null), - createFileRange(1000, 100, "3") - ); - assertIsNotOrderedDisjoint(input, 100, 800); - final List outputList = mergeSortedRanges( - sortRanges(input), 100, 1001, 2500); - - assertRangeListSize(outputList, 1); - CombinedFileRange output = outputList.get(0); - assertUnderlyingSize(output, "merged range underlying size", 3); - // range[1000,3100) - assertFileRange(output, 1000, 2100); - assertOrderedDisjoint(outputList, 100, 800); - - // the minSeek doesn't allow the first two to merge - assertIsNotOrderedDisjoint(input, 100, 100); - final List list2 = mergeSortedRanges( - sortRanges(input), - 100, 1000, 2100); - assertRangeListSize(list2, 2); - assertRangeElement(list2, 0, 1000, 100); - assertRangeElement(list2, 1, 2100, 1000); - - assertOrderedDisjoint(list2, 100, 1000); - - // the maxSize doesn't allow the third range to merge - assertIsNotOrderedDisjoint(input, 100, 800); - final List list3 = mergeSortedRanges( - sortRanges(input), - 100, 1001, 2099); - assertRangeListSize(list3, 2); - CombinedFileRange range0 = list3.get(0); - assertFileRange(range0, 1000, 1200); - final List underlying = range0.getUnderlying(); - assertFileRange(underlying.get(0), - 1000, 100, "3"); - assertFileRange(underlying.get(1), - 2100, 100, null); - CombinedFileRange range1 = list3.get(1); - // range[3000,3100) - assertFileRange(range1, 3000, 100); - assertFileRange(range1.getUnderlying().get(0), - 3000, 100, "1"); - - assertOrderedDisjoint(list3, 100, 800); - - // test the round up and round down (the maxSize doesn't allow any merges) - assertIsNotOrderedDisjoint(input, 16, 700); - final List list4 = mergeSortedRanges( - sortRanges(input), - 16, 1001, 100); - assertRangeListSize(list4, 3); - // range[992,1104) - assertRangeElement(list4, 0, 992, 112); - // range[2096,2208) - assertRangeElement(list4, 1, 2096, 112); - // range[2992,3104) - assertRangeElement(list4, 2, 2992, 112); - assertOrderedDisjoint(list4, 16, 700); - } - - /** - * Assert that a file range has the specified start position and length. - * @param range range to validate - * @param start offset of range - * @param length range length - * @param type of range - */ - private static void assertFileRange( - ELEMENT range, long start, int length) { - - Assertions.assertThat(range) - .describedAs("file range %s", range) - .isNotNull(); - Assertions.assertThat(range.getOffset()) - .describedAs("offset of %s", range) - .isEqualTo(start); - Assertions.assertThat(range.getLength()) - .describedAs("length of %s", range) - .isEqualTo(length); - } - - /** - * Assert that a file range satisfies the conditions. - * @param range range to validate - * @param offset offset of range - * @param length range length - * @param reference reference; may be null. - * @param type of range - */ - private static void assertFileRange( - ELEMENT range, long offset, int length, Object reference) { - - assertFileRange(range, offset, length); - Assertions.assertThat(range.getReference()) - .describedAs("reference field of file range %s", range) - .isEqualTo(reference); - } - - /** - * Assert that a range list has a single element with the given start and length. - * @param ranges range list - * @param start start position - * @param length length of range - * @param type of range - * @return the ongoing assertion. - */ - private static ObjectAssert assertIsSingleRange( - final List ranges, - final long start, - final int length) { - assertRangeListSize(ranges, 1); - return assertRangeElement(ranges, 0, start, length); - } - - /** - * Assert that a range list has the exact size specified. - * @param ranges range list - * @param size expected size - * @param type of range - * @return the ongoing assertion. - */ - private static ListAssert assertRangeListSize( - final List ranges, - final int size) { - return Assertions.assertThat(ranges) - .describedAs("coalesced ranges") - .hasSize(size); - } - - /** - * Assert that a range list has at least the size specified. - * @param ranges range list - * @param size expected size - * @param type of range - * @return the ongoing assertion. - */ - private static ListAssert assertRangesCountAtLeast( - final List ranges, - final int size) { - return Assertions.assertThat(ranges) - .describedAs("coalesced ranges") - .hasSizeGreaterThanOrEqualTo(size); - } - - /** - * Assert that a range element has the given start offset and length. - * @param ranges range list - * @param index index of range - * @param start position - * @param length length of range - * @param type of range - * @return the ongoing assertion. - */ - private static ObjectAssert assertRangeElement( - final List ranges, - final int index, - final long start, - final int length) { - return assertRangesCountAtLeast(ranges, index + 1) - .element(index) - .describedAs("range") - .satisfies(r -> assertFileRange(r, start, length)); - } - - /** - * Assert that a file range is ordered and disjoint. - * @param input the list of input ranges. - * @param chunkSize the size of the chunks that the offset and end must align to. - * @param minimumSeek the minimum distance between ranges. - */ - private static void assertOrderedDisjoint( - List input, - int chunkSize, - int minimumSeek) { - Assertions.assertThat(isOrderedDisjoint(input, chunkSize, minimumSeek)) - .describedAs("ranges are ordered and disjoint") - .isTrue(); - } - - /** - * Assert that a file range is not ordered or not disjoint. - * @param input the list of input ranges. - * @param chunkSize the size of the chunks that the offset and end must align to. - * @param minimumSeek the minimum distance between ranges. - */ - private static void assertIsNotOrderedDisjoint( - List input, - int chunkSize, - int minimumSeek) { - Assertions.assertThat(isOrderedDisjoint(input, chunkSize, minimumSeek)) - .describedAs("Ranges are non disjoint/ordered") - .isFalse(); - } - - /** - * Test sort and merge. - */ - @Test - public void testSortAndMergeMoreCases() throws Exception { - List input = asList( - createFileRange(3000, 110), - createFileRange(3000, 100), - createFileRange(2100, 100), - createFileRange(1000, 100) - ); - assertIsNotOrderedDisjoint(input, 100, 800); - List outputList = mergeSortedRanges( - sortRanges(input), 1, 1001, 2500); - Assertions.assertThat(outputList) - .describedAs("merged range size") - .hasSize(1); - CombinedFileRange output = outputList.get(0); - assertUnderlyingSize(output, "merged range underlying size", 4); - - assertFileRange(output, 1000, 2110); - - assertOrderedDisjoint(outputList, 1, 800); - - outputList = mergeSortedRanges( - sortRanges(input), 100, 1001, 2500); - assertRangeListSize(outputList, 1); - - output = outputList.get(0); - assertUnderlyingSize(output, "merged range underlying size", 4); - assertFileRange(output, 1000, 2200); - - assertOrderedDisjoint(outputList, 1, 800); - } - - @Test - public void testRejectOverlappingRanges() throws Exception { - List input = asList( - createFileRange(100, 100), - createFileRange(200, 100), - createFileRange(250, 100) - ); - - intercept(IllegalArgumentException.class, - () -> validateAndSortRanges(input, Optional.empty())); - } - - /** - * Special case of overlap: the ranges are equal. - */ - @Test - public void testDuplicateRangesRaisesIllegalArgument() throws Exception { - - List input1 = asList( - createFileRange(100, 100), - createFileRange(500, 100), - createFileRange(1000, 100), - createFileRange(1000, 100) - ); - - intercept(IllegalArgumentException.class, - () -> validateAndSortRanges(input1, Optional.empty())); - } - - /** - * Consecutive ranges MUST pass. - */ - @Test - public void testConsecutiveRangesAreValid() throws Throwable { - - validateAndSortRanges( - asList( - createFileRange(100, 100), - createFileRange(200, 100), - createFileRange(300, 100)), - Optional.empty()); - } - - /** - * If the maximum zie for merging is zero, ranges do not get merged. - */ - @Test - public void testMaxSizeZeroDisablesMerging() { - List randomRanges = asList( - createFileRange(3000, 110), - createFileRange(3000, 100), - createFileRange(2100, 100) - ); - assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0); - assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0); - assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0); - } - - /** - * Assert that the range count is the same after merging. - * @param inputRanges input ranges - * @param chunkSize chunk size for merge - * @param minimumSeek minimum seek for merge - * @param maxSize max size for merge - */ - private static void assertEqualRangeCountsAfterMerging(List inputRanges, - int chunkSize, - int minimumSeek, - int maxSize) { - List combinedFileRanges = mergeSortedRanges( - inputRanges, chunkSize, minimumSeek, maxSize); - assertRangeListSize(combinedFileRanges, inputRanges.size()); - } - - /** - * Stream to read from. - */ - interface Stream extends PositionedReadable, ByteBufferPositionedReadable { - // nothing - } - - /** - * Fill a buffer with bytes incremented from 0. - * @param buffer target buffer. - */ - private static void fillBuffer(ByteBuffer buffer) { - byte b = 0; - while (buffer.remaining() > 0) { - buffer.put(b++); - } - } - - /** - * Read a single range, verify the future completed and validate the buffer - * returned. - */ - @Test - public void testReadSingleRange() throws Exception { - final Stream stream = mockStreamWithReadFully(); - CompletableFuture result = - readRangeFrom(stream, createFileRange(1000, 100), - ByteBuffer::allocate); - assertFutureCompletedSuccessfully(result); - ByteBuffer buffer = result.get(); - assertEquals("Size of result buffer", 100, buffer.remaining()); - byte b = 0; - while (buffer.remaining() > 0) { - assertEquals("remain = " + buffer.remaining(), b++, buffer.get()); - } - } - - /** - * Read a single range with IOE fault injection; verify the failure - * is reported. - */ - @Test - public void testReadWithIOE() throws Exception { - final Stream stream = mockStreamWithReadFully(); - - Mockito.doThrow(new IOException("foo")) - .when(stream).readFully(ArgumentMatchers.anyLong(), - ArgumentMatchers.any(ByteBuffer.class)); - CompletableFuture result = - readRangeFrom(stream, createFileRange(1000, 100), ByteBuffer::allocate); - assertFutureFailedExceptionally(result); - } - - /** - * Read a range, first successfully, then with an IOE. - * the output of the first read is validated. - * @param allocate allocator to use - */ - private static void runReadRangeFromPositionedReadable(IntFunction allocate) - throws Exception { - PositionedReadable stream = Mockito.mock(PositionedReadable.class); - Mockito.doAnswer(invocation -> { - byte b=0; - byte[] buffer = invocation.getArgument(1); - for(int i=0; i < buffer.length; ++i) { - buffer[i] = b++; - } - return null; - }).when(stream).readFully(ArgumentMatchers.anyLong(), - ArgumentMatchers.any(), ArgumentMatchers.anyInt(), - ArgumentMatchers.anyInt()); - CompletableFuture result = - readRangeFrom(stream, createFileRange(1000, 100), - allocate); - assertFutureCompletedSuccessfully(result); - ByteBuffer buffer = result.get(); - assertEquals("Size of result buffer", 100, buffer.remaining()); - validateBuffer("buffer", buffer, 0); - - - // test an IOException - Mockito.reset(stream); - Mockito.doThrow(new IOException("foo")) - .when(stream).readFully(ArgumentMatchers.anyLong(), - ArgumentMatchers.any(), ArgumentMatchers.anyInt(), - ArgumentMatchers.anyInt()); - result = readRangeFrom(stream, createFileRange(1000, 100), - ByteBuffer::allocate); - assertFutureFailedExceptionally(result); - } - - /** - * Read into an on heap buffer. - */ - @Test - public void testReadRangeArray() throws Exception { - runReadRangeFromPositionedReadable(ByteBuffer::allocate); - } - - /** - * Read into an off-heap buffer. - */ - @Test - public void testReadRangeDirect() throws Exception { - runReadRangeFromPositionedReadable(ByteBuffer::allocateDirect); - } - - /** - * Validate a buffer where the first byte value is {@code start} - * and the subsequent bytes are from that value incremented by one, wrapping - * at 256. - * @param message error message. - * @param buffer buffer - * @param start first byte of the buffer. - */ - private static void validateBuffer(String message, ByteBuffer buffer, int start) { - byte expected = (byte) start; - while (buffer.remaining() > 0) { - assertEquals(message + " remain: " + buffer.remaining(), expected, - buffer.get()); - // increment with wrapping. - expected = (byte) (expected + 1); - } - } - - /** - * Validate basic read vectored works as expected. - */ - @Test - public void testReadVectored() throws Exception { - List input = asList(createFileRange(0, 100), - createFileRange(100_000, 100, "this"), - createFileRange(200_000, 100, "that")); - runAndValidateVectoredRead(input); - } - - /** - * Verify a read with length 0 completes with a buffer of size 0. - */ - @Test - public void testReadVectoredZeroBytes() throws Exception { - List input = asList(createFileRange(0, 0, "1"), - createFileRange(100_000, 100, "2"), - createFileRange(200_000, 0, "3")); - runAndValidateVectoredRead(input); - // look up by name and validate. - final FileRange r1 = retrieve(input, "1"); - Assertions.assertThat(r1.getData().get().limit()) - .describedAs("Data limit of %s", r1) - .isEqualTo(0); - } - - /** - * Retrieve a range from a list of ranges by its (string) reference. - * @param input input list - * @param key key to look up - * @return the range - * @throws IllegalArgumentException if the range is not found. - */ - private static FileRange retrieve(List input, String key) { - return input.stream() - .filter(r -> key.equals(r.getReference())) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("No range with key " + key)); - } - - /** - * Mock run a vectored read and validate the results with the assertions. - *

    - *
  1. {@code ByteBufferPositionedReadable.readFully()} is invoked once per range.
  2. - *
  3. The buffers are filled with data
  4. - *
- * @param input input ranges - * @throws Exception failure - */ - private void runAndValidateVectoredRead(List input) - throws Exception { - final Stream stream = mockStreamWithReadFully(); - // should not merge the ranges - readVectored(stream, input, ByteBuffer::allocate); - // readFully is invoked once per range - Mockito.verify(stream, Mockito.times(input.size())) - .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); - - // validate each buffer - for (int b = 0; b < input.size(); ++b) { - validateBuffer("buffer " + b, input.get(b).getData().get(), 0); - } - } - - /** - * Mock a stream with {@link Stream#readFully(long, ByteBuffer)}. - * Filling in each byte buffer. - * @return the stream - * @throws IOException (side effect of the mocking; - */ - private static Stream mockStreamWithReadFully() throws IOException { - Stream stream = Mockito.mock(Stream.class); - Mockito.doAnswer(invocation -> { - fillBuffer(invocation.getArgument(1)); - return null; - }).when(stream).readFully(ArgumentMatchers.anyLong(), - ArgumentMatchers.any(ByteBuffer.class)); - return stream; - } - - /** - * Empty ranges cannot be sorted. - */ - @Test - public void testEmptyRangesRaisesIllegalArgument() throws Throwable { - intercept(IllegalArgumentException.class, - () -> validateAndSortRanges(Collections.emptyList(), Optional.empty())); - } - - /** - * Reject negative offsets. - */ - @Test - public void testNegativeOffsetRaisesEOF() throws Throwable { - intercept(EOFException.class, () -> - validateAndSortRanges(asList( - createFileRange(1000, 100), - createFileRange(-1000, 100)), - Optional.empty())); - } - - /** - * Reject negative lengths. - */ - @Test - public void testNegativePositionRaisesIllegalArgument() throws Throwable { - intercept(IllegalArgumentException.class, () -> - validateAndSortRanges(asList( - createFileRange(1000, 100), - createFileRange(1000, -100)), - Optional.empty())); - } - - /** - * A read for a whole file is valid. - */ - @Test - public void testReadWholeFile() throws Exception { - final int length = 1000; - - // Read whole file as one element - final List ranges = validateAndSortRanges( - asList(createFileRange(0, length)), - Optional.of((long) length)); - - assertIsSingleRange(ranges, 0, length); - } - - /** - * A read from start of file to past EOF is rejected. - */ - @Test - public void testReadPastEOFRejected() throws Exception { - final int length = 1000; - intercept(EOFException.class, () -> - validateAndSortRanges( - asList(createFileRange(0, length + 1)), - Optional.of((long) length))); - } - - /** - * If the start offset is at the end of the file: an EOFException. - */ - @Test - public void testReadStartingPastEOFRejected() throws Exception { - final int length = 1000; - intercept(EOFException.class, () -> - validateAndSortRanges( - asList(createFileRange(length, 0)), - Optional.of((long) length))); - } - - /** - * A read from just below the EOF to the end of the file is valid. - */ - @Test - public void testReadUpToEOF() throws Exception { - final int length = 1000; - - final int p = length - 1; - assertIsSingleRange( - validateAndSortRanges( - asList(createFileRange(p, 1)), - Optional.of((long) length)), - p, 1); - } - - /** - * A read from just below the EOF to the just past the end of the file is rejected - * with EOFException. - */ - @Test - public void testReadOverEOFRejected() throws Exception { - final long length = 1000; - - intercept(EOFException.class, () -> - validateAndSortRanges( - asList(createFileRange(length - 1, 2)), - Optional.of(length))); - } -} diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml index ad291272a9..03bb3e800f 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml @@ -131,9 +131,4 @@ case sensitivity and permission options are determined at run time from OS type true - - fs.contract.vector-io-early-eof-check - true - - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractVectoredRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractVectoredRead.java deleted file mode 100644 index 374dcedcbd..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractVectoredRead.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.contract.hdfs; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; - -/** - * Contract test for vectored reads through HDFS connector. - */ -public class TestHDFSContractVectoredRead - extends AbstractContractVectoredReadTest { - - public TestHDFSContractVectoredRead(final String bufferType) { - super(bufferType); - } - - @BeforeClass - public static void createCluster() throws IOException { - HDFSContract.createCluster(); - } - - @AfterClass - public static void teardownCluster() throws IOException { - HDFSContract.destroyCluster(); - } - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new HDFSContract(conf); - } -} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index cfdc361234..9f04e11d94 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -27,7 +27,6 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,7 +66,7 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; -import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; import static org.apache.hadoop.util.StringUtils.toLowerCase; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; @@ -148,16 +147,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, private final String bucket; private final String key; private final String pathStr; - - /** - * Content length from HEAD or openFile option. - */ private final long contentLength; - /** - * Content length in format for vector IO. - */ - private final Optional fileLength; - private final String uri; private static final Logger LOG = LoggerFactory.getLogger(S3AInputStream.class); @@ -227,7 +217,6 @@ public S3AInputStream(S3AReadOpContext ctx, this.key = s3Attributes.getKey(); this.pathStr = s3Attributes.getPath().toString(); this.contentLength = l; - this.fileLength = Optional.of(contentLength); this.client = client; this.uri = "s3a://" + this.bucket + "/" + this.key; this.streamStatistics = streamStatistics; @@ -250,7 +239,6 @@ public S3AInputStream(S3AReadOpContext ctx, * @param inputPolicy new input policy. */ private void setInputPolicy(S3AInputPolicy inputPolicy) { - LOG.debug("Switching to input policy {}", inputPolicy); this.inputPolicy = inputPolicy; streamStatistics.inputPolicySet(inputPolicy.ordinal()); } @@ -264,16 +252,6 @@ public S3AInputPolicy getInputPolicy() { return inputPolicy; } - /** - * If the stream is in Adaptive mode, switch to random IO at this - * point. Unsynchronized. - */ - private void maybeSwitchToRandomIO() { - if (inputPolicy.isAdaptive()) { - setInputPolicy(S3AInputPolicy.Random); - } - } - /** * Opens up the stream at specified target position and for given length. * @@ -410,7 +388,10 @@ private void seekInStream(long targetPos, long length) throws IOException { streamStatistics.seekBackwards(diff); // if the stream is in "Normal" mode, switch to random IO at this // point, as it is indicative of columnar format IO - maybeSwitchToRandomIO(); + if (inputPolicy.isAdaptive()) { + LOG.info("Switching to Random IO seek policy"); + setInputPolicy(S3AInputPolicy.Random); + } } else { // targetPos == pos if (remainingInCurrentRequest() > 0) { @@ -904,26 +885,19 @@ public int maxReadSizeForVectorReads() { * @throws IOException IOE if any. */ @Override - public synchronized void readVectored(List ranges, + public void readVectored(List ranges, IntFunction allocate) throws IOException { LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); checkNotClosed(); if (stopVectoredIOOperations.getAndSet(false)) { LOG.debug("Reinstating vectored read operation for path {} ", pathStr); } - - // prepare to read - List sortedRanges = validateAndSortRanges(ranges, - fileLength); + List sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges); for (FileRange range : ranges) { + validateRangeRequest(range); CompletableFuture result = new CompletableFuture<>(); range.setData(result); } - // switch to random IO and close any open stream. - // what happens if a read is in progress? bad things. - // ...which is why this method is synchronized - closeStream("readVectored()", false, false); - maybeSwitchToRandomIO(); if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) { LOG.debug("Not merging the ranges as they are disjoint"); @@ -957,7 +931,7 @@ public synchronized void readVectored(List ranges, */ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, IntFunction allocate) { - LOG.debug("Start reading {} from path {} ", combinedFileRange, pathStr); + LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr); ResponseInputStream rangeContent = null; try { rangeContent = getS3ObjectInputStream("readCombinedFileRange", @@ -965,29 +939,22 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa combinedFileRange.getLength()); populateChildBuffers(combinedFileRange, rangeContent, allocate); } catch (Exception ex) { - LOG.debug("Exception while reading {} from path {} ", combinedFileRange, pathStr, ex); - // complete exception all the underlying ranges which have not already - // finished. + LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex); for(FileRange child : combinedFileRange.getUnderlying()) { - if (!child.getData().isDone()) { - child.getData().completeExceptionally(ex); - } + child.getData().completeExceptionally(ex); } } finally { IOUtils.cleanupWithLogger(LOG, rangeContent); } - LOG.debug("Finished reading {} from path {} ", combinedFileRange, pathStr); + LOG.debug("Finished reading range {} from path {} ", combinedFileRange, pathStr); } /** * Populate underlying buffers of the child ranges. - * There is no attempt to recover from any read failures. * @param combinedFileRange big combined file range. * @param objectContent data from s3. * @param allocate method to allocate child byte buffers. * @throws IOException any IOE. - * @throws EOFException if EOF if read() call returns -1 - * @throws InterruptedIOException if vectored IO operation is stopped. */ private void populateChildBuffers(CombinedFileRange combinedFileRange, InputStream objectContent, @@ -999,24 +966,17 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, if (combinedFileRange.getUnderlying().size() == 1) { FileRange child = combinedFileRange.getUnderlying().get(0); ByteBuffer buffer = allocate.apply(child.getLength()); - populateBuffer(child, buffer, objectContent); + populateBuffer(child.getLength(), buffer, objectContent); child.getData().complete(buffer); } else { FileRange prev = null; for (FileRange child : combinedFileRange.getUnderlying()) { - checkIfVectoredIOStopped(); - if (prev != null) { - final long position = prev.getOffset() + prev.getLength(); - if (position < child.getOffset()) { - // there's data to drain between the requests. - // work out how much - long drainQuantity = child.getOffset() - position; - // and drain it. - drainUnnecessaryData(objectContent, position, drainQuantity); - } + if (prev != null && prev.getOffset() + prev.getLength() < child.getOffset()) { + long drainQuantity = child.getOffset() - prev.getOffset() - prev.getLength(); + drainUnnecessaryData(objectContent, drainQuantity); } ByteBuffer buffer = allocate.apply(child.getLength()); - populateBuffer(child, buffer, objectContent); + populateBuffer(child.getLength(), buffer, objectContent); child.getData().complete(buffer); prev = child; } @@ -1025,47 +985,42 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, /** * Drain unnecessary data in between ranges. - * There's no attempt at recovery here; it should be done at a higher level. * @param objectContent s3 data stream. - * @param position position in file, for logging * @param drainQuantity how many bytes to drain. * @throws IOException any IOE. - * @throws EOFException if the end of stream was reached during the draining */ - @Retries.OnceTranslated - private void drainUnnecessaryData( - final InputStream objectContent, - final long position, - long drainQuantity) throws IOException { - + private void drainUnnecessaryData(InputStream objectContent, long drainQuantity) + throws IOException { int drainBytes = 0; int readCount; - byte[] drainBuffer; - int size = (int)Math.min(InternalConstants.DRAIN_BUFFER_SIZE, drainQuantity); - drainBuffer = new byte[size]; - LOG.debug("Draining {} bytes from stream from offset {}; buffer size={}", - drainQuantity, position, size); - try { - long remaining = drainQuantity; - while (remaining > 0) { - checkIfVectoredIOStopped(); - readCount = objectContent.read(drainBuffer, 0, (int)Math.min(size, remaining)); - LOG.debug("Drained {} bytes from stream", readCount); - if (readCount < 0) { - // read request failed; often network issues. - // no attempt is made to recover at this point. - final String s = String.format( - "End of stream reached draining data between ranges; expected %,d bytes;" - + " only drained %,d bytes before -1 returned (position=%,d)", - drainQuantity, drainBytes, position + drainBytes); - throw new EOFException(s); - } - drainBytes += readCount; - remaining -= readCount; + while (drainBytes < drainQuantity) { + if (drainBytes + InternalConstants.DRAIN_BUFFER_SIZE <= drainQuantity) { + byte[] drainBuffer = new byte[InternalConstants.DRAIN_BUFFER_SIZE]; + readCount = objectContent.read(drainBuffer); + } else { + byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)]; + readCount = objectContent.read(drainBuffer); } - } finally { - streamStatistics.readVectoredBytesDiscarded(drainBytes); - LOG.debug("{} bytes drained from stream ", drainBytes); + drainBytes += readCount; + } + streamStatistics.readVectoredBytesDiscarded(drainBytes); + LOG.debug("{} bytes drained from stream ", drainBytes); + } + + /** + * Validates range parameters. + * In case of S3 we already have contentLength from the first GET request + * during an open file operation so failing fast here. + * @param range requested range. + * @throws EOFException end of file exception. + */ + private void validateRangeRequest(FileRange range) throws EOFException { + VectoredReadUtils.validateRangeRequest(range); + if(range.getOffset() + range.getLength() > contentLength) { + final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s", + range.getOffset(), range.getLength(), pathStr); + LOG.warn(errMsg); + throw new RangeNotSatisfiableEOFException(errMsg, null); } } @@ -1075,19 +1030,13 @@ private void drainUnnecessaryData( * @param buffer buffer to fill. */ private void readSingleRange(FileRange range, ByteBuffer buffer) { - LOG.debug("Start reading {} from {} ", range, pathStr); - if (range.getLength() == 0) { - // a zero byte read. - buffer.flip(); - range.getData().complete(buffer); - return; - } + LOG.debug("Start reading range {} from path {} ", range, pathStr); ResponseInputStream objectRange = null; try { long position = range.getOffset(); int length = range.getLength(); objectRange = getS3ObjectInputStream("readSingleRange", position, length); - populateBuffer(range, buffer, objectRange); + populateBuffer(length, buffer, objectRange); range.getData().complete(buffer); } catch (Exception ex) { LOG.warn("Exception while reading a range {} from path {} ", range, pathStr, ex); @@ -1107,9 +1056,7 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) { * @param length length from position of the object to be read from S3. * @return result s3 object. * @throws IOException exception if any. - * @throws InterruptedIOException if vectored io operation is stopped. */ - @Retries.RetryTranslated private ResponseInputStream getS3ObjectInputStream( final String operationName, final long position, final int length) throws IOException { checkIfVectoredIOStopped(); @@ -1122,77 +1069,56 @@ private ResponseInputStream getS3ObjectInputStream( /** * Populates the buffer with data from objectContent * till length. Handles both direct and heap byte buffers. - * calls {@code buffer.flip()} on the buffer afterwards. - * @param range vector range to populate. + * @param length length of data to populate. * @param buffer buffer to fill. * @param objectContent result retrieved from S3 store. * @throws IOException any IOE. - * @throws EOFException if EOF if read() call returns -1 - * @throws InterruptedIOException if vectored IO operation is stopped. */ - private void populateBuffer(FileRange range, + private void populateBuffer(int length, ByteBuffer buffer, InputStream objectContent) throws IOException { - int length = range.getLength(); if (buffer.isDirect()) { - VectoredReadUtils.readInDirectBuffer(range, buffer, + VectoredReadUtils.readInDirectBuffer(length, buffer, (position, tmp, offset, currentLength) -> { - readByteArray(objectContent, range, tmp, offset, currentLength); + readByteArray(objectContent, tmp, offset, currentLength); return null; }); buffer.flip(); } else { - // there is no use of a temp byte buffer, or buffer.put() calls, - // so flip() is not needed. - readByteArray(objectContent, range, buffer.array(), 0, length); + readByteArray(objectContent, buffer.array(), 0, length); } + // update io stats. + incrementBytesRead(length); } + /** * Read data into destination buffer from s3 object content. - * Calls {@link #incrementBytesRead(long)} to update statistics - * incrementally. * @param objectContent result from S3. - * @param range range being read into * @param dest destination buffer. * @param offset start offset of dest buffer. * @param length number of bytes to fill in dest. * @throws IOException any IOE. - * @throws EOFException if EOF if read() call returns -1 - * @throws InterruptedIOException if vectored IO operation is stopped. */ private void readByteArray(InputStream objectContent, - final FileRange range, byte[] dest, int offset, int length) throws IOException { - LOG.debug("Reading {} bytes", length); int readBytes = 0; - long position = range.getOffset(); while (readBytes < length) { - checkIfVectoredIOStopped(); int readBytesCurr = objectContent.read(dest, offset + readBytes, length - readBytes); - LOG.debug("read {} bytes from stream", readBytesCurr); + readBytes +=readBytesCurr; if (readBytesCurr < 0) { - throw new EOFException( - String.format("HTTP stream closed before all bytes were read." - + " Expected %,d bytes but only read %,d bytes. Current position %,d" - + " (%s)", - length, readBytes, position, range)); + throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); } - readBytes += readBytesCurr; - position += readBytesCurr; - - // update io stats incrementally - incrementBytesRead(readBytesCurr); } } /** - * Read data from S3 with retries for the GET request + * Read data from S3 using a http request with retries. * This also handles if file has been changed while the * http call is getting executed. If the file has been * changed RemoteFileChangedException is thrown. @@ -1201,10 +1127,7 @@ private void readByteArray(InputStream objectContent, * @param length length from position of the object to be read from S3. * @return S3Object result s3 object. * @throws IOException exception if any. - * @throws InterruptedIOException if vectored io operation is stopped. - * @throws RemoteFileChangedException if file has changed on the store. */ - @Retries.RetryTranslated private ResponseInputStream getS3Object(String operationName, long position, int length) @@ -1347,6 +1270,7 @@ public synchronized void unbuffer() { streamStatistics.unbuffered(); if (inputPolicy.isAdaptive()) { S3AInputPolicy policy = S3AInputPolicy.Random; + LOG.debug("Switching to seek policy {} after unbuffer() invoked", policy); setInputPolicy(policy); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java index a8aa532ac0..49c2fb8947 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java @@ -171,11 +171,8 @@ private boolean drainOrAbortHttpStream() { "duplicate invocation of drain operation"); } boolean executeAbort = shouldAbort; - if (remaining > 0 || executeAbort) { - // only log if there is a drain or an abort - LOG.debug("drain or abort reason {} remaining={} abort={}", - reason, remaining, executeAbort); - } + LOG.debug("drain or abort reason {} remaining={} abort={}", + reason, remaining, executeAbort); if (!executeAbort) { try { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index e7ce783c2c..9966393d41 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -20,7 +20,6 @@ import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -29,7 +28,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,19 +43,14 @@ import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.RangeNotSatisfiableEOFException; import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.S3AInputPolicy; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.StoreStatisticNames; import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; -import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; -import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; -import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR; -import static org.apache.hadoop.fs.contract.ContractTestUtils.range; import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; @@ -65,11 +58,6 @@ import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; import static org.apache.hadoop.test.MoreAsserts.assertEqual; -/** - * S3A contract tests for vectored reads. - * This is a complex suite as it really is testing the store, so measurements of - * what IO took place is also performed if the input stream is suitable for this. - */ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractVectoredRead.class); @@ -83,6 +71,18 @@ protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + /** + * Overriding in S3 vectored read api fails fast in case of EOF + * requested range. + */ + @Override + public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + verifyExceptionalVectoredRead(fs, fileRanges, RangeNotSatisfiableEOFException.class); + } + /** * Verify response to a vector read request which is beyond the * real length of the file. @@ -98,27 +98,22 @@ public void testEOFRanges416Handling() throws Exception { CompletableFuture builder = fs.openFile(path(VECTORED_READ_FILE_NAME)) .mustLong(FS_OPTION_OPENFILE_LENGTH, extendedLen) - .opt(FS_OPTION_OPENFILE_READ_POLICY, - FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE) .build(); - List fileRanges = range(DATASET_LEN, 100); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); - // read starting past EOF generates a 416 response, mapped to - // RangeNotSatisfiableEOFException describe("Read starting from past EOF"); try (FSDataInputStream in = builder.get()) { in.readVectored(fileRanges, getAllocate()); FileRange res = fileRanges.get(0); CompletableFuture data = res.getData(); - interceptFuture(EOFException.class, - "", + interceptFuture(RangeNotSatisfiableEOFException.class, + "416", ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS, data); } - // a read starting before the EOF and continuing past it does generate - // an EOF exception, but not a 416. describe("Read starting 0 continuing past EOF"); try (FSDataInputStream in = fs.openFile(path(VECTORED_READ_FILE_NAME)) .mustLong(FS_OPTION_OPENFILE_LENGTH, extendedLen) @@ -126,8 +121,8 @@ public void testEOFRanges416Handling() throws Exception { final FileRange range = FileRange.createFileRange(0, extendedLen); in.readVectored(Arrays.asList(range), getAllocate()); CompletableFuture data = range.getData(); - interceptFuture(RangeNotSatisfiableEOFException.class, - "", + interceptFuture(EOFException.class, + EOF_IN_READ_FULLY, ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS, data); @@ -147,7 +142,7 @@ public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception { conf.set(Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K"); conf.set(Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M"); try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { - try (FSDataInputStream fis = openVectorFile(fs)) { + try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) { int newMinSeek = fis.minSeekForVectorReads(); int newMaxSize = fis.maxReadSizeForVectorReads(); assertEqual(newMinSeek, configuredMinSeek, @@ -165,7 +160,7 @@ public void testMinSeekAndMaxSizeDefaultValues() throws Exception { Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE); try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { - try (FSDataInputStream fis = openVectorFile(fs)) { + try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) { int minSeek = fis.minSeekForVectorReads(); int maxSize = fis.maxReadSizeForVectorReads(); assertEqual(minSeek, Constants.DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, @@ -178,42 +173,58 @@ public void testMinSeekAndMaxSizeDefaultValues() throws Exception { @Test public void testStopVectoredIoOperationsCloseStream() throws Exception { - + FileSystem fs = getFileSystem(); List fileRanges = createSampleNonOverlappingRanges(); - try (FSDataInputStream in = openVectorFile()){ + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ in.readVectored(fileRanges, getAllocate()); in.close(); LambdaTestUtils.intercept(InterruptedIOException.class, - () -> validateVectoredReadResult(fileRanges, DATASET, 0)); + () -> validateVectoredReadResult(fileRanges, DATASET)); } // reopening the stream should succeed. - try (FSDataInputStream in = openVectorFile()){ + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ in.readVectored(fileRanges, getAllocate()); - validateVectoredReadResult(fileRanges, DATASET, 0); + validateVectoredReadResult(fileRanges, DATASET); } } - /** - * Verify that unbuffer() stops vectored IO operations. - * There's a small risk of a race condition where the unbuffer() call - * is made after the vector reads have completed. - */ @Test public void testStopVectoredIoOperationsUnbuffer() throws Exception { - + FileSystem fs = getFileSystem(); List fileRanges = createSampleNonOverlappingRanges(); - try (FSDataInputStream in = openVectorFile()){ + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ in.readVectored(fileRanges, getAllocate()); in.unbuffer(); LambdaTestUtils.intercept(InterruptedIOException.class, - () -> validateVectoredReadResult(fileRanges, DATASET, 0)); + () -> validateVectoredReadResult(fileRanges, DATASET)); // re-initiating the vectored reads after unbuffer should succeed. in.readVectored(fileRanges, getAllocate()); - validateVectoredReadResult(fileRanges, DATASET, 0); + validateVectoredReadResult(fileRanges, DATASET); } } + /** + * S3 vectored IO doesn't support overlapping ranges. + */ + @Override + public void testOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = getSampleOverlappingRanges(); + verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); + } + + /** + * S3 vectored IO doesn't support overlapping ranges. + */ + @Override + public void testSameRanges() throws Exception { + // Same ranges are special case of overlapping only. + FileSystem fs = getFileSystem(); + List fileRanges = getSampleSameRanges(); + verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); + } + /** * As the minimum seek value is 4*1024, the first three ranges will be * merged into and other two will remain as it is. @@ -223,35 +234,21 @@ public void testNormalReadVsVectoredReadStatsCollection() throws Exception { try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) { List fileRanges = new ArrayList<>(); - range(fileRanges, 10 * 1024, 100); - range(fileRanges, 8 * 1024, 100); - range(fileRanges, 14 * 1024, 100); - range(fileRanges, 2 * 1024 - 101, 100); - range(fileRanges, 40 * 1024, 1024); + fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); CompletableFuture builder = - fs.openFile(path(VECTORED_READ_FILE_NAME)) - .withFileStatus(fileStatus) - .opt(FS_OPTION_OPENFILE_READ_POLICY, - FS_OPTION_OPENFILE_READ_POLICY_VECTOR) - .build(); + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); try (FSDataInputStream in = builder.get()) { in.readVectored(fileRanges, getAllocate()); - validateVectoredReadResult(fileRanges, DATASET, 0); + validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, getPool()); - final InputStream wrappedStream = in.getWrappedStream(); - - // policy will be random. - if (wrappedStream instanceof S3AInputStream) { - S3AInputStream inner = (S3AInputStream) wrappedStream; - Assertions.assertThat(inner.getInputPolicy()) - .describedAs("Input policy of %s", inner) - .isEqualTo(S3AInputPolicy.Random); - Assertions.assertThat(inner.isObjectStreamOpen()) - .describedAs("Object stream open in %s", inner) - .isFalse(); - } // audit the io statistics for this stream IOStatistics st = in.getIOStatistics(); @@ -350,8 +347,8 @@ public void testMultiVectoredReadStatsCollection() throws Exception { try (FSDataInputStream in = builder.get()) { in.readVectored(ranges1, getAllocate()); in.readVectored(ranges2, getAllocate()); - validateVectoredReadResult(ranges1, DATASET, 0); - validateVectoredReadResult(ranges2, DATASET, 0); + validateVectoredReadResult(ranges1, DATASET); + validateVectoredReadResult(ranges2, DATASET); returnBuffersToPoolPostRead(ranges1, getPool()); returnBuffersToPoolPostRead(ranges2, getPool()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestDelegatedMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestDelegatedMRJob.java index 69795b06aa..ba9746358c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestDelegatedMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestDelegatedMRJob.java @@ -175,7 +175,7 @@ protected YarnConfiguration createConfiguration() { String host = jobResourceUri.getHost(); // and fix to the main endpoint if the caller has moved conf.set( - String.format("fs.s3a.bucket.%s.endpoint", host), "us-east-1"); + String.format("fs.s3a.bucket.%s.endpoint", host), ""); // set up DTs enableDelegationTokens(conf, tokenBinding); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 482a963b92..25ffc8fda8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.statistics.IOStatistics; +import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; @@ -437,7 +438,7 @@ public void testVectorReadPastEOF() throws Throwable { final FileRange range = FileRange.createFileRange(0, longLen); in.readVectored(Arrays.asList(range), (i) -> bb); interceptFuture(EOFException.class, - "", + EOF_IN_READ_FULLY, ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS, range.getData()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 79e5a93371..9cf3c220d1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -21,7 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; -import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicInteger; @@ -51,8 +51,6 @@ import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.io.ElasticByteBufferPool; -import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.Progressable; @@ -61,6 +59,8 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -78,11 +78,11 @@ /** * Scale test which creates a huge file. - *

+ * * Important: the order in which these tests execute is fixed to * alphabetical order. Test cases are numbered {@code test_123_} to impose * an ordering based on the numbers. - *

+ * * Having this ordering allows the tests to assume that the huge file * exists. Even so: they should all have a {@link #assumeHugeFileExists()} * check at the start, in case an individual test is executed. @@ -584,94 +584,54 @@ public void test_040_PositionedReadHugeFile() throws Throwable { toHuman(timer.nanosPerOperation(ops))); } - /** - * Should this test suite use direct buffers for - * the Vector IO operations? - * @return true if direct buffers are desired. - */ - protected boolean isDirectVectorBuffer() { - return false; - } - @Test public void test_045_vectoredIOHugeFile() throws Throwable { assumeHugeFileExists(); - final ElasticByteBufferPool pool = - new WeakReferencedElasticByteBufferPool(); - boolean direct = isDirectVectorBuffer(); - IntFunction allocate = size -> pool.getBuffer(direct, size); - - // build a list of ranges for both reads. - final int rangeLength = 116770; - long base = 1520861; - long pos = base; - List rangeList = range(pos, rangeLength); - pos += rangeLength; - range(rangeList, pos, rangeLength); - pos += rangeLength; - range(rangeList, pos, rangeLength); - pos += rangeLength; - range(rangeList, pos, rangeLength); - pos += rangeLength; - range(rangeList, pos, rangeLength); - pos += rangeLength; - range(rangeList, pos, rangeLength); - + List rangeList = new ArrayList<>(); + rangeList.add(FileRange.createFileRange(5856368, 116770)); + rangeList.add(FileRange.createFileRange(3520861, 116770)); + rangeList.add(FileRange.createFileRange(8191913, 116770)); + rangeList.add(FileRange.createFileRange(1520861, 116770)); + rangeList.add(FileRange.createFileRange(2520861, 116770)); + rangeList.add(FileRange.createFileRange(9191913, 116770)); + rangeList.add(FileRange.createFileRange(2820861, 156770)); + IntFunction allocate = ByteBuffer::allocate; FileSystem fs = getFileSystem(); - final int validateSize = (int) totalReadSize(rangeList); + // read into a buffer first + // using sequential IO - // read the same ranges using readFully into a buffer. - // this is to both validate the range resolution logic, - // and to compare performance of sequential GET requests - // with the vector IO. - byte[] readFullRes = new byte[validateSize]; - IOStatistics readIOStats, vectorIOStats; - DurationInfo readFullyTime = new DurationInfo(LOG, true, "Sequential read of %,d bytes", - validateSize); + int validateSize = (int) Math.min(filesize, 10 * _1MB); + byte[] readFullRes; + IOStatistics sequentialIOStats, vectorIOStats; try (FSDataInputStream in = fs.openFile(hugefile) - .optLong(FS_OPTION_OPENFILE_LENGTH, filesize) - .opt(FS_OPTION_OPENFILE_READ_POLICY, "random") + .optLong(FS_OPTION_OPENFILE_LENGTH, validateSize) // lets us actually force a shorter read + .optLong(FS_OPTION_OPENFILE_SPLIT_START, 0) + .opt(FS_OPTION_OPENFILE_SPLIT_END, validateSize) + .opt(FS_OPTION_OPENFILE_READ_POLICY, "sequential") .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize) - .build().get()) { - for (FileRange range : rangeList) { - in.readFully(range.getOffset(), - readFullRes, - (int)(range.getOffset() - base), - range.getLength()); - } - readIOStats = in.getIOStatistics(); - } finally { - readFullyTime.close(); + .build().get(); + DurationInfo ignored = new DurationInfo(LOG, "Sequential read of %,d bytes", + validateSize)) { + readFullRes = new byte[validateSize]; + in.readFully(0, readFullRes); + sequentialIOStats = in.getIOStatistics(); } // now do a vector IO read - DurationInfo vectorTime = new DurationInfo(LOG, true, "Vector Read"); try (FSDataInputStream in = fs.openFile(hugefile) .optLong(FS_OPTION_OPENFILE_LENGTH, filesize) .opt(FS_OPTION_OPENFILE_READ_POLICY, "vector, random") - .build().get()) { - // initiate the read. + .build().get(); + DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) { + in.readVectored(rangeList, allocate); - // Wait for the results and compare with read fully. - validateVectoredReadResult(rangeList, readFullRes, base); + // Comparing vectored read results with read fully. + validateVectoredReadResult(rangeList, readFullRes); vectorIOStats = in.getIOStatistics(); - } finally { - vectorTime.close(); - // release the pool - pool.release(); } - final Duration readFullyDuration = readFullyTime.asDuration(); - final Duration vectorDuration = vectorTime.asDuration(); - final Duration diff = readFullyDuration.minus(vectorDuration); - double ratio = readFullyDuration.toNanos() / (double) vectorDuration.toNanos(); - String format = String.format("Vector read to %s buffer taking %s was %s faster than" - + " readFully() (%s); ratio=%,.2fX", - direct ? "direct" : "heap", - vectorDuration, diff, readFullyDuration, ratio); - LOG.info(format); - LOG.info("Bulk read IOStatistics={}", ioStatisticsToPrettyString(readIOStats)); + LOG.info("Bulk read IOStatistics={}", ioStatisticsToPrettyString(sequentialIOStats)); LOG.info("Vector IOStatistics={}", ioStatisticsToPrettyString(vectorIOStats)); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java index 6020f4c5f8..2be5769893 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java @@ -22,16 +22,10 @@ /** * Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering. - * Also uses direct buffers for the vector IO. */ public class ITestS3AHugeFilesDiskBlocks extends AbstractSTestS3AHugeFiles { protected String getBlockOutputBufferName() { return Constants.FAST_UPLOAD_BUFFER_DISK; } - - @Override - protected boolean isDirectVectorBuffer() { - return true; - } } diff --git a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml index ab33b0cd79..a5d98a32e6 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml +++ b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml @@ -147,9 +147,4 @@ true - - fs.contract.vector-io-early-eof-check - true - - diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractVectoredRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractVectoredRead.java deleted file mode 100644 index e553989008..0000000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractVectoredRead.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.azurebfs.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; - -/** - * Contract test for vectored reads through ABFS connector. - */ -public class ITestAbfsFileSystemContractVectoredRead - extends AbstractContractVectoredReadTest { - - private final boolean isSecure; - private final ABFSContractTestBinding binding; - - public ITestAbfsFileSystemContractVectoredRead(final String bufferType) throws Exception { - super(bufferType); - this.binding = new ABFSContractTestBinding(); - this.isSecure = binding.isSecureMode(); - } - - @Override - public void setup() throws Exception { - binding.setup(); - super.setup(); - } - - @Override - protected Configuration createConfiguration() { - return this.binding.getRawConfiguration(); - } - - @Override - protected AbstractFSContract createContract(final Configuration conf) { - return new AbfsFileSystemContract(conf, this.isSecure); - } -}