HADOOP-18805. S3A prefetch tests to work with small files (#5851)

Contributed by Viraj Jasani
This commit is contained in:
Viraj Jasani 2023-07-24 12:36:57 -06:00 committed by GitHub
parent caf1816e0e
commit 90793e1bce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 94 deletions

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.fs.s3a;
import java.net.URI;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,7 +34,6 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
@ -49,7 +46,6 @@
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Test the prefetching input stream, validates that the underlying S3ACachingInputStream and
@ -64,47 +60,39 @@ public ITestS3APrefetchingInputStream() {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3APrefetchingInputStream.class);
private static final int S_1K = 1024;
private static final int S_500 = 512;
private static final int S_1K = S_500 * 2;
private static final int S_1M = S_1K * S_1K;
// Path for file which should have length > block size so S3ACachingInputStream is used
private Path largeFile;
private FileSystem largeFileFS;
private int numBlocks;
private int blockSize;
// Size should be > block size so S3ACachingInputStream is used
private long largeFileSize;
// Size should be < block size so S3AInMemoryInputStream is used
private static final int SMALL_FILE_SIZE = S_1K * 16;
private static final int SMALL_FILE_SIZE = S_1K * 9;
private static final int TIMEOUT_MILLIS = 5000;
private static final int INTERVAL_MILLIS = 500;
private static final int BLOCK_SIZE = S_1K * 10;
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
return conf;
}
@Override
public void teardown() throws Exception {
super.teardown();
cleanupWithLogger(LOG, largeFileFS);
largeFileFS = null;
}
private void openFS() throws Exception {
Configuration conf = getConfiguration();
String largeFileUri = S3ATestUtils.getCSVTestFile(conf);
largeFile = new Path(largeFileUri);
blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
largeFileFS = new S3AFileSystem();
largeFileFS.initialize(new URI(largeFileUri), getConfiguration());
private void createLargeFile() throws Exception {
byte[] data = ContractTestUtils.dataset(S_1K * 72, 'x', 26);
Path largeFile = methodPath();
FileSystem largeFileFS = getFileSystem();
ContractTestUtils.writeDataset(getFileSystem(), largeFile, data, data.length, 16, true);
FileStatus fileStatus = largeFileFS.getFileStatus(largeFile);
largeFileSize = fileStatus.getLen();
numBlocks = calculateNumBlocks(largeFileSize, blockSize);
numBlocks = calculateNumBlocks(largeFileSize, BLOCK_SIZE);
}
private static int calculateNumBlocks(long largeFileSize, int blockSize) {
@ -119,9 +107,9 @@ private static int calculateNumBlocks(long largeFileSize, int blockSize) {
public void testReadLargeFileFully() throws Throwable {
describe("read a large file fully, uses S3ACachingInputStream");
IOStatistics ioStats;
openFS();
createLargeFile();
try (FSDataInputStream in = largeFileFS.open(largeFile)) {
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();
byte[] buffer = new byte[S_1M * 10];
@ -152,9 +140,9 @@ public void testReadLargeFileFullyLazySeek() throws Throwable {
describe("read a large file using readFully(position,buffer,offset,length),"
+ " uses S3ACachingInputStream");
IOStatistics ioStats;
openFS();
createLargeFile();
try (FSDataInputStream in = largeFileFS.open(largeFile)) {
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();
byte[] buffer = new byte[S_1M * 10];
@ -183,25 +171,25 @@ public void testReadLargeFileFullyLazySeek() throws Throwable {
public void testRandomReadLargeFile() throws Throwable {
describe("random read on a large file, uses S3ACachingInputStream");
IOStatistics ioStats;
openFS();
createLargeFile();
try (FSDataInputStream in = largeFileFS.open(largeFile)) {
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();
byte[] buffer = new byte[blockSize];
byte[] buffer = new byte[BLOCK_SIZE];
// Don't read block 0 completely so it gets cached on read after seek
in.read(buffer, 0, blockSize - S_1K * 10);
in.read(buffer, 0, BLOCK_SIZE - S_500 * 10);
// Seek to block 2 and read all of it
in.seek(blockSize * 2);
in.read(buffer, 0, blockSize);
in.seek(BLOCK_SIZE * 2);
in.read(buffer, 0, BLOCK_SIZE);
// Seek to block 4 but don't read: noop.
in.seek(blockSize * 4);
in.seek(BLOCK_SIZE * 4);
// Backwards seek, will use cached block 0
in.seek(S_1K * 5);
in.seek(S_500 * 5);
in.read();
// Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch)
@ -234,9 +222,9 @@ public void testRandomReadSmallFile() throws Throwable {
byte[] buffer = new byte[SMALL_FILE_SIZE];
in.read(buffer, 0, S_1K * 4);
in.seek(S_1K * 12);
in.read(buffer, 0, S_1K * 4);
in.read(buffer, 0, S_1K * 2);
in.seek(S_1K * 7);
in.read(buffer, 0, S_1K * 2);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1);
@ -261,9 +249,9 @@ public void testStatusProbesAfterClosingStream() throws Throwable {
FSDataInputStream in = getFileSystem().open(smallFile);
byte[] buffer = new byte[SMALL_FILE_SIZE];
in.read(buffer, 0, S_1K * 4);
in.seek(S_1K * 12);
in.read(buffer, 0, S_1K * 4);
in.read(buffer, 0, S_1K * 2);
in.seek(S_1K * 7);
in.read(buffer, 0, S_1K * 2);
long pos = in.getPos();
IOStatistics ioStats = in.getIOStatistics();
@ -298,7 +286,6 @@ public void testStatusProbesAfterClosingStream() throws Throwable {
inputStreamStatistics, newInputStreamStatistics);
assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));
}
}

View File

@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
@ -37,19 +36,17 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Test the prefetching input stream with LRU cache eviction on S3ACachingInputStream.
@ -63,9 +60,7 @@ public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest {
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{"1"},
{"2"},
{"3"},
{"4"}
{"2"}
});
}
@ -78,45 +73,32 @@ public ITestS3APrefetchingLruEviction(final String maxBlocks) {
LoggerFactory.getLogger(ITestS3APrefetchingLruEviction.class);
private static final int S_1K = 1024;
// Path for file which should have length > block size so S3ACachingInputStream is used
private Path largeFile;
private FileSystem largeFileFS;
private int blockSize;
private static final int S_500 = 512;
private static final int SMALL_FILE_SIZE = S_1K * 56;
private static final int TIMEOUT_MILLIS = 5000;
private static final int TIMEOUT_MILLIS = 3000;
private static final int INTERVAL_MILLIS = 500;
private static final int BLOCK_SIZE = S_1K * 10;
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_MAX_BLOCKS_COUNT);
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
return conf;
}
@Override
public void teardown() throws Exception {
super.teardown();
cleanupWithLogger(LOG, largeFileFS);
largeFileFS = null;
}
private void openFS() throws Exception {
Configuration conf = getConfiguration();
String largeFileUri = S3ATestUtils.getCSVTestFile(conf);
largeFile = new Path(largeFileUri);
blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
largeFileFS = new S3AFileSystem();
largeFileFS.initialize(new URI(largeFileUri), getConfiguration());
}
@Test
public void testSeeksWithLruEviction() throws Throwable {
IOStatistics ioStats;
openFS();
byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'x', 26);
// Path for file which should have length > block size so S3ACachingInputStream is used
Path smallFile = methodPath();
ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
ExecutorService executorService = Executors.newFixedThreadPool(5,
new ThreadFactoryBuilder()
@ -125,7 +107,7 @@ public void testSeeksWithLruEviction() throws Throwable {
.build());
CountDownLatch countDownLatch = new CountDownLatch(7);
try (FSDataInputStream in = largeFileFS.open(largeFile)) {
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();
// tests to add multiple blocks in the prefetch cache
// and let LRU eviction take place as more cache entries
@ -135,43 +117,43 @@ public void testSeeksWithLruEviction() throws Throwable {
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
0,
blockSize - S_1K * 10));
BLOCK_SIZE - S_500 * 10));
// Seek to block 1 and don't read completely
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
blockSize,
2 * S_1K));
BLOCK_SIZE,
2 * S_500));
// Seek to block 2 and don't read completely
executorService.submit(() -> readFullyWithSeek(countDownLatch,
in,
blockSize * 2L,
2 * S_1K));
BLOCK_SIZE * 2L,
2 * S_500));
// Seek to block 3 and don't read completely
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
blockSize * 3L,
2 * S_1K));
BLOCK_SIZE * 3L,
2 * S_500));
// Seek to block 4 and don't read completely
executorService.submit(() -> readFullyWithSeek(countDownLatch,
in,
blockSize * 4L,
2 * S_1K));
BLOCK_SIZE * 4L,
2 * S_500));
// Seek to block 5 and don't read completely
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
blockSize * 5L,
2 * S_1K));
BLOCK_SIZE * 5L,
2 * S_500));
// backward seek, can't use block 0 as it is evicted
executorService.submit(() -> readFullyWithSeek(countDownLatch,
in,
S_1K * 5,
2 * S_1K));
S_500 * 5,
2 * S_500));
countDownLatch.await();
@ -205,8 +187,7 @@ public void testSeeksWithLruEviction() throws Throwable {
*/
private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDataInputStream in,
long position, int len) {
byte[] buffer = new byte[blockSize];
// Don't read block 0 completely
byte[] buffer = new byte[BLOCK_SIZE];
try {
in.readFully(position, buffer, 0, len);
countDownLatch.countDown();
@ -228,8 +209,7 @@ private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDat
*/
private boolean readFullyWithSeek(CountDownLatch countDownLatch, FSDataInputStream in,
long position, int len) {
byte[] buffer = new byte[blockSize];
// Don't read block 0 completely
byte[] buffer = new byte[BLOCK_SIZE];
try {
in.seek(position);
in.readFully(buffer, 0, len);