diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManagerParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManagerParameters.java new file mode 100644 index 0000000000..581d53016d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManagerParameters.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.impl.prefetch; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; + +/** + * This class is used to provide parameters to {@link BlockManager}. + */ +@InterfaceAudience.Private +public final class BlockManagerParameters { + + /** + * Asynchronous tasks are performed in this pool. + */ + private ExecutorServiceFuturePool futurePool; + + /** + * Information about each block of the underlying file. + */ + private BlockData blockData; + + /** + * Size of the in-memory cache in terms of number of blocks. + */ + private int bufferPoolSize; + + /** + * Statistics for the stream. + */ + private PrefetchingStatistics prefetchingStatistics; + + /** + * The configuration object. + */ + private Configuration conf; + + /** + * The local dir allocator instance. + */ + private LocalDirAllocator localDirAllocator; + + /** + * Max blocks count to be kept in cache at any time. + */ + private int maxBlocksCount; + + /** + * Tracker with statistics to update. + */ + private DurationTrackerFactory trackerFactory; + + /** + * @return The Executor future pool to perform async prefetch tasks. + */ + public ExecutorServiceFuturePool getFuturePool() { + return futurePool; + } + + /** + * @return The object holding blocks data info for the underlying file. + */ + public BlockData getBlockData() { + return blockData; + } + + /** + * @return The size of the in-memory cache. + */ + public int getBufferPoolSize() { + return bufferPoolSize; + } + + /** + * @return The prefetching statistics for the stream. + */ + public PrefetchingStatistics getPrefetchingStatistics() { + return prefetchingStatistics; + } + + /** + * @return The configuration object. + */ + public Configuration getConf() { + return conf; + } + + /** + * @return The local dir allocator instance. + */ + public LocalDirAllocator getLocalDirAllocator() { + return localDirAllocator; + } + + /** + * @return The max blocks count to be kept in cache at any time. + */ + public int getMaxBlocksCount() { + return maxBlocksCount; + } + + /** + * @return The duration tracker with statistics to update. + */ + public DurationTrackerFactory getTrackerFactory() { + return trackerFactory; + } + + /** + * Sets the executor service future pool that is later used to perform + * async prefetch tasks. + * + * @param pool The future pool. + * @return The builder. + */ + public BlockManagerParameters withFuturePool( + final ExecutorServiceFuturePool pool) { + this.futurePool = pool; + return this; + } + + /** + * Sets the object holding blocks data info for the underlying file. + * + * @param data The block data object. + * @return The builder. + */ + public BlockManagerParameters withBlockData( + final BlockData data) { + this.blockData = data; + return this; + } + + /** + * Sets the in-memory cache size as number of blocks. + * + * @param poolSize The buffer pool size as number of blocks. + * @return The builder. + */ + public BlockManagerParameters withBufferPoolSize( + final int poolSize) { + this.bufferPoolSize = poolSize; + return this; + } + + /** + * Sets the prefetching statistics for the stream. + * + * @param statistics The prefetching statistics. + * @return The builder. + */ + public BlockManagerParameters withPrefetchingStatistics( + final PrefetchingStatistics statistics) { + this.prefetchingStatistics = statistics; + return this; + } + + /** + * Sets the configuration object. + * + * @param configuration The configuration object. + * @return The builder. + */ + public BlockManagerParameters withConf( + final Configuration configuration) { + this.conf = configuration; + return this; + } + + /** + * Sets the local dir allocator for round-robin disk allocation + * while creating files. + * + * @param dirAllocator The local dir allocator object. + * @return The builder. + */ + public BlockManagerParameters withLocalDirAllocator( + final LocalDirAllocator dirAllocator) { + this.localDirAllocator = dirAllocator; + return this; + } + + /** + * Sets the max blocks count to be kept in cache at any time. + * + * @param blocksCount The max blocks count. + * @return The builder. + */ + public BlockManagerParameters withMaxBlocksCount( + final int blocksCount) { + this.maxBlocksCount = blocksCount; + return this; + } + + /** + * Sets the duration tracker with statistics to update. + * + * @param factory The tracker factory object. + * @return The builder. + */ + public BlockManagerParameters withTrackerFactory( + final DurationTrackerFactory factory) { + this.trackerFactory = factory; + return this; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java index 407cd63004..86612a6d2c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java @@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import javax.annotation.Nonnull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,47 +107,33 @@ public abstract class CachingBlockManager extends BlockManager { /** * Constructs an instance of a {@code CachingBlockManager}. * - * @param futurePool asynchronous tasks are performed in this pool. - * @param blockData information about each block of the underlying file. - * @param bufferPoolSize size of the in-memory cache in terms of number of blocks. - * @param prefetchingStatistics statistics for this stream. - * @param conf the configuration. - * @param localDirAllocator the local dir allocator instance. - * @param maxBlocksCount max blocks count to be kept in cache at any time. - * @param trackerFactory tracker with statistics to update. + * @param blockManagerParameters params for block manager. * @throws IllegalArgumentException if bufferPoolSize is zero or negative. */ - @SuppressWarnings("checkstyle:parameternumber") - public CachingBlockManager( - ExecutorServiceFuturePool futurePool, - BlockData blockData, - int bufferPoolSize, - PrefetchingStatistics prefetchingStatistics, - Configuration conf, - LocalDirAllocator localDirAllocator, - int maxBlocksCount, - DurationTrackerFactory trackerFactory) { - super(blockData); + public CachingBlockManager(@Nonnull final BlockManagerParameters blockManagerParameters) { + super(blockManagerParameters.getBlockData()); - Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize"); + Validate.checkPositiveInteger(blockManagerParameters.getBufferPoolSize(), "bufferPoolSize"); - this.futurePool = requireNonNull(futurePool); - this.bufferPoolSize = bufferPoolSize; + this.futurePool = requireNonNull(blockManagerParameters.getFuturePool()); + this.bufferPoolSize = blockManagerParameters.getBufferPoolSize(); this.numCachingErrors = new AtomicInteger(); this.numReadErrors = new AtomicInteger(); this.cachingDisabled = new AtomicBoolean(); - this.prefetchingStatistics = requireNonNull(prefetchingStatistics); - this.conf = requireNonNull(conf); + this.prefetchingStatistics = requireNonNull( + blockManagerParameters.getPrefetchingStatistics()); + this.conf = requireNonNull(blockManagerParameters.getConf()); if (this.getBlockData().getFileSize() > 0) { this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(), this.prefetchingStatistics); - this.cache = this.createCache(maxBlocksCount, trackerFactory); + this.cache = this.createCache(blockManagerParameters.getMaxBlocksCount(), + blockManagerParameters.getTrackerFactory()); } this.ops = new BlockOperations(); this.ops.setDebug(false); - this.localDirAllocator = localDirAllocator; + this.localDirAllocator = blockManagerParameters.getLocalDirAllocator(); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java index e008de3a79..35ce7dd7a1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java @@ -22,28 +22,17 @@ import java.io.IOException; import java.nio.ByteBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.impl.prefetch.BlockData; +import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters; import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager; -import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.impl.prefetch.Validate; -import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; - -import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT; -import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; /** * Provides access to S3 file one block at a time. */ public class S3ACachingBlockManager extends CachingBlockManager { - private static final Logger LOG = LoggerFactory.getLogger( - S3ACachingBlockManager.class); - /** * Reader that reads from S3 file. */ @@ -52,32 +41,15 @@ public class S3ACachingBlockManager extends CachingBlockManager { /** * Constructs an instance of a {@code S3ACachingBlockManager}. * - * @param futurePool asynchronous tasks are performed in this pool. + * @param blockManagerParameters params for block manager. * @param reader reader that reads from S3 file. - * @param blockData information about each block of the S3 file. - * @param bufferPoolSize size of the in-memory cache in terms of number of blocks. - * @param streamStatistics statistics for this stream. - * @param conf the configuration. - * @param localDirAllocator the local dir allocator instance. * @throws IllegalArgumentException if reader is null. */ public S3ACachingBlockManager( - ExecutorServiceFuturePool futurePool, - S3ARemoteObjectReader reader, - BlockData blockData, - int bufferPoolSize, - S3AInputStreamStatistics streamStatistics, - Configuration conf, - LocalDirAllocator localDirAllocator) { + @Nonnull final BlockManagerParameters blockManagerParameters, + final S3ARemoteObjectReader reader) { - super(futurePool, - blockData, - bufferPoolSize, - streamStatistics, - conf, - localDirAllocator, - conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT), - streamStatistics); + super(blockManagerParameters); Validate.checkNotNull(reader, "reader"); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java index fe95048648..e05ad7e38b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java @@ -21,21 +21,24 @@ import java.io.IOException; +import javax.annotation.Nonnull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.impl.prefetch.BlockData; import org.apache.hadoop.fs.impl.prefetch.BlockManager; +import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters; import org.apache.hadoop.fs.impl.prefetch.BufferData; -import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.impl.prefetch.FilePosition; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; @@ -80,13 +83,19 @@ public S3ACachingInputStream( this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount(); int bufferPoolSize = this.numBlocksToPrefetch + 1; - this.blockManager = this.createBlockManager( - this.getContext().getFuturePool(), - this.getReader(), - this.getBlockData(), - bufferPoolSize, - conf, - localDirAllocator); + BlockManagerParameters blockManagerParamsBuilder = + new BlockManagerParameters() + .withFuturePool(this.getContext().getFuturePool()) + .withBlockData(this.getBlockData()) + .withBufferPoolSize(bufferPoolSize) + .withConf(conf) + .withLocalDirAllocator(localDirAllocator) + .withMaxBlocksCount( + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)) + .withPrefetchingStatistics(getS3AStreamStatistics()) + .withTrackerFactory(getS3AStreamStatistics()); + this.blockManager = this.createBlockManager(blockManagerParamsBuilder, + this.getReader()); int fileSize = (int) s3Attributes.getLen(); LOG.debug("Created caching input stream for {} (size = {})", this.getName(), fileSize); @@ -180,18 +189,8 @@ public String toString() { } protected BlockManager createBlockManager( - ExecutorServiceFuturePool futurePool, - S3ARemoteObjectReader reader, - BlockData blockData, - int bufferPoolSize, - Configuration conf, - LocalDirAllocator localDirAllocator) { - return new S3ACachingBlockManager(futurePool, - reader, - blockData, - bufferPoolSize, - getS3AStreamStatistics(), - conf, - localDirAllocator); + @Nonnull final BlockManagerParameters blockManagerParameters, + final S3ARemoteObjectReader reader) { + return new S3ACachingBlockManager(blockManagerParameters, reader); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java index e678df700b..fe00cb5b0e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java @@ -75,6 +75,7 @@ public void setUp() throws Exception { public Configuration createConfiguration() { Configuration configuration = super.createConfiguration(); S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_ENABLED_KEY); + S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_BLOCK_SIZE_KEY); configuration.setBoolean(PREFETCH_ENABLED_KEY, true); return configuration; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java index 89477a9071..1c50970218 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java @@ -31,6 +31,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; + import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -40,7 +42,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.impl.prefetch.BlockCache; -import org.apache.hadoop.fs.impl.prefetch.BlockData; +import org.apache.hadoop.fs.impl.prefetch.BlockManager; +import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.impl.prefetch.SingleFilePerBlockCache; import org.apache.hadoop.fs.impl.prefetch.Validate; @@ -363,15 +366,9 @@ public static class FakeS3ACachingBlockManager extends S3ACachingBlockManager { public FakeS3ACachingBlockManager( - ExecutorServiceFuturePool futurePool, - S3ARemoteObjectReader reader, - BlockData blockData, - int bufferPoolSize, - Configuration conf, - LocalDirAllocator localDirAllocator) { - super(futurePool, reader, blockData, bufferPoolSize, - new EmptyS3AStatisticsContext().newInputStreamStatistics(), - conf, localDirAllocator); + @Nonnull final BlockManagerParameters blockManagerParameters, + final S3ARemoteObjectReader reader) { + super(blockManagerParameters, reader); } @Override @@ -409,15 +406,10 @@ protected S3ARemoteObject getS3File() { } @Override - protected S3ACachingBlockManager createBlockManager( - ExecutorServiceFuturePool futurePool, - S3ARemoteObjectReader reader, - BlockData blockData, - int bufferPoolSize, - Configuration conf, - LocalDirAllocator localDirAllocator) { - return new FakeS3ACachingBlockManager(futurePool, reader, blockData, - bufferPoolSize, conf, localDirAllocator); + protected BlockManager createBlockManager( + @Nonnull final BlockManagerParameters blockManagerParameters, + final S3ARemoteObjectReader reader) { + return new FakeS3ACachingBlockManager(blockManagerParameters, reader); } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java index 8ec94d469d..87e2b68f1e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.impl.prefetch.BlockData; +import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters; import org.apache.hadoop.fs.impl.prefetch.BufferData; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3ATestUtils; @@ -43,6 +44,9 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.assertEquals; +/** + * Tests to perform read from S3ACachingBlockManager. + */ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase { static final int FILE_SIZE = 15; @@ -61,50 +65,126 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase { private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE); + private static final Configuration CONF = + S3ATestUtils.prepareTestConfiguration(new Configuration()); + + @Test + public void testFuturePoolNull() throws Exception { + MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); + Configuration conf = new Configuration(); + try (S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File)) { + BlockManagerParameters blockManagerParams = + new BlockManagerParameters() + .withBlockData(blockData) + .withBufferPoolSize(POOL_SIZE) + .withPrefetchingStatistics(streamStatistics) + .withConf(conf); + + intercept(NullPointerException.class, + () -> new S3ACachingBlockManager(blockManagerParams, reader)); + } + } + + @Test + public void testNullReader() throws Exception { + Configuration conf = new Configuration(); + BlockManagerParameters blockManagerParams = + new BlockManagerParameters() + .withFuturePool(futurePool) + .withBlockData(blockData) + .withBufferPoolSize(POOL_SIZE) + .withPrefetchingStatistics(streamStatistics) + .withConf(conf) + .withMaxBlocksCount( + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)); + + intercept(IllegalArgumentException.class, "'reader' must not be null", + () -> new S3ACachingBlockManager(blockManagerParams, null)); + } + + @Test + public void testNullBlockData() throws Exception { + MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); + Configuration conf = new Configuration(); + try (S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File)) { + BlockManagerParameters blockManagerParams = + new BlockManagerParameters() + .withFuturePool(futurePool) + .withBufferPoolSize(POOL_SIZE) + .withPrefetchingStatistics(streamStatistics) + .withConf(conf); + + intercept(IllegalArgumentException.class, "'blockData' must not be null", + () -> new S3ACachingBlockManager(blockManagerParams, reader)); + } + } + + @Test + public void testNonPositiveBufferPoolSize() throws Exception { + MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); + Configuration conf = new Configuration(); + try (S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File)) { + BlockManagerParameters blockManagerParams = + new BlockManagerParameters() + .withFuturePool(futurePool) + .withBlockData(blockData) + .withBufferPoolSize(0) + .withPrefetchingStatistics(streamStatistics) + .withConf(conf); + + intercept(IllegalArgumentException.class, "'bufferPoolSize' must be a positive integer", + () -> new S3ACachingBlockManager(blockManagerParams, reader)); + + BlockManagerParameters blockManagerParamsWithNegativeSize = + new BlockManagerParameters() + .withFuturePool(futurePool) + .withBlockData(blockData) + .withBufferPoolSize(-1) + .withPrefetchingStatistics(streamStatistics) + .withConf(conf); + + intercept(IllegalArgumentException.class, "'bufferPoolSize' must be a positive integer", + () -> new S3ACachingBlockManager(blockManagerParamsWithNegativeSize, reader)); + } + } + + @Test + public void testNullPrefetchingStatistics() throws Exception { + MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); + Configuration conf = new Configuration(); + try (S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File)) { + + BlockManagerParameters blockManagerParamsBuilder7 = + new BlockManagerParameters() + .withFuturePool(futurePool) + .withBlockData(blockData) + .withBufferPoolSize(POOL_SIZE) + .withConf(conf); + + intercept(NullPointerException.class, + () -> new S3ACachingBlockManager(blockManagerParamsBuilder7, reader)); + } + } + @Test public void testArgChecks() throws Exception { MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File); Configuration conf = new Configuration(); + BlockManagerParameters blockManagerParams = + new BlockManagerParameters() + .withFuturePool(futurePool) + .withBlockData(blockData) + .withBufferPoolSize(POOL_SIZE) + .withPrefetchingStatistics(streamStatistics) + .withConf(conf) + .withMaxBlocksCount( + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)); + // Should not throw. S3ACachingBlockManager blockManager = - new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE, - streamStatistics, conf, null); - - // Verify it throws correctly. - intercept( - NullPointerException.class, - () -> new S3ACachingBlockManager(null, reader, blockData, POOL_SIZE, - streamStatistics, conf, null)); - - intercept( - IllegalArgumentException.class, - "'reader' must not be null", - () -> new S3ACachingBlockManager(futurePool, null, blockData, POOL_SIZE, - streamStatistics, conf, null)); - - intercept( - IllegalArgumentException.class, - "'blockData' must not be null", - () -> new S3ACachingBlockManager(futurePool, reader, null, POOL_SIZE, - streamStatistics, conf, null)); - - intercept( - IllegalArgumentException.class, - "'bufferPoolSize' must be a positive integer", - () -> new S3ACachingBlockManager(futurePool, reader, blockData, 0, - streamStatistics, conf, null)); - - intercept( - IllegalArgumentException.class, - "'bufferPoolSize' must be a positive integer", - () -> new S3ACachingBlockManager(futurePool, reader, blockData, -1, - streamStatistics, conf, null)); - - intercept(NullPointerException.class, - () -> new S3ACachingBlockManager(futurePool, reader, blockData, - POOL_SIZE, null, conf, null)); + new S3ACachingBlockManager(blockManagerParams, reader); intercept( IllegalArgumentException.class, @@ -133,17 +213,9 @@ public void testArgChecks() throws Exception { private static final class BlockManagerForTesting extends S3ACachingBlockManager { - private static final Configuration CONF = - S3ATestUtils.prepareTestConfiguration(new Configuration()); - - BlockManagerForTesting( - ExecutorServiceFuturePool futurePool, - S3ARemoteObjectReader reader, - BlockData blockData, - int bufferPoolSize, - S3AInputStreamStatistics streamStatistics) { - super(futurePool, reader, blockData, bufferPoolSize, streamStatistics, CONF, - new LocalDirAllocator(HADOOP_TMP_DIR)); + BlockManagerForTesting(BlockManagerParameters blockManagerParameters, + S3ARemoteObjectReader reader) { + super(blockManagerParameters, reader); } // If true, forces the next read operation to fail. @@ -196,9 +268,9 @@ public void testGetFailure() throws Exception { private void testGetHelper(boolean forceReadFailure) throws Exception { MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, true); S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File); + BlockManagerParameters blockManagerParams = getBlockManagerParameters(); BlockManagerForTesting blockManager = - new BlockManagerForTesting(futurePool, reader, blockData, POOL_SIZE, - streamStatistics); + new BlockManagerForTesting(blockManagerParams, reader); for (int b = 0; b < blockData.getNumBlocks(); b++) { // We simulate caching failure for all even numbered blocks. @@ -244,9 +316,9 @@ private void testPrefetchHelper(boolean forcePrefetchFailure) throws IOException, InterruptedException { MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File); + BlockManagerParameters blockManagerParams = getBlockManagerParameters(); BlockManagerForTesting blockManager = - new BlockManagerForTesting(futurePool, reader, blockData, POOL_SIZE, - streamStatistics); + new BlockManagerForTesting(blockManagerParams, reader); assertInitialState(blockManager); int expectedNumErrors = 0; @@ -272,6 +344,18 @@ private void testPrefetchHelper(boolean forcePrefetchFailure) assertEquals(expectedNumSuccesses, blockManager.numCached()); } + private BlockManagerParameters getBlockManagerParameters() { + return new BlockManagerParameters() + .withFuturePool(futurePool) + .withBlockData(blockData) + .withBufferPoolSize(POOL_SIZE) + .withPrefetchingStatistics(streamStatistics) + .withLocalDirAllocator(new LocalDirAllocator(HADOOP_TMP_DIR)) + .withConf(CONF) + .withMaxBlocksCount( + CONF.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)); + } + // @Ignore @Test public void testCachingOfPrefetched() @@ -279,10 +363,19 @@ public void testCachingOfPrefetched() MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File); Configuration conf = new Configuration(); + BlockManagerParameters blockManagerParamsBuilder = + new BlockManagerParameters() + .withFuturePool(futurePool) + .withBlockData(blockData) + .withBufferPoolSize(POOL_SIZE) + .withPrefetchingStatistics(streamStatistics) + .withLocalDirAllocator( + new LocalDirAllocator(conf.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR)) + .withConf(conf) + .withMaxBlocksCount( + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)); S3ACachingBlockManager blockManager = - new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE, - streamStatistics, conf, new LocalDirAllocator( - conf.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR)); + new S3ACachingBlockManager(blockManagerParamsBuilder, reader); assertInitialState(blockManager); for (int b = 0; b < blockData.getNumBlocks(); b++) { @@ -316,9 +409,9 @@ public void testCachingOfGetHelper(boolean forceCachingFailure) throws IOException, InterruptedException { MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File); + BlockManagerParameters blockManagerParams = getBlockManagerParameters(); BlockManagerForTesting blockManager = - new BlockManagerForTesting(futurePool, reader, blockData, POOL_SIZE, - streamStatistics); + new BlockManagerForTesting(blockManagerParams, reader); assertInitialState(blockManager); int expectedNumErrors = 0;