From 90793e1bce58e0d12234d080a26515d8a81d82ed Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Mon, 24 Jul 2023 12:36:57 -0600 Subject: [PATCH] HADOOP-18805. S3A prefetch tests to work with small files (#5851) Contributed by Viraj Jasani --- .../s3a/ITestS3APrefetchingInputStream.java | 79 ++++++++----------- .../s3a/ITestS3APrefetchingLruEviction.java | 76 +++++++----------- 2 files changed, 61 insertions(+), 94 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 2146e17b6a..a7b59bb5d4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -18,8 +18,6 @@ package org.apache.hadoop.fs.s3a; -import java.net.URI; - import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +34,6 @@ import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.test.LambdaTestUtils; -import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum; @@ -49,7 +46,6 @@ import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; -import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** * Test the prefetching input stream, validates that the underlying S3ACachingInputStream and @@ -64,47 +60,39 @@ public ITestS3APrefetchingInputStream() { private static final Logger LOG = LoggerFactory.getLogger(ITestS3APrefetchingInputStream.class); - private static final int S_1K = 1024; + private static final int S_500 = 512; + private static final int S_1K = S_500 * 2; private static final int S_1M = S_1K * S_1K; - // Path for file which should have length > block size so S3ACachingInputStream is used - private Path largeFile; - private FileSystem largeFileFS; private int numBlocks; - private int blockSize; + + // Size should be > block size so S3ACachingInputStream is used private long largeFileSize; + // Size should be < block size so S3AInMemoryInputStream is used - private static final int SMALL_FILE_SIZE = S_1K * 16; + private static final int SMALL_FILE_SIZE = S_1K * 9; private static final int TIMEOUT_MILLIS = 5000; private static final int INTERVAL_MILLIS = 500; - + private static final int BLOCK_SIZE = S_1K * 10; @Override public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY); conf.setBoolean(PREFETCH_ENABLED_KEY, true); + conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); return conf; } - @Override - public void teardown() throws Exception { - super.teardown(); - cleanupWithLogger(LOG, largeFileFS); - largeFileFS = null; - } - - private void openFS() throws Exception { - Configuration conf = getConfiguration(); - String largeFileUri = S3ATestUtils.getCSVTestFile(conf); - - largeFile = new Path(largeFileUri); - blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); - largeFileFS = new S3AFileSystem(); - largeFileFS.initialize(new URI(largeFileUri), getConfiguration()); + private void createLargeFile() throws Exception { + byte[] data = ContractTestUtils.dataset(S_1K * 72, 'x', 26); + Path largeFile = methodPath(); + FileSystem largeFileFS = getFileSystem(); + ContractTestUtils.writeDataset(getFileSystem(), largeFile, data, data.length, 16, true); FileStatus fileStatus = largeFileFS.getFileStatus(largeFile); largeFileSize = fileStatus.getLen(); - numBlocks = calculateNumBlocks(largeFileSize, blockSize); + numBlocks = calculateNumBlocks(largeFileSize, BLOCK_SIZE); } private static int calculateNumBlocks(long largeFileSize, int blockSize) { @@ -119,9 +107,9 @@ private static int calculateNumBlocks(long largeFileSize, int blockSize) { public void testReadLargeFileFully() throws Throwable { describe("read a large file fully, uses S3ACachingInputStream"); IOStatistics ioStats; - openFS(); + createLargeFile(); - try (FSDataInputStream in = largeFileFS.open(largeFile)) { + try (FSDataInputStream in = getFileSystem().open(methodPath())) { ioStats = in.getIOStatistics(); byte[] buffer = new byte[S_1M * 10]; @@ -152,9 +140,9 @@ public void testReadLargeFileFullyLazySeek() throws Throwable { describe("read a large file using readFully(position,buffer,offset,length)," + " uses S3ACachingInputStream"); IOStatistics ioStats; - openFS(); + createLargeFile(); - try (FSDataInputStream in = largeFileFS.open(largeFile)) { + try (FSDataInputStream in = getFileSystem().open(methodPath())) { ioStats = in.getIOStatistics(); byte[] buffer = new byte[S_1M * 10]; @@ -183,25 +171,25 @@ public void testReadLargeFileFullyLazySeek() throws Throwable { public void testRandomReadLargeFile() throws Throwable { describe("random read on a large file, uses S3ACachingInputStream"); IOStatistics ioStats; - openFS(); + createLargeFile(); - try (FSDataInputStream in = largeFileFS.open(largeFile)) { + try (FSDataInputStream in = getFileSystem().open(methodPath())) { ioStats = in.getIOStatistics(); - byte[] buffer = new byte[blockSize]; + byte[] buffer = new byte[BLOCK_SIZE]; // Don't read block 0 completely so it gets cached on read after seek - in.read(buffer, 0, blockSize - S_1K * 10); + in.read(buffer, 0, BLOCK_SIZE - S_500 * 10); // Seek to block 2 and read all of it - in.seek(blockSize * 2); - in.read(buffer, 0, blockSize); + in.seek(BLOCK_SIZE * 2); + in.read(buffer, 0, BLOCK_SIZE); // Seek to block 4 but don't read: noop. - in.seek(blockSize * 4); + in.seek(BLOCK_SIZE * 4); // Backwards seek, will use cached block 0 - in.seek(S_1K * 5); + in.seek(S_500 * 5); in.read(); // Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch) @@ -234,9 +222,9 @@ public void testRandomReadSmallFile() throws Throwable { byte[] buffer = new byte[SMALL_FILE_SIZE]; - in.read(buffer, 0, S_1K * 4); - in.seek(S_1K * 12); - in.read(buffer, 0, S_1K * 4); + in.read(buffer, 0, S_1K * 2); + in.seek(S_1K * 7); + in.read(buffer, 0, S_1K * 2); verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1); @@ -261,9 +249,9 @@ public void testStatusProbesAfterClosingStream() throws Throwable { FSDataInputStream in = getFileSystem().open(smallFile); byte[] buffer = new byte[SMALL_FILE_SIZE]; - in.read(buffer, 0, S_1K * 4); - in.seek(S_1K * 12); - in.read(buffer, 0, S_1K * 4); + in.read(buffer, 0, S_1K * 2); + in.seek(S_1K * 7); + in.read(buffer, 0, S_1K * 2); long pos = in.getPos(); IOStatistics ioStats = in.getIOStatistics(); @@ -298,7 +286,6 @@ public void testStatusProbesAfterClosingStream() throws Throwable { inputStreamStatistics, newInputStreamStatistics); assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10)); - } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java index bbe0188758..a8211e24ce 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.net.URI; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.CountDownLatch; @@ -37,19 +36,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.test.LambdaTestUtils; -import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE; -import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** * Test the prefetching input stream with LRU cache eviction on S3ACachingInputStream. @@ -63,9 +60,7 @@ public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest { public static Collection params() { return Arrays.asList(new Object[][]{ {"1"}, - {"2"}, - {"3"}, - {"4"} + {"2"} }); } @@ -78,45 +73,32 @@ public ITestS3APrefetchingLruEviction(final String maxBlocks) { LoggerFactory.getLogger(ITestS3APrefetchingLruEviction.class); private static final int S_1K = 1024; - // Path for file which should have length > block size so S3ACachingInputStream is used - private Path largeFile; - private FileSystem largeFileFS; - private int blockSize; + private static final int S_500 = 512; + private static final int SMALL_FILE_SIZE = S_1K * 56; - private static final int TIMEOUT_MILLIS = 5000; + private static final int TIMEOUT_MILLIS = 3000; private static final int INTERVAL_MILLIS = 500; + private static final int BLOCK_SIZE = S_1K * 10; @Override public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_MAX_BLOCKS_COUNT); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY); conf.setBoolean(PREFETCH_ENABLED_KEY, true); conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks)); + conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); return conf; } - @Override - public void teardown() throws Exception { - super.teardown(); - cleanupWithLogger(LOG, largeFileFS); - largeFileFS = null; - } - - private void openFS() throws Exception { - Configuration conf = getConfiguration(); - String largeFileUri = S3ATestUtils.getCSVTestFile(conf); - - largeFile = new Path(largeFileUri); - blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); - largeFileFS = new S3AFileSystem(); - largeFileFS.initialize(new URI(largeFileUri), getConfiguration()); - } - @Test public void testSeeksWithLruEviction() throws Throwable { IOStatistics ioStats; - openFS(); + byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'x', 26); + // Path for file which should have length > block size so S3ACachingInputStream is used + Path smallFile = methodPath(); + ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true); ExecutorService executorService = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder() @@ -125,7 +107,7 @@ public void testSeeksWithLruEviction() throws Throwable { .build()); CountDownLatch countDownLatch = new CountDownLatch(7); - try (FSDataInputStream in = largeFileFS.open(largeFile)) { + try (FSDataInputStream in = getFileSystem().open(methodPath())) { ioStats = in.getIOStatistics(); // tests to add multiple blocks in the prefetch cache // and let LRU eviction take place as more cache entries @@ -135,43 +117,43 @@ public void testSeeksWithLruEviction() throws Throwable { executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, in, 0, - blockSize - S_1K * 10)); + BLOCK_SIZE - S_500 * 10)); // Seek to block 1 and don't read completely executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, in, - blockSize, - 2 * S_1K)); + BLOCK_SIZE, + 2 * S_500)); // Seek to block 2 and don't read completely executorService.submit(() -> readFullyWithSeek(countDownLatch, in, - blockSize * 2L, - 2 * S_1K)); + BLOCK_SIZE * 2L, + 2 * S_500)); // Seek to block 3 and don't read completely executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, in, - blockSize * 3L, - 2 * S_1K)); + BLOCK_SIZE * 3L, + 2 * S_500)); // Seek to block 4 and don't read completely executorService.submit(() -> readFullyWithSeek(countDownLatch, in, - blockSize * 4L, - 2 * S_1K)); + BLOCK_SIZE * 4L, + 2 * S_500)); // Seek to block 5 and don't read completely executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, in, - blockSize * 5L, - 2 * S_1K)); + BLOCK_SIZE * 5L, + 2 * S_500)); // backward seek, can't use block 0 as it is evicted executorService.submit(() -> readFullyWithSeek(countDownLatch, in, - S_1K * 5, - 2 * S_1K)); + S_500 * 5, + 2 * S_500)); countDownLatch.await(); @@ -205,8 +187,7 @@ public void testSeeksWithLruEviction() throws Throwable { */ private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDataInputStream in, long position, int len) { - byte[] buffer = new byte[blockSize]; - // Don't read block 0 completely + byte[] buffer = new byte[BLOCK_SIZE]; try { in.readFully(position, buffer, 0, len); countDownLatch.countDown(); @@ -228,8 +209,7 @@ private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDat */ private boolean readFullyWithSeek(CountDownLatch countDownLatch, FSDataInputStream in, long position, int len) { - byte[] buffer = new byte[blockSize]; - // Don't read block 0 completely + byte[] buffer = new byte[BLOCK_SIZE]; try { in.seek(position); in.readFully(buffer, 0, len);