HADOOP-18106: Handle memory fragmentation in S3A Vectored IO. (#4445)

part of HADOOP-18103.
Handling memory fragmentation in S3A vectored IO implementation by
allocating smaller user range requested size buffers and directly
filling them from the remote S3 stream and skipping undesired
data in between ranges.
This patch also adds aborting active vectored reads when stream is
closed or unbuffer() is called.

Contributed By: Mukund Thakur
This commit is contained in:
Mukund Thakur 2022-06-21 03:45:40 +05:30 committed by Steve Loughran
parent 0d49bd2004
commit 4d1f6f9b99
17 changed files with 616 additions and 264 deletions

View File

@ -39,7 +39,6 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.VectoredReadUtils;
import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.impl.OpenFileParameters;
@ -55,6 +54,7 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
/**************************************************************** /****************************************************************
* Abstract Checksumed FileSystem. * Abstract Checksumed FileSystem.
@ -166,7 +166,7 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
* It verifies that data matches checksums. * It verifies that data matches checksums.
*******************************************************/ *******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker implements private static class ChecksumFSInputChecker extends FSInputChecker implements
IOStatisticsSource { IOStatisticsSource, StreamCapabilities {
private ChecksumFileSystem fs; private ChecksumFileSystem fs;
private FSDataInputStream datas; private FSDataInputStream datas;
private FSDataInputStream sums; private FSDataInputStream sums;
@ -408,7 +408,7 @@ public void readVectored(List<? extends FileRange> ranges,
int minSeek = minSeekForVectorReads(); int minSeek = minSeekForVectorReads();
int maxSize = maxReadSizeForVectorReads(); int maxSize = maxReadSizeForVectorReads();
List<CombinedFileRange> dataRanges = List<CombinedFileRange> dataRanges =
VectoredReadUtils.sortAndMergeRanges(ranges, bytesPerSum, VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum,
minSeek, maxReadSizeForVectorReads()); minSeek, maxReadSizeForVectorReads());
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges, List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize); bytesPerSum, minSeek, maxSize);
@ -435,6 +435,11 @@ public void readVectored(List<? extends FileRange> ranges,
} }
} }
} }
@Override
public boolean hasCapability(String capability) {
return datas.hasCapability(capability);
}
} }
private static class FSDataBoundedInputStream extends FSDataInputStream { private static class FSDataBoundedInputStream extends FSDataInputStream {

View File

@ -20,6 +20,8 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.fs.impl.FileRangeImpl;
/** /**
* A byte range of a file. * A byte range of a file.
* This is used for the asynchronous gather read API of * This is used for the asynchronous gather read API of
@ -52,4 +54,14 @@ public interface FileRange {
* @param data the future of the ByteBuffer that will have the data * @param data the future of the ByteBuffer that will have the data
*/ */
void setData(CompletableFuture<ByteBuffer> data); void setData(CompletableFuture<ByteBuffer> data);
/**
* Factory method to create a FileRange object.
* @param offset starting offset of the range.
* @param length length of the range.
* @return a new instance of FileRangeImpl.
*/
static FileRange createFileRange(long offset, int length) {
return new FileRangeImpl(offset, length);
}
} }

View File

