diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index b23df1713c..8ec2a1c67b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -174,6 +174,7 @@ private static class ChecksumFSInputChecker extends FSInputChecker implements private static final int HEADER_LENGTH = 8; private int bytesPerSum = 1; + private long fileLen = -1L; public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) throws IOException { @@ -320,6 +321,18 @@ public static long findChecksumOffset(long dataOffset, return HEADER_LENGTH + (dataOffset/bytesPerSum) * FSInputChecker.CHECKSUM_SIZE; } + /** + * Calculate length of file if not already cached. + * @return file length. + * @throws IOException any IOE. + */ + private long getFileLength() throws IOException { + if (fileLen == -1L) { + fileLen = fs.getFileStatus(file).getLen(); + } + return fileLen; + } + /** * Find the checksum ranges that correspond to the given data ranges. * @param dataRanges the input data ranges, which are assumed to be sorted @@ -371,13 +384,28 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, IntBuffer sums = sumsBytes.asIntBuffer(); sums.position(offset / FSInputChecker.CHECKSUM_SIZE); ByteBuffer current = data.duplicate(); - int numChunks = data.remaining() / bytesPerSum; + int numFullChunks = data.remaining() / bytesPerSum; + boolean partialChunk = ((data.remaining() % bytesPerSum) != 0); + int totalChunks = numFullChunks; + if (partialChunk) { + totalChunks++; + } CRC32 crc = new CRC32(); // check each chunk to ensure they match - for(int c = 0; c < numChunks; ++c) { - // set the buffer position and the limit - current.limit((c + 1) * bytesPerSum); + for(int c = 0; c < totalChunks; ++c) { + // set the buffer position to the start of every chunk. current.position(c * bytesPerSum); + + if (c == numFullChunks) { + // During last chunk, there may be less than chunk size + // data preset, so setting the limit accordingly. + int lastIncompleteChunk = data.remaining() % bytesPerSum; + current.limit((c * bytesPerSum) + lastIncompleteChunk); + } else { + // set the buffer limit to end of every chunk. + current.limit((c + 1) * bytesPerSum); + } + // compute the crc crc.reset(); crc.update(current); @@ -396,11 +424,34 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, return data; } + /** + * Validates range parameters. + * In case of CheckSum FS, we already have calculated + * fileLength so failing fast here. + * @param ranges requested ranges. + * @param fileLength length of file. + * @throws EOFException end of file exception. + */ + private void validateRangeRequest(List ranges, + final long fileLength) throws EOFException { + for (FileRange range : ranges) { + VectoredReadUtils.validateRangeRequest(range); + if (range.getOffset() + range.getLength() > fileLength) { + final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s", + range.getOffset(), range.getLength(), file); + LOG.warn(errMsg); + throw new EOFException(errMsg); + } + } + } + @Override public void readVectored(List ranges, IntFunction allocate) throws IOException { + final long length = getFileLength(); + validateRangeRequest(ranges, length); + // If the stream doesn't have checksums, just delegate. - VectoredReadUtils.validateVectoredReadRanges(ranges); if (sums == null) { datas.readVectored(ranges, allocate); return; @@ -410,15 +461,18 @@ public void readVectored(List ranges, List dataRanges = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum, minSeek, maxReadSizeForVectorReads()); + // While merging the ranges above, they are rounded up based on the value of bytesPerSum + // which leads to some ranges crossing the EOF thus they need to be fixed else it will + // cause EOFException during actual reads. + for (CombinedFileRange range : dataRanges) { + if (range.getOffset() + range.getLength() > length) { + range.setLength((int) (length - range.getOffset())); + } + } List checksumRanges = findChecksumRanges(dataRanges, bytesPerSum, minSeek, maxSize); sums.readVectored(checksumRanges, allocate); datas.readVectored(dataRanges, allocate); - // Data read is correct. I have verified content of dataRanges. - // There is some bug below here as test (testVectoredReadMultipleRanges) - // is failing, should be - // somewhere while slicing the merged data into smaller user ranges. - // Spend some time figuring out but it is a complex code. for(CombinedFileRange checksumRange: checksumRanges) { for(FileRange dataRange: checksumRange.getUnderlying()) { // when we have both the ranges, validate the checksum diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index c76f1839b7..86b645b9ec 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -272,6 +272,11 @@ public void testConsecutiveRanges() throws Exception { } } + /** + * Test to validate EOF ranges. Default implementation fails with EOFException + * while reading the ranges. Some implementation like s3, checksum fs fail fast + * as they already have the file length calculated. + */ @Test public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 5d6ca3f8f0..5ee8880153 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.contract.localfs; +import java.io.EOFException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; @@ -52,9 +54,33 @@ protected AbstractFSContract createContract(Configuration conf) { @Test public void testChecksumValidationDuringVectoredRead() throws Exception { - Path testPath = path("big_range_checksum"); + Path testPath = path("big_range_checksum_file"); + List someRandomRanges = new ArrayList<>(); + someRandomRanges.add(FileRange.createFileRange(10, 1024)); + someRandomRanges.add(FileRange.createFileRange(1025, 1024)); + validateCheckReadException(testPath, DATASET_LEN, someRandomRanges); + } + + + /** + * Test for file size less than checksum chunk size. + * {@code ChecksumFileSystem#bytesPerChecksum}. + */ + @Test + public void testChecksumValidationDuringVectoredReadSmallFile() throws Exception { + Path testPath = path("big_range_checksum_file"); + final int length = 471; + List smallFileRanges = new ArrayList<>(); + smallFileRanges.add(FileRange.createFileRange(10, 50)); + smallFileRanges.add(FileRange.createFileRange(100, 20)); + validateCheckReadException(testPath, length, smallFileRanges); + } + + private void validateCheckReadException(Path testPath, + int length, + List ranges) throws Exception { LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); - final byte[] datasetCorrect = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32); try (FSDataOutputStream out = localFs.create(testPath, true)){ out.write(datasetCorrect); } @@ -63,24 +89,55 @@ public void testChecksumValidationDuringVectoredRead() throws Exception { .describedAs("Checksum file should be present") .isTrue(); CompletableFuture fis = localFs.openFile(testPath).build(); - List someRandomRanges = new ArrayList<>(); - someRandomRanges.add(FileRange.createFileRange(10, 1024)); - someRandomRanges.add(FileRange.createFileRange(1025, 1024)); try (FSDataInputStream in = fis.get()){ - in.readVectored(someRandomRanges, getAllocate()); - validateVectoredReadResult(someRandomRanges, datasetCorrect); + in.readVectored(ranges, getAllocate()); + validateVectoredReadResult(ranges, datasetCorrect); } - final byte[] datasetCorrupted = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); + final byte[] datasetCorrupted = ContractTestUtils.dataset(length, 'a', 64); try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){ out.write(datasetCorrupted); } CompletableFuture fisN = localFs.openFile(testPath).build(); try (FSDataInputStream in = fisN.get()){ - in.readVectored(someRandomRanges, getAllocate()); + in.readVectored(ranges, getAllocate()); // Expect checksum exception when data is updated directly through // raw local fs instance. intercept(ChecksumException.class, - () -> validateVectoredReadResult(someRandomRanges, datasetCorrupted)); + () -> validateVectoredReadResult(ranges, datasetCorrupted)); } } + @Test + public void tesChecksumVectoredReadBoundaries() throws Exception { + Path testPath = path("boundary_range_checksum_file"); + final int length = 1071; + LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); + final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32); + try (FSDataOutputStream out = localFs.create(testPath, true)){ + out.write(datasetCorrect); + } + Path checksumPath = localFs.getChecksumFile(testPath); + Assertions.assertThat(localFs.exists(checksumPath)) + .describedAs("Checksum file should be present at {} ", checksumPath) + .isTrue(); + CompletableFuture fis = localFs.openFile(testPath).build(); + List smallRange = new ArrayList<>(); + smallRange.add(FileRange.createFileRange(1000, 71)); + try (FSDataInputStream in = fis.get()){ + in.readVectored(smallRange, getAllocate()); + validateVectoredReadResult(smallRange, datasetCorrect); + } + } + + + /** + * Overriding in checksum fs as vectored read api fails fast + * in case of EOF requested range. + */ + @Override + public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 7ef1e216cf..b6ac8669a6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -1005,10 +1005,10 @@ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQ private void validateRangeRequest(FileRange range) throws EOFException { VectoredReadUtils.validateRangeRequest(range); if(range.getOffset() + range.getLength() > contentLength) { - LOG.warn("Requested range [{}, {}) is beyond EOF for path {}", + final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s", range.getOffset(), range.getLength(), pathStr); - throw new EOFException("Requested range [" + range.getOffset() +", " - + range.getLength() + ") is beyond EOF for path " + pathStr); + LOG.warn(errMsg); + throw new EOFException(errMsg); } }