HADOOP-19291. RawLocalFileSystem to allow overlapping ranges (#7101)
ChecksumFileSystem creates the chunked ranges based on the checksum chunk size and then calls readVectored on Raw Local which may lead to overlapping ranges in some cases. Contributed by: Mukund Thakur
This commit is contained in:
parent
dc56fc385a
commit
e4b070025b
@ -68,7 +68,8 @@
|
|||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
|
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
|
||||||
|
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
|
||||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||||
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
|
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
|
||||||
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
|
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
|
||||||
@ -320,10 +321,10 @@ public void readVectored(List<? extends FileRange> ranges,
|
|||||||
IntFunction<ByteBuffer> allocate) throws IOException {
|
IntFunction<ByteBuffer> allocate) throws IOException {
|
||||||
|
|
||||||
// Validate, but do not pass in a file length as it may change.
|
// Validate, but do not pass in a file length as it may change.
|
||||||
List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges,
|
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
|
||||||
Optional.empty());
|
|
||||||
// Set up all of the futures, so that we can use them if things fail
|
// Set up all of the futures, so that we can use them if things fail
|
||||||
for(FileRange range: sortedRanges) {
|
for(FileRange range: sortedRanges) {
|
||||||
|
validateRangeRequest(range);
|
||||||
range.setData(new CompletableFuture<>());
|
range.setData(new CompletableFuture<>());
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -623,8 +623,13 @@ support -and fallback everywhere else.
|
|||||||
|
|
||||||
The restriction "no overlapping ranges" was only initially enforced in
|
The restriction "no overlapping ranges" was only initially enforced in
|
||||||
the S3A connector, which would raise `UnsupportedOperationException`.
|
the S3A connector, which would raise `UnsupportedOperationException`.
|
||||||
Adding the range check as a precondition for all implementations guarantees
|
Adding the range check as a precondition for all implementations (Raw Local
|
||||||
consistent behavior everywhere.
|
being an exception) guarantees consistent behavior everywhere.
|
||||||
|
The reason Raw Local doesn't have this precondition is ChecksumFileSystem
|
||||||
|
creates the chunked ranges based on the checksum chunk size and then calls
|
||||||
|
readVectored on Raw Local which may lead to overlapping ranges in some cases.
|
||||||
|
For details see [HADOOP-19291](https://issues.apache.org/jira/browse/HADOOP-19291)
|
||||||
|
|
||||||
For reliable use with older hadoop releases with the API: sort the list of ranges
|
For reliable use with older hadoop releases with the API: sort the list of ranges
|
||||||
and check for overlaps before calling `readVectored()`.
|
and check for overlaps before calling `readVectored()`.
|
||||||
|
|
||||||
|
@ -270,13 +270,23 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Vectored IO doesn't support overlapping ranges.
|
* Most file systems won't support overlapping ranges.
|
||||||
|
* Currently, only Raw Local supports it.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testOverlappingRanges() throws Exception {
|
public void testOverlappingRanges() throws Exception {
|
||||||
verifyExceptionalVectoredRead(
|
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
|
||||||
getSampleOverlappingRanges(),
|
verifyExceptionalVectoredRead(
|
||||||
IllegalArgumentException.class);
|
getSampleOverlappingRanges(),
|
||||||
|
IllegalArgumentException.class);
|
||||||
|
} else {
|
||||||
|
try (FSDataInputStream in = openVectorFile()) {
|
||||||
|
List<FileRange> fileRanges = getSampleOverlappingRanges();
|
||||||
|
in.readVectored(fileRanges, allocate);
|
||||||
|
validateVectoredReadResult(fileRanges, DATASET, 0);
|
||||||
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -284,9 +294,18 @@ public void testOverlappingRanges() throws Exception {
|
|||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSameRanges() throws Exception {
|
public void testSameRanges() throws Exception {
|
||||||
verifyExceptionalVectoredRead(
|
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
|
||||||
getSampleSameRanges(),
|
verifyExceptionalVectoredRead(
|
||||||
IllegalArgumentException.class);
|
getSampleSameRanges(),
|
||||||
|
IllegalArgumentException.class);
|
||||||
|
} else {
|
||||||
|
try (FSDataInputStream in = openVectorFile()) {
|
||||||
|
List<FileRange> fileRanges = getSampleSameRanges();
|
||||||
|
in.readVectored(fileRanges, allocate);
|
||||||
|
validateVectoredReadResult(fileRanges, DATASET, 0);
|
||||||
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -329,10 +348,9 @@ public void testSomeRandomNonOverlappingRanges() throws Exception {
|
|||||||
public void testConsecutiveRanges() throws Exception {
|
public void testConsecutiveRanges() throws Exception {
|
||||||
List<FileRange> fileRanges = new ArrayList<>();
|
List<FileRange> fileRanges = new ArrayList<>();
|
||||||
final int offset = 500;
|
final int offset = 500;
|
||||||
final int length = 100;
|
final int length = 2011;
|
||||||
range(fileRanges, offset, length);
|
range(fileRanges, offset, length);
|
||||||
range(fileRanges, 600, 200);
|
range(fileRanges, offset + length, length);
|
||||||
range(fileRanges, 800, 100);
|
|
||||||
try (FSDataInputStream in = openVectorFile()) {
|
try (FSDataInputStream in = openVectorFile()) {
|
||||||
in.readVectored(fileRanges, allocate);
|
in.readVectored(fileRanges, allocate);
|
||||||
validateVectoredReadResult(fileRanges, DATASET, 0);
|
validateVectoredReadResult(fileRanges, DATASET, 0);
|
||||||
|
@ -261,4 +261,6 @@ public interface ContractOptions {
|
|||||||
* Does vector read check file length on open rather than in the read call?
|
* Does vector read check file length on open rather than in the read call?
|
||||||
*/
|
*/
|
||||||
String VECTOR_IO_EARLY_EOF_CHECK = "vector-io-early-eof-check";
|
String VECTOR_IO_EARLY_EOF_CHECK = "vector-io-early-eof-check";
|
||||||
|
|
||||||
|
String VECTOR_IO_OVERLAPPING_RANGES = "vector-io-overlapping-ranges";
|
||||||
}
|
}
|
||||||
|
@ -142,4 +142,9 @@
|
|||||||
<value>true</value>
|
<value>true</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.contract.vector-io-overlapping-ranges</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
Loading…
Reference in New Issue
Block a user