diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index 756c3de85c..e8c86b5dbb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -43,7 +43,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.impl.FutureIOSupport; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; @RunWith(Parameterized.class) public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase { @@ -53,8 +55,6 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac public static final int DATASET_LEN = 64 * 1024; private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; - private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt"; - private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256); private final IntFunction allocate; @@ -77,8 +77,6 @@ public void setup() throws Exception { Path path = path(VECTORED_READ_FILE_NAME); FileSystem fs = getFileSystem(); createFile(fs, path, true, DATASET); - Path bigFile = path(VECTORED_READ_FILE_1MB_NAME); - createFile(fs, bigFile, true, DATASET_MB); } @Test @@ -99,7 +97,7 @@ public void testVectoredReadMultipleRanges() throws Exception { CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); combinedFuture.get(); - validateVectoredReadResult(fileRanges); + validateVectoredReadResult(fileRanges, DATASET); } } @@ -132,7 +130,7 @@ public void testDisjointRanges() throws Exception { fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges); + validateVectoredReadResult(fileRanges, DATASET); } } @@ -149,7 +147,7 @@ public void testAllRangesMergedIntoOne() throws Exception { fileRanges.add(new FileRangeImpl(8*1024 - 101, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges); + validateVectoredReadResult(fileRanges, DATASET); } } @@ -168,7 +166,7 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception { fileRanges.add(new FileRangeImpl(40*1024, 1024)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges); + validateVectoredReadResult(fileRanges, DATASET); } } @@ -184,24 +182,7 @@ public void testSameRanges() throws Exception { .build(); try (FSDataInputStream in = builder.get()) { in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges); - } - } - - @Test - public void testVectoredRead1MBFile() throws Exception { - FileSystem fs = getFileSystem(); - List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(1293, 25837)); - CompletableFuture builder = - fs.openFile(path(VECTORED_READ_FILE_1MB_NAME)) - .build(); - try (FSDataInputStream in = builder.get()) { - in.readVectored(fileRanges, allocate); - ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData()); - FileRange resRange = fileRanges.get(0); - assertDatasetEquals((int) resRange.getOffset(), "vecRead", - vecRes, resRange.getLength(), DATASET_MB); + validateVectoredReadResult(fileRanges, DATASET); } } @@ -215,7 +196,7 @@ public void testOverlappingRanges() throws Exception { fileRanges.add(new FileRangeImpl(10, 980)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges); + validateVectoredReadResult(fileRanges, DATASET); } } @@ -272,7 +253,7 @@ public void testNormalReadAfterVectoredRead() throws Exception { Assertions.assertThat(in.getPos()) .describedAs("Vectored read shouldn't change file pointer.") .isEqualTo(200); - validateVectoredReadResult(fileRanges); + validateVectoredReadResult(fileRanges, DATASET); } } @@ -290,7 +271,7 @@ public void testVectoredReadAfterNormalRead() throws Exception { .describedAs("Vectored read shouldn't change file pointer.") .isEqualTo(200); in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges); + validateVectoredReadResult(fileRanges, DATASET); } } @@ -302,8 +283,8 @@ public void testMultipleVectoredReads() throws Exception { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges1, allocate); in.readVectored(fileRanges2, allocate); - validateVectoredReadResult(fileRanges2); - validateVectoredReadResult(fileRanges1); + validateVectoredReadResult(fileRanges2, DATASET); + validateVectoredReadResult(fileRanges1, DATASET); } } @@ -314,27 +295,6 @@ protected List createSomeOverlappingRanges() { return fileRanges; } - protected void validateVectoredReadResult(List fileRanges) - throws ExecutionException, InterruptedException { - CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; - int i = 0; - for (FileRange res : fileRanges) { - completableFutures[i++] = res.getData(); - } - CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); - combinedFuture.get(); - - for (FileRange res : fileRanges) { - CompletableFuture data = res.getData(); - try { - ByteBuffer buffer = FutureIOSupport.awaitFuture(data); - assertDatasetEquals((int) res.getOffset(), "vecRead", buffer, res.getLength(), DATASET); - } catch (Exception ex) { - LOG.error("Exception while running vectored read ", ex); - Assert.fail("Exception while running vectored read " + ex); - } - } - } protected void testExceptionalVectoredRead(FileSystem fs, List fileRanges, @@ -351,26 +311,4 @@ protected void testExceptionalVectoredRead(FileSystem fs, .describedAs(s) .isTrue(); } - - /** - * Assert that the data read matches the dataset at the given offset. - * This helps verify that the seek process is moving the read pointer - * to the correct location in the file. - * @param readOffset the offset in the file where the read began. - * @param operation operation name for the assertion. - * @param data data read in. - * @param length length of data to check. - * @param originalData - */ - private void assertDatasetEquals( - final int readOffset, final String operation, - final ByteBuffer data, - int length, byte[] originalData) { - for (int i = 0; i < length; i++) { - int o = readOffset + i; - assertEquals(operation + " with read offset " + readOffset - + ": data[" + i + "] != DATASET[" + o + "]", - originalData[o], data.get()); - } - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 90e3649c1b..8c071282dc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.hadoop.util.functional.FutureIO; import org.junit.Assert; import org.junit.AssumptionViolatedException; @@ -41,6 +43,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -51,6 +54,9 @@ import java.util.Properties; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; @@ -70,6 +76,11 @@ public class ContractTestUtils extends Assert { public static final String IO_CHUNK_MODULUS_SIZE = "io.chunk.modulus.size"; public static final int DEFAULT_IO_CHUNK_MODULUS_SIZE = 128; + /** + * Timeout in seconds for vectored read operation in tests : {@value}. + */ + public static final int VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS = 5 * 60; + /** * Assert that a property in the property set matches the expected value. * @param props property set @@ -1097,6 +1108,59 @@ public static void validateFileContent(byte[] concat, byte[][] bytes) { mismatch); } + /** + * Utility to validate vectored read results. + * @param fileRanges input ranges. + * @param originalData original data. + * @throws IOException any ioe. + */ + public static void validateVectoredReadResult(List fileRanges, + byte[] originalData) + throws IOException, TimeoutException { + CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; + int i = 0; + for (FileRange res : fileRanges) { + completableFutures[i++] = res.getData(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); + FutureIO.awaitFuture(combinedFuture, + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + + for (FileRange res : fileRanges) { + CompletableFuture data = res.getData(); + ByteBuffer buffer = FutureIO.awaitFuture(data, + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + assertDatasetEquals((int) res.getOffset(), "vecRead", + buffer, res.getLength(), originalData); + } + } + + + /** + * Assert that the data read matches the dataset at the given offset. + * This helps verify that the seek process is moving the read pointer + * to the correct location in the file. + * @param readOffset the offset in the file where the read began. + * @param operation operation name for the assertion. + * @param data data read in. + * @param length length of data to check. + * @param originalData original data. + */ + public static void assertDatasetEquals( + final int readOffset, + final String operation, + final ByteBuffer data, + int length, byte[] originalData) { + for (int i = 0; i < length; i++) { + int o = readOffset + i; + assertEquals(operation + " with read offset " + readOffset + + ": data[" + i + "] != DATASET[" + o + "]", + originalData[o], data.get()); + } + } + /** * Receives test data from the given input file and checks the size of the * data as well as the pattern inside the received data. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 23f31df164..9d87f26c3c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -186,6 +186,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, * @param ctx operation context * @param s3Attributes object attributes * @param client S3 client to use + * @param streamStatistics stream io stats. * @param unboundedThreadPool thread pool to use. */ public S3AInputStream(S3AReadOpContext ctx, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java index 29e3df1af1..803b7757d2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -153,7 +153,6 @@ public AuditSpan getAuditSpan() { } /** -<<<<<<< HEAD * Set builder value. * @param value new value * @return the builder diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetContentSummaryOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetContentSummaryOperation.java index 248bffb940..257cef8192 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetContentSummaryOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetContentSummaryOperation.java @@ -220,8 +220,7 @@ public interface GetContentSummaryCallbacks { /*** * List all entries under a path. - * - * @param path + * @param path path. * @param recursive if the subdirectories need to be traversed recursively * @return an iterator over the listing. * @throws IOException failure diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 15700ce953..956e23a3f1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -19,8 +19,13 @@ package org.apache.hadoop.fs.s3a.scale; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntFunction; import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEventType; @@ -35,7 +40,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.Constants; @@ -47,6 +55,7 @@ import org.apache.hadoop.util.Progressable; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING; @@ -446,6 +455,30 @@ public void test_040_PositionedReadHugeFile() throws Throwable { toHuman(timer.nanosPerOperation(ops))); } + @Test + public void test_045_vectoredIOHugeFile() throws Throwable { + assumeHugeFileExists(); + List rangeList = new ArrayList<>(); + rangeList.add(new FileRangeImpl(5856368, 1167716)); + rangeList.add(new FileRangeImpl(3520861, 1167700)); + rangeList.add(new FileRangeImpl(8191913, 1167775)); + rangeList.add(new FileRangeImpl(1520861, 1167700)); + rangeList.add(new FileRangeImpl(2520861, 116770)); + rangeList.add(new FileRangeImpl(9191913, 116770)); + rangeList.add(new FileRangeImpl(2820861, 156770)); + IntFunction allocate = ByteBuffer::allocate; + FileSystem fs = getFileSystem(); + CompletableFuture builder = + fs.openFile(hugefile).build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(rangeList, allocate); + byte[] readFullRes = new byte[(int)filesize]; + in.readFully(0, readFullRes); + // Comparing vectored read results with read fully. + validateVectoredReadResult(rangeList, readFullRes); + } + } + /** * Read in the entire file using read() calls. * @throws Throwable failure