diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index e612713cfe..b23df1713c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -39,7 +39,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; -import org.apache.hadoop.fs.impl.VectoredReadUtils; import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.impl.OpenFileParameters; @@ -55,6 +54,7 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; /**************************************************************** * Abstract Checksumed FileSystem. @@ -166,7 +166,7 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) { * It verifies that data matches checksums. *******************************************************/ private static class ChecksumFSInputChecker extends FSInputChecker implements - IOStatisticsSource { + IOStatisticsSource, StreamCapabilities { private ChecksumFileSystem fs; private FSDataInputStream datas; private FSDataInputStream sums; @@ -408,7 +408,7 @@ public void readVectored(List ranges, int minSeek = minSeekForVectorReads(); int maxSize = maxReadSizeForVectorReads(); List dataRanges = - VectoredReadUtils.sortAndMergeRanges(ranges, bytesPerSum, + VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum, minSeek, maxReadSizeForVectorReads()); List checksumRanges = findChecksumRanges(dataRanges, bytesPerSum, minSeek, maxSize); @@ -435,6 +435,11 @@ public void readVectored(List ranges, } } } + + @Override + public boolean hasCapability(String capability) { + return datas.hasCapability(capability); + } } private static class FSDataBoundedInputStream extends FSDataInputStream { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java index 7388e462cc..e55696e965 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java @@ -20,6 +20,8 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.fs.impl.FileRangeImpl; + /** * A byte range of a file. * This is used for the asynchronous gather read API of @@ -52,4 +54,14 @@ public interface FileRange { * @param data the future of the ByteBuffer that will have the data */ void setData(CompletableFuture data); + + /** + * Factory method to create a FileRange object. + * @param offset starting offset of the range. + * @param length length of the range. + * @return a new instance of FileRangeImpl. + */ + static FileRange createFileRange(long offset, int length) { + return new FileRangeImpl(offset, length); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index 7e543ebf22..de76090512 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.impl.VectoredReadUtils; /** * Stream that permits positional reading. @@ -121,7 +120,6 @@ default int maxReadSizeForVectorReads() { */ default void readVectored(List ranges, IntFunction allocate) throws IOException { - VectoredReadUtils.readVectored(this, ranges, allocate, minSeekForVectorReads(), - maxReadSizeForVectorReads()); + VectoredReadUtils.readVectored(this, ranges, allocate); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index c71692e67e..e3baf8cd5b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -20,7 +20,6 @@ package org.apache.hadoop.fs; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.fs.impl.VectoredReadUtils; import java.io.BufferedOutputStream; import java.io.DataOutput; @@ -68,6 +67,7 @@ import org.apache.hadoop.util.StringUtils; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS; @@ -278,6 +278,7 @@ public boolean hasCapability(String capability) { // new capabilities. switch (capability.toLowerCase(Locale.ENGLISH)) { case StreamCapabilities.IOSTATISTICS: + case StreamCapabilities.VECTOREDIO: return true; default: return false; @@ -303,23 +304,24 @@ AsynchronousFileChannel getAsyncChannel() throws IOException { public void readVectored(List ranges, IntFunction allocate) throws IOException { + List sortedRanges = Arrays.asList(sortRanges(ranges)); // Set up all of the futures, so that we can use them if things fail - for(FileRange range: ranges) { + for(FileRange range: sortedRanges) { VectoredReadUtils.validateRangeRequest(range); range.setData(new CompletableFuture<>()); } try { AsynchronousFileChannel channel = getAsyncChannel(); - ByteBuffer[] buffers = new ByteBuffer[ranges.size()]; - AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers); - for(int i = 0; i < ranges.size(); ++i) { - FileRange range = ranges.get(i); + ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()]; + AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers); + for(int i = 0; i < sortedRanges.size(); ++i) { + FileRange range = sortedRanges.get(i); buffers[i] = allocate.apply(range.getLength()); channel.read(buffers[i], range.getOffset(), i, asyncHandler); } } catch (IOException ioe) { LOG.debug("Exception occurred during vectored read ", ioe); - for(FileRange range: ranges) { + for(FileRange range: sortedRanges) { range.getData().completeExceptionally(ioe); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 8611780195..d68ef505dc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -80,6 +80,12 @@ public interface StreamCapabilities { */ String IOSTATISTICS = "iostatistics"; + /** + * Support for vectored IO api. + * See {@code PositionedReadable#readVectored(List, IntFunction)}. + */ + String VECTOREDIO = "readvectored"; + /** * Stream abort() capability implemented by {@link Abortable#abort()}. * This matches the Path Capability diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java similarity index 79% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index 9a16e6841d..64107f1a18 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.impl; +package org.apache.hadoop.fs; import java.io.EOFException; import java.io.IOException; @@ -28,9 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.IntFunction; -import org.apache.hadoop.fs.ByteBufferPositionedReadable; -import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.util.Preconditions; /** @@ -68,35 +66,19 @@ public static void validateVectoredReadRanges(List ranges) /** - * Read fully a list of file ranges asynchronously from this file. - * The default iterates through the ranges to read each synchronously, but - * the intent is that subclasses can make more efficient readers. + * This is the default implementation which iterates through the ranges + * to read each synchronously, but the intent is that subclasses + * can make more efficient readers. * The data or exceptions are pushed into {@link FileRange#getData()}. * @param stream the stream to read the data from * @param ranges the byte ranges to read * @param allocate the byte buffer allocation - * @param minimumSeek the minimum number of bytes to seek over - * @param maximumRead the largest number of bytes to combine into a single read */ public static void readVectored(PositionedReadable stream, List ranges, - IntFunction allocate, - int minimumSeek, - int maximumRead) { - if (isOrderedDisjoint(ranges, 1, minimumSeek)) { - for(FileRange range: ranges) { - range.setData(readRangeFrom(stream, range, allocate)); - } - } else { - for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek, - maximumRead)) { - CompletableFuture read = - readRangeFrom(stream, range, allocate); - for(FileRange child: range.getUnderlying()) { - child.setData(read.thenApply( - (b) -> sliceTo(b, range.getOffset(), child))); - } - } + IntFunction allocate) { + for (FileRange range: ranges) { + range.setData(readRangeFrom(stream, range, allocate)); } } @@ -166,7 +148,7 @@ public static boolean isOrderedDisjoint(List input, int chunkSize, int minimumSeek) { long previous = -minimumSeek; - for(FileRange range: input) { + for (FileRange range: input) { long offset = range.getOffset(); long end = range.getOffset() + range.getLength(); if (offset % chunkSize != 0 || @@ -209,7 +191,42 @@ public static long roundUp(long offset, int chunkSize) { } /** - * Sort and merge ranges to optimize the access from the underlying file + * Check if the input ranges are overlapping in nature. + * We call two ranges to be overlapping when start offset + * of second is less than the end offset of first. + * End offset is calculated as start offset + length. + * @param input list if input ranges. + * @return true/false based on logic explained above. + */ + public static List validateNonOverlappingAndReturnSortedRanges( + List input) { + + if (input.size() <= 1) { + return input; + } + FileRange[] sortedRanges = sortRanges(input); + FileRange prev = sortedRanges[0]; + for (int i=1; i input) { + FileRange[] sortedRanges = input.toArray(new FileRange[0]); + Arrays.sort(sortedRanges, Comparator.comparingLong(FileRange::getOffset)); + return sortedRanges; + } + + /** + * Merge sorted ranges to optimize the access from the underlying file * system. * The motivations are that: *
    @@ -219,24 +236,22 @@ public static long roundUp(long offset, int chunkSize) { *
  • Some file systems want to round ranges to be at checksum boundaries.
  • *
* - * @param input the list of input ranges + * @param sortedRanges already sorted list of ranges based on offset. * @param chunkSize round the start and end points to multiples of chunkSize * @param minimumSeek the smallest gap that we should seek over in bytes * @param maxSize the largest combined file range in bytes * @return the list of sorted CombinedFileRanges that cover the input */ - public static List sortAndMergeRanges(List input, - int chunkSize, - int minimumSeek, - int maxSize) { - // sort the ranges by offset - FileRange[] ranges = input.toArray(new FileRange[0]); - Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset)); + public static List mergeSortedRanges(List sortedRanges, + int chunkSize, + int minimumSeek, + int maxSize) { + CombinedFileRange current = null; - List result = new ArrayList<>(ranges.length); + List result = new ArrayList<>(sortedRanges.size()); // now merge together the ones that merge - for(FileRange range: ranges) { + for (FileRange range: sortedRanges) { long start = roundDown(range.getOffset(), chunkSize); long end = roundUp(range.getOffset() + range.getLength(), chunkSize); if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java index 828a50b4f7..516bbb2c70 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.impl; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; import java.util.ArrayList; import java.util.List; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java similarity index 85% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java index ef5851154b..041e5f0a8d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java @@ -15,15 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.fs; +package org.apache.hadoop.fs.impl; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileRange; + /** * A range of bytes from a file with an optional buffer to read those bytes - * for zero copy. + * for zero copy. This shouldn't be created directly via constructor rather + * factory defined in {@code FileRange#createFileRange} should be used. */ +@InterfaceAudience.Private public class FileRangeImpl implements FileRange { private long offset; private int length; diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index e4a2830967..197b999c81 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -449,7 +449,14 @@ Read fully data for a list of ranges asynchronously. The default implementation iterates through the ranges, tries to coalesce the ranges based on values of `minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged ranges synchronously, but the intent is sub classes can implement efficient -implementation. +implementation. Reading in both direct and heap byte buffers are supported. +Also, clients are encouraged to use `WeakReferencedElasticByteBufferPool` for +allocating buffers such that even direct buffers are garbage collected when +they are no longer referenced. + +Note: Don't use direct buffers for reading from ChecksumFileSystem as that may +lead to memory fragmentation explained in HADOOP-18296. + #### Preconditions @@ -467,7 +474,7 @@ For each requested range: ### `minSeekForVectorReads()` -Smallest reasonable seek. Two ranges won't be merged together if the difference between +The smallest reasonable seek. Two ranges won't be merged together if the difference between end of first and start of next range is more than this value. ### `maxReadSizeForVectorReads()` diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java similarity index 74% rename from hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java index cfd366701b..5d08b02e11 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.impl; +package org.apache.hadoop.fs; import java.io.IOException; import java.nio.ByteBuffer; @@ -31,12 +31,10 @@ import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import org.apache.hadoop.fs.ByteBufferPositionedReadable; -import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; -import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.test.HadoopTestBase; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully; import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally; @@ -56,7 +54,7 @@ public void testSliceTo() { } // ensure we don't make unnecessary slices ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100, - new FileRangeImpl(100, size)); + FileRange.createFileRange(100, size)); Assertions.assertThat(buffer) .describedAs("Slicing on the same offset shouldn't " + "create a new buffer") @@ -67,7 +65,7 @@ public void testSliceTo() { final int sliceStart = 1024; final int sliceLength = 16 * 1024; slice = VectoredReadUtils.sliceTo(buffer, offset, - new FileRangeImpl(offset + sliceStart, sliceLength)); + FileRange.createFileRange(offset + sliceStart, sliceLength)); // make sure they aren't the same, but use the same backing data Assertions.assertThat(buffer) .describedAs("Slicing on new offset should " + @@ -96,12 +94,12 @@ public void testRounding() { @Test public void testMerge() { - FileRange base = new FileRangeImpl(2000, 1000); + FileRange base = FileRange.createFileRange(2000, 1000); CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); // test when the gap between is too big assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000, - new FileRangeImpl(5000, 1000), 2000, 4000)); + FileRange.createFileRange(5000, 1000), 2000, 4000)); assertEquals("Number of ranges in merged range shouldn't increase", 1, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 2000, mergeBase.getOffset()); @@ -109,7 +107,7 @@ public void testMerge() { // test when the total size gets exceeded assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000, - new FileRangeImpl(5000, 1000), 2001, 3999)); + FileRange.createFileRange(5000, 1000), 2001, 3999)); assertEquals("Number of ranges in merged range shouldn't increase", 1, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 2000, mergeBase.getOffset()); @@ -117,7 +115,7 @@ public void testMerge() { // test when the merge works assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000, - new FileRangeImpl(5000, 1000), 2001, 4000)); + FileRange.createFileRange(5000, 1000), 2001, 4000)); assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 2000, mergeBase.getOffset()); assertEquals("post merge length", 4000, mergeBase.getLength()); @@ -127,7 +125,7 @@ public void testMerge() { assertEquals(200, mergeBase.getOffset()); assertEquals(100, mergeBase.getLength()); assertTrue("ranges should get merged ", mergeBase.merge(500, 600, - new FileRangeImpl(5000, 1000), 201, 400)); + FileRange.createFileRange(5000, 1000), 201, 400)); assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 200, mergeBase.getOffset()); assertEquals("post merge length", 400, mergeBase.getLength()); @@ -136,42 +134,58 @@ public void testMerge() { @Test public void testSortAndMerge() { List input = Arrays.asList( - new FileRangeImpl(3000, 100), - new FileRangeImpl(2100, 100), - new FileRangeImpl(1000, 100) + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100), + FileRange.createFileRange(1000, 100) ); assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - List outputList = VectoredReadUtils.sortAndMergeRanges( - input, 100, 1001, 2500); - assertEquals("merged range size", 1, outputList.size()); + List outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 100, 1001, 2500); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); CombinedFileRange output = outputList.get(0); - assertEquals("merged range underlying size", 3, output.getUnderlying().size()); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(3); assertEquals("range[1000,3100)", output.toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); // the minSeek doesn't allow the first two to merge - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); - outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1000, 2100); - assertEquals("merged range size", 2, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + 100, 1000, 2100); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(2); assertEquals("range[1000,1100)", outputList.get(0).toString()); assertEquals("range[2100,3100)", outputList.get(1).toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000)); // the maxSize doesn't allow the third range to merge - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1001, 2099); - assertEquals("merged range size", 2, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + 100, 1001, 2099); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(2); assertEquals("range[1000,2200)", outputList.get(0).toString()); assertEquals("range[3000,3100)", outputList.get(1).toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); // test the round up and round down (the maxSize doesn't allow any merges) - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); - outputList = VectoredReadUtils.sortAndMergeRanges(input, 16, 1001, 100); - assertEquals("merged range size", 3, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + 16, 1001, 100); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(3); assertEquals("range[992,1104)", outputList.get(0).toString()); assertEquals("range[2096,2208)", outputList.get(1).toString()); assertEquals("range[2992,3104)", outputList.get(2).toString()); @@ -182,26 +196,35 @@ public void testSortAndMerge() { @Test public void testSortAndMergeMoreCases() throws Exception { List input = Arrays.asList( - new FileRangeImpl(3000, 110), - new FileRangeImpl(3000, 100), - new FileRangeImpl(2100, 100), - new FileRangeImpl(1000, 100) + FileRange.createFileRange(3000, 110), + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100), + FileRange.createFileRange(1000, 100) ); - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - List outputList = VectoredReadUtils.sortAndMergeRanges( - input, 1, 1001, 2500); - assertEquals("merged range size", 1, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + List outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 1, 1001, 2500); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); CombinedFileRange output = outputList.get(0); - assertEquals("merged range underlying size", 4, output.getUnderlying().size()); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(4); assertEquals("range[1000,3110)", output.toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); - outputList = VectoredReadUtils.sortAndMergeRanges( - input, 100, 1001, 2500); - assertEquals("merged range size", 1, outputList.size()); + outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 100, 1001, 2500); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); output = outputList.get(0); - assertEquals("merged range underlying size", 4, output.getUnderlying().size()); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(4); assertEquals("range[1000,3200)", output.toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); @@ -211,9 +234,9 @@ public void testSortAndMergeMoreCases() throws Exception { @Test public void testMaxSizeZeroDisablesMering() throws Exception { List randomRanges = Arrays.asList( - new FileRangeImpl(3000, 110), - new FileRangeImpl(3000, 100), - new FileRangeImpl(2100, 100) + FileRange.createFileRange(3000, 110), + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100) ); assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0); assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0); @@ -225,7 +248,7 @@ private void assertEqualRangeCountsAfterMerging(List inputRanges, int minimumSeek, int maxSize) { List combinedFileRanges = VectoredReadUtils - .sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize); + .mergeSortedRanges(inputRanges, chunkSize, minimumSeek, maxSize); Assertions.assertThat(combinedFileRanges) .describedAs("Mismatch in number of ranges post merging") .hasSize(inputRanges.size()); @@ -251,7 +274,7 @@ public void testReadRangeFromByteBufferPositionedReadable() throws Exception { }).when(stream).readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); CompletableFuture result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), ByteBuffer::allocate); assertFutureCompletedSuccessfully(result); ByteBuffer buffer = result.get(); @@ -267,7 +290,7 @@ public void testReadRangeFromByteBufferPositionedReadable() throws Exception { .when(stream).readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), ByteBuffer::allocate); assertFutureFailedExceptionally(result); } @@ -286,7 +309,7 @@ static void runReadRangeFromPositionedReadable(IntFunction allocate) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()); CompletableFuture result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), allocate); assertFutureCompletedSuccessfully(result); ByteBuffer buffer = result.get(); @@ -303,7 +326,7 @@ static void runReadRangeFromPositionedReadable(IntFunction allocate) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()); result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), ByteBuffer::allocate); assertFutureFailedExceptionally(result); } @@ -328,9 +351,9 @@ static void validateBuffer(String message, ByteBuffer buffer, int start) { @Test public void testReadVectored() throws Exception { - List input = Arrays.asList(new FileRangeImpl(0, 100), - new FileRangeImpl(100_000, 100), - new FileRangeImpl(200_000, 100)); + List input = Arrays.asList(FileRange.createFileRange(0, 100), + FileRange.createFileRange(100_000, 100), + FileRange.createFileRange(200_000, 100)); Stream stream = Mockito.mock(Stream.class); Mockito.doAnswer(invocation -> { fillBuffer(invocation.getArgument(1)); @@ -338,31 +361,11 @@ public void testReadVectored() throws Exception { }).when(stream).readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); // should not merge the ranges - VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100, 100); + VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate); Mockito.verify(stream, Mockito.times(3)) .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); for(int b=0; b < input.size(); ++b) { validateBuffer("buffer " + b, input.get(b).getData().get(), 0); } } - - @Test - public void testReadVectoredMerge() throws Exception { - List input = Arrays.asList(new FileRangeImpl(2000, 100), - new FileRangeImpl(1000, 100), - new FileRangeImpl(0, 100)); - Stream stream = Mockito.mock(Stream.class); - Mockito.doAnswer(invocation -> { - fillBuffer(invocation.getArgument(1)); - return null; - }).when(stream).readFully(ArgumentMatchers.anyLong(), - ArgumentMatchers.any(ByteBuffer.class)); - // should merge the ranges into a single read - VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 1000, 2100); - Mockito.verify(stream, Mockito.times(1)) - .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); - for(int b=0; b < input.size(); ++b) { - validateBuffer("buffer " + b, input.get(b).getData().get(), (2 - b) * 1000); - } - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index e8c86b5dbb..77bcc496ff 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.contract; import java.io.EOFException; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -38,13 +37,18 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.impl.FutureIOSupport; +import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; +import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; @RunWith(Parameterized.class) @@ -53,11 +57,14 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class); public static final int DATASET_LEN = 64 * 1024; - private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; private final IntFunction allocate; + private final WeakReferencedElasticByteBufferPool pool = + new WeakReferencedElasticByteBufferPool(); + private final String bufferType; @Parameterized.Parameters(name = "Buffer type : {0}") @@ -67,8 +74,14 @@ public static List params() { public AbstractContractVectoredReadTest(String bufferType) { this.bufferType = bufferType; - this.allocate = "array".equals(bufferType) ? - ByteBuffer::allocate : ByteBuffer::allocateDirect; + this.allocate = value -> { + boolean isDirect = !"array".equals(bufferType); + return pool.getBuffer(isDirect, value); + }; + } + + public IntFunction getAllocate() { + return allocate; } @Override @@ -79,12 +92,27 @@ public void setup() throws Exception { createFile(fs, path, true, DATASET); } + @Override + public void teardown() throws Exception { + super.teardown(); + pool.release(); + } + + @Test + public void testVectoredReadCapability() throws Exception { + FileSystem fs = getFileSystem(); + String[] vectoredReadCapability = new String[]{StreamCapabilities.VECTOREDIO}; + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + assertCapabilities(in, vectoredReadCapability, null); + } + } + @Test public void testVectoredReadMultipleRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); for (int i = 0; i < 10; i++) { - FileRange fileRange = new FileRangeImpl(i * 100, 100); + FileRange fileRange = FileRange.createFileRange(i * 100, 100); fileRanges.add(fileRange); } try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { @@ -98,6 +126,7 @@ public void testVectoredReadMultipleRanges() throws Exception { combinedFuture.get(); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -105,7 +134,7 @@ public void testVectoredReadMultipleRanges() throws Exception { public void testVectoredReadAndReadFully() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(100, 100)); + fileRanges.add(FileRange.createFileRange(100, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); byte[] readFullRes = new byte[100]; @@ -114,6 +143,7 @@ public void testVectoredReadAndReadFully() throws Exception { Assertions.assertThat(vecRes) .describedAs("Result from vectored read and readFully must match") .isEqualByComparingTo(ByteBuffer.wrap(readFullRes)); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -125,12 +155,13 @@ public void testVectoredReadAndReadFully() throws Exception { public void testDisjointRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 100)); - fileRanges.add(new FileRangeImpl(4 * 1024 + 101, 100)); - fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100)); + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(4_000 + 101, 100)); + fileRanges.add(FileRange.createFileRange(16_000 + 101, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -142,12 +173,13 @@ public void testDisjointRanges() throws Exception { public void testAllRangesMergedIntoOne() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 100)); - fileRanges.add(new FileRangeImpl(4 *1024 - 101, 100)); - fileRanges.add(new FileRangeImpl(8*1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(4_000 - 101, 100)); + fileRanges.add(FileRange.createFileRange(8_000 - 101, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -159,44 +191,80 @@ public void testAllRangesMergedIntoOne() throws Exception { public void testSomeRangesMergedSomeUnmerged() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(8*1024, 100)); - fileRanges.add(new FileRangeImpl(14*1024, 100)); - fileRanges.add(new FileRangeImpl(10*1024, 100)); - fileRanges.add(new FileRangeImpl(2 *1024 - 101, 100)); - fileRanges.add(new FileRangeImpl(40*1024, 1024)); - try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { - in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges, DATASET); - } - } - - @Test - public void testSameRanges() throws Exception { - FileSystem fs = getFileSystem(); - List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(8*1024, 1000)); - fileRanges.add(new FileRangeImpl(8*1024, 1000)); - fileRanges.add(new FileRangeImpl(8*1024, 1000)); + fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); CompletableFuture builder = fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) .build(); try (FSDataInputStream in = builder.get()) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @Test public void testOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = getSampleOverlappingRanges(); + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testSameRanges() throws Exception { + // Same ranges are special case of overlapping only. + FileSystem fs = getFileSystem(); + List fileRanges = getSampleSameRanges(); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testSomeRandomNonOverlappingRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 1000)); - fileRanges.add(new FileRangeImpl(90, 900)); - fileRanges.add(new FileRangeImpl(50, 900)); - fileRanges.add(new FileRangeImpl(10, 980)); + fileRanges.add(FileRange.createFileRange(500, 100)); + fileRanges.add(FileRange.createFileRange(1000, 200)); + fileRanges.add(FileRange.createFileRange(50, 10)); + fileRanges.add(FileRange.createFileRange(10, 5)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testConsecutiveRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(500, 100)); + fileRanges.add(FileRange.createFileRange(600, 200)); + fileRanges.add(FileRange.createFileRange(800, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -204,7 +272,7 @@ public void testOverlappingRanges() throws Exception { public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); for (FileRange res : fileRanges) { @@ -227,22 +295,22 @@ public void testEOFRanges() throws Exception { public void testNegativeLengthRange() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, -50)); - testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); + fileRanges.add(FileRange.createFileRange(0, -50)); + verifyExceptionalVectoredRead(fs, fileRanges, IllegalArgumentException.class); } @Test public void testNegativeOffsetRange() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(-1, 50)); - testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); + fileRanges.add(FileRange.createFileRange(-1, 50)); + verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); } @Test public void testNormalReadAfterVectoredRead() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges = createSomeOverlappingRanges(); + List fileRanges = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); // read starting 200 bytes @@ -254,13 +322,14 @@ public void testNormalReadAfterVectoredRead() throws Exception { .describedAs("Vectored read shouldn't change file pointer.") .isEqualTo(200); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @Test public void testVectoredReadAfterNormalRead() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges = createSomeOverlappingRanges(); + List fileRanges = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { // read starting 200 bytes byte[] res = new byte[200]; @@ -272,43 +341,66 @@ public void testVectoredReadAfterNormalRead() throws Exception { .isEqualTo(200); in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @Test public void testMultipleVectoredReads() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges1 = createSomeOverlappingRanges(); - List fileRanges2 = createSomeOverlappingRanges(); + List fileRanges1 = createSampleNonOverlappingRanges(); + List fileRanges2 = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges1, allocate); in.readVectored(fileRanges2, allocate); validateVectoredReadResult(fileRanges2, DATASET); validateVectoredReadResult(fileRanges1, DATASET); + returnBuffersToPoolPostRead(fileRanges1, pool); + returnBuffersToPoolPostRead(fileRanges2, pool); } } - protected List createSomeOverlappingRanges() { + protected List createSampleNonOverlappingRanges() { List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 100)); - fileRanges.add(new FileRangeImpl(90, 50)); + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(110, 50)); return fileRanges; } + protected List getSampleSameRanges() { + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + return fileRanges; + } - protected void testExceptionalVectoredRead(FileSystem fs, - List fileRanges, - String s) throws IOException { - boolean exRaised = false; - try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { - // Can we intercept here as done in S3 tests ?? - in.readVectored(fileRanges, allocate); - } catch (EOFException | IllegalArgumentException ex) { - // expected. - exRaised = true; + protected List getSampleOverlappingRanges() { + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(100, 500)); + fileRanges.add(FileRange.createFileRange(400, 500)); + return fileRanges; + } + + /** + * Validate that exceptions must be thrown during a vectored + * read operation with specific input ranges. + * @param fs FileSystem instance. + * @param fileRanges input file ranges. + * @param clazz type of exception expected. + * @throws Exception any other IOE. + */ + protected void verifyExceptionalVectoredRead( + FileSystem fs, + List fileRanges, + Class clazz) throws Exception { + + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .build(); + try (FSDataInputStream in = builder.get()) { + LambdaTestUtils.intercept(clazz, + () -> in.readVectored(fileRanges, allocate)); } - Assertions.assertThat(exRaised) - .describedAs(s) - .isTrue(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 8c071282dc..b61abddd43 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.PathCapabilities; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.util.functional.FutureIO; @@ -1137,6 +1138,25 @@ public static void validateVectoredReadResult(List fileRanges, } } + /** + * Utility to return buffers back to the pool once all + * data has been read for each file range. + * @param fileRanges list of file range. + * @param pool buffer pool. + * @throws IOException any IOE + * @throws TimeoutException ideally this should never occur. + */ + public static void returnBuffersToPoolPostRead(List fileRanges, + ByteBufferPool pool) + throws IOException, TimeoutException { + for (FileRange range : fileRanges) { + ByteBuffer buffer = FutureIO.awaitFuture(range.getData(), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + pool.putBuffer(buffer); + } + } + /** * Assert that the data read matches the dataset at the given offset. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 099e3b946d..5d6ca3f8f0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -18,9 +18,26 @@ package org.apache.hadoop.fs.contract.localfs; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest { @@ -32,4 +49,38 @@ public TestLocalFSContractVectoredRead(String bufferType) { protected AbstractFSContract createContract(Configuration conf) { return new LocalFSContract(conf); } + + @Test + public void testChecksumValidationDuringVectoredRead() throws Exception { + Path testPath = path("big_range_checksum"); + LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); + final byte[] datasetCorrect = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + try (FSDataOutputStream out = localFs.create(testPath, true)){ + out.write(datasetCorrect); + } + Path checksumPath = localFs.getChecksumFile(testPath); + Assertions.assertThat(localFs.exists(checksumPath)) + .describedAs("Checksum file should be present") + .isTrue(); + CompletableFuture fis = localFs.openFile(testPath).build(); + List someRandomRanges = new ArrayList<>(); + someRandomRanges.add(FileRange.createFileRange(10, 1024)); + someRandomRanges.add(FileRange.createFileRange(1025, 1024)); + try (FSDataInputStream in = fis.get()){ + in.readVectored(someRandomRanges, getAllocate()); + validateVectoredReadResult(someRandomRanges, datasetCorrect); + } + final byte[] datasetCorrupted = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); + try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){ + out.write(datasetCorrupted); + } + CompletableFuture fisN = localFs.openFile(testPath).build(); + try (FSDataInputStream in = fisN.get()){ + in.readVectored(someRandomRanges, getAllocate()); + // Expect checksum exception when data is updated directly through + // raw local fs instance. + intercept(ChecksumException.class, + () -> validateVectoredReadResult(someRandomRanges, datasetCorrupted)); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 9d87f26c3c..3069f17289 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -22,11 +22,13 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntFunction; import com.amazonaws.services.s3.model.GetObjectRequest; @@ -46,8 +48,7 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.impl.CombinedFileRange; -import org.apache.hadoop.fs.impl.FutureIOSupport; -import org.apache.hadoop.fs.impl.VectoredReadUtils; +import org.apache.hadoop.fs.VectoredReadUtils; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; @@ -59,9 +60,9 @@ import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isNotEmpty; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.isOrderedDisjoint; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.sliceTo; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortAndMergeRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; +import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; import static org.apache.hadoop.util.StringUtils.toLowerCase; @@ -107,6 +108,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024; + /** + * Atomic boolean variable to stop all ongoing vectored read operation + * for this input stream. This will be set to true when the stream is + * closed or unbuffer is called. + */ + private final AtomicBoolean stopVectoredIOOperations = new AtomicBoolean(false); + /** * This is the public position; the one set in {@link #seek(long)} * and returned in {@link #getPos()}. @@ -589,6 +597,7 @@ public synchronized void close() throws IOException { if (!closed) { closed = true; try { + stopVectoredIOOperations.set(true); // close or abort the stream; blocking awaitFuture(closeStream("close() operation", false, true)); LOG.debug("Statistics of stream {}\n{}", key, streamStatistics); @@ -940,31 +949,32 @@ public void readVectored(List ranges, LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); checkNotClosed(); + if (stopVectoredIOOperations.getAndSet(false)) { + LOG.debug("Reinstating vectored read operation for path {} ", pathStr); + } + List sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges); for (FileRange range : ranges) { validateRangeRequest(range); CompletableFuture result = new CompletableFuture<>(); range.setData(result); } - if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) { + if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) { LOG.debug("Not merging the ranges as they are disjoint"); - for(FileRange range: ranges) { + for (FileRange range: sortedRanges) { ByteBuffer buffer = allocate.apply(range.getLength()); unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); } } else { LOG.debug("Trying to merge the ranges as they are not disjoint"); - List combinedFileRanges = sortAndMergeRanges(ranges, + List combinedFileRanges = mergeSortedRanges(sortedRanges, 1, minSeekForVectorReads(), maxReadSizeForVectorReads()); LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", ranges.size(), combinedFileRanges.size()); - for(CombinedFileRange combinedFileRange: combinedFileRanges) { - CompletableFuture result = new CompletableFuture<>(); - ByteBuffer buffer = allocate.apply(combinedFileRange.getLength()); - combinedFileRange.setData(result); + for (CombinedFileRange combinedFileRange: combinedFileRanges) { unboundedThreadPool.submit( - () -> readCombinedRangeAndUpdateChildren(combinedFileRange, buffer)); + () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate)); } } LOG.debug("Finished submitting vectored read to threadpool" + @@ -972,58 +982,102 @@ public void readVectored(List ranges, } /** - * Read data in the combinedFileRange and update data in buffers - * of all underlying ranges. - * @param combinedFileRange combined range. - * @param buffer combined buffer. + * Read the data from S3 for the bigger combined file range and update all the + * underlying ranges. + * @param combinedFileRange big combined file range. + * @param allocate method to create byte buffers to hold result data. */ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, - ByteBuffer buffer) { - // Not putting read single range call inside try block as - // exception if any occurred during this call will be raised - // during awaitFuture call while getting the combined buffer. - readSingleRange(combinedFileRange, buffer); + IntFunction allocate) { + LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr); + // This reference is must be kept till all buffers are populated as this is a + // finalizable object which closes the internal stream when gc triggers. + S3Object objectRange = null; + S3ObjectInputStream objectContent = null; try { - // In case of single range we return the original byte buffer else - // we return slice byte buffers for each child ranges. - ByteBuffer combinedBuffer = FutureIOSupport.awaitFuture(combinedFileRange.getData()); - if (combinedFileRange.getUnderlying().size() == 1) { - combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer); - } else { - for (FileRange child : combinedFileRange.getUnderlying()) { - updateOriginalRange(child, combinedBuffer, combinedFileRange); - } + checkIfVectoredIOStopped(); + final String operationName = "readCombinedFileRange"; + objectRange = getS3Object(operationName, + combinedFileRange.getOffset(), + combinedFileRange.getLength()); + objectContent = objectRange.getObjectContent(); + if (objectContent == null) { + throw new PathIOException(uri, + "Null IO stream received during " + operationName); } + populateChildBuffers(combinedFileRange, objectContent, allocate); } catch (Exception ex) { - LOG.warn("Exception occurred while reading combined range from file {}", pathStr, ex); + LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex); for(FileRange child : combinedFileRange.getUnderlying()) { child.getData().completeExceptionally(ex); } + } finally { + IOUtils.cleanupWithLogger(LOG, objectRange, objectContent); + } + LOG.debug("Finished reading range {} from path {} ", combinedFileRange, pathStr); + } + + /** + * Populate underlying buffers of the child ranges. + * @param combinedFileRange big combined file range. + * @param objectContent data from s3. + * @param allocate method to allocate child byte buffers. + * @throws IOException any IOE. + */ + private void populateChildBuffers(CombinedFileRange combinedFileRange, + S3ObjectInputStream objectContent, + IntFunction allocate) throws IOException { + // If the combined file range just contains a single child + // range, we only have to fill that one child buffer else + // we drain the intermediate data between consecutive ranges + // and fill the buffers one by one. + if (combinedFileRange.getUnderlying().size() == 1) { + FileRange child = combinedFileRange.getUnderlying().get(0); + ByteBuffer buffer = allocate.apply(child.getLength()); + populateBuffer(child.getLength(), buffer, objectContent); + child.getData().complete(buffer); + } else { + FileRange prev = null; + for (FileRange child : combinedFileRange.getUnderlying()) { + if (prev != null && prev.getOffset() + prev.getLength() < child.getOffset()) { + long drainQuantity = child.getOffset() - prev.getOffset() - prev.getLength(); + drainUnnecessaryData(objectContent, drainQuantity); + } + ByteBuffer buffer = allocate.apply(child.getLength()); + populateBuffer(child.getLength(), buffer, objectContent); + child.getData().complete(buffer); + prev = child; + } } } /** - * Update data in child range from combined range. - * @param child child range. - * @param combinedBuffer combined buffer. - * @param combinedFileRange combined range. + * Drain unnecessary data in between ranges. + * @param objectContent s3 data stream. + * @param drainQuantity how many bytes to drain. + * @throws IOException any IOE. */ - private void updateOriginalRange(FileRange child, - ByteBuffer combinedBuffer, - CombinedFileRange combinedFileRange) { - LOG.trace("Start Filling original range [{}, {}) from combined range [{}, {}) ", - child.getOffset(), child.getLength(), - combinedFileRange.getOffset(), combinedFileRange.getLength()); - ByteBuffer childBuffer = sliceTo(combinedBuffer, combinedFileRange.getOffset(), child); - child.getData().complete(childBuffer); - LOG.trace("End Filling original range [{}, {}) from combined range [{}, {}) ", - child.getOffset(), child.getLength(), - combinedFileRange.getOffset(), combinedFileRange.getLength()); + private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQuantity) + throws IOException { + int drainBytes = 0; + int readCount; + while (drainBytes < drainQuantity) { + if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity) { + byte[] drainBuffer = new byte[DRAIN_BUFFER_SIZE]; + readCount = objectContent.read(drainBuffer); + } else { + byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)]; + readCount = objectContent.read(drainBuffer); + } + drainBytes += readCount; + } + LOG.debug("{} bytes drained from stream ", drainBytes); } /** - * // Check if we can use contentLength returned by http GET request. * Validates range parameters. + * In case of S3 we already have contentLength from the first GET request + * during an open file operation so failing fast here. * @param range requested range. * @throws EOFException end of file exception. */ @@ -1038,13 +1092,7 @@ private void validateRangeRequest(FileRange range) throws EOFException { } /** - * TODO: Add retry in client.getObject(). not present in older reads why here?? - * Okay retry is being done in the top layer during read. - * But if we do here in the top layer, one issue I am thinking is - * what if there is some error which happened during filling the buffer - * If we retry that old offsets of heap buffers can be overwritten ? - * I think retry should be only added in {@link S3AInputStream#getS3Object} - * Read data from S3 for this range and populate the bufffer. + * Read data from S3 for this range and populate the buffer. * @param range range of data to read. * @param buffer buffer to fill. */ @@ -1053,6 +1101,7 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) { S3Object objectRange = null; S3ObjectInputStream objectContent = null; try { + checkIfVectoredIOStopped(); long position = range.getOffset(); int length = range.getLength(); final String operationName = "readRange"; @@ -1089,6 +1138,7 @@ private void populateBuffer(int length, int offset = 0; byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE]; while (readBytes < length) { + checkIfVectoredIOStopped(); int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ? TMP_BUFFER_MAX_SIZE : length - readBytes; @@ -1103,7 +1153,15 @@ private void populateBuffer(int length, } } - public void readByteArray(S3ObjectInputStream objectContent, + /** + * Read data into destination buffer from s3 object content. + * @param objectContent result from S3. + * @param dest destination buffer. + * @param offset start offset of dest buffer. + * @param length number of bytes to fill in dest. + * @throws IOException any IOE. + */ + private void readByteArray(S3ObjectInputStream objectContent, byte[] dest, int offset, int length) throws IOException { @@ -1120,14 +1178,14 @@ public void readByteArray(S3ObjectInputStream objectContent, } /** - * Read data from S3 using a http request. - * This also handles if file has been changed while http call - * is getting executed. If file has been changed RemoteFileChangedException - * is thrown. + * Read data from S3 using a http request with retries. + * This also handles if file has been changed while the + * http call is getting executed. If the file has been + * changed RemoteFileChangedException is thrown. * @param operationName name of the operation for which get object on S3 is called. * @param position position of the object to be read from S3. * @param length length from position of the object to be read from S3. - * @return S3Object + * @return S3Object result s3 object. * @throws IOException exception if any. */ private S3Object getS3Object(String operationName, long position, @@ -1140,7 +1198,11 @@ private S3Object getS3Object(String operationName, long position, Invoker invoker = context.getReadInvoker(); try { objectRange = invoker.retry(operationName, pathStr, true, - () -> client.getObject(request)); + () -> { + checkIfVectoredIOStopped(); + return client.getObject(request); + }); + } catch (IOException ex) { tracker.failed(); throw ex; @@ -1152,6 +1214,19 @@ private S3Object getS3Object(String operationName, long position, return objectRange; } + /** + * Check if vectored io operation has been stooped. This happens + * when the stream is closed or unbuffer is called. + * @throws InterruptedIOException throw InterruptedIOException such + * that all running vectored io is + * terminated thus releasing resources. + */ + private void checkIfVectoredIOStopped() throws InterruptedIOException { + if (stopVectoredIOOperations.get()) { + throw new InterruptedIOException("Stream closed or unbuffer is called"); + } + } + /** * Access the input stream statistics. * This is for internal testing and may be removed without warning. @@ -1237,10 +1312,15 @@ public static long validateReadahead(@Nullable Long readahead) { /** * Closes the underlying S3 stream, and merges the {@link #streamStatistics} * instance associated with the stream. + * Also sets the {@code stopVectoredIOOperations} flag to true such that + * active vectored read operations are terminated. However termination of + * old vectored reads are not guaranteed if a new vectored read operation + * is initiated after unbuffer is called. */ @Override public synchronized void unbuffer() { try { + stopVectoredIOOperations.set(true); closeStream("unbuffer()", false, false); } finally { streamStatistics.unbuffered(); @@ -1253,6 +1333,7 @@ public boolean hasCapability(String capability) { case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.READAHEAD: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.VECTOREDIO: return true; default: return false; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 0752e75d24..18a727dcdc 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.EOFException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; @@ -26,14 +28,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.test.MoreAsserts.assertEqual; public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { @@ -55,8 +58,8 @@ protected AbstractFSContract createContract(Configuration conf) { public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); - testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected"); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); } @Test @@ -99,4 +102,58 @@ public void testMinSeekAndMaxSizeDefaultValues() throws Exception { } } } + + @Test + public void testStopVectoredIoOperationsCloseStream() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = createSampleNonOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ + in.readVectored(fileRanges, getAllocate()); + in.close(); + LambdaTestUtils.intercept(InterruptedIOException.class, + () -> validateVectoredReadResult(fileRanges, DATASET)); + } + // reopening the stream should succeed. + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ + in.readVectored(fileRanges, getAllocate()); + validateVectoredReadResult(fileRanges, DATASET); + } + } + + @Test + public void testStopVectoredIoOperationsUnbuffer() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = createSampleNonOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ + in.readVectored(fileRanges, getAllocate()); + in.unbuffer(); + LambdaTestUtils.intercept(InterruptedIOException.class, + () -> validateVectoredReadResult(fileRanges, DATASET)); + // re-initiating the vectored reads after unbuffer should succeed. + in.readVectored(fileRanges, getAllocate()); + validateVectoredReadResult(fileRanges, DATASET); + } + + } + + /** + * S3 vectored IO doesn't support overlapping ranges. + */ + @Override + public void testOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = getSampleOverlappingRanges(); + verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); + } + + /** + * S3 vectored IO doesn't support overlapping ranges. + */ + @Override + public void testSameRanges() throws Exception { + // Same ranges are special case of overlapping only. + FileSystem fs = getFileSystem(); + List fileRanges = getSampleSameRanges(); + verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 956e23a3f1..f8d47011de 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -41,7 +41,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -459,13 +458,13 @@ public void test_040_PositionedReadHugeFile() throws Throwable { public void test_045_vectoredIOHugeFile() throws Throwable { assumeHugeFileExists(); List rangeList = new ArrayList<>(); - rangeList.add(new FileRangeImpl(5856368, 1167716)); - rangeList.add(new FileRangeImpl(3520861, 1167700)); - rangeList.add(new FileRangeImpl(8191913, 1167775)); - rangeList.add(new FileRangeImpl(1520861, 1167700)); - rangeList.add(new FileRangeImpl(2520861, 116770)); - rangeList.add(new FileRangeImpl(9191913, 116770)); - rangeList.add(new FileRangeImpl(2820861, 156770)); + rangeList.add(FileRange.createFileRange(5856368, 116770)); + rangeList.add(FileRange.createFileRange(3520861, 116770)); + rangeList.add(FileRange.createFileRange(8191913, 116770)); + rangeList.add(FileRange.createFileRange(1520861, 116770)); + rangeList.add(FileRange.createFileRange(2520861, 116770)); + rangeList.add(FileRange.createFileRange(9191913, 116770)); + rangeList.add(FileRange.createFileRange(2820861, 156770)); IntFunction allocate = ByteBuffer::allocate; FileSystem fs = getFileSystem(); CompletableFuture builder = diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java index aaee951d72..631842f78e 100644 --- a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java +++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java @@ -47,7 +47,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -107,7 +107,7 @@ public void asyncRead(FileSystemChoice fsChoice, FSDataInputStream stream = fsChoice.fs.open(DATA_PATH); List ranges = new ArrayList<>(); for(int m=0; m < 100; ++m) { - FileRangeImpl range = new FileRangeImpl(m * SEEK_SIZE, READ_SIZE); + FileRange range = FileRange.createFileRange(m * SEEK_SIZE, READ_SIZE); ranges.add(range); } stream.readVectored(ranges, bufferChoice.allocate);