From 4d1f6f9b995cbf65fc8a00cb6c8fabe70d3b5474 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Tue, 21 Jun 2022 03:45:40 +0530 Subject: [PATCH] HADOOP-18106: Handle memory fragmentation in S3A Vectored IO. (#4445) part of HADOOP-18103. Handling memory fragmentation in S3A vectored IO implementation by allocating smaller user range requested size buffers and directly filling them from the remote S3 stream and skipping undesired data in between ranges. This patch also adds aborting active vectored reads when stream is closed or unbuffer() is called. Contributed By: Mukund Thakur --- .../apache/hadoop/fs/ChecksumFileSystem.java | 11 +- .../java/org/apache/hadoop/fs/FileRange.java | 12 + .../apache/hadoop/fs/PositionedReadable.java | 4 +- .../apache/hadoop/fs/RawLocalFileSystem.java | 16 +- .../apache/hadoop/fs/StreamCapabilities.java | 6 + .../fs/{impl => }/VectoredReadUtils.java | 91 ++++---- .../hadoop/fs/impl/CombinedFileRange.java | 1 - .../hadoop/fs/{ => impl}/FileRangeImpl.java | 9 +- .../markdown/filesystem/fsdatainputstream.md | 11 +- .../fs/{impl => }/TestVectoredReadUtils.java | 149 ++++++------ .../AbstractContractVectoredReadTest.java | 212 +++++++++++++----- .../hadoop/fs/contract/ContractTestUtils.java | 20 ++ .../TestLocalFSContractVectoredRead.java | 51 +++++ .../apache/hadoop/fs/s3a/S3AInputStream.java | 205 ++++++++++++----- .../s3a/ITestS3AContractVectoredRead.java | 63 +++++- .../s3a/scale/AbstractSTestS3AHugeFiles.java | 15 +- .../benchmark/VectoredReadBenchmark.java | 4 +- 17 files changed, 616 insertions(+), 264 deletions(-) rename hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/{impl => }/VectoredReadUtils.java (79%) rename hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/{ => impl}/FileRangeImpl.java (85%) rename hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/{impl => }/TestVectoredReadUtils.java (74%) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index a6bdc220ba..1cca9fe2bf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -39,7 +39,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; -import org.apache.hadoop.fs.impl.VectoredReadUtils; import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.impl.OpenFileParameters; @@ -55,6 +54,7 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; /**************************************************************** * Abstract Checksumed FileSystem. @@ -166,7 +166,7 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) { * It verifies that data matches checksums. *******************************************************/ private static class ChecksumFSInputChecker extends FSInputChecker implements - IOStatisticsSource { + IOStatisticsSource, StreamCapabilities { private ChecksumFileSystem fs; private FSDataInputStream datas; private FSDataInputStream sums; @@ -408,7 +408,7 @@ public void readVectored(List ranges, int minSeek = minSeekForVectorReads(); int maxSize = maxReadSizeForVectorReads(); List dataRanges = - VectoredReadUtils.sortAndMergeRanges(ranges, bytesPerSum, + VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum, minSeek, maxReadSizeForVectorReads()); List checksumRanges = findChecksumRanges(dataRanges, bytesPerSum, minSeek, maxSize); @@ -435,6 +435,11 @@ public void readVectored(List ranges, } } } + + @Override + public boolean hasCapability(String capability) { + return datas.hasCapability(capability); + } } private static class FSDataBoundedInputStream extends FSDataInputStream { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java index 7388e462cc..e55696e965 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java @@ -20,6 +20,8 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.fs.impl.FileRangeImpl; + /** * A byte range of a file. * This is used for the asynchronous gather read API of @@ -52,4 +54,14 @@ public interface FileRange { * @param data the future of the ByteBuffer that will have the data */ void setData(CompletableFuture data); + + /** + * Factory method to create a FileRange object. + * @param offset starting offset of the range. + * @param length length of the range. + * @return a new instance of FileRangeImpl. + */ + static FileRange createFileRange(long offset, int length) { + return new FileRangeImpl(offset, length); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index 7e543ebf22..de76090512 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.impl.VectoredReadUtils; /** * Stream that permits positional reading. @@ -121,7 +120,6 @@ default int maxReadSizeForVectorReads() { */ default void readVectored(List ranges, IntFunction allocate) throws IOException { - VectoredReadUtils.readVectored(this, ranges, allocate, minSeekForVectorReads(), - maxReadSizeForVectorReads()); + VectoredReadUtils.readVectored(this, ranges, allocate); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index 208d1668b6..f525c3cba7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -20,7 +20,6 @@ package org.apache.hadoop.fs; import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.fs.impl.VectoredReadUtils; import java.io.BufferedOutputStream; import java.io.DataOutput; @@ -68,6 +67,7 @@ import org.apache.hadoop.util.StringUtils; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS; @@ -278,6 +278,7 @@ public boolean hasCapability(String capability) { // new capabilities. switch (capability.toLowerCase(Locale.ENGLISH)) { case StreamCapabilities.IOSTATISTICS: + case StreamCapabilities.VECTOREDIO: return true; default: return false; @@ -303,23 +304,24 @@ AsynchronousFileChannel getAsyncChannel() throws IOException { public void readVectored(List ranges, IntFunction allocate) throws IOException { + List sortedRanges = Arrays.asList(sortRanges(ranges)); // Set up all of the futures, so that we can use them if things fail - for(FileRange range: ranges) { + for(FileRange range: sortedRanges) { VectoredReadUtils.validateRangeRequest(range); range.setData(new CompletableFuture<>()); } try { AsynchronousFileChannel channel = getAsyncChannel(); - ByteBuffer[] buffers = new ByteBuffer[ranges.size()]; - AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers); - for(int i = 0; i < ranges.size(); ++i) { - FileRange range = ranges.get(i); + ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()]; + AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers); + for(int i = 0; i < sortedRanges.size(); ++i) { + FileRange range = sortedRanges.get(i); buffers[i] = allocate.apply(range.getLength()); channel.read(buffers[i], range.getOffset(), i, asyncHandler); } } catch (IOException ioe) { LOG.debug("Exception occurred during vectored read ", ioe); - for(FileRange range: ranges) { + for(FileRange range: sortedRanges) { range.getData().completeExceptionally(ioe); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 8611780195..d68ef505dc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -80,6 +80,12 @@ public interface StreamCapabilities { */ String IOSTATISTICS = "iostatistics"; + /** + * Support for vectored IO api. + * See {@code PositionedReadable#readVectored(List, IntFunction)}. + */ + String VECTOREDIO = "readvectored"; + /** * Stream abort() capability implemented by {@link Abortable#abort()}. * This matches the Path Capability diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java similarity index 79% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index 9a16e6841d..64107f1a18 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.impl; +package org.apache.hadoop.fs; import java.io.EOFException; import java.io.IOException; @@ -28,9 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.IntFunction; -import org.apache.hadoop.fs.ByteBufferPositionedReadable; -import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.util.Preconditions; /** @@ -68,35 +66,19 @@ public static void validateVectoredReadRanges(List ranges) /** - * Read fully a list of file ranges asynchronously from this file. - * The default iterates through the ranges to read each synchronously, but - * the intent is that subclasses can make more efficient readers. + * This is the default implementation which iterates through the ranges + * to read each synchronously, but the intent is that subclasses + * can make more efficient readers. * The data or exceptions are pushed into {@link FileRange#getData()}. * @param stream the stream to read the data from * @param ranges the byte ranges to read * @param allocate the byte buffer allocation - * @param minimumSeek the minimum number of bytes to seek over - * @param maximumRead the largest number of bytes to combine into a single read */ public static void readVectored(PositionedReadable stream, List ranges, - IntFunction allocate, - int minimumSeek, - int maximumRead) { - if (isOrderedDisjoint(ranges, 1, minimumSeek)) { - for(FileRange range: ranges) { - range.setData(readRangeFrom(stream, range, allocate)); - } - } else { - for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek, - maximumRead)) { - CompletableFuture read = - readRangeFrom(stream, range, allocate); - for(FileRange child: range.getUnderlying()) { - child.setData(read.thenApply( - (b) -> sliceTo(b, range.getOffset(), child))); - } - } + IntFunction allocate) { + for (FileRange range: ranges) { + range.setData(readRangeFrom(stream, range, allocate)); } } @@ -166,7 +148,7 @@ public static boolean isOrderedDisjoint(List input, int chunkSize, int minimumSeek) { long previous = -minimumSeek; - for(FileRange range: input) { + for (FileRange range: input) { long offset = range.getOffset(); long end = range.getOffset() + range.getLength(); if (offset % chunkSize != 0 || @@ -209,7 +191,42 @@ public static long roundUp(long offset, int chunkSize) { } /** - * Sort and merge ranges to optimize the access from the underlying file + * Check if the input ranges are overlapping in nature. + * We call two ranges to be overlapping when start offset + * of second is less than the end offset of first. + * End offset is calculated as start offset + length. + * @param input list if input ranges. + * @return true/false based on logic explained above. + */ + public static List validateNonOverlappingAndReturnSortedRanges( + List input) { + + if (input.size() <= 1) { + return input; + } + FileRange[] sortedRanges = sortRanges(input); + FileRange prev = sortedRanges[0]; + for (int i=1; i input) { + FileRange[] sortedRanges = input.toArray(new FileRange[0]); + Arrays.sort(sortedRanges, Comparator.comparingLong(FileRange::getOffset)); + return sortedRanges; + } + + /** + * Merge sorted ranges to optimize the access from the underlying file * system. * The motivations are that: *
    @@ -219,24 +236,22 @@ public static long roundUp(long offset, int chunkSize) { *
  • Some file systems want to round ranges to be at checksum boundaries.
  • *
* - * @param input the list of input ranges + * @param sortedRanges already sorted list of ranges based on offset. * @param chunkSize round the start and end points to multiples of chunkSize * @param minimumSeek the smallest gap that we should seek over in bytes * @param maxSize the largest combined file range in bytes * @return the list of sorted CombinedFileRanges that cover the input */ - public static List sortAndMergeRanges(List input, - int chunkSize, - int minimumSeek, - int maxSize) { - // sort the ranges by offset - FileRange[] ranges = input.toArray(new FileRange[0]); - Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset)); + public static List mergeSortedRanges(List sortedRanges, + int chunkSize, + int minimumSeek, + int maxSize) { + CombinedFileRange current = null; - List result = new ArrayList<>(ranges.length); + List result = new ArrayList<>(sortedRanges.size()); // now merge together the ones that merge - for(FileRange range: ranges) { + for (FileRange range: sortedRanges) { long start = roundDown(range.getOffset(), chunkSize); long end = roundUp(range.getOffset() + range.getLength(), chunkSize); if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java index 828a50b4f7..516bbb2c70 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.impl; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; import java.util.ArrayList; import java.util.List; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java similarity index 85% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java index ef5851154b..041e5f0a8d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java @@ -15,15 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.fs; +package org.apache.hadoop.fs.impl; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileRange; + /** * A range of bytes from a file with an optional buffer to read those bytes - * for zero copy. + * for zero copy. This shouldn't be created directly via constructor rather + * factory defined in {@code FileRange#createFileRange} should be used. */ +@InterfaceAudience.Private public class FileRangeImpl implements FileRange { private long offset; private int length; diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index e4a2830967..197b999c81 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -449,7 +449,14 @@ Read fully data for a list of ranges asynchronously. The default implementation iterates through the ranges, tries to coalesce the ranges based on values of `minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged ranges synchronously, but the intent is sub classes can implement efficient -implementation. +implementation. Reading in both direct and heap byte buffers are supported. +Also, clients are encouraged to use `WeakReferencedElasticByteBufferPool` for +allocating buffers such that even direct buffers are garbage collected when +they are no longer referenced. + +Note: Don't use direct buffers for reading from ChecksumFileSystem as that may +lead to memory fragmentation explained in HADOOP-18296. + #### Preconditions @@ -467,7 +474,7 @@ For each requested range: ### `minSeekForVectorReads()` -Smallest reasonable seek. Two ranges won't be merged together if the difference between +The smallest reasonable seek. Two ranges won't be merged together if the difference between end of first and start of next range is more than this value. ### `maxReadSizeForVectorReads()` diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java similarity index 74% rename from hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java index cfd366701b..5d08b02e11 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.impl; +package org.apache.hadoop.fs; import java.io.IOException; import java.nio.ByteBuffer; @@ -31,12 +31,10 @@ import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import org.apache.hadoop.fs.ByteBufferPositionedReadable; -import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; -import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.test.HadoopTestBase; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully; import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally; @@ -56,7 +54,7 @@ public void testSliceTo() { } // ensure we don't make unnecessary slices ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100, - new FileRangeImpl(100, size)); + FileRange.createFileRange(100, size)); Assertions.assertThat(buffer) .describedAs("Slicing on the same offset shouldn't " + "create a new buffer") @@ -67,7 +65,7 @@ public void testSliceTo() { final int sliceStart = 1024; final int sliceLength = 16 * 1024; slice = VectoredReadUtils.sliceTo(buffer, offset, - new FileRangeImpl(offset + sliceStart, sliceLength)); + FileRange.createFileRange(offset + sliceStart, sliceLength)); // make sure they aren't the same, but use the same backing data Assertions.assertThat(buffer) .describedAs("Slicing on new offset should " + @@ -96,12 +94,12 @@ public void testRounding() { @Test public void testMerge() { - FileRange base = new FileRangeImpl(2000, 1000); + FileRange base = FileRange.createFileRange(2000, 1000); CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); // test when the gap between is too big assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000, - new FileRangeImpl(5000, 1000), 2000, 4000)); + FileRange.createFileRange(5000, 1000), 2000, 4000)); assertEquals("Number of ranges in merged range shouldn't increase", 1, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 2000, mergeBase.getOffset()); @@ -109,7 +107,7 @@ public void testMerge() { // test when the total size gets exceeded assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000, - new FileRangeImpl(5000, 1000), 2001, 3999)); + FileRange.createFileRange(5000, 1000), 2001, 3999)); assertEquals("Number of ranges in merged range shouldn't increase", 1, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 2000, mergeBase.getOffset()); @@ -117,7 +115,7 @@ public void testMerge() { // test when the merge works assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000, - new FileRangeImpl(5000, 1000), 2001, 4000)); + FileRange.createFileRange(5000, 1000), 2001, 4000)); assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 2000, mergeBase.getOffset()); assertEquals("post merge length", 4000, mergeBase.getLength()); @@ -127,7 +125,7 @@ public void testMerge() { assertEquals(200, mergeBase.getOffset()); assertEquals(100, mergeBase.getLength()); assertTrue("ranges should get merged ", mergeBase.merge(500, 600, - new FileRangeImpl(5000, 1000), 201, 400)); + FileRange.createFileRange(5000, 1000), 201, 400)); assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 200, mergeBase.getOffset()); assertEquals("post merge length", 400, mergeBase.getLength()); @@ -136,42 +134,58 @@ public void testMerge() { @Test public void testSortAndMerge() { List input = Arrays.asList( - new FileRangeImpl(3000, 100), - new FileRangeImpl(2100, 100), - new FileRangeImpl(1000, 100) + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100), + FileRange.createFileRange(1000, 100) ); assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - List outputList = VectoredReadUtils.sortAndMergeRanges( - input, 100, 1001, 2500); - assertEquals("merged range size", 1, outputList.size()); + List outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 100, 1001, 2500); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); CombinedFileRange output = outputList.get(0); - assertEquals("merged range underlying size", 3, output.getUnderlying().size()); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(3); assertEquals("range[1000,3100)", output.toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); // the minSeek doesn't allow the first two to merge - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); - outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1000, 2100); - assertEquals("merged range size", 2, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + 100, 1000, 2100); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(2); assertEquals("range[1000,1100)", outputList.get(0).toString()); assertEquals("range[2100,3100)", outputList.get(1).toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000)); // the maxSize doesn't allow the third range to merge - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1001, 2099); - assertEquals("merged range size", 2, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + 100, 1001, 2099); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(2); assertEquals("range[1000,2200)", outputList.get(0).toString()); assertEquals("range[3000,3100)", outputList.get(1).toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); // test the round up and round down (the maxSize doesn't allow any merges) - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); - outputList = VectoredReadUtils.sortAndMergeRanges(input, 16, 1001, 100); - assertEquals("merged range size", 3, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + 16, 1001, 100); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(3); assertEquals("range[992,1104)", outputList.get(0).toString()); assertEquals("range[2096,2208)", outputList.get(1).toString()); assertEquals("range[2992,3104)", outputList.get(2).toString()); @@ -182,26 +196,35 @@ public void testSortAndMerge() { @Test public void testSortAndMergeMoreCases() throws Exception { List input = Arrays.asList( - new FileRangeImpl(3000, 110), - new FileRangeImpl(3000, 100), - new FileRangeImpl(2100, 100), - new FileRangeImpl(1000, 100) + FileRange.createFileRange(3000, 110), + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100), + FileRange.createFileRange(1000, 100) ); - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - List outputList = VectoredReadUtils.sortAndMergeRanges( - input, 1, 1001, 2500); - assertEquals("merged range size", 1, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + List outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 1, 1001, 2500); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); CombinedFileRange output = outputList.get(0); - assertEquals("merged range underlying size", 4, output.getUnderlying().size()); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(4); assertEquals("range[1000,3110)", output.toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); - outputList = VectoredReadUtils.sortAndMergeRanges( - input, 100, 1001, 2500); - assertEquals("merged range size", 1, outputList.size()); + outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 100, 1001, 2500); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); output = outputList.get(0); - assertEquals("merged range underlying size", 4, output.getUnderlying().size()); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(4); assertEquals("range[1000,3200)", output.toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); @@ -211,9 +234,9 @@ public void testSortAndMergeMoreCases() throws Exception { @Test public void testMaxSizeZeroDisablesMering() throws Exception { List randomRanges = Arrays.asList( - new FileRangeImpl(3000, 110), - new FileRangeImpl(3000, 100), - new FileRangeImpl(2100, 100) + FileRange.createFileRange(3000, 110), + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100) ); assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0); assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0); @@ -225,7 +248,7 @@ private void assertEqualRangeCountsAfterMerging(List inputRanges, int minimumSeek, int maxSize) { List combinedFileRanges = VectoredReadUtils - .sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize); + .mergeSortedRanges(inputRanges, chunkSize, minimumSeek, maxSize); Assertions.assertThat(combinedFileRanges) .describedAs("Mismatch in number of ranges post merging") .hasSize(inputRanges.size()); @@ -251,7 +274,7 @@ public void testReadRangeFromByteBufferPositionedReadable() throws Exception { }).when(stream).readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); CompletableFuture result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), ByteBuffer::allocate); assertFutureCompletedSuccessfully(result); ByteBuffer buffer = result.get(); @@ -267,7 +290,7 @@ public void testReadRangeFromByteBufferPositionedReadable() throws Exception { .when(stream).readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), ByteBuffer::allocate); assertFutureFailedExceptionally(result); } @@ -286,7 +309,7 @@ static void runReadRangeFromPositionedReadable(IntFunction allocate) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()); CompletableFuture result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), allocate); assertFutureCompletedSuccessfully(result); ByteBuffer buffer = result.get(); @@ -303,7 +326,7 @@ static void runReadRangeFromPositionedReadable(IntFunction allocate) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()); result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), ByteBuffer::allocate); assertFutureFailedExceptionally(result); } @@ -328,9 +351,9 @@ static void validateBuffer(String message, ByteBuffer buffer, int start) { @Test public void testReadVectored() throws Exception { - List input = Arrays.asList(new FileRangeImpl(0, 100), - new FileRangeImpl(100_000, 100), - new FileRangeImpl(200_000, 100)); + List input = Arrays.asList(FileRange.createFileRange(0, 100), + FileRange.createFileRange(100_000, 100), + FileRange.createFileRange(200_000, 100)); Stream stream = Mockito.mock(Stream.class); Mockito.doAnswer(invocation -> { fillBuffer(invocation.getArgument(1)); @@ -338,31 +361,11 @@ public void testReadVectored() throws Exception { }).when(stream).readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); // should not merge the ranges - VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100, 100); + VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate); Mockito.verify(stream, Mockito.times(3)) .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); for(int b=0; b < input.size(); ++b) { validateBuffer("buffer " + b, input.get(b).getData().get(), 0); } } - - @Test - public void testReadVectoredMerge() throws Exception { - List input = Arrays.asList(new FileRangeImpl(2000, 100), - new FileRangeImpl(1000, 100), - new FileRangeImpl(0, 100)); - Stream stream = Mockito.mock(Stream.class); - Mockito.doAnswer(invocation -> { - fillBuffer(invocation.getArgument(1)); - return null; - }).when(stream).readFully(ArgumentMatchers.anyLong(), - ArgumentMatchers.any(ByteBuffer.class)); - // should merge the ranges into a single read - VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 1000, 2100); - Mockito.verify(stream, Mockito.times(1)) - .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); - for(int b=0; b < input.size(); ++b) { - validateBuffer("buffer " + b, input.get(b).getData().get(), (2 - b) * 1000); - } - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index e8c86b5dbb..77bcc496ff 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.contract; import java.io.EOFException; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -38,13 +37,18 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.impl.FutureIOSupport; +import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; +import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; @RunWith(Parameterized.class) @@ -53,11 +57,14 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class); public static final int DATASET_LEN = 64 * 1024; - private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; private final IntFunction allocate; + private final WeakReferencedElasticByteBufferPool pool = + new WeakReferencedElasticByteBufferPool(); + private final String bufferType; @Parameterized.Parameters(name = "Buffer type : {0}") @@ -67,8 +74,14 @@ public static List params() { public AbstractContractVectoredReadTest(String bufferType) { this.bufferType = bufferType; - this.allocate = "array".equals(bufferType) ? - ByteBuffer::allocate : ByteBuffer::allocateDirect; + this.allocate = value -> { + boolean isDirect = !"array".equals(bufferType); + return pool.getBuffer(isDirect, value); + }; + } + + public IntFunction getAllocate() { + return allocate; } @Override @@ -79,12 +92,27 @@ public void setup() throws Exception { createFile(fs, path, true, DATASET); } + @Override + public void teardown() throws Exception { + super.teardown(); + pool.release(); + } + + @Test + public void testVectoredReadCapability() throws Exception { + FileSystem fs = getFileSystem(); + String[] vectoredReadCapability = new String[]{StreamCapabilities.VECTOREDIO}; + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + assertCapabilities(in, vectoredReadCapability, null); + } + } + @Test public void testVectoredReadMultipleRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); for (int i = 0; i < 10; i++) { - FileRange fileRange = new FileRangeImpl(i * 100, 100); + FileRange fileRange = FileRange.createFileRange(i * 100, 100); fileRanges.add(fileRange); } try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { @@ -98,6 +126,7 @@ public void testVectoredReadMultipleRanges() throws Exception { combinedFuture.get(); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -105,7 +134,7 @@ public void testVectoredReadMultipleRanges() throws Exception { public void testVectoredReadAndReadFully() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(100, 100)); + fileRanges.add(FileRange.createFileRange(100, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); byte[] readFullRes = new byte[100]; @@ -114,6 +143,7 @@ public void testVectoredReadAndReadFully() throws Exception { Assertions.assertThat(vecRes) .describedAs("Result from vectored read and readFully must match") .isEqualByComparingTo(ByteBuffer.wrap(readFullRes)); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -125,12 +155,13 @@ public void testVectoredReadAndReadFully() throws Exception { public void testDisjointRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 100)); - fileRanges.add(new FileRangeImpl(4 * 1024 + 101, 100)); - fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100)); + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(4_000 + 101, 100)); + fileRanges.add(FileRange.createFileRange(16_000 + 101, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -142,12 +173,13 @@ public void testDisjointRanges() throws Exception { public void testAllRangesMergedIntoOne() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 100)); - fileRanges.add(new FileRangeImpl(4 *1024 - 101, 100)); - fileRanges.add(new FileRangeImpl(8*1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(4_000 - 101, 100)); + fileRanges.add(FileRange.createFileRange(8_000 - 101, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -159,44 +191,80 @@ public void testAllRangesMergedIntoOne() throws Exception { public void testSomeRangesMergedSomeUnmerged() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(8*1024, 100)); - fileRanges.add(new FileRangeImpl(14*1024, 100)); - fileRanges.add(new FileRangeImpl(10*1024, 100)); - fileRanges.add(new FileRangeImpl(2 *1024 - 101, 100)); - fileRanges.add(new FileRangeImpl(40*1024, 1024)); - try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { - in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges, DATASET); - } - } - - @Test - public void testSameRanges() throws Exception { - FileSystem fs = getFileSystem(); - List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(8*1024, 1000)); - fileRanges.add(new FileRangeImpl(8*1024, 1000)); - fileRanges.add(new FileRangeImpl(8*1024, 1000)); + fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); CompletableFuture builder = fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) .build(); try (FSDataInputStream in = builder.get()) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @Test public void testOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = getSampleOverlappingRanges(); + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testSameRanges() throws Exception { + // Same ranges are special case of overlapping only. + FileSystem fs = getFileSystem(); + List fileRanges = getSampleSameRanges(); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testSomeRandomNonOverlappingRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 1000)); - fileRanges.add(new FileRangeImpl(90, 900)); - fileRanges.add(new FileRangeImpl(50, 900)); - fileRanges.add(new FileRangeImpl(10, 980)); + fileRanges.add(FileRange.createFileRange(500, 100)); + fileRanges.add(FileRange.createFileRange(1000, 200)); + fileRanges.add(FileRange.createFileRange(50, 10)); + fileRanges.add(FileRange.createFileRange(10, 5)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testConsecutiveRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(500, 100)); + fileRanges.add(FileRange.createFileRange(600, 200)); + fileRanges.add(FileRange.createFileRange(800, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -204,7 +272,7 @@ public void testOverlappingRanges() throws Exception { public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); for (FileRange res : fileRanges) { @@ -227,22 +295,22 @@ public void testEOFRanges() throws Exception { public void testNegativeLengthRange() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, -50)); - testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); + fileRanges.add(FileRange.createFileRange(0, -50)); + verifyExceptionalVectoredRead(fs, fileRanges, IllegalArgumentException.class); } @Test public void testNegativeOffsetRange() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(-1, 50)); - testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); + fileRanges.add(FileRange.createFileRange(-1, 50)); + verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); } @Test public void testNormalReadAfterVectoredRead() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges = createSomeOverlappingRanges(); + List fileRanges = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); // read starting 200 bytes @@ -254,13 +322,14 @@ public void testNormalReadAfterVectoredRead() throws Exception { .describedAs("Vectored read shouldn't change file pointer.") .isEqualTo(200); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @Test public void testVectoredReadAfterNormalRead() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges = createSomeOverlappingRanges(); + List fileRanges = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { // read starting 200 bytes byte[] res = new byte[200]; @@ -272,43 +341,66 @@ public void testVectoredReadAfterNormalRead() throws Exception { .isEqualTo(200); in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @Test public void testMultipleVectoredReads() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges1 = createSomeOverlappingRanges(); - List fileRanges2 = createSomeOverlappingRanges(); + List fileRanges1 = createSampleNonOverlappingRanges(); + List fileRanges2 = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges1, allocate); in.readVectored(fileRanges2, allocate); validateVectoredReadResult(fileRanges2, DATASET); validateVectoredReadResult(fileRanges1, DATASET); + returnBuffersToPoolPostRead(fileRanges1, pool); + returnBuffersToPoolPostRead(fileRanges2, pool); } } - protected List createSomeOverlappingRanges() { + protected List createSampleNonOverlappingRanges() { List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 100)); - fileRanges.add(new FileRangeImpl(90, 50)); + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(110, 50)); return fileRanges; } + protected List getSampleSameRanges() { + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + return fileRanges; + } - protected void testExceptionalVectoredRead(FileSystem fs, - List fileRanges, - String s) throws IOException { - boolean exRaised = false; - try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { - // Can we intercept here as done in S3 tests ?? - in.readVectored(fileRanges, allocate); - } catch (EOFException | IllegalArgumentException ex) { - // expected. - exRaised = true; + protected List getSampleOverlappingRanges() { + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(100, 500)); + fileRanges.add(FileRange.createFileRange(400, 500)); + return fileRanges; + } + + /** + * Validate that exceptions must be thrown during a vectored + * read operation with specific input ranges. + * @param fs FileSystem instance. + * @param fileRanges input file ranges. + * @param clazz type of exception expected. + * @throws Exception any other IOE. + */ + protected void verifyExceptionalVectoredRead( + FileSystem fs, + List fileRanges, + Class clazz) throws Exception { + + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .build(); + try (FSDataInputStream in = builder.get()) { + LambdaTestUtils.intercept(clazz, + () -> in.readVectored(fileRanges, allocate)); } - Assertions.assertThat(exRaised) - .describedAs(s) - .isTrue(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 8c071282dc..b61abddd43 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.PathCapabilities; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.util.functional.FutureIO; @@ -1137,6 +1138,25 @@ public static void validateVectoredReadResult(List fileRanges, } } + /** + * Utility to return buffers back to the pool once all + * data has been read for each file range. + * @param fileRanges list of file range. + * @param pool buffer pool. + * @throws IOException any IOE + * @throws TimeoutException ideally this should never occur. + */ + public static void returnBuffersToPoolPostRead(List fileRanges, + ByteBufferPool pool) + throws IOException, TimeoutException { + for (FileRange range : fileRanges) { + ByteBuffer buffer = FutureIO.awaitFuture(range.getData(), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + pool.putBuffer(buffer); + } + } + /** * Assert that the data read matches the dataset at the given offset. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 099e3b946d..5d6ca3f8f0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -18,9 +18,26 @@ package org.apache.hadoop.fs.contract.localfs; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest { @@ -32,4 +49,38 @@ public TestLocalFSContractVectoredRead(String bufferType) { protected AbstractFSContract createContract(Configuration conf) { return new LocalFSContract(conf); } + + @Test + public void testChecksumValidationDuringVectoredRead() throws Exception { + Path testPath = path("big_range_checksum"); + LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); + final byte[] datasetCorrect = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + try (FSDataOutputStream out = localFs.create(testPath, true)){ + out.write(datasetCorrect); + } + Path checksumPath = localFs.getChecksumFile(testPath); + Assertions.assertThat(localFs.exists(checksumPath)) + .describedAs("Checksum file should be present") + .isTrue(); + CompletableFuture fis = localFs.openFile(testPath).build(); + List someRandomRanges = new ArrayList<>(); + someRandomRanges.add(FileRange.createFileRange(10, 1024)); + someRandomRanges.add(FileRange.createFileRange(1025, 1024)); + try (FSDataInputStream in = fis.get()){ + in.readVectored(someRandomRanges, getAllocate()); + validateVectoredReadResult(someRandomRanges, datasetCorrect); + } + final byte[] datasetCorrupted = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); + try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){ + out.write(datasetCorrupted); + } + CompletableFuture fisN = localFs.openFile(testPath).build(); + try (FSDataInputStream in = fisN.get()){ + in.readVectored(someRandomRanges, getAllocate()); + // Expect checksum exception when data is updated directly through + // raw local fs instance. + intercept(ChecksumException.class, + () -> validateVectoredReadResult(someRandomRanges, datasetCorrupted)); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 9d87f26c3c..3069f17289 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -22,11 +22,13 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntFunction; import com.amazonaws.services.s3.model.GetObjectRequest; @@ -46,8 +48,7 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.impl.CombinedFileRange; -import org.apache.hadoop.fs.impl.FutureIOSupport; -import org.apache.hadoop.fs.impl.VectoredReadUtils; +import org.apache.hadoop.fs.VectoredReadUtils; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; @@ -59,9 +60,9 @@ import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isNotEmpty; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.isOrderedDisjoint; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.sliceTo; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortAndMergeRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; +import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; import static org.apache.hadoop.util.StringUtils.toLowerCase; @@ -107,6 +108,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024; + /** + * Atomic boolean variable to stop all ongoing vectored read operation + * for this input stream. This will be set to true when the stream is + * closed or unbuffer is called. + */ + private final AtomicBoolean stopVectoredIOOperations = new AtomicBoolean(false); + /** * This is the public position; the one set in {@link #seek(long)} * and returned in {@link #getPos()}. @@ -589,6 +597,7 @@ public synchronized void close() throws IOException { if (!closed) { closed = true; try { + stopVectoredIOOperations.set(true); // close or abort the stream; blocking awaitFuture(closeStream("close() operation", false, true)); LOG.debug("Statistics of stream {}\n{}", key, streamStatistics); @@ -940,31 +949,32 @@ public void readVectored(List ranges, LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); checkNotClosed(); + if (stopVectoredIOOperations.getAndSet(false)) { + LOG.debug("Reinstating vectored read operation for path {} ", pathStr); + } + List sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges); for (FileRange range : ranges) { validateRangeRequest(range); CompletableFuture result = new CompletableFuture<>(); range.setData(result); } - if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) { + if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) { LOG.debug("Not merging the ranges as they are disjoint"); - for(FileRange range: ranges) { + for (FileRange range: sortedRanges) { ByteBuffer buffer = allocate.apply(range.getLength()); unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); } } else { LOG.debug("Trying to merge the ranges as they are not disjoint"); - List combinedFileRanges = sortAndMergeRanges(ranges, + List combinedFileRanges = mergeSortedRanges(sortedRanges, 1, minSeekForVectorReads(), maxReadSizeForVectorReads()); LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", ranges.size(), combinedFileRanges.size()); - for(CombinedFileRange combinedFileRange: combinedFileRanges) { - CompletableFuture result = new CompletableFuture<>(); - ByteBuffer buffer = allocate.apply(combinedFileRange.getLength()); - combinedFileRange.setData(result); + for (CombinedFileRange combinedFileRange: combinedFileRanges) { unboundedThreadPool.submit( - () -> readCombinedRangeAndUpdateChildren(combinedFileRange, buffer)); + () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate)); } } LOG.debug("Finished submitting vectored read to threadpool" + @@ -972,58 +982,102 @@ public void readVectored(List ranges, } /** - * Read data in the combinedFileRange and update data in buffers - * of all underlying ranges. - * @param combinedFileRange combined range. - * @param buffer combined buffer. + * Read the data from S3 for the bigger combined file range and update all the + * underlying ranges. + * @param combinedFileRange big combined file range. + * @param allocate method to create byte buffers to hold result data. */ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, - ByteBuffer buffer) { - // Not putting read single range call inside try block as - // exception if any occurred during this call will be raised - // during awaitFuture call while getting the combined buffer. - readSingleRange(combinedFileRange, buffer); + IntFunction allocate) { + LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr); + // This reference is must be kept till all buffers are populated as this is a + // finalizable object which closes the internal stream when gc triggers. + S3Object objectRange = null; + S3ObjectInputStream objectContent = null; try { - // In case of single range we return the original byte buffer else - // we return slice byte buffers for each child ranges. - ByteBuffer combinedBuffer = FutureIOSupport.awaitFuture(combinedFileRange.getData()); - if (combinedFileRange.getUnderlying().size() == 1) { - combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer); - } else { - for (FileRange child : combinedFileRange.getUnderlying()) { - updateOriginalRange(child, combinedBuffer, combinedFileRange); - } + checkIfVectoredIOStopped(); + final String operationName = "readCombinedFileRange"; + objectRange = getS3Object(operationName, + combinedFileRange.getOffset(), + combinedFileRange.getLength()); + objectContent = objectRange.getObjectContent(); + if (objectContent == null) { + throw new PathIOException(uri, + "Null IO stream received during " + operationName); } + populateChildBuffers(combinedFileRange, objectContent, allocate); } catch (Exception ex) { - LOG.warn("Exception occurred while reading combined range from file {}", pathStr, ex); + LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex); for(FileRange child : combinedFileRange.getUnderlying()) { child.getData().completeExceptionally(ex); } + } finally { + IOUtils.cleanupWithLogger(LOG, objectRange, objectContent); + } + LOG.debug("Finished reading range {} from path {} ", combinedFileRange, pathStr); + } + + /** + * Populate underlying buffers of the child ranges. + * @param combinedFileRange big combined file range. + * @param objectContent data from s3. + * @param allocate method to allocate child byte buffers. + * @throws IOException any IOE. + */ + private void populateChildBuffers(CombinedFileRange combinedFileRange, + S3ObjectInputStream objectContent, + IntFunction allocate) throws IOException { + // If the combined file range just contains a single child + // range, we only have to fill that one child buffer else + // we drain the intermediate data between consecutive ranges + // and fill the buffers one by one. + if (combinedFileRange.getUnderlying().size() == 1) { + FileRange child = combinedFileRange.getUnderlying().get(0); + ByteBuffer buffer = allocate.apply(child.getLength()); + populateBuffer(child.getLength(), buffer, objectContent); + child.getData().complete(buffer); + } else { + FileRange prev = null; + for (FileRange child : combinedFileRange.getUnderlying()) { + if (prev != null && prev.getOffset() + prev.getLength() < child.getOffset()) { + long drainQuantity = child.getOffset() - prev.getOffset() - prev.getLength(); + drainUnnecessaryData(objectContent, drainQuantity); + } + ByteBuffer buffer = allocate.apply(child.getLength()); + populateBuffer(child.getLength(), buffer, objectContent); + child.getData().complete(buffer); + prev = child; + } } } /** - * Update data in child range from combined range. - * @param child child range. - * @param combinedBuffer combined buffer. - * @param combinedFileRange combined range. + * Drain unnecessary data in between ranges. + * @param objectContent s3 data stream. + * @param drainQuantity how many bytes to drain. + * @throws IOException any IOE. */ - private void updateOriginalRange(FileRange child, - ByteBuffer combinedBuffer, - CombinedFileRange combinedFileRange) { - LOG.trace("Start Filling original range [{}, {}) from combined range [{}, {}) ", - child.getOffset(), child.getLength(), - combinedFileRange.getOffset(), combinedFileRange.getLength()); - ByteBuffer childBuffer = sliceTo(combinedBuffer, combinedFileRange.getOffset(), child); - child.getData().complete(childBuffer); - LOG.trace("End Filling original range [{}, {}) from combined range [{}, {}) ", - child.getOffset(), child.getLength(), - combinedFileRange.getOffset(), combinedFileRange.getLength()); + private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQuantity) + throws IOException { + int drainBytes = 0; + int readCount; + while (drainBytes < drainQuantity) { + if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity) { + byte[] drainBuffer = new byte[DRAIN_BUFFER_SIZE]; + readCount = objectContent.read(drainBuffer); + } else { + byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)]; + readCount = objectContent.read(drainBuffer); + } + drainBytes += readCount; + } + LOG.debug("{} bytes drained from stream ", drainBytes); } /** - * // Check if we can use contentLength returned by http GET request. * Validates range parameters. + * In case of S3 we already have contentLength from the first GET request + * during an open file operation so failing fast here. * @param range requested range. * @throws EOFException end of file exception. */ @@ -1038,13 +1092,7 @@ private void validateRangeRequest(FileRange range) throws EOFException { } /** - * TODO: Add retry in client.getObject(). not present in older reads why here?? - * Okay retry is being done in the top layer during read. - * But if we do here in the top layer, one issue I am thinking is - * what if there is some error which happened during filling the buffer - * If we retry that old offsets of heap buffers can be overwritten ? - * I think retry should be only added in {@link S3AInputStream#getS3Object} - * Read data from S3 for this range and populate the bufffer. + * Read data from S3 for this range and populate the buffer. * @param range range of data to read. * @param buffer buffer to fill. */ @@ -1053,6 +1101,7 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) { S3Object objectRange = null; S3ObjectInputStream objectContent = null; try { + checkIfVectoredIOStopped(); long position = range.getOffset(); int length = range.getLength(); final String operationName = "readRange"; @@ -1089,6 +1138,7 @@ private void populateBuffer(int length, int offset = 0; byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE]; while (readBytes < length) { + checkIfVectoredIOStopped(); int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ? TMP_BUFFER_MAX_SIZE : length - readBytes; @@ -1103,7 +1153,15 @@ private void populateBuffer(int length, } } - public void readByteArray(S3ObjectInputStream objectContent, + /** + * Read data into destination buffer from s3 object content. + * @param objectContent result from S3. + * @param dest destination buffer. + * @param offset start offset of dest buffer. + * @param length number of bytes to fill in dest. + * @throws IOException any IOE. + */ + private void readByteArray(S3ObjectInputStream objectContent, byte[] dest, int offset, int length) throws IOException { @@ -1120,14 +1178,14 @@ public void readByteArray(S3ObjectInputStream objectContent, } /** - * Read data from S3 using a http request. - * This also handles if file has been changed while http call - * is getting executed. If file has been changed RemoteFileChangedException - * is thrown. + * Read data from S3 using a http request with retries. + * This also handles if file has been changed while the + * http call is getting executed. If the file has been + * changed RemoteFileChangedException is thrown. * @param operationName name of the operation for which get object on S3 is called. * @param position position of the object to be read from S3. * @param length length from position of the object to be read from S3. - * @return S3Object + * @return S3Object result s3 object. * @throws IOException exception if any. */ private S3Object getS3Object(String operationName, long position, @@ -1140,7 +1198,11 @@ private S3Object getS3Object(String operationName, long position, Invoker invoker = context.getReadInvoker(); try { objectRange = invoker.retry(operationName, pathStr, true, - () -> client.getObject(request)); + () -> { + checkIfVectoredIOStopped(); + return client.getObject(request); + }); + } catch (IOException ex) { tracker.failed(); throw ex; @@ -1152,6 +1214,19 @@ private S3Object getS3Object(String operationName, long position, return objectRange; } + /** + * Check if vectored io operation has been stooped. This happens + * when the stream is closed or unbuffer is called. + * @throws InterruptedIOException throw InterruptedIOException such + * that all running vectored io is + * terminated thus releasing resources. + */ + private void checkIfVectoredIOStopped() throws InterruptedIOException { + if (stopVectoredIOOperations.get()) { + throw new InterruptedIOException("Stream closed or unbuffer is called"); + } + } + /** * Access the input stream statistics. * This is for internal testing and may be removed without warning. @@ -1237,10 +1312,15 @@ public static long validateReadahead(@Nullable Long readahead) { /** * Closes the underlying S3 stream, and merges the {@link #streamStatistics} * instance associated with the stream. + * Also sets the {@code stopVectoredIOOperations} flag to true such that + * active vectored read operations are terminated. However termination of + * old vectored reads are not guaranteed if a new vectored read operation + * is initiated after unbuffer is called. */ @Override public synchronized void unbuffer() { try { + stopVectoredIOOperations.set(true); closeStream("unbuffer()", false, false); } finally { streamStatistics.unbuffered(); @@ -1253,6 +1333,7 @@ public boolean hasCapability(String capability) { case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.READAHEAD: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.VECTOREDIO: return true; default: return false; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 0752e75d24..18a727dcdc 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.EOFException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; @@ -26,14 +28,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.test.MoreAsserts.assertEqual; public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { @@ -55,8 +58,8 @@ protected AbstractFSContract createContract(Configuration conf) { public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); - testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected"); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); } @Test @@ -99,4 +102,58 @@ public void testMinSeekAndMaxSizeDefaultValues() throws Exception { } } } + + @Test + public void testStopVectoredIoOperationsCloseStream() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = createSampleNonOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ + in.readVectored(fileRanges, getAllocate()); + in.close(); + LambdaTestUtils.intercept(InterruptedIOException.class, + () -> validateVectoredReadResult(fileRanges, DATASET)); + } + // reopening the stream should succeed. + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ + in.readVectored(fileRanges, getAllocate()); + validateVectoredReadResult(fileRanges, DATASET); + } + } + + @Test + public void testStopVectoredIoOperationsUnbuffer() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = createSampleNonOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ + in.readVectored(fileRanges, getAllocate()); + in.unbuffer(); + LambdaTestUtils.intercept(InterruptedIOException.class, + () -> validateVectoredReadResult(fileRanges, DATASET)); + // re-initiating the vectored reads after unbuffer should succeed. + in.readVectored(fileRanges, getAllocate()); + validateVectoredReadResult(fileRanges, DATASET); + } + + } + + /** + * S3 vectored IO doesn't support overlapping ranges. + */ + @Override + public void testOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = getSampleOverlappingRanges(); + verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); + } + + /** + * S3 vectored IO doesn't support overlapping ranges. + */ + @Override + public void testSameRanges() throws Exception { + // Same ranges are special case of overlapping only. + FileSystem fs = getFileSystem(); + List fileRanges = getSampleSameRanges(); + verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 956e23a3f1..f8d47011de 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -41,7 +41,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -459,13 +458,13 @@ public void test_040_PositionedReadHugeFile() throws Throwable { public void test_045_vectoredIOHugeFile() throws Throwable { assumeHugeFileExists(); List rangeList = new ArrayList<>(); - rangeList.add(new FileRangeImpl(5856368, 1167716)); - rangeList.add(new FileRangeImpl(3520861, 1167700)); - rangeList.add(new FileRangeImpl(8191913, 1167775)); - rangeList.add(new FileRangeImpl(1520861, 1167700)); - rangeList.add(new FileRangeImpl(2520861, 116770)); - rangeList.add(new FileRangeImpl(9191913, 116770)); - rangeList.add(new FileRangeImpl(2820861, 156770)); + rangeList.add(FileRange.createFileRange(5856368, 116770)); + rangeList.add(FileRange.createFileRange(3520861, 116770)); + rangeList.add(FileRange.createFileRange(8191913, 116770)); + rangeList.add(FileRange.createFileRange(1520861, 116770)); + rangeList.add(FileRange.createFileRange(2520861, 116770)); + rangeList.add(FileRange.createFileRange(9191913, 116770)); + rangeList.add(FileRange.createFileRange(2820861, 156770)); IntFunction allocate = ByteBuffer::allocate; FileSystem fs = getFileSystem(); CompletableFuture builder = diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java index aaee951d72..631842f78e 100644 --- a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java +++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java @@ -47,7 +47,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -107,7 +107,7 @@ public void asyncRead(FileSystemChoice fsChoice, FSDataInputStream stream = fsChoice.fs.open(DATA_PATH); List ranges = new ArrayList<>(); for(int m=0; m < 100; ++m) { - FileRangeImpl range = new FileRangeImpl(m * SEEK_SIZE, READ_SIZE); + FileRange range = FileRange.createFileRange(m * SEEK_SIZE, READ_SIZE); ranges.add(range); } stream.readVectored(ranges, bufferChoice.allocate);