diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockCache.java index c18dc51918..2990696ee1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockCache.java @@ -23,6 +23,9 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; + /** * Provides functionality necessary for caching blocks of data read from FileSystem. */ @@ -64,7 +67,10 @@ public interface BlockCache extends Closeable { * * @param blockNumber the id of the given block. * @param buffer contents of the given block to be added to this cache. + * @param conf the configuration. + * @param localDirAllocator the local dir allocator instance. * @throws IOException if there is an error writing the given block. */ - void put(int blockNumber, ByteBuffer buffer) throws IOException; + void put(int blockNumber, ByteBuffer buffer, Configuration conf, + LocalDirAllocator localDirAllocator) throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java index a0db4b308b..e43b176d0b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java @@ -33,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.statistics.DurationTracker; import static java.util.Objects.requireNonNull; @@ -95,6 +97,10 @@ public abstract class CachingBlockManager extends BlockManager { private final PrefetchingStatistics prefetchingStatistics; + private final Configuration conf; + + private final LocalDirAllocator localDirAllocator; + /** * Constructs an instance of a {@code CachingBlockManager}. * @@ -102,14 +108,17 @@ public abstract class CachingBlockManager extends BlockManager { * @param blockData information about each block of the underlying file. * @param bufferPoolSize size of the in-memory cache in terms of number of blocks. * @param prefetchingStatistics statistics for this stream. - * + * @param conf the configuration. + * @param localDirAllocator the local dir allocator instance. * @throws IllegalArgumentException if bufferPoolSize is zero or negative. */ public CachingBlockManager( ExecutorServiceFuturePool futurePool, BlockData blockData, int bufferPoolSize, - PrefetchingStatistics prefetchingStatistics) { + PrefetchingStatistics prefetchingStatistics, + Configuration conf, + LocalDirAllocator localDirAllocator) { super(blockData); Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize"); @@ -129,6 +138,8 @@ public CachingBlockManager( this.ops = new BlockOperations(); this.ops.setDebug(false); + this.conf = requireNonNull(conf); + this.localDirAllocator = localDirAllocator; } /** @@ -468,7 +479,8 @@ public void requestCaching(BufferData data) { blockFuture = cf; } - CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now()); + CachePutTask task = + new CachePutTask(data, blockFuture, this, Instant.now()); Future actionFuture = futurePool.executeFunction(task); data.setCaching(actionFuture); ops.end(op); @@ -554,7 +566,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { return; } - cache.put(blockNumber, buffer); + cache.put(blockNumber, buffer, conf, localDirAllocator); } private static class CachePutTask implements Supplier { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java index c84335a763..1141603265 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java @@ -27,10 +27,9 @@ import java.nio.file.Files; import java.nio.file.OpenOption; import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.file.StandardOpenOption; -import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -39,9 +38,13 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; + import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull; @@ -67,6 +70,12 @@ public class SingleFilePerBlockCache implements BlockCache { private final PrefetchingStatistics prefetchingStatistics; + /** + * File attributes attached to any intermediate temporary file created during index creation. + */ + private static final Set TEMP_FILE_ATTRS = + ImmutableSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE); + /** * Cache entry. * Each block is stored as a separate file. @@ -172,11 +181,17 @@ private Entry getEntry(int blockNumber) { /** * Puts the given block in this cache. * - * @throws IllegalArgumentException if buffer is null. - * @throws IllegalArgumentException if buffer.limit() is zero or negative. + * @param blockNumber the block number, used as a key for blocks map. + * @param buffer buffer contents of the given block to be added to this cache. + * @param conf the configuration. + * @param localDirAllocator the local dir allocator instance. + * @throws IOException if either local dir allocator fails to allocate file or if IO error + * occurs while writing the buffer content to the file. + * @throws IllegalArgumentException if buffer is null, or if buffer.limit() is zero or negative. */ @Override - public void put(int blockNumber, ByteBuffer buffer) throws IOException { + public void put(int blockNumber, ByteBuffer buffer, Configuration conf, + LocalDirAllocator localDirAllocator) throws IOException { if (closed) { return; } @@ -191,7 +206,7 @@ public void put(int blockNumber, ByteBuffer buffer) throws IOException { Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()"); - Path blockFilePath = getCacheFilePath(); + Path blockFilePath = getCacheFilePath(conf, localDirAllocator); long size = Files.size(blockFilePath); if (size != 0) { String message = @@ -221,8 +236,19 @@ protected void writeFile(Path path, ByteBuffer buffer) throws IOException { writeChannel.close(); } - protected Path getCacheFilePath() throws IOException { - return getTempFilePath(); + /** + * Return temporary file created based on the file path retrieved from local dir allocator. + * + * @param conf The configuration object. + * @param localDirAllocator Local dir allocator instance. + * @return Path of the temporary file created. + * @throws IOException if IO error occurs while local dir allocator tries to retrieve path + * from local FS or file creation fails or permission set fails. + */ + protected Path getCacheFilePath(final Configuration conf, + final LocalDirAllocator localDirAllocator) + throws IOException { + return getTempFilePath(conf, localDirAllocator); } @Override @@ -323,9 +349,19 @@ private String getStats() { private static final String CACHE_FILE_PREFIX = "fs-cache-"; - public static boolean isCacheSpaceAvailable(long fileSize) { + /** + * Determine if the cache space is available on the local FS. + * + * @param fileSize The size of the file. + * @param conf The configuration. + * @param localDirAllocator Local dir allocator instance. + * @return True if the given file size is less than the available free space on local FS, + * False otherwise. + */ + public static boolean isCacheSpaceAvailable(long fileSize, Configuration conf, + LocalDirAllocator localDirAllocator) { try { - Path cacheFilePath = getTempFilePath(); + Path cacheFilePath = getTempFilePath(conf, localDirAllocator); long freeSpace = new File(cacheFilePath.toString()).getUsableSpace(); LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace); Files.deleteIfExists(cacheFilePath); @@ -339,16 +375,25 @@ public static boolean isCacheSpaceAvailable(long fileSize) { // The suffix (file extension) of each serialized index file. private static final String BINARY_FILE_SUFFIX = ".bin"; - // File attributes attached to any intermediate temporary file created during index creation. - private static final FileAttribute> TEMP_FILE_ATTRS = - PosixFilePermissions.asFileAttribute(EnumSet.of(PosixFilePermission.OWNER_READ, - PosixFilePermission.OWNER_WRITE)); - - private static Path getTempFilePath() throws IOException { - return Files.createTempFile( - CACHE_FILE_PREFIX, - BINARY_FILE_SUFFIX, - TEMP_FILE_ATTRS - ); + /** + * Create temporary file based on the file path retrieved from local dir allocator + * instance. The file is created with .bin suffix. The created file has been granted + * posix file permissions available in TEMP_FILE_ATTRS. + * + * @param conf the configuration. + * @param localDirAllocator the local dir allocator instance. + * @return path of the file created. + * @throws IOException if IO error occurs while local dir allocator tries to retrieve path + * from local FS or file creation fails or permission set fails. + */ + private static Path getTempFilePath(final Configuration conf, + final LocalDirAllocator localDirAllocator) throws IOException { + org.apache.hadoop.fs.Path path = + localDirAllocator.getLocalPathForWrite(CACHE_FILE_PREFIX, conf); + File dir = new File(path.getParent().toUri().getPath()); + String prefix = path.getName(); + File tmpFile = File.createTempFile(prefix, BINARY_FILE_SUFFIX, dir); + Path tmpFilePath = Paths.get(tmpFile.toURI()); + return Files.setPosixFilePermissions(tmpFilePath, TEMP_FILE_ATTRS); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java index 2ea041283a..3b60c1c795 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java @@ -23,8 +23,11 @@ import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.test.AbstractHadoopTestBase; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -36,6 +39,8 @@ public class TestBlockCache extends AbstractHadoopTestBase { private static final int BUFFER_SIZE = 16; + private static final Configuration CONF = new Configuration(); + @Test public void testArgChecks() throws Exception { // Should not throw. @@ -46,7 +51,7 @@ public void testArgChecks() throws Exception { // Verify it throws correctly. intercept(IllegalArgumentException.class, "'buffer' must not be null", - () -> cache.put(42, null)); + () -> cache.put(42, null, null, null)); intercept(NullPointerException.class, null, @@ -67,7 +72,7 @@ public void testPutAndGet() throws Exception { assertEquals(0, cache.size()); assertFalse(cache.containsBlock(0)); - cache.put(0, buffer1); + cache.put(0, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR)); assertEquals(1, cache.size()); assertTrue(cache.containsBlock(0)); ByteBuffer buffer2 = ByteBuffer.allocate(BUFFER_SIZE); @@ -77,7 +82,7 @@ public void testPutAndGet() throws Exception { assertEquals(1, cache.size()); assertFalse(cache.containsBlock(1)); - cache.put(1, buffer1); + cache.put(1, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR)); assertEquals(2, cache.size()); assertTrue(cache.containsBlock(1)); ByteBuffer buffer3 = ByteBuffer.allocate(BUFFER_SIZE); diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index ae8db93329..3bd973567c 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -200,6 +200,9 @@ **/ITestMarkerToolRootOperations.java **/ITestAggregateIOStatistics.java + + **/ITestS3APrefetchingCacheFiles.java @@ -246,6 +249,8 @@ **/ITestS3AContractRootDir.java **/ITestAggregateIOStatistics.java + + **/ITestS3APrefetchingCacheFiles.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a73bd55b55..30b2813caf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1368,6 +1368,21 @@ public S3AEncryptionMethods getS3EncryptionAlgorithm() { */ File createTmpFileForWrite(String pathStr, long size, Configuration conf) throws IOException { + initLocalDirAllocatorIfNotInitialized(conf); + Path path = directoryAllocator.getLocalPathForWrite(pathStr, + size, conf); + File dir = new File(path.getParent().toUri().getPath()); + String prefix = path.getName(); + // create a temp file on this directory + return File.createTempFile(prefix, null, dir); + } + + /** + * Initialize dir allocator if not already initialized. + * + * @param conf The Configuration object. + */ + private void initLocalDirAllocatorIfNotInitialized(Configuration conf) { if (directoryAllocator == null) { synchronized (this) { String bufferDir = conf.get(BUFFER_DIR) != null @@ -1375,12 +1390,6 @@ File createTmpFileForWrite(String pathStr, long size, directoryAllocator = new LocalDirAllocator(bufferDir); } } - Path path = directoryAllocator.getLocalPathForWrite(pathStr, - size, conf); - File dir = new File(path.getParent().toUri().getPath()); - String prefix = path.getName(); - // create a temp file on this directory - return File.createTempFile(prefix, null, dir); } /** @@ -1573,12 +1582,16 @@ private FSDataInputStream executeOpen( LOG.debug("Opening '{}'", readContext); if (this.prefetchEnabled) { + Configuration configuration = getConf(); + initLocalDirAllocatorIfNotInitialized(configuration); return new FSDataInputStream( new S3APrefetchingInputStream( readContext.build(), createObjectAttributes(path, fileStatus), createInputStreamCallbacks(auditSpan), - inputStreamStats)); + inputStreamStats, + configuration, + directoryAllocator)); } else { return new FSDataInputStream( new S3AInputStream( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java index f82786659d..c166943c00 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java @@ -25,6 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.impl.prefetch.BlockData; import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; @@ -52,7 +54,8 @@ public class S3ACachingBlockManager extends CachingBlockManager { * @param blockData information about each block of the S3 file. * @param bufferPoolSize size of the in-memory cache in terms of number of blocks. * @param streamStatistics statistics for this stream. - * + * @param conf the configuration. + * @param localDirAllocator the local dir allocator instance. * @throws IllegalArgumentException if reader is null. */ public S3ACachingBlockManager( @@ -60,8 +63,11 @@ public S3ACachingBlockManager( S3ARemoteObjectReader reader, BlockData blockData, int bufferPoolSize, - S3AInputStreamStatistics streamStatistics) { - super(futurePool, blockData, bufferPoolSize, streamStatistics); + S3AInputStreamStatistics streamStatistics, + Configuration conf, + LocalDirAllocator localDirAllocator) { + + super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator); Validate.checkNotNull(reader, "reader"); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java index f9ee4e412f..fe95048648 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java @@ -24,6 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.impl.prefetch.BlockData; import org.apache.hadoop.fs.impl.prefetch.BlockManager; import org.apache.hadoop.fs.impl.prefetch.BufferData; @@ -61,7 +63,8 @@ public class S3ACachingInputStream extends S3ARemoteInputStream { * @param s3Attributes attributes of the S3 object being read. * @param client callbacks used for interacting with the underlying S3 client. * @param streamStatistics statistics for this stream. - * + * @param conf the configuration. + * @param localDirAllocator the local dir allocator instance. * @throws IllegalArgumentException if context is null. * @throws IllegalArgumentException if s3Attributes is null. * @throws IllegalArgumentException if client is null. @@ -70,7 +73,9 @@ public S3ACachingInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, S3AInputStream.InputStreamCallbacks client, - S3AInputStreamStatistics streamStatistics) { + S3AInputStreamStatistics streamStatistics, + Configuration conf, + LocalDirAllocator localDirAllocator) { super(context, s3Attributes, client, streamStatistics); this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount(); @@ -79,7 +84,9 @@ public S3ACachingInputStream( this.getContext().getFuturePool(), this.getReader(), this.getBlockData(), - bufferPoolSize); + bufferPoolSize, + conf, + localDirAllocator); int fileSize = (int) s3Attributes.getLen(); LOG.debug("Created caching input stream for {} (size = {})", this.getName(), fileSize); @@ -176,9 +183,15 @@ protected BlockManager createBlockManager( ExecutorServiceFuturePool futurePool, S3ARemoteObjectReader reader, BlockData blockData, - int bufferPoolSize) { - return new S3ACachingBlockManager(futurePool, reader, blockData, + int bufferPoolSize, + Configuration conf, + LocalDirAllocator localDirAllocator) { + return new S3ACachingBlockManager(futurePool, + reader, + blockData, bufferPoolSize, - getS3AStreamStatistics()); + getS3AStreamStatistics(), + conf, + localDirAllocator); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java index f778f40b74..9b9ee12ad7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java @@ -27,9 +27,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.impl.prefetch.Validate; import org.apache.hadoop.fs.s3a.S3AInputStream; @@ -79,7 +81,8 @@ public class S3APrefetchingInputStream * @param s3Attributes attributes of the S3 object being read. * @param client callbacks used for interacting with the underlying S3 client. * @param streamStatistics statistics for this stream. - * + * @param conf the configuration. + * @param localDirAllocator the local dir allocator instance retrieved from S3A FS. * @throws IllegalArgumentException if context is null. * @throws IllegalArgumentException if s3Attributes is null. * @throws IllegalArgumentException if client is null. @@ -88,7 +91,9 @@ public S3APrefetchingInputStream( S3AReadOpContext context, S3ObjectAttributes s3Attributes, S3AInputStream.InputStreamCallbacks client, - S3AInputStreamStatistics streamStatistics) { + S3AInputStreamStatistics streamStatistics, + Configuration conf, + LocalDirAllocator localDirAllocator) { Validate.checkNotNull(context, "context"); Validate.checkNotNull(s3Attributes, "s3Attributes"); @@ -114,7 +119,9 @@ public S3APrefetchingInputStream( context, s3Attributes, client, - streamStatistics); + streamStatistics, + conf, + localDirAllocator); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java new file mode 100644 index 0000000000..6ad8ef58a7 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.File; +import java.net.URI; + +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; + +import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + +/** + * Test the cache file behaviour with prefetching input stream. + */ +public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class); + + private Path testFile; + private FileSystem fs; + private int prefetchBlockSize; + private Configuration conf; + + public ITestS3APrefetchingCacheFiles() { + super(true); + } + + @Before + public void setUp() throws Exception { + super.setup(); + // Sets BUFFER_DIR by calling S3ATestUtils#prepareTestConfiguration + conf = createConfiguration(); + String testFileUri = S3ATestUtils.getCSVTestFile(conf); + + testFile = new Path(testFileUri); + prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); + fs = getFileSystem(); + fs.initialize(new URI(testFileUri), conf); + } + + @Override + public Configuration createConfiguration() { + Configuration configuration = super.createConfiguration(); + S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_ENABLED_KEY); + configuration.setBoolean(PREFETCH_ENABLED_KEY, true); + return configuration; + } + + @Override + public synchronized void teardown() throws Exception { + super.teardown(); + File tmpFileDir = new File(conf.get(BUFFER_DIR)); + File[] tmpFiles = tmpFileDir.listFiles(); + if (tmpFiles != null) { + for (File filePath : tmpFiles) { + String path = filePath.getPath(); + if (path.endsWith(".bin") && path.contains("fs-cache-")) { + filePath.delete(); + } + } + } + cleanupWithLogger(LOG, fs); + fs = null; + testFile = null; + } + + /** + * Test to verify the existence of the cache file. + * Tries to perform inputStream read and seek ops to make the prefetching take place and + * asserts whether file with .bin suffix is present. It also verifies certain file stats. + */ + @Test + public void testCacheFileExistence() throws Throwable { + describe("Verify that FS cache files exist on local FS"); + + try (FSDataInputStream in = fs.open(testFile)) { + byte[] buffer = new byte[prefetchBlockSize]; + + in.read(buffer, 0, prefetchBlockSize - 10240); + in.seek(prefetchBlockSize * 2); + in.read(buffer, 0, prefetchBlockSize); + + File tmpFileDir = new File(conf.get(BUFFER_DIR)); + assertTrue("The dir to keep cache files must exist", tmpFileDir.exists()); + File[] tmpFiles = tmpFileDir + .listFiles((dir, name) -> name.endsWith(".bin") && name.contains("fs-cache-")); + boolean isCacheFileForBlockFound = tmpFiles != null && tmpFiles.length > 0; + if (!isCacheFileForBlockFound) { + LOG.warn("No cache files found under " + tmpFileDir); + } + assertTrue("File to cache block data must exist", isCacheFileForBlockFound); + + for (File tmpFile : tmpFiles) { + Path path = new Path(tmpFile.getAbsolutePath()); + try (FileSystem localFs = FileSystem.getLocal(conf)) { + FileStatus stat = localFs.getFileStatus(path); + ContractTestUtils.assertIsFile(path, stat); + assertEquals("File length not matching with prefetchBlockSize", prefetchBlockSize, + stat.getLen()); + assertEquals("User permissions should be RW", FsAction.READ_WRITE, + stat.getPermission().getUserAction()); + assertEquals("Group permissions should be NONE", FsAction.NONE, + stat.getPermission().getGroupAction()); + assertEquals("Other permissions should be NONE", FsAction.NONE, + stat.getPermission().getOtherAction()); + } + } + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java index bab07f4f9e..cf6aa7ba1a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java @@ -36,7 +36,9 @@ import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.impl.prefetch.BlockCache; import org.apache.hadoop.fs.impl.prefetch.BlockData; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; @@ -60,6 +62,8 @@ import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.util.functional.CallableRaisingIOE; +import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; +import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore; /** @@ -86,6 +90,8 @@ private S3APrefetchFakes() { public static final long MODIFICATION_TIME = 0L; + private static final Configuration CONF = new Configuration(); + public static final ChangeDetectionPolicy CHANGE_POLICY = ChangeDetectionPolicy.createPolicy( ChangeDetectionPolicy.Mode.None, @@ -335,7 +341,9 @@ protected void writeFile(Path path, ByteBuffer buffer) throws IOException { private long fileCount = 0; @Override - protected Path getCacheFilePath() throws IOException { + protected Path getCacheFilePath(final Configuration conf, + final LocalDirAllocator localDirAllocator) + throws IOException { fileCount++; return Paths.get(Long.toString(fileCount)); } @@ -363,9 +371,12 @@ public FakeS3ACachingBlockManager( ExecutorServiceFuturePool futurePool, S3ARemoteObjectReader reader, BlockData blockData, - int bufferPoolSize) { + int bufferPoolSize, + Configuration conf, + LocalDirAllocator localDirAllocator) { super(futurePool, reader, blockData, bufferPoolSize, - new EmptyS3AStatisticsContext().newInputStreamStatistics()); + new EmptyS3AStatisticsContext().newInputStreamStatistics(), + conf, localDirAllocator); } @Override @@ -390,7 +401,9 @@ public FakeS3ACachingInputStream( S3ObjectAttributes s3Attributes, S3AInputStream.InputStreamCallbacks client, S3AInputStreamStatistics streamStatistics) { - super(context, s3Attributes, client, streamStatistics); + super(context, s3Attributes, client, streamStatistics, CONF, + new LocalDirAllocator( + CONF.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR)); } @Override @@ -405,9 +418,11 @@ protected S3ACachingBlockManager createBlockManager( ExecutorServiceFuturePool futurePool, S3ARemoteObjectReader reader, BlockData blockData, - int bufferPoolSize) { + int bufferPoolSize, + Configuration conf, + LocalDirAllocator localDirAllocator) { return new FakeS3ACachingBlockManager(futurePool, reader, blockData, - bufferPoolSize); + bufferPoolSize, conf, localDirAllocator); } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java index aecf8802be..cbfa643ee5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java @@ -26,13 +26,18 @@ import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.impl.prefetch.BlockData; import org.apache.hadoop.fs.impl.prefetch.BufferData; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; +import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.test.AbstractHadoopTestBase; +import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; +import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.assertEquals; @@ -59,44 +64,45 @@ public void testArgChecks() throws Exception { MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File); + Configuration conf = new Configuration(); // Should not throw. S3ACachingBlockManager blockManager = new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE, - streamStatistics); + streamStatistics, conf, null); // Verify it throws correctly. intercept( NullPointerException.class, () -> new S3ACachingBlockManager(null, reader, blockData, POOL_SIZE, - streamStatistics)); + streamStatistics, conf, null)); intercept( IllegalArgumentException.class, "'reader' must not be null", () -> new S3ACachingBlockManager(futurePool, null, blockData, POOL_SIZE, - streamStatistics)); + streamStatistics, conf, null)); intercept( IllegalArgumentException.class, "'blockData' must not be null", () -> new S3ACachingBlockManager(futurePool, reader, null, POOL_SIZE, - streamStatistics)); + streamStatistics, conf, null)); intercept( IllegalArgumentException.class, "'bufferPoolSize' must be a positive integer", () -> new S3ACachingBlockManager(futurePool, reader, blockData, 0, - streamStatistics)); + streamStatistics, conf, null)); intercept( IllegalArgumentException.class, "'bufferPoolSize' must be a positive integer", () -> new S3ACachingBlockManager(futurePool, reader, blockData, -1, - streamStatistics)); + streamStatistics, conf, null)); intercept(NullPointerException.class, () -> new S3ACachingBlockManager(futurePool, reader, blockData, - POOL_SIZE, null)); + POOL_SIZE, null, conf, null)); intercept( IllegalArgumentException.class, @@ -125,13 +131,17 @@ public void testArgChecks() throws Exception { private static final class BlockManagerForTesting extends S3ACachingBlockManager { + private static final Configuration CONF = + S3ATestUtils.prepareTestConfiguration(new Configuration()); + BlockManagerForTesting( ExecutorServiceFuturePool futurePool, S3ARemoteObjectReader reader, BlockData blockData, int bufferPoolSize, S3AInputStreamStatistics streamStatistics) { - super(futurePool, reader, blockData, bufferPoolSize, streamStatistics); + super(futurePool, reader, blockData, bufferPoolSize, streamStatistics, CONF, + new LocalDirAllocator(HADOOP_TMP_DIR)); } // If true, forces the next read operation to fail. @@ -154,8 +164,8 @@ public int read(ByteBuffer buffer, long offset, int size) private boolean forceNextCachePutToFail; @Override - protected void cachePut(int blockNumber, ByteBuffer buffer) - throws IOException { + protected void cachePut(int blockNumber, + ByteBuffer buffer) throws IOException { if (forceNextCachePutToFail) { forceNextCachePutToFail = false; throw new RuntimeException("bar"); @@ -262,9 +272,11 @@ public void testCachingOfPrefetched() throws IOException, InterruptedException { MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File); + Configuration conf = new Configuration(); S3ACachingBlockManager blockManager = new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE, - streamStatistics); + streamStatistics, conf, new LocalDirAllocator( + conf.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR)); assertInitialState(blockManager); for (int b = 0; b < blockData.getNumBlocks(); b++) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java index d449a79a5a..8ce26033c1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java @@ -27,11 +27,13 @@ import org.junit.Test; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.impl.prefetch.ExceptionAsserts; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; +import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.test.AbstractHadoopTestBase; @@ -63,24 +65,25 @@ public void testArgChecks() throws Exception { S3AInputStreamStatistics stats = readContext.getS3AStatisticsContext().newInputStreamStatistics(); + Configuration conf = S3ATestUtils.prepareTestConfiguration(new Configuration()); // Should not throw. - new S3ACachingInputStream(readContext, attrs, client, stats); + new S3ACachingInputStream(readContext, attrs, client, stats, conf, null); ExceptionAsserts.assertThrows( NullPointerException.class, - () -> new S3ACachingInputStream(null, attrs, client, stats)); + () -> new S3ACachingInputStream(null, attrs, client, stats, conf, null)); ExceptionAsserts.assertThrows( NullPointerException.class, - () -> new S3ACachingInputStream(readContext, null, client, stats)); + () -> new S3ACachingInputStream(readContext, null, client, stats, conf, null)); ExceptionAsserts.assertThrows( NullPointerException.class, - () -> new S3ACachingInputStream(readContext, attrs, null, stats)); + () -> new S3ACachingInputStream(readContext, attrs, null, stats, conf, null)); ExceptionAsserts.assertThrows( NullPointerException.class, - () -> new S3ACachingInputStream(readContext, attrs, client, null)); + () -> new S3ACachingInputStream(readContext, attrs, client, null, conf, null)); } @Test