HADOOP-18439. Fix VectoredIO for LocalFileSystem when checksum is enabled. (#4862)

part of HADOOP-18103.

While merging the ranges in CheckSumFs, they are rounded up based on the
value of checksum bytes size which leads to some ranges crossing the EOF
thus they need to be fixed else it will cause EOFException during actual reads.

Contributed By: Mukund Thakur
This commit is contained in:
Mukund Thakur 2022-09-09 21:46:08 +05:30 committed by GitHub
parent 5b85af87f0
commit 8732625f50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 139 additions and 23 deletions

View File

@ -174,6 +174,7 @@ private static class ChecksumFSInputChecker extends FSInputChecker implements
private static final int HEADER_LENGTH = 8; private static final int HEADER_LENGTH = 8;
private int bytesPerSum = 1; private int bytesPerSum = 1;
private long fileLen = -1L;
public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
throws IOException { throws IOException {
@ -320,6 +321,18 @@ public static long findChecksumOffset(long dataOffset,
return HEADER_LENGTH + (dataOffset/bytesPerSum) * FSInputChecker.CHECKSUM_SIZE; return HEADER_LENGTH + (dataOffset/bytesPerSum) * FSInputChecker.CHECKSUM_SIZE;
} }
/**
* Calculate length of file if not already cached.
* @return file length.
* @throws IOException any IOE.
*/
private long getFileLength() throws IOException {
if (fileLen == -1L) {
fileLen = fs.getFileStatus(file).getLen();
}
return fileLen;
}
/** /**
* Find the checksum ranges that correspond to the given data ranges. * Find the checksum ranges that correspond to the given data ranges.
* @param dataRanges the input data ranges, which are assumed to be sorted * @param dataRanges the input data ranges, which are assumed to be sorted
@ -371,13 +384,28 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
IntBuffer sums = sumsBytes.asIntBuffer(); IntBuffer sums = sumsBytes.asIntBuffer();
sums.position(offset / FSInputChecker.CHECKSUM_SIZE); sums.position(offset / FSInputChecker.CHECKSUM_SIZE);
ByteBuffer current = data.duplicate(); ByteBuffer current = data.duplicate();
int numChunks = data.remaining() / bytesPerSum; int numFullChunks = data.remaining() / bytesPerSum;
boolean partialChunk = ((data.remaining() % bytesPerSum) != 0);
int totalChunks = numFullChunks;
if (partialChunk) {
totalChunks++;
}
CRC32 crc = new CRC32(); CRC32 crc = new CRC32();
// check each chunk to ensure they match // check each chunk to ensure they match
for(int c = 0; c < numChunks; ++c) { for(int c = 0; c < totalChunks; ++c) {
// set the buffer position and the limit // set the buffer position to the start of every chunk.
current.limit((c + 1) * bytesPerSum);
current.position(c * bytesPerSum); current.position(c * bytesPerSum);
if (c == numFullChunks) {
// During last chunk, there may be less than chunk size
// data preset, so setting the limit accordingly.
int lastIncompleteChunk = data.remaining() % bytesPerSum;
current.limit((c * bytesPerSum) + lastIncompleteChunk);
} else {
// set the buffer limit to end of every chunk.
current.limit((c + 1) * bytesPerSum);
}
// compute the crc // compute the crc
crc.reset(); crc.reset();
crc.update(current); crc.update(current);
@ -396,11 +424,34 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
return data; return data;
} }
/**
* Validates range parameters.
* In case of CheckSum FS, we already have calculated
* fileLength so failing fast here.
* @param ranges requested ranges.
* @param fileLength length of file.
* @throws EOFException end of file exception.
*/
private void validateRangeRequest(List<? extends FileRange> ranges,
final long fileLength) throws EOFException {
for (FileRange range : ranges) {
VectoredReadUtils.validateRangeRequest(range);
if (range.getOffset() + range.getLength() > fileLength) {
final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s",
range.getOffset(), range.getLength(), file);
LOG.warn(errMsg);
throw new EOFException(errMsg);
}
}
}
@Override @Override
public void readVectored(List<? extends FileRange> ranges, public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException { IntFunction<ByteBuffer> allocate) throws IOException {
final long length = getFileLength();
validateRangeRequest(ranges, length);
// If the stream doesn't have checksums, just delegate. // If the stream doesn't have checksums, just delegate.
VectoredReadUtils.validateVectoredReadRanges(ranges);
if (sums == null) { if (sums == null) {
datas.readVectored(ranges, allocate); datas.readVectored(ranges, allocate);
return; return;
@ -410,15 +461,18 @@ public void readVectored(List<? extends FileRange> ranges,
List<CombinedFileRange> dataRanges = List<CombinedFileRange> dataRanges =
VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum, VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum,
minSeek, maxReadSizeForVectorReads()); minSeek, maxReadSizeForVectorReads());
// While merging the ranges above, they are rounded up based on the value of bytesPerSum
// which leads to some ranges crossing the EOF thus they need to be fixed else it will
// cause EOFException during actual reads.
for (CombinedFileRange range : dataRanges) {
if (range.getOffset() + range.getLength() > length) {
range.setLength((int) (length - range.getOffset()));
}
}
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges, List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize); bytesPerSum, minSeek, maxSize);
sums.readVectored(checksumRanges, allocate); sums.readVectored(checksumRanges, allocate);
datas.readVectored(dataRanges, allocate); datas.readVectored(dataRanges, allocate);
// Data read is correct. I have verified content of dataRanges.
// There is some bug below here as test (testVectoredReadMultipleRanges)
// is failing, should be
// somewhere while slicing the merged data into smaller user ranges.
// Spend some time figuring out but it is a complex code.
for(CombinedFileRange checksumRange: checksumRanges) { for(CombinedFileRange checksumRange: checksumRanges) {
for(FileRange dataRange: checksumRange.getUnderlying()) { for(FileRange dataRange: checksumRange.getUnderlying()) {
// when we have both the ranges, validate the checksum // when we have both the ranges, validate the checksum

View File

@ -272,6 +272,11 @@ public void testConsecutiveRanges() throws Exception {
} }
} }
/**
* Test to validate EOF ranges. Default implementation fails with EOFException
* while reading the ranges. Some implementation like s3, checksum fs fail fast
* as they already have the file length calculated.
*/
@Test @Test
public void testEOFRanges() throws Exception { public void testEOFRanges() throws Exception {
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.contract.localfs; package org.apache.hadoop.fs.contract.localfs;
import java.io.EOFException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -30,6 +31,7 @@
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
@ -52,9 +54,33 @@ protected AbstractFSContract createContract(Configuration conf) {
@Test @Test
public void testChecksumValidationDuringVectoredRead() throws Exception { public void testChecksumValidationDuringVectoredRead() throws Exception {
Path testPath = path("big_range_checksum"); Path testPath = path("big_range_checksum_file");
List<FileRange> someRandomRanges = new ArrayList<>();
someRandomRanges.add(FileRange.createFileRange(10, 1024));
someRandomRanges.add(FileRange.createFileRange(1025, 1024));
validateCheckReadException(testPath, DATASET_LEN, someRandomRanges);
}
/**
* Test for file size less than checksum chunk size.
* {@code ChecksumFileSystem#bytesPerChecksum}.
*/
@Test
public void testChecksumValidationDuringVectoredReadSmallFile() throws Exception {
Path testPath = path("big_range_checksum_file");
final int length = 471;
List<FileRange> smallFileRanges = new ArrayList<>();
smallFileRanges.add(FileRange.createFileRange(10, 50));
smallFileRanges.add(FileRange.createFileRange(100, 20));
validateCheckReadException(testPath, length, smallFileRanges);
}
private void validateCheckReadException(Path testPath,
int length,
List<FileRange> ranges) throws Exception {
LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
final byte[] datasetCorrect = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32);
try (FSDataOutputStream out = localFs.create(testPath, true)){ try (FSDataOutputStream out = localFs.create(testPath, true)){
out.write(datasetCorrect); out.write(datasetCorrect);
} }
@ -63,24 +89,55 @@ public void testChecksumValidationDuringVectoredRead() throws Exception {
.describedAs("Checksum file should be present") .describedAs("Checksum file should be present")
.isTrue(); .isTrue();
CompletableFuture<FSDataInputStream> fis = localFs.openFile(testPath).build(); CompletableFuture<FSDataInputStream> fis = localFs.openFile(testPath).build();
List<FileRange> someRandomRanges = new ArrayList<>();
someRandomRanges.add(FileRange.createFileRange(10, 1024));
someRandomRanges.add(FileRange.createFileRange(1025, 1024));
try (FSDataInputStream in = fis.get()){ try (FSDataInputStream in = fis.get()){
in.readVectored(someRandomRanges, getAllocate()); in.readVectored(ranges, getAllocate());
validateVectoredReadResult(someRandomRanges, datasetCorrect); validateVectoredReadResult(ranges, datasetCorrect);
} }
final byte[] datasetCorrupted = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); final byte[] datasetCorrupted = ContractTestUtils.dataset(length, 'a', 64);
try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){ try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){
out.write(datasetCorrupted); out.write(datasetCorrupted);
} }
CompletableFuture<FSDataInputStream> fisN = localFs.openFile(testPath).build(); CompletableFuture<FSDataInputStream> fisN = localFs.openFile(testPath).build();
try (FSDataInputStream in = fisN.get()){ try (FSDataInputStream in = fisN.get()){
in.readVectored(someRandomRanges, getAllocate()); in.readVectored(ranges, getAllocate());
// Expect checksum exception when data is updated directly through // Expect checksum exception when data is updated directly through
// raw local fs instance. // raw local fs instance.
intercept(ChecksumException.class, intercept(ChecksumException.class,
() -> validateVectoredReadResult(someRandomRanges, datasetCorrupted)); () -> validateVectoredReadResult(ranges, datasetCorrupted));
} }
} }
@Test
public void tesChecksumVectoredReadBoundaries() throws Exception {
Path testPath = path("boundary_range_checksum_file");
final int length = 1071;
LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32);
try (FSDataOutputStream out = localFs.create(testPath, true)){
out.write(datasetCorrect);
}
Path checksumPath = localFs.getChecksumFile(testPath);
Assertions.assertThat(localFs.exists(checksumPath))
.describedAs("Checksum file should be present at {} ", checksumPath)
.isTrue();
CompletableFuture<FSDataInputStream> fis = localFs.openFile(testPath).build();
List<FileRange> smallRange = new ArrayList<>();
smallRange.add(FileRange.createFileRange(1000, 71));
try (FSDataInputStream in = fis.get()){
in.readVectored(smallRange, getAllocate());
validateVectoredReadResult(smallRange, datasetCorrect);
}
}
/**
* Overriding in checksum fs as vectored read api fails fast
* in case of EOF requested range.
*/
@Override
public void testEOFRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100));
verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class);
}
} }

View File

@ -1005,10 +1005,10 @@ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQ
private void validateRangeRequest(FileRange range) throws EOFException { private void validateRangeRequest(FileRange range) throws EOFException {
VectoredReadUtils.validateRangeRequest(range); VectoredReadUtils.validateRangeRequest(range);
if(range.getOffset() + range.getLength() > contentLength) { if(range.getOffset() + range.getLength() > contentLength) {
LOG.warn("Requested range [{}, {}) is beyond EOF for path {}", final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s",
range.getOffset(), range.getLength(), pathStr); range.getOffset(), range.getLength(), pathStr);
throw new EOFException("Requested range [" + range.getOffset() +", " LOG.warn(errMsg);
+ range.getLength() + ") is beyond EOF for path " + pathStr); throw new EOFException(errMsg);
} }
} }