From 052d7bd2275c2ae8570f4362756802c97ab8e69d Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Wed, 28 Sep 2022 23:16:47 +0530 Subject: [PATCH] HADOOP-18463. Add an integration test to process data asynchronously during vectored read. (#4921) part of HADOOP-18103. Contributed by: Mukund Thakur --- .../AbstractContractVectoredReadTest.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index 86b645b9ec..a39201df24 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -19,12 +19,17 @@ package org.apache.hadoop.fs.contract; import java.io.EOFException; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.IntFunction; import org.assertj.core.api.Assertions; @@ -42,7 +47,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.util.functional.FutureIO; +import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; @@ -364,6 +372,66 @@ public void testMultipleVectoredReads() throws Exception { } } + /** + * This test creates list of ranges and then submit a readVectored + * operation and then uses a separate thread pool to process the + * results asynchronously. + */ + @Test + public void testVectoredIOEndToEnd() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); + + ExecutorService dataProcessor = Executors.newFixedThreadPool(5); + CountDownLatch countDown = new CountDownLatch(fileRanges.size()); + + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, value -> pool.getBuffer(true, value)); + for (FileRange res : fileRanges) { + dataProcessor.submit(() -> { + try { + readBufferValidateDataAndReturnToPool(res, countDown); + } catch (Exception e) { + String error = String.format("Error while processing result for %s", res); + LOG.error(error, e); + ContractTestUtils.fail(error, e); + } + }); + } + // user can perform other computations while waiting for IO. + if (!countDown.await(VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + ContractTestUtils.fail("Timeout/Error while processing vectored io results"); + } + } finally { + HadoopExecutors.shutdown(dataProcessor, LOG, + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + } + + private void readBufferValidateDataAndReturnToPool(FileRange res, + CountDownLatch countDownLatch) + throws IOException, TimeoutException { + CompletableFuture data = res.getData(); + // Read the data and perform custom operation. Here we are just + // validating it with original data. + FutureIO.awaitFuture(data.thenAccept(buffer -> { + assertDatasetEquals((int) res.getOffset(), + "vecRead", buffer, res.getLength(), DATASET); + // return buffer to the pool once read. + pool.putBuffer(buffer); + }), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + // countdown to notify main thread that processing has been done. + countDownLatch.countDown(); + } + + protected List createSampleNonOverlappingRanges() { List fileRanges = new ArrayList<>(); fileRanges.add(FileRange.createFileRange(0, 100));