@ -25,7 +25,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.impl.VectoredReadUtils;
/** /**
* Stream that permits positional reading. * Stream that permits positional reading.
@ -121,7 +120,6 @@ default int maxReadSizeForVectorReads() {
*/ */
default void readVectored(List<? extends FileRange> ranges, default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException { IntFunction<ByteBuffer> allocate) throws IOException {
VectoredReadUtils.readVectored(this, ranges, allocate, minSeekForVectorReads(), VectoredReadUtils.readVectored(this, ranges, allocate);
maxReadSizeForVectorReads());
} }
} }

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.VectoredReadUtils;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataOutput; import java.io.DataOutput;
@ -68,6 +67,7 @@
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS;
@ -278,6 +278,7 @@ public boolean hasCapability(String capability) {
// new capabilities. // new capabilities.
switch (capability.toLowerCase(Locale.ENGLISH)) { switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.VECTOREDIO:
return true; return true;
default: default:
return false; return false;
@ -303,23 +304,24 @@ AsynchronousFileChannel getAsyncChannel() throws IOException {
public void readVectored(List<? extends FileRange> ranges, public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException { IntFunction<ByteBuffer> allocate) throws IOException {
List<? extends FileRange> sortedRanges = Arrays.asList(sortRanges(ranges));
// Set up all of the futures, so that we can use them if things fail // Set up all of the futures, so that we can use them if things fail
for(FileRange range: ranges) { for(FileRange range: sortedRanges) {
VectoredReadUtils.validateRangeRequest(range); VectoredReadUtils.validateRangeRequest(range);
range.setData(new CompletableFuture<>()); range.setData(new CompletableFuture<>());
} }
try { try {
AsynchronousFileChannel channel = getAsyncChannel(); AsynchronousFileChannel channel = getAsyncChannel();
ByteBuffer[] buffers = new ByteBuffer[ranges.size()]; ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers); AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers);
for(int i = 0; i < ranges.size(); ++i) { for(int i = 0; i < sortedRanges.size(); ++i) {
FileRange range = ranges.get(i); FileRange range = sortedRanges.get(i);
buffers[i] = allocate.apply(range.getLength()); buffers[i] = allocate.apply(range.getLength());
channel.read(buffers[i], range.getOffset(), i, asyncHandler); channel.read(buffers[i], range.getOffset(), i, asyncHandler);
} }
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.debug("Exception occurred during vectored read ", ioe); LOG.debug("Exception occurred during vectored read ", ioe);
for(FileRange range: ranges) { for(FileRange range: sortedRanges) {
range.getData().completeExceptionally(ioe); range.getData().completeExceptionally(ioe);
} }
} }

View File

@ -80,6 +80,12 @@ public interface StreamCapabilities {
*/ */
String IOSTATISTICS = "iostatistics"; String IOSTATISTICS = "iostatistics";
/**
* Support for vectored IO api.
* See {@code PositionedReadable#readVectored(List, IntFunction)}.
*/
String VECTOREDIO = "readvectored";
/** /**
* Stream abort() capability implemented by {@link Abortable#abort()}. * Stream abort() capability implemented by {@link Abortable#abort()}.
* This matches the Path Capability * This matches the Path Capability

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.fs.impl; package org.apache.hadoop.fs;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
@ -28,9 +28,7 @@
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.IntFunction; import java.util.function.IntFunction;
import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
/** /**
@ -68,36 +66,20 @@ public static void validateVectoredReadRanges(List<? extends FileRange> ranges)
/** /**
* Read fully a list of file ranges asynchronously from this file. * This is the default implementation which iterates through the ranges
* The default iterates through the ranges to read each synchronously, but * to read each synchronously, but the intent is that subclasses
* the intent is that subclasses can make more efficient readers. * can make more efficient readers.
* The data or exceptions are pushed into {@link FileRange#getData()}. * The data or exceptions are pushed into {@link FileRange#getData()}.
* @param stream the stream to read the data from * @param stream the stream to read the data from
* @param ranges the byte ranges to read * @param ranges the byte ranges to read
* @param allocate the byte buffer allocation * @param allocate the byte buffer allocation
* @param minimumSeek the minimum number of bytes to seek over
* @param maximumRead the largest number of bytes to combine into a single read
*/ */
public static void readVectored(PositionedReadable stream, public static void readVectored(PositionedReadable stream,
List<? extends FileRange> ranges, List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate, IntFunction<ByteBuffer> allocate) {
int minimumSeek,
int maximumRead) {
if (isOrderedDisjoint(ranges, 1, minimumSeek)) {
for (FileRange range: ranges) { for (FileRange range: ranges) {
range.setData(readRangeFrom(stream, range, allocate)); range.setData(readRangeFrom(stream, range, allocate));
} }
} else {
for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek,
maximumRead)) {
CompletableFuture<ByteBuffer> read =
readRangeFrom(stream, range, allocate);
for(FileRange child: range.getUnderlying()) {
child.setData(read.thenApply(
(b) -> sliceTo(b, range.getOffset(), child)));
}
}
}
} }
/** /**
@ -209,7 +191,42 @@ public static long roundUp(long offset, int chunkSize) {
} }
/** /**
* Sort and merge ranges to optimize the access from the underlying file * Check if the input ranges are overlapping in nature.
* We call two ranges to be overlapping when start offset
* of second is less than the end offset of first.
* End offset is calculated as start offset + length.
* @param input list if input ranges.
* @return true/false based on logic explained above.
*/
public static List<? extends FileRange> validateNonOverlappingAndReturnSortedRanges(
List<? extends FileRange> input) {
if (input.size() <= 1) {
return input;
}
FileRange[] sortedRanges = sortRanges(input);
FileRange prev = sortedRanges[0];
for (int i=1; i<sortedRanges.length; i++) {
if (sortedRanges[i].getOffset() < prev.getOffset() + prev.getLength()) {
throw new UnsupportedOperationException("Overlapping ranges are not supported");
}
}
return Arrays.asList(sortedRanges);
}
/**
* Sort the input ranges by offset.
* @param input input ranges.
* @return sorted ranges.
*/
public static FileRange[] sortRanges(List<? extends FileRange> input) {
FileRange[] sortedRanges = input.toArray(new FileRange[0]);
Arrays.sort(sortedRanges, Comparator.comparingLong(FileRange::getOffset));
return sortedRanges;
}
/**
* Merge sorted ranges to optimize the access from the underlying file
* system. * system.
* The motivations are that: * The motivations are that:
* <ul> * <ul>
@ -219,24 +236,22 @@ public static long roundUp(long offset, int chunkSize) {
* <li>Some file systems want to round ranges to be at checksum boundaries.</li> * <li>Some file systems want to round ranges to be at checksum boundaries.</li>
* </ul> * </ul>
* *
* @param input the list of input ranges * @param sortedRanges already sorted list of ranges based on offset.
* @param chunkSize round the start and end points to multiples of chunkSize * @param chunkSize round the start and end points to multiples of chunkSize
* @param minimumSeek the smallest gap that we should seek over in bytes * @param minimumSeek the smallest gap that we should seek over in bytes
* @param maxSize the largest combined file range in bytes * @param maxSize the largest combined file range in bytes
* @return the list of sorted CombinedFileRanges that cover the input * @return the list of sorted CombinedFileRanges that cover the input
*/ */
public static List<CombinedFileRange> sortAndMergeRanges(List<? extends FileRange> input, public static List<CombinedFileRange> mergeSortedRanges(List<? extends FileRange> sortedRanges,
int chunkSize, int chunkSize,
int minimumSeek, int minimumSeek,
int maxSize) { int maxSize) {
// sort the ranges by offset
FileRange[] ranges = input.toArray(new FileRange[0]);
Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset));
CombinedFileRange current = null; CombinedFileRange current = null;
List<CombinedFileRange> result = new ArrayList<>(ranges.length); List<CombinedFileRange> result = new ArrayList<>(sortedRanges.size());
// now merge together the ones that merge // now merge together the ones that merge
for(FileRange range: ranges) { for (FileRange range: sortedRanges) {
long start = roundDown(range.getOffset(), chunkSize); long start = roundDown(range.getOffset(), chunkSize);
long end = roundUp(range.getOffset() + range.getLength(), chunkSize); long end = roundUp(range.getOffset() + range.getLength(), chunkSize);
if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) { if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) {

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.fs.impl; package org.apache.hadoop.fs.impl;
import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -15,15 +15,20 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs.impl;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileRange;
/** /**
* A range of bytes from a file with an optional buffer to read those bytes * A range of bytes from a file with an optional buffer to read those bytes
* for zero copy. * for zero copy. This shouldn't be created directly via constructor rather
* factory defined in {@code FileRange#createFileRange} should be used.
*/ */
@InterfaceAudience.Private
public class FileRangeImpl implements FileRange { public class FileRangeImpl implements FileRange {
private long offset; private long offset;
private int length; private int length;

View File

@ -449,7 +449,14 @@ Read fully data for a list of ranges asynchronously. The default implementation
iterates through the ranges, tries to coalesce the ranges based on values of iterates through the ranges, tries to coalesce the ranges based on values of
`minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged `minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged
ranges synchronously, but the intent is sub classes can implement efficient ranges synchronously, but the intent is sub classes can implement efficient
implementation. implementation. Reading in both direct and heap byte buffers are supported.
Also, clients are encouraged to use `WeakReferencedElasticByteBufferPool` for
allocating buffers such that even direct buffers are garbage collected when
they are no longer referenced.
Note: Don't use direct buffers for reading from ChecksumFileSystem as that may
lead to memory fragmentation explained in HADOOP-18296.
#### Preconditions #### Preconditions
@ -467,7 +474,7 @@ For each requested range:
### `minSeekForVectorReads()` ### `minSeekForVectorReads()`
Smallest reasonable seek. Two ranges won't be merged together if the difference between The smallest reasonable seek. Two ranges won't be merged together if the difference between
end of first and start of next range is more than this value. end of first and start of next range is more than this value.
### `maxReadSizeForVectorReads()` ### `maxReadSizeForVectorReads()`

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.fs.impl; package org.apache.hadoop.fs;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -31,12 +31,10 @@
import org.mockito.ArgumentMatchers; import org.mockito.ArgumentMatchers;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.test.HadoopTestBase; import org.apache.hadoop.test.HadoopTestBase;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully; import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully;
import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally; import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally;
@ -56,7 +54,7 @@ public void testSliceTo() {
} }
// ensure we don't make unnecessary slices // ensure we don't make unnecessary slices
ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100, ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100,
new FileRangeImpl(100, size)); FileRange.createFileRange(100, size));
Assertions.assertThat(buffer) Assertions.assertThat(buffer)
.describedAs("Slicing on the same offset shouldn't " + .describedAs("Slicing on the same offset shouldn't " +
"create a new buffer") "create a new buffer")
@ -67,7 +65,7 @@ public void testSliceTo() {
final int sliceStart = 1024; final int sliceStart = 1024;
final int sliceLength = 16 * 1024; final int sliceLength = 16 * 1024;
slice = VectoredReadUtils.sliceTo(buffer, offset, slice = VectoredReadUtils.sliceTo(buffer, offset,
new FileRangeImpl(offset + sliceStart, sliceLength)); FileRange.createFileRange(offset + sliceStart, sliceLength));
// make sure they aren't the same, but use the same backing data // make sure they aren't the same, but use the same backing data
Assertions.assertThat(buffer) Assertions.assertThat(buffer)
.describedAs("Slicing on new offset should " + .describedAs("Slicing on new offset should " +
@ -96,12 +94,12 @@ public void testRounding() {
@Test @Test
public void testMerge() { public void testMerge() {
FileRange base = new FileRangeImpl(2000, 1000); FileRange base = FileRange.createFileRange(2000, 1000);
CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base);
// test when the gap between is too big // test when the gap between is too big
assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000, assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000,
new FileRangeImpl(5000, 1000), 2000, 4000)); FileRange.createFileRange(5000, 1000), 2000, 4000));
assertEquals("Number of ranges in merged range shouldn't increase", assertEquals("Number of ranges in merged range shouldn't increase",
1, mergeBase.getUnderlying().size()); 1, mergeBase.getUnderlying().size());
assertEquals("post merge offset", 2000, mergeBase.getOffset()); assertEquals("post merge offset", 2000, mergeBase.getOffset());
@ -109,7 +107,7 @@ public void testMerge() {
// test when the total size gets exceeded // test when the total size gets exceeded
assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000, assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000,
new FileRangeImpl(5000, 1000), 2001, 3999)); FileRange.createFileRange(5000, 1000), 2001, 3999));
assertEquals("Number of ranges in merged range shouldn't increase", assertEquals("Number of ranges in merged range shouldn't increase",
1, mergeBase.getUnderlying().size()); 1, mergeBase.getUnderlying().size());
assertEquals("post merge offset", 2000, mergeBase.getOffset()); assertEquals("post merge offset", 2000, mergeBase.getOffset());
@ -117,7 +115,7 @@ public void testMerge() {
// test when the merge works // test when the merge works
assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000, assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000,
new FileRangeImpl(5000, 1000), 2001, 4000)); FileRange.createFileRange(5000, 1000), 2001, 4000));
assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
assertEquals("post merge offset", 2000, mergeBase.getOffset()); assertEquals("post merge offset", 2000, mergeBase.getOffset());
assertEquals("post merge length", 4000, mergeBase.getLength()); assertEquals("post merge length", 4000, mergeBase.getLength());
@ -127,7 +125,7 @@ public void testMerge() {
assertEquals(200, mergeBase.getOffset()); assertEquals(200, mergeBase.getOffset());
assertEquals(100, mergeBase.getLength()); assertEquals(100, mergeBase.getLength());
assertTrue("ranges should get merged ", mergeBase.merge(500, 600, assertTrue("ranges should get merged ", mergeBase.merge(500, 600,
new FileRangeImpl(5000, 1000), 201, 400)); FileRange.createFileRange(5000, 1000), 201, 400));
assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
assertEquals("post merge offset", 200, mergeBase.getOffset()); assertEquals("post merge offset", 200, mergeBase.getOffset());
assertEquals("post merge length", 400, mergeBase.getLength()); assertEquals("post merge length", 400, mergeBase.getLength());
@ -136,42 +134,58 @@ public void testMerge() {
@Test @Test
public void testSortAndMerge() { public void testSortAndMerge() {
List<FileRange> input = Arrays.asList( List<FileRange> input = Arrays.asList(
new FileRangeImpl(3000, 100), FileRange.createFileRange(3000, 100),
new FileRangeImpl(2100, 100), FileRange.createFileRange(2100, 100),
new FileRangeImpl(1000, 100) FileRange.createFileRange(1000, 100)
); );
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
List<CombinedFileRange> outputList = VectoredReadUtils.sortAndMergeRanges( List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
input, 100, 1001, 2500); Arrays.asList(sortRanges(input)), 100, 1001, 2500);
assertEquals("merged range size", 1, outputList.size()); Assertions.assertThat(outputList)
.describedAs("merged range size")
.hasSize(1);
CombinedFileRange output = outputList.get(0); CombinedFileRange output = outputList.get(0);
assertEquals("merged range underlying size", 3, output.getUnderlying().size()); Assertions.assertThat(output.getUnderlying())
.describedAs("merged range underlying size")
.hasSize(3);
assertEquals("range[1000,3100)", output.toString()); assertEquals("range[1000,3100)", output.toString());
assertTrue("merged output ranges are disjoint", assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
// the minSeek doesn't allow the first two to merge // the minSeek doesn't allow the first two to merge
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); assertFalse("Ranges are non disjoint",
outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1000, 2100); VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
assertEquals("merged range size", 2, outputList.size()); outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
100, 1000, 2100);
Assertions.assertThat(outputList)
.describedAs("merged range size")
.hasSize(2);
assertEquals("range[1000,1100)", outputList.get(0).toString()); assertEquals("range[1000,1100)", outputList.get(0).toString());
assertEquals("range[2100,3100)", outputList.get(1).toString()); assertEquals("range[2100,3100)", outputList.get(1).toString());
assertTrue("merged output ranges are disjoint", assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000)); VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000));
// the maxSize doesn't allow the third range to merge // the maxSize doesn't allow the third range to merge
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); assertFalse("Ranges are non disjoint",
outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1001, 2099); VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
assertEquals("merged range size", 2, outputList.size()); outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
100, 1001, 2099);
Assertions.assertThat(outputList)
.describedAs("merged range size")
.hasSize(2);
assertEquals("range[1000,2200)", outputList.get(0).toString()); assertEquals("range[1000,2200)", outputList.get(0).toString());
assertEquals("range[3000,3100)", outputList.get(1).toString()); assertEquals("range[3000,3100)", outputList.get(1).toString());
assertTrue("merged output ranges are disjoint", assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
// test the round up and round down (the maxSize doesn't allow any merges) // test the round up and round down (the maxSize doesn't allow any merges)
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); assertFalse("Ranges are non disjoint",
outputList = VectoredReadUtils.sortAndMergeRanges(input, 16, 1001, 100); VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
assertEquals("merged range size", 3, outputList.size()); outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
16, 1001, 100);
Assertions.assertThat(outputList)
.describedAs("merged range size")
.hasSize(3);
assertEquals("range[992,1104)", outputList.get(0).toString()); assertEquals("range[992,1104)", outputList.get(0).toString());
assertEquals("range[2096,2208)", outputList.get(1).toString()); assertEquals("range[2096,2208)", outputList.get(1).toString());
assertEquals("range[2992,3104)", outputList.get(2).toString()); assertEquals("range[2992,3104)", outputList.get(2).toString());
@ -182,26 +196,35 @@ public void testSortAndMerge() {
@Test @Test
public void testSortAndMergeMoreCases() throws Exception { public void testSortAndMergeMoreCases() throws Exception {
List<FileRange> input = Arrays.asList( List<FileRange> input = Arrays.asList(
new FileRangeImpl(3000, 110), FileRange.createFileRange(3000, 110),
new FileRangeImpl(3000, 100), FileRange.createFileRange(3000, 100),
new FileRangeImpl(2100, 100), FileRange.createFileRange(2100, 100),
new FileRangeImpl(1000, 100) FileRange.createFileRange(1000, 100)
); );
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); assertFalse("Ranges are non disjoint",
List<CombinedFileRange> outputList = VectoredReadUtils.sortAndMergeRanges( VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
input, 1, 1001, 2500); List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
assertEquals("merged range size", 1, outputList.size()); Arrays.asList(sortRanges(input)), 1, 1001, 2500);
Assertions.assertThat(outputList)
.describedAs("merged range size")
.hasSize(1);
CombinedFileRange output = outputList.get(0); CombinedFileRange output = outputList.get(0);
assertEquals("merged range underlying size", 4, output.getUnderlying().size()); Assertions.assertThat(output.getUnderlying())
.describedAs("merged range underlying size")
.hasSize(4);
assertEquals("range[1000,3110)", output.toString()); assertEquals("range[1000,3110)", output.toString());
assertTrue("merged output ranges are disjoint", assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
outputList = VectoredReadUtils.sortAndMergeRanges( outputList = VectoredReadUtils.mergeSortedRanges(
input, 100, 1001, 2500); Arrays.asList(sortRanges(input)), 100, 1001, 2500);
assertEquals("merged range size", 1, outputList.size()); Assertions.assertThat(outputList)
.describedAs("merged range size")
.hasSize(1);
output = outputList.get(0); output = outputList.get(0);
assertEquals("merged range underlying size", 4, output.getUnderlying().size()); Assertions.assertThat(output.getUnderlying())
.describedAs("merged range underlying size")
.hasSize(4);
assertEquals("range[1000,3200)", output.toString()); assertEquals("range[1000,3200)", output.toString());
assertTrue("merged output ranges are disjoint", assertTrue("merged output ranges are disjoint",
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
@ -211,9 +234,9 @@ public void testSortAndMergeMoreCases() throws Exception {
@Test @Test
public void testMaxSizeZeroDisablesMering() throws Exception { public void testMaxSizeZeroDisablesMering() throws Exception {
List<FileRange> randomRanges = Arrays.asList( List<FileRange> randomRanges = Arrays.asList(
new FileRangeImpl(3000, 110), FileRange.createFileRange(3000, 110),
new FileRangeImpl(3000, 100), FileRange.createFileRange(3000, 100),
new FileRangeImpl(2100, 100) FileRange.createFileRange(2100, 100)
); );
assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0); assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0);
assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0); assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0);
@ -225,7 +248,7 @@ private void assertEqualRangeCountsAfterMerging(List<FileRange> inputRanges,
int minimumSeek, int minimumSeek,
int maxSize) { int maxSize) {
List<CombinedFileRange> combinedFileRanges = VectoredReadUtils List<CombinedFileRange> combinedFileRanges = VectoredReadUtils
.sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize); .mergeSortedRanges(inputRanges, chunkSize, minimumSeek, maxSize);
Assertions.assertThat(combinedFileRanges) Assertions.assertThat(combinedFileRanges)
.describedAs("Mismatch in number of ranges post merging") .describedAs("Mismatch in number of ranges post merging")
.hasSize(inputRanges.size()); .hasSize(inputRanges.size());
@ -251,7 +274,7 @@ public void testReadRangeFromByteBufferPositionedReadable() throws Exception {
}).when(stream).readFully(ArgumentMatchers.anyLong(), }).when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class)); ArgumentMatchers.any(ByteBuffer.class));
CompletableFuture<ByteBuffer> result = CompletableFuture<ByteBuffer> result =
VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100),
ByteBuffer::allocate); ByteBuffer::allocate);
assertFutureCompletedSuccessfully(result); assertFutureCompletedSuccessfully(result);
ByteBuffer buffer = result.get(); ByteBuffer buffer = result.get();
@ -267,7 +290,7 @@ public void testReadRangeFromByteBufferPositionedReadable() throws Exception {
.when(stream).readFully(ArgumentMatchers.anyLong(), .when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class)); ArgumentMatchers.any(ByteBuffer.class));
result = result =
VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100),
ByteBuffer::allocate); ByteBuffer::allocate);
assertFutureFailedExceptionally(result); assertFutureFailedExceptionally(result);
} }
@ -286,7 +309,7 @@ static void runReadRangeFromPositionedReadable(IntFunction<ByteBuffer> allocate)
ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.any(), ArgumentMatchers.anyInt(),
ArgumentMatchers.anyInt()); ArgumentMatchers.anyInt());
CompletableFuture<ByteBuffer> result = CompletableFuture<ByteBuffer> result =
VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100),
allocate); allocate);
assertFutureCompletedSuccessfully(result); assertFutureCompletedSuccessfully(result);
ByteBuffer buffer = result.get(); ByteBuffer buffer = result.get();
@ -303,7 +326,7 @@ static void runReadRangeFromPositionedReadable(IntFunction<ByteBuffer> allocate)
ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.any(), ArgumentMatchers.anyInt(),
ArgumentMatchers.anyInt()); ArgumentMatchers.anyInt());
result = result =
VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100),
ByteBuffer::allocate); ByteBuffer::allocate);
assertFutureFailedExceptionally(result); assertFutureFailedExceptionally(result);
} }
@ -328,9 +351,9 @@ static void validateBuffer(String message, ByteBuffer buffer, int start) {
@Test @Test
public void testReadVectored() throws Exception { public void testReadVectored() throws Exception {
List<FileRange> input = Arrays.asList(new FileRangeImpl(0, 100), List<FileRange> input = Arrays.asList(FileRange.createFileRange(0, 100),
new FileRangeImpl(100_000, 100), FileRange.createFileRange(100_000, 100),
new FileRangeImpl(200_000, 100)); FileRange.createFileRange(200_000, 100));
Stream stream = Mockito.mock(Stream.class); Stream stream = Mockito.mock(Stream.class);
Mockito.doAnswer(invocation -> { Mockito.doAnswer(invocation -> {
fillBuffer(invocation.getArgument(1)); fillBuffer(invocation.getArgument(1));
@ -338,31 +361,11 @@ public void testReadVectored() throws Exception {
}).when(stream).readFully(ArgumentMatchers.anyLong(), }).when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class)); ArgumentMatchers.any(ByteBuffer.class));
// should not merge the ranges // should not merge the ranges
VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100, 100); VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate);
Mockito.verify(stream, Mockito.times(3)) Mockito.verify(stream, Mockito.times(3))
.readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
for(int b=0; b < input.size(); ++b) { for(int b=0; b < input.size(); ++b) {
validateBuffer("buffer " + b, input.get(b).getData().get(), 0); validateBuffer("buffer " + b, input.get(b).getData().get(), 0);
} }
} }
@Test
public void testReadVectoredMerge() throws Exception {
List<FileRange> input = Arrays.asList(new FileRangeImpl(2000, 100),
new FileRangeImpl(1000, 100),
new FileRangeImpl(0, 100));
Stream stream = Mockito.mock(Stream.class);
Mockito.doAnswer(invocation -> {
fillBuffer(invocation.getArgument(1));
return null;
}).when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class));
// should merge the ranges into a single read
VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 1000, 2100);
Mockito.verify(stream, Mockito.times(1))
.readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class));
for(int b=0; b < input.size(); ++b) {
validateBuffer("buffer " + b, input.get(b).getData().get(), (2 - b) * 1000);
}
}
} }

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.fs.contract; package org.apache.hadoop.fs.contract;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -38,13 +37,18 @@
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@ -53,11 +57,14 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
public static final int DATASET_LEN = 64 * 1024; public static final int DATASET_LEN = 64 * 1024;
private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
private final IntFunction<ByteBuffer> allocate; private final IntFunction<ByteBuffer> allocate;
private final WeakReferencedElasticByteBufferPool pool =
new WeakReferencedElasticByteBufferPool();
private final String bufferType; private final String bufferType;
@Parameterized.Parameters(name = "Buffer type : {0}") @Parameterized.Parameters(name = "Buffer type : {0}")
@ -67,8 +74,14 @@ public static List<String> params() {
public AbstractContractVectoredReadTest(String bufferType) { public AbstractContractVectoredReadTest(String bufferType) {
this.bufferType = bufferType; this.bufferType = bufferType;
this.allocate = "array".equals(bufferType) ? this.allocate = value -> {
ByteBuffer::allocate : ByteBuffer::allocateDirect; boolean isDirect = !"array".equals(bufferType);
return pool.getBuffer(isDirect, value);
};
}
public IntFunction<ByteBuffer> getAllocate() {
return allocate;
} }
@Override @Override
@ -79,12 +92,27 @@ public void setup() throws Exception {
createFile(fs, path, true, DATASET); createFile(fs, path, true, DATASET);
} }
@Override
public void teardown() throws Exception {
super.teardown();
pool.release();
}
@Test
public void testVectoredReadCapability() throws Exception {
FileSystem fs = getFileSystem();
String[] vectoredReadCapability = new String[]{StreamCapabilities.VECTOREDIO};
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
assertCapabilities(in, vectoredReadCapability, null);
}
}
@Test @Test
public void testVectoredReadMultipleRanges() throws Exception { public void testVectoredReadMultipleRanges() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
FileRange fileRange = new FileRangeImpl(i * 100, 100); FileRange fileRange = FileRange.createFileRange(i * 100, 100);
fileRanges.add(fileRange); fileRanges.add(fileRange);
} }
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
@ -98,6 +126,7 @@ public void testVectoredReadMultipleRanges() throws Exception {
combinedFuture.get(); combinedFuture.get();
validateVectoredReadResult(fileRanges, DATASET); validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, pool);
} }
} }
@ -105,7 +134,7 @@ public void testVectoredReadMultipleRanges() throws Exception {
public void testVectoredReadAndReadFully() throws Exception { public void testVectoredReadAndReadFully() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(100, 100)); fileRanges.add(FileRange.createFileRange(100, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate); in.readVectored(fileRanges, allocate);
byte[] readFullRes = new byte[100]; byte[] readFullRes = new byte[100];
@ -114,6 +143,7 @@ public void testVectoredReadAndReadFully() throws Exception {
Assertions.assertThat(vecRes) Assertions.assertThat(vecRes)
.describedAs("Result from vectored read and readFully must match") .describedAs("Result from vectored read and readFully must match")
.isEqualByComparingTo(ByteBuffer.wrap(readFullRes)); .isEqualByComparingTo(ByteBuffer.wrap(readFullRes));
returnBuffersToPoolPostRead(fileRanges, pool);
} }
} }
@ -125,12 +155,13 @@ public void testVectoredReadAndReadFully() throws Exception {
public void testDisjointRanges() throws Exception { public void testDisjointRanges() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(0, 100)); fileRanges.add(FileRange.createFileRange(0, 100));
fileRanges.add(new FileRangeImpl(4 * 1024 + 101, 100)); fileRanges.add(FileRange.createFileRange(4_000 + 101, 100));
fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100)); fileRanges.add(FileRange.createFileRange(16_000 + 101, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate); in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET); validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, pool);
} }
} }
@ -142,12 +173,13 @@ public void testDisjointRanges() throws Exception {
public void testAllRangesMergedIntoOne() throws Exception { public void testAllRangesMergedIntoOne() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(0, 100)); fileRanges.add(FileRange.createFileRange(0, 100));
fileRanges.add(new FileRangeImpl(4 *1024 - 101, 100)); fileRanges.add(FileRange.createFileRange(4_000 - 101, 100));
fileRanges.add(new FileRangeImpl(8*1024 - 101, 100)); fileRanges.add(FileRange.createFileRange(8_000 - 101, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate); in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET); validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, pool);
} }
} }
@ -159,44 +191,80 @@ public void testAllRangesMergedIntoOne() throws Exception {
public void testSomeRangesMergedSomeUnmerged() throws Exception { public void testSomeRangesMergedSomeUnmerged() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(8*1024, 100)); fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
fileRanges.add(new FileRangeImpl(14*1024, 100)); fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
fileRanges.add(new FileRangeImpl(10*1024, 100)); fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
fileRanges.add(new FileRangeImpl(2 *1024 - 101, 100)); fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
fileRanges.add(new FileRangeImpl(40*1024, 1024)); fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET);
}
}
@Test
public void testSameRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(8*1024, 1000));
fileRanges.add(new FileRangeImpl(8*1024, 1000));
fileRanges.add(new FileRangeImpl(8*1024, 1000));
CompletableFuture<FSDataInputStream> builder = CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME)) fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build(); .build();
try (FSDataInputStream in = builder.get()) { try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, allocate); in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET); validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, pool);
} }
} }
@Test @Test
public void testOverlappingRanges() throws Exception { public void testOverlappingRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = getSampleOverlappingRanges();
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}
@Test
public void testSameRanges() throws Exception {
// Same ranges are special case of overlapping only.
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = getSampleSameRanges();
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}
@Test
public void testSomeRandomNonOverlappingRanges() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(0, 1000)); fileRanges.add(FileRange.createFileRange(500, 100));
fileRanges.add(new FileRangeImpl(90, 900)); fileRanges.add(FileRange.createFileRange(1000, 200));
fileRanges.add(new FileRangeImpl(50, 900)); fileRanges.add(FileRange.createFileRange(50, 10));
fileRanges.add(new FileRangeImpl(10, 980)); fileRanges.add(FileRange.createFileRange(10, 5));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate); in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET); validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}
@Test
public void testConsecutiveRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(FileRange.createFileRange(500, 100));
fileRanges.add(FileRange.createFileRange(600, 200));
fileRanges.add(FileRange.createFileRange(800, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, pool);
} }
} }
@ -204,7 +272,7 @@ public void testOverlappingRanges() throws Exception {
public void testEOFRanges() throws Exception { public void testEOFRanges() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate); in.readVectored(fileRanges, allocate);
for (FileRange res : fileRanges) { for (FileRange res : fileRanges) {
@ -227,22 +295,22 @@ public void testEOFRanges() throws Exception {
public void testNegativeLengthRange() throws Exception { public void testNegativeLengthRange() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(0, -50)); fileRanges.add(FileRange.createFileRange(0, -50));
testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); verifyExceptionalVectoredRead(fs, fileRanges, IllegalArgumentException.class);
} }
@Test @Test
public void testNegativeOffsetRange() throws Exception { public void testNegativeOffsetRange() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(-1, 50)); fileRanges.add(FileRange.createFileRange(-1, 50));
testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class);
} }
@Test @Test
public void testNormalReadAfterVectoredRead() throws Exception { public void testNormalReadAfterVectoredRead() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = createSomeOverlappingRanges(); List<FileRange> fileRanges = createSampleNonOverlappingRanges();
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate); in.readVectored(fileRanges, allocate);
// read starting 200 bytes // read starting 200 bytes
@ -254,13 +322,14 @@ public void testNormalReadAfterVectoredRead() throws Exception {
.describedAs("Vectored read shouldn't change file pointer.") .describedAs("Vectored read shouldn't change file pointer.")
.isEqualTo(200); .isEqualTo(200);
validateVectoredReadResult(fileRanges, DATASET); validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, pool);
} }
} }
@Test @Test
public void testVectoredReadAfterNormalRead() throws Exception { public void testVectoredReadAfterNormalRead() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = createSomeOverlappingRanges(); List<FileRange> fileRanges = createSampleNonOverlappingRanges();
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
// read starting 200 bytes // read starting 200 bytes
byte[] res = new byte[200]; byte[] res = new byte[200];
@ -272,43 +341,66 @@ public void testVectoredReadAfterNormalRead() throws Exception {
.isEqualTo(200); .isEqualTo(200);
in.readVectored(fileRanges, allocate); in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET); validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, pool);
} }
} }
@Test @Test
public void testMultipleVectoredReads() throws Exception { public void testMultipleVectoredReads() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges1 = createSomeOverlappingRanges(); List<FileRange> fileRanges1 = createSampleNonOverlappingRanges();
List<FileRange> fileRanges2 = createSomeOverlappingRanges(); List<FileRange> fileRanges2 = createSampleNonOverlappingRanges();
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges1, allocate); in.readVectored(fileRanges1, allocate);
in.readVectored(fileRanges2, allocate); in.readVectored(fileRanges2, allocate);
validateVectoredReadResult(fileRanges2, DATASET); validateVectoredReadResult(fileRanges2, DATASET);
validateVectoredReadResult(fileRanges1, DATASET); validateVectoredReadResult(fileRanges1, DATASET);
returnBuffersToPoolPostRead(fileRanges1, pool);
returnBuffersToPoolPostRead(fileRanges2, pool);
} }
} }
protected List<FileRange> createSomeOverlappingRanges() { protected List<FileRange> createSampleNonOverlappingRanges() {
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(0, 100)); fileRanges.add(FileRange.createFileRange(0, 100));
fileRanges.add(new FileRangeImpl(90, 50)); fileRanges.add(FileRange.createFileRange(110, 50));
return fileRanges; return fileRanges;
} }
protected List<FileRange> getSampleSameRanges() {
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(FileRange.createFileRange(8_000, 1000));
fileRanges.add(FileRange.createFileRange(8_000, 1000));
fileRanges.add(FileRange.createFileRange(8_000, 1000));
return fileRanges;
}
protected void testExceptionalVectoredRead(FileSystem fs, protected List<FileRange> getSampleOverlappingRanges() {
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(FileRange.createFileRange(100, 500));
fileRanges.add(FileRange.createFileRange(400, 500));
return fileRanges;
}
/**
* Validate that exceptions must be thrown during a vectored
* read operation with specific input ranges.
* @param fs FileSystem instance.
* @param fileRanges input file ranges.
* @param clazz type of exception expected.
* @throws Exception any other IOE.
*/
protected <T extends Throwable> void verifyExceptionalVectoredRead(
FileSystem fs,
List<FileRange> fileRanges, List<FileRange> fileRanges,
String s) throws IOException { Class<T> clazz) throws Exception {
boolean exRaised = false;
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { CompletableFuture<FSDataInputStream> builder =
// Can we intercept here as done in S3 tests ?? fs.openFile(path(VECTORED_READ_FILE_NAME))
in.readVectored(fileRanges, allocate); .build();
} catch (EOFException | IllegalArgumentException ex) { try (FSDataInputStream in = builder.get()) {
// expected. LambdaTestUtils.intercept(clazz,
exRaised = true; () -> in.readVectored(fileRanges, allocate));
} }
Assertions.assertThat(exRaised)
.describedAs(s)
.isTrue();
} }
} }

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.fs.PathCapabilities; import org.apache.hadoop.fs.PathCapabilities;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.functional.FutureIO; import org.apache.hadoop.util.functional.FutureIO;
@ -1137,6 +1138,25 @@ public static void validateVectoredReadResult(List<FileRange> fileRanges,
} }
} }
/**
* Utility to return buffers back to the pool once all
* data has been read for each file range.
* @param fileRanges list of file range.
* @param pool buffer pool.
* @throws IOException any IOE
* @throws TimeoutException ideally this should never occur.
*/
public static void returnBuffersToPoolPostRead(List<FileRange> fileRanges,
ByteBufferPool pool)
throws IOException, TimeoutException {
for (FileRange range : fileRanges) {
ByteBuffer buffer = FutureIO.awaitFuture(range.getData(),
VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
pool.putBuffer(buffer);
}
}
/** /**
* Assert that the data read matches the dataset at the given offset. * Assert that the data read matches the dataset at the given offset.

View File

@ -18,9 +18,26 @@
package org.apache.hadoop.fs.contract.localfs; package org.apache.hadoop.fs.contract.localfs;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest { public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest {
@ -32,4 +49,38 @@ public TestLocalFSContractVectoredRead(String bufferType) {
protected AbstractFSContract createContract(Configuration conf) { protected AbstractFSContract createContract(Configuration conf) {
return new LocalFSContract(conf); return new LocalFSContract(conf);
} }
@Test
public void testChecksumValidationDuringVectoredRead() throws Exception {
Path testPath = path("big_range_checksum");
LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
final byte[] datasetCorrect = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
try (FSDataOutputStream out = localFs.create(testPath, true)){
out.write(datasetCorrect);
}
Path checksumPath = localFs.getChecksumFile(testPath);
Assertions.assertThat(localFs.exists(checksumPath))
.describedAs("Checksum file should be present")
.isTrue();
CompletableFuture<FSDataInputStream> fis = localFs.openFile(testPath).build();
List<FileRange> someRandomRanges = new ArrayList<>();
someRandomRanges.add(FileRange.createFileRange(10, 1024));
someRandomRanges.add(FileRange.createFileRange(1025, 1024));
try (FSDataInputStream in = fis.get()){
in.readVectored(someRandomRanges, getAllocate());
validateVectoredReadResult(someRandomRanges, datasetCorrect);
}
final byte[] datasetCorrupted = ContractTestUtils.dataset(DATASET_LEN, 'a', 64);
try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){
out.write(datasetCorrupted);
}
CompletableFuture<FSDataInputStream> fisN = localFs.openFile(testPath).build();
try (FSDataInputStream in = fisN.get()){
in.readVectored(someRandomRanges, getAllocate());
// Expect checksum exception when data is updated directly through
// raw local fs instance.
intercept(ChecksumException.class,
() -> validateVectoredReadResult(someRandomRanges, datasetCorrupted));
}
}
} }

View File

@ -22,11 +22,13 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction; import java.util.function.IntFunction;
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
@ -46,8 +48,7 @@
import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.fs.VectoredReadUtils;
import org.apache.hadoop.fs.impl.VectoredReadUtils;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTracker;
@ -59,9 +60,9 @@
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.impl.VectoredReadUtils.isOrderedDisjoint; import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint;
import static org.apache.hadoop.fs.impl.VectoredReadUtils.sliceTo; import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges;
import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortAndMergeRanges; import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges;
import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.util.StringUtils.toLowerCase; import static org.apache.hadoop.util.StringUtils.toLowerCase;
@ -107,6 +108,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/ */
private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024; private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
/**
* Atomic boolean variable to stop all ongoing vectored read operation
* for this input stream. This will be set to true when the stream is
* closed or unbuffer is called.
*/
private final AtomicBoolean stopVectoredIOOperations = new AtomicBoolean(false);
/** /**
* This is the public position; the one set in {@link #seek(long)} * This is the public position; the one set in {@link #seek(long)}
* and returned in {@link #getPos()}. * and returned in {@link #getPos()}.
@ -589,6 +597,7 @@ public synchronized void close() throws IOException {
if (!closed) { if (!closed) {
closed = true; closed = true;
try { try {
stopVectoredIOOperations.set(true);
// close or abort the stream; blocking // close or abort the stream; blocking
awaitFuture(closeStream("close() operation", false, true)); awaitFuture(closeStream("close() operation", false, true));
LOG.debug("Statistics of stream {}\n{}", key, streamStatistics); LOG.debug("Statistics of stream {}\n{}", key, streamStatistics);
@ -940,31 +949,32 @@ public void readVectored(List<? extends FileRange> ranges,
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
checkNotClosed(); checkNotClosed();
if (stopVectoredIOOperations.getAndSet(false)) {
LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
}
List<? extends FileRange> sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges);
for (FileRange range : ranges) { for (FileRange range : ranges) {
validateRangeRequest(range); validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>(); CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
range.setData(result); range.setData(result);
} }
if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) { if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint"); LOG.debug("Not merging the ranges as they are disjoint");
for(FileRange range: ranges) { for (FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength()); ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
} }
} else { } else {
LOG.debug("Trying to merge the ranges as they are not disjoint"); LOG.debug("Trying to merge the ranges as they are not disjoint");
List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges, List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
1, minSeekForVectorReads(), 1, minSeekForVectorReads(),
maxReadSizeForVectorReads()); maxReadSizeForVectorReads());
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
ranges.size(), combinedFileRanges.size()); ranges.size(), combinedFileRanges.size());
for (CombinedFileRange combinedFileRange: combinedFileRanges) { for (CombinedFileRange combinedFileRange: combinedFileRanges) {
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
combinedFileRange.setData(result);
unboundedThreadPool.submit( unboundedThreadPool.submit(
() -> readCombinedRangeAndUpdateChildren(combinedFileRange, buffer)); () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
} }
} }
LOG.debug("Finished submitting vectored read to threadpool" + LOG.debug("Finished submitting vectored read to threadpool" +
@ -972,58 +982,102 @@ public void readVectored(List<? extends FileRange> ranges,
} }
/** /**
* Read data in the combinedFileRange and update data in buffers * Read the data from S3 for the bigger combined file range and update all the
* of all underlying ranges. * underlying ranges.
* @param combinedFileRange combined range. * @param combinedFileRange big combined file range.
* @param buffer combined buffer. * @param allocate method to create byte buffers to hold result data.
*/ */
private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
ByteBuffer buffer) { IntFunction<ByteBuffer> allocate) {
// Not putting read single range call inside try block as LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr);
// exception if any occurred during this call will be raised // This reference is must be kept till all buffers are populated as this is a
// during awaitFuture call while getting the combined buffer. // finalizable object which closes the internal stream when gc triggers.
readSingleRange(combinedFileRange, buffer); S3Object objectRange = null;
S3ObjectInputStream objectContent = null;
try { try {
// In case of single range we return the original byte buffer else checkIfVectoredIOStopped();
// we return slice byte buffers for each child ranges. final String operationName = "readCombinedFileRange";
ByteBuffer combinedBuffer = FutureIOSupport.awaitFuture(combinedFileRange.getData()); objectRange = getS3Object(operationName,
if (combinedFileRange.getUnderlying().size() == 1) { combinedFileRange.getOffset(),
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer); combinedFileRange.getLength());
} else { objectContent = objectRange.getObjectContent();
for (FileRange child : combinedFileRange.getUnderlying()) { if (objectContent == null) {
updateOriginalRange(child, combinedBuffer, combinedFileRange); throw new PathIOException(uri,
} "Null IO stream received during " + operationName);
} }
populateChildBuffers(combinedFileRange, objectContent, allocate);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("Exception occurred while reading combined range from file {}", pathStr, ex); LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex);
for(FileRange child : combinedFileRange.getUnderlying()) { for(FileRange child : combinedFileRange.getUnderlying()) {
child.getData().completeExceptionally(ex); child.getData().completeExceptionally(ex);
} }
} finally {
IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
} }
LOG.debug("Finished reading range {} from path {} ", combinedFileRange, pathStr);
} }
/** /**
* Update data in child range from combined range. * Populate underlying buffers of the child ranges.
* @param child child range. * @param combinedFileRange big combined file range.
* @param combinedBuffer combined buffer. * @param objectContent data from s3.
* @param combinedFileRange combined range. * @param allocate method to allocate child byte buffers.
* @throws IOException any IOE.
*/ */
private void updateOriginalRange(FileRange child, private void populateChildBuffers(CombinedFileRange combinedFileRange,
ByteBuffer combinedBuffer, S3ObjectInputStream objectContent,
CombinedFileRange combinedFileRange) { IntFunction<ByteBuffer> allocate) throws IOException {
LOG.trace("Start Filling original range [{}, {}) from combined range [{}, {}) ", // If the combined file range just contains a single child
child.getOffset(), child.getLength(), // range, we only have to fill that one child buffer else
combinedFileRange.getOffset(), combinedFileRange.getLength()); // we drain the intermediate data between consecutive ranges
ByteBuffer childBuffer = sliceTo(combinedBuffer, combinedFileRange.getOffset(), child); // and fill the buffers one by one.
child.getData().complete(childBuffer); if (combinedFileRange.getUnderlying().size() == 1) {
LOG.trace("End Filling original range [{}, {}) from combined range [{}, {}) ", FileRange child = combinedFileRange.getUnderlying().get(0);
child.getOffset(), child.getLength(), ByteBuffer buffer = allocate.apply(child.getLength());
combinedFileRange.getOffset(), combinedFileRange.getLength()); populateBuffer(child.getLength(), buffer, objectContent);
child.getData().complete(buffer);
} else {
FileRange prev = null;
for (FileRange child : combinedFileRange.getUnderlying()) {
if (prev != null && prev.getOffset() + prev.getLength() < child.getOffset()) {
long drainQuantity = child.getOffset() - prev.getOffset() - prev.getLength();
drainUnnecessaryData(objectContent, drainQuantity);
}
ByteBuffer buffer = allocate.apply(child.getLength());
populateBuffer(child.getLength(), buffer, objectContent);
child.getData().complete(buffer);
prev = child;
}
}
}
/**
* Drain unnecessary data in between ranges.
* @param objectContent s3 data stream.
* @param drainQuantity how many bytes to drain.
* @throws IOException any IOE.
*/
private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQuantity)
throws IOException {
int drainBytes = 0;
int readCount;
while (drainBytes < drainQuantity) {
if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity) {
byte[] drainBuffer = new byte[DRAIN_BUFFER_SIZE];
readCount = objectContent.read(drainBuffer);
} else {
byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)];
readCount = objectContent.read(drainBuffer);
}
drainBytes += readCount;
}
LOG.debug("{} bytes drained from stream ", drainBytes);
} }
/** /**
* // Check if we can use contentLength returned by http GET request.
* Validates range parameters. * Validates range parameters.
* In case of S3 we already have contentLength from the first GET request
* during an open file operation so failing fast here.
* @param range requested range. * @param range requested range.
* @throws EOFException end of file exception. * @throws EOFException end of file exception.
*/ */
@ -1038,13 +1092,7 @@ private void validateRangeRequest(FileRange range) throws EOFException {
} }
/** /**
* TODO: Add retry in client.getObject(). not present in older reads why here?? * Read data from S3 for this range and populate the buffer.
* Okay retry is being done in the top layer during read.
* But if we do here in the top layer, one issue I am thinking is
* what if there is some error which happened during filling the buffer
* If we retry that old offsets of heap buffers can be overwritten ?
* I think retry should be only added in {@link S3AInputStream#getS3Object}
* Read data from S3 for this range and populate the bufffer.
* @param range range of data to read. * @param range range of data to read.
* @param buffer buffer to fill. * @param buffer buffer to fill.
*/ */
@ -1053,6 +1101,7 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) {
S3Object objectRange = null; S3Object objectRange = null;
S3ObjectInputStream objectContent = null; S3ObjectInputStream objectContent = null;
try { try {
checkIfVectoredIOStopped();
long position = range.getOffset(); long position = range.getOffset();
int length = range.getLength(); int length = range.getLength();
final String operationName = "readRange"; final String operationName = "readRange";
@ -1089,6 +1138,7 @@ private void populateBuffer(int length,
int offset = 0; int offset = 0;
byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE]; byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE];
while (readBytes < length) { while (readBytes < length) {
checkIfVectoredIOStopped();
int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ? int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ?
TMP_BUFFER_MAX_SIZE TMP_BUFFER_MAX_SIZE
: length - readBytes; : length - readBytes;
@ -1103,7 +1153,15 @@ private void populateBuffer(int length,
} }
} }
public void readByteArray(S3ObjectInputStream objectContent, /**
* Read data into destination buffer from s3 object content.
* @param objectContent result from S3.
* @param dest destination buffer.
* @param offset start offset of dest buffer.
* @param length number of bytes to fill in dest.
* @throws IOException any IOE.
*/
private void readByteArray(S3ObjectInputStream objectContent,
byte[] dest, byte[] dest,
int offset, int offset,
int length) throws IOException { int length) throws IOException {
@ -1120,14 +1178,14 @@ public void readByteArray(S3ObjectInputStream objectContent,
} }
/** /**
* Read data from S3 using a http request. * Read data from S3 using a http request with retries.
* This also handles if file has been changed while http call * This also handles if file has been changed while the
* is getting executed. If file has been changed RemoteFileChangedException * http call is getting executed. If the file has been
* is thrown. * changed RemoteFileChangedException is thrown.
* @param operationName name of the operation for which get object on S3 is called. * @param operationName name of the operation for which get object on S3 is called.
* @param position position of the object to be read from S3. * @param position position of the object to be read from S3.
* @param length length from position of the object to be read from S3. * @param length length from position of the object to be read from S3.
* @return S3Object * @return S3Object result s3 object.
* @throws IOException exception if any. * @throws IOException exception if any.
*/ */
private S3Object getS3Object(String operationName, long position, private S3Object getS3Object(String operationName, long position,
@ -1140,7 +1198,11 @@ private S3Object getS3Object(String operationName, long position,
Invoker invoker = context.getReadInvoker(); Invoker invoker = context.getReadInvoker();
try { try {
objectRange = invoker.retry(operationName, pathStr, true, objectRange = invoker.retry(operationName, pathStr, true,
() -> client.getObject(request)); () -> {
checkIfVectoredIOStopped();
return client.getObject(request);
});
} catch (IOException ex) { } catch (IOException ex) {
tracker.failed(); tracker.failed();
throw ex; throw ex;
@ -1152,6 +1214,19 @@ private S3Object getS3Object(String operationName, long position,
return objectRange; return objectRange;
} }
/**
* Check if vectored io operation has been stooped. This happens
* when the stream is closed or unbuffer is called.
* @throws InterruptedIOException throw InterruptedIOException such
* that all running vectored io is
* terminated thus releasing resources.
*/
private void checkIfVectoredIOStopped() throws InterruptedIOException {
if (stopVectoredIOOperations.get()) {
throw new InterruptedIOException("Stream closed or unbuffer is called");
}
}
/** /**
* Access the input stream statistics. * Access the input stream statistics.
* This is for internal testing and may be removed without warning. * This is for internal testing and may be removed without warning.
@ -1237,10 +1312,15 @@ public static long validateReadahead(@Nullable Long readahead) {
/** /**
* Closes the underlying S3 stream, and merges the {@link #streamStatistics} * Closes the underlying S3 stream, and merges the {@link #streamStatistics}
* instance associated with the stream. * instance associated with the stream.
* Also sets the {@code stopVectoredIOOperations} flag to true such that
* active vectored read operations are terminated. However termination of
* old vectored reads are not guaranteed if a new vectored read operation
* is initiated after unbuffer is called.
*/ */
@Override @Override
public synchronized void unbuffer() { public synchronized void unbuffer() {
try { try {
stopVectoredIOOperations.set(true);
closeStream("unbuffer()", false, false); closeStream("unbuffer()", false, false);
} finally { } finally {
streamStatistics.unbuffered(); streamStatistics.unbuffered();
@ -1253,6 +1333,7 @@ public boolean hasCapability(String capability) {
case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.READAHEAD: case StreamCapabilities.READAHEAD:
case StreamCapabilities.UNBUFFER: case StreamCapabilities.UNBUFFER:
case StreamCapabilities.VECTOREDIO:
return true; return true;
default: default:
return false; return false;

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.fs.contract.s3a; package org.apache.hadoop.fs.contract.s3a;
import java.io.EOFException;
import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -26,14 +28,15 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.test.MoreAsserts.assertEqual; import static org.apache.hadoop.test.MoreAsserts.assertEqual;
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
@ -55,8 +58,8 @@ protected AbstractFSContract createContract(Configuration conf) {
public void testEOFRanges() throws Exception { public void testEOFRanges() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100));
testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected"); verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class);
} }
@Test @Test
@ -99,4 +102,58 @@ public void testMinSeekAndMaxSizeDefaultValues() throws Exception {
} }
} }
} }
@Test
public void testStopVectoredIoOperationsCloseStream() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
in.readVectored(fileRanges, getAllocate());
in.close();
LambdaTestUtils.intercept(InterruptedIOException.class,
() -> validateVectoredReadResult(fileRanges, DATASET));
}
// reopening the stream should succeed.
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
in.readVectored(fileRanges, getAllocate());
validateVectoredReadResult(fileRanges, DATASET);
}
}
@Test
public void testStopVectoredIoOperationsUnbuffer() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
in.readVectored(fileRanges, getAllocate());
in.unbuffer();
LambdaTestUtils.intercept(InterruptedIOException.class,
() -> validateVectoredReadResult(fileRanges, DATASET));
// re-initiating the vectored reads after unbuffer should succeed.
in.readVectored(fileRanges, getAllocate());
validateVectoredReadResult(fileRanges, DATASET);
}
}
/**
* S3 vectored IO doesn't support overlapping ranges.
*/
@Override
public void testOverlappingRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = getSampleOverlappingRanges();
verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class);
}
/**
* S3 vectored IO doesn't support overlapping ranges.
*/
@Override
public void testSameRanges() throws Exception {
// Same ranges are special case of overlapping only.
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = getSampleSameRanges();
verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class);
}
} }

View File

@ -41,7 +41,6 @@
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -459,13 +458,13 @@ public void test_040_PositionedReadHugeFile() throws Throwable {
public void test_045_vectoredIOHugeFile() throws Throwable { public void test_045_vectoredIOHugeFile() throws Throwable {
assumeHugeFileExists(); assumeHugeFileExists();
List<FileRange> rangeList = new ArrayList<>(); List<FileRange> rangeList = new ArrayList<>();
rangeList.add(new FileRangeImpl(5856368, 1167716)); rangeList.add(FileRange.createFileRange(5856368, 116770));
rangeList.add(new FileRangeImpl(3520861, 1167700)); rangeList.add(FileRange.createFileRange(3520861, 116770));
rangeList.add(new FileRangeImpl(8191913, 1167775)); rangeList.add(FileRange.createFileRange(8191913, 116770));
rangeList.add(new FileRangeImpl(1520861, 1167700)); rangeList.add(FileRange.createFileRange(1520861, 116770));
rangeList.add(new FileRangeImpl(2520861, 116770)); rangeList.add(FileRange.createFileRange(2520861, 116770));
rangeList.add(new FileRangeImpl(9191913, 116770)); rangeList.add(FileRange.createFileRange(9191913, 116770));
rangeList.add(new FileRangeImpl(2820861, 156770)); rangeList.add(FileRange.createFileRange(2820861, 156770));
IntFunction<ByteBuffer> allocate = ByteBuffer::allocate; IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
CompletableFuture<FSDataInputStream> builder = CompletableFuture<FSDataInputStream> builder =

View File

@ -47,7 +47,7 @@
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl; import org.apache.hadoop.fs.impl.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -107,7 +107,7 @@ public void asyncRead(FileSystemChoice fsChoice,
FSDataInputStream stream = fsChoice.fs.open(DATA_PATH); FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
List<FileRange> ranges = new ArrayList<>(); List<FileRange> ranges = new ArrayList<>();
for(int m=0; m < 100; ++m) { for(int m=0; m < 100; ++m) {
FileRangeImpl range = new FileRangeImpl(m * SEEK_SIZE, READ_SIZE); FileRange range = FileRange.createFileRange(m * SEEK_SIZE, READ_SIZE);
ranges.add(range); ranges.add(range);
} }
stream.readVectored(ranges, bufferChoice.allocate); stream.readVectored(ranges, bufferChoice.allocate);