HADOOP-18399. S3A Prefetch - SingleFilePerBlockCache to use LocalDirAllocator (#5054)

Contributed by Viraj Jasani
This commit is contained in:
Viraj Jasani 2023-04-18 08:37:48 -07:00 committed by Steve Loughran
parent 0e51a9b55e
commit 05edfee1f3
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
13 changed files with 356 additions and 70 deletions

View File

@ -23,6 +23,9 @@
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
/** /**
* Provides functionality necessary for caching blocks of data read from FileSystem. * Provides functionality necessary for caching blocks of data read from FileSystem.
*/ */
@ -64,7 +67,10 @@ public interface BlockCache extends Closeable {
* *
* @param blockNumber the id of the given block. * @param blockNumber the id of the given block.
* @param buffer contents of the given block to be added to this cache. * @param buffer contents of the given block to be added to this cache.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IOException if there is an error writing the given block. * @throws IOException if there is an error writing the given block.
*/ */
void put(int blockNumber, ByteBuffer buffer) throws IOException; void put(int blockNumber, ByteBuffer buffer, Configuration conf,
LocalDirAllocator localDirAllocator) throws IOException;
} }

View File

@ -33,6 +33,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTracker;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
@ -95,6 +97,10 @@ public abstract class CachingBlockManager extends BlockManager {
private final PrefetchingStatistics prefetchingStatistics; private final PrefetchingStatistics prefetchingStatistics;
private final Configuration conf;
private final LocalDirAllocator localDirAllocator;
/** /**
* Constructs an instance of a {@code CachingBlockManager}. * Constructs an instance of a {@code CachingBlockManager}.
* *
@ -102,14 +108,17 @@ public abstract class CachingBlockManager extends BlockManager {
* @param blockData information about each block of the underlying file. * @param blockData information about each block of the underlying file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks. * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param prefetchingStatistics statistics for this stream. * @param prefetchingStatistics statistics for this stream.
* * @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IllegalArgumentException if bufferPoolSize is zero or negative. * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
*/ */
public CachingBlockManager( public CachingBlockManager(
ExecutorServiceFuturePool futurePool, ExecutorServiceFuturePool futurePool,
BlockData blockData, BlockData blockData,
int bufferPoolSize, int bufferPoolSize,
PrefetchingStatistics prefetchingStatistics) { PrefetchingStatistics prefetchingStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
super(blockData); super(blockData);
Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize"); Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
@ -129,6 +138,8 @@ public CachingBlockManager(
this.ops = new BlockOperations(); this.ops = new BlockOperations();
this.ops.setDebug(false); this.ops.setDebug(false);
this.conf = requireNonNull(conf);
this.localDirAllocator = localDirAllocator;
} }
/** /**
@ -468,7 +479,8 @@ public void requestCaching(BufferData data) {
blockFuture = cf; blockFuture = cf;
} }
CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now()); CachePutTask task =
new CachePutTask(data, blockFuture, this, Instant.now());
Future<Void> actionFuture = futurePool.executeFunction(task); Future<Void> actionFuture = futurePool.executeFunction(task);
data.setCaching(actionFuture); data.setCaching(actionFuture);
ops.end(op); ops.end(op);
@ -554,7 +566,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
return; return;
} }
cache.put(blockNumber, buffer); cache.put(blockNumber, buffer, conf, localDirAllocator);
} }
private static class CachePutTask implements Supplier<Void> { private static class CachePutTask implements Supplier<Void> {

View File

@ -27,10 +27,9 @@
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.OpenOption; import java.nio.file.OpenOption;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
@ -39,9 +38,13 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull; import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
@ -67,6 +70,12 @@ public class SingleFilePerBlockCache implements BlockCache {
private final PrefetchingStatistics prefetchingStatistics; private final PrefetchingStatistics prefetchingStatistics;
/**
* File attributes attached to any intermediate temporary file created during index creation.
*/
private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
ImmutableSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);
/** /**
* Cache entry. * Cache entry.
* Each block is stored as a separate file. * Each block is stored as a separate file.
@ -172,11 +181,17 @@ private Entry getEntry(int blockNumber) {
/** /**
* Puts the given block in this cache. * Puts the given block in this cache.
* *
* @throws IllegalArgumentException if buffer is null. * @param blockNumber the block number, used as a key for blocks map.
* @throws IllegalArgumentException if buffer.limit() is zero or negative. * @param buffer buffer contents of the given block to be added to this cache.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IOException if either local dir allocator fails to allocate file or if IO error
* occurs while writing the buffer content to the file.
* @throws IllegalArgumentException if buffer is null, or if buffer.limit() is zero or negative.
*/ */
@Override @Override
public void put(int blockNumber, ByteBuffer buffer) throws IOException { public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
LocalDirAllocator localDirAllocator) throws IOException {
if (closed) { if (closed) {
return; return;
} }
@ -191,7 +206,7 @@ public void put(int blockNumber, ByteBuffer buffer) throws IOException {
Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()"); Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()");
Path blockFilePath = getCacheFilePath(); Path blockFilePath = getCacheFilePath(conf, localDirAllocator);
long size = Files.size(blockFilePath); long size = Files.size(blockFilePath);
if (size != 0) { if (size != 0) {
String message = String message =
@ -221,8 +236,19 @@ protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
writeChannel.close(); writeChannel.close();
} }
protected Path getCacheFilePath() throws IOException { /**
return getTempFilePath(); * Return temporary file created based on the file path retrieved from local dir allocator.
*
* @param conf The configuration object.
* @param localDirAllocator Local dir allocator instance.
* @return Path of the temporary file created.
* @throws IOException if IO error occurs while local dir allocator tries to retrieve path
* from local FS or file creation fails or permission set fails.
*/
protected Path getCacheFilePath(final Configuration conf,
final LocalDirAllocator localDirAllocator)
throws IOException {
return getTempFilePath(conf, localDirAllocator);
} }
@Override @Override
@ -323,9 +349,19 @@ private String getStats() {
private static final String CACHE_FILE_PREFIX = "fs-cache-"; private static final String CACHE_FILE_PREFIX = "fs-cache-";
public static boolean isCacheSpaceAvailable(long fileSize) { /**
* Determine if the cache space is available on the local FS.
*
* @param fileSize The size of the file.
* @param conf The configuration.
* @param localDirAllocator Local dir allocator instance.
* @return True if the given file size is less than the available free space on local FS,
* False otherwise.
*/
public static boolean isCacheSpaceAvailable(long fileSize, Configuration conf,
LocalDirAllocator localDirAllocator) {
try { try {
Path cacheFilePath = getTempFilePath(); Path cacheFilePath = getTempFilePath(conf, localDirAllocator);
long freeSpace = new File(cacheFilePath.toString()).getUsableSpace(); long freeSpace = new File(cacheFilePath.toString()).getUsableSpace();
LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace); LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace);
Files.deleteIfExists(cacheFilePath); Files.deleteIfExists(cacheFilePath);
@ -339,16 +375,25 @@ public static boolean isCacheSpaceAvailable(long fileSize) {
// The suffix (file extension) of each serialized index file. // The suffix (file extension) of each serialized index file.
private static final String BINARY_FILE_SUFFIX = ".bin"; private static final String BINARY_FILE_SUFFIX = ".bin";
// File attributes attached to any intermediate temporary file created during index creation. /**
private static final FileAttribute<Set<PosixFilePermission>> TEMP_FILE_ATTRS = * Create temporary file based on the file path retrieved from local dir allocator
PosixFilePermissions.asFileAttribute(EnumSet.of(PosixFilePermission.OWNER_READ, * instance. The file is created with .bin suffix. The created file has been granted
PosixFilePermission.OWNER_WRITE)); * posix file permissions available in TEMP_FILE_ATTRS.
*
private static Path getTempFilePath() throws IOException { * @param conf the configuration.
return Files.createTempFile( * @param localDirAllocator the local dir allocator instance.
CACHE_FILE_PREFIX, * @return path of the file created.
BINARY_FILE_SUFFIX, * @throws IOException if IO error occurs while local dir allocator tries to retrieve path
TEMP_FILE_ATTRS * from local FS or file creation fails or permission set fails.
); */
private static Path getTempFilePath(final Configuration conf,
final LocalDirAllocator localDirAllocator) throws IOException {
org.apache.hadoop.fs.Path path =
localDirAllocator.getLocalPathForWrite(CACHE_FILE_PREFIX, conf);
File dir = new File(path.getParent().toUri().getPath());
String prefix = path.getName();
File tmpFile = File.createTempFile(prefix, BINARY_FILE_SUFFIX, dir);
Path tmpFilePath = Paths.get(tmpFile.toURI());
return Files.setPosixFilePermissions(tmpFilePath, TEMP_FILE_ATTRS);
} }
} }

View File

@ -23,8 +23,11 @@
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.test.AbstractHadoopTestBase; import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -36,6 +39,8 @@ public class TestBlockCache extends AbstractHadoopTestBase {
private static final int BUFFER_SIZE = 16; private static final int BUFFER_SIZE = 16;
private static final Configuration CONF = new Configuration();
@Test @Test
public void testArgChecks() throws Exception { public void testArgChecks() throws Exception {
// Should not throw. // Should not throw.
@ -46,7 +51,7 @@ public void testArgChecks() throws Exception {
// Verify it throws correctly. // Verify it throws correctly.
intercept(IllegalArgumentException.class, "'buffer' must not be null", intercept(IllegalArgumentException.class, "'buffer' must not be null",
() -> cache.put(42, null)); () -> cache.put(42, null, null, null));
intercept(NullPointerException.class, null, intercept(NullPointerException.class, null,
@ -67,7 +72,7 @@ public void testPutAndGet() throws Exception {
assertEquals(0, cache.size()); assertEquals(0, cache.size());
assertFalse(cache.containsBlock(0)); assertFalse(cache.containsBlock(0));
cache.put(0, buffer1); cache.put(0, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
assertEquals(1, cache.size()); assertEquals(1, cache.size());
assertTrue(cache.containsBlock(0)); assertTrue(cache.containsBlock(0));
ByteBuffer buffer2 = ByteBuffer.allocate(BUFFER_SIZE); ByteBuffer buffer2 = ByteBuffer.allocate(BUFFER_SIZE);
@ -77,7 +82,7 @@ public void testPutAndGet() throws Exception {
assertEquals(1, cache.size()); assertEquals(1, cache.size());
assertFalse(cache.containsBlock(1)); assertFalse(cache.containsBlock(1));
cache.put(1, buffer1); cache.put(1, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
assertEquals(2, cache.size()); assertEquals(2, cache.size());
assertTrue(cache.containsBlock(1)); assertTrue(cache.containsBlock(1));
ByteBuffer buffer3 = ByteBuffer.allocate(BUFFER_SIZE); ByteBuffer buffer3 = ByteBuffer.allocate(BUFFER_SIZE);

View File

@ -198,6 +198,9 @@
<exclude>**/ITestMarkerToolRootOperations.java</exclude> <exclude>**/ITestMarkerToolRootOperations.java</exclude>
<!-- leave this until the end for better statistics --> <!-- leave this until the end for better statistics -->
<exclude>**/ITestAggregateIOStatistics.java</exclude> <exclude>**/ITestAggregateIOStatistics.java</exclude>
<!-- cache file based assertions cannot be properly achieved with parallel
execution, let this be sequential -->
<exclude>**/ITestS3APrefetchingCacheFiles.java</exclude>
</excludes> </excludes>
</configuration> </configuration>
</execution> </execution>
@ -244,6 +247,8 @@
<include>**/ITestS3AContractRootDir.java</include> <include>**/ITestS3AContractRootDir.java</include>
<!-- leave this until the end for better statistics --> <!-- leave this until the end for better statistics -->
<include>**/ITestAggregateIOStatistics.java</include> <include>**/ITestAggregateIOStatistics.java</include>
<!-- sequential execution for the better cleanup -->
<include>**/ITestS3APrefetchingCacheFiles.java</include>
</includes> </includes>
</configuration> </configuration>
</execution> </execution>

View File

@ -1363,6 +1363,21 @@ public S3AEncryptionMethods getS3EncryptionAlgorithm() {
*/ */
File createTmpFileForWrite(String pathStr, long size, File createTmpFileForWrite(String pathStr, long size,
Configuration conf) throws IOException { Configuration conf) throws IOException {
initLocalDirAllocatorIfNotInitialized(conf);
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
size, conf);
File dir = new File(path.getParent().toUri().getPath());
String prefix = path.getName();
// create a temp file on this directory
return File.createTempFile(prefix, null, dir);
}
/**
* Initialize dir allocator if not already initialized.
*
* @param conf The Configuration object.
*/
private void initLocalDirAllocatorIfNotInitialized(Configuration conf) {
if (directoryAllocator == null) { if (directoryAllocator == null) {
synchronized (this) { synchronized (this) {
String bufferDir = conf.get(BUFFER_DIR) != null String bufferDir = conf.get(BUFFER_DIR) != null
@ -1370,12 +1385,6 @@ File createTmpFileForWrite(String pathStr, long size,
directoryAllocator = new LocalDirAllocator(bufferDir); directoryAllocator = new LocalDirAllocator(bufferDir);
} }
} }
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
size, conf);
File dir = new File(path.getParent().toUri().getPath());
String prefix = path.getName();
// create a temp file on this directory
return File.createTempFile(prefix, null, dir);
} }
/** /**
@ -1568,12 +1577,16 @@ private FSDataInputStream executeOpen(
LOG.debug("Opening '{}'", readContext); LOG.debug("Opening '{}'", readContext);
if (this.prefetchEnabled) { if (this.prefetchEnabled) {
Configuration configuration = getConf();
initLocalDirAllocatorIfNotInitialized(configuration);
return new FSDataInputStream( return new FSDataInputStream(
new S3APrefetchingInputStream( new S3APrefetchingInputStream(
readContext.build(), readContext.build(),
createObjectAttributes(path, fileStatus), createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan), createInputStreamCallbacks(auditSpan),
inputStreamStats)); inputStreamStats,
configuration,
directoryAllocator));
} else { } else {
return new FSDataInputStream( return new FSDataInputStream(
new S3AInputStream( new S3AInputStream(

View File

@ -25,6 +25,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.impl.prefetch.BlockData; import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager; import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
@ -52,7 +54,8 @@ public class S3ACachingBlockManager extends CachingBlockManager {
* @param blockData information about each block of the S3 file. * @param blockData information about each block of the S3 file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks. * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param streamStatistics statistics for this stream. * @param streamStatistics statistics for this stream.
* * @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IllegalArgumentException if reader is null. * @throws IllegalArgumentException if reader is null.
*/ */
public S3ACachingBlockManager( public S3ACachingBlockManager(
@ -60,8 +63,11 @@ public S3ACachingBlockManager(
S3ARemoteObjectReader reader, S3ARemoteObjectReader reader,
BlockData blockData, BlockData blockData,
int bufferPoolSize, int bufferPoolSize,
S3AInputStreamStatistics streamStatistics) { S3AInputStreamStatistics streamStatistics,
super(futurePool, blockData, bufferPoolSize, streamStatistics); Configuration conf,
LocalDirAllocator localDirAllocator) {
super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator);
Validate.checkNotNull(reader, "reader"); Validate.checkNotNull(reader, "reader");

View File

@ -24,6 +24,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.impl.prefetch.BlockData; import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.BlockManager; import org.apache.hadoop.fs.impl.prefetch.BlockManager;
import org.apache.hadoop.fs.impl.prefetch.BufferData; import org.apache.hadoop.fs.impl.prefetch.BufferData;
@ -61,7 +63,8 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
* @param s3Attributes attributes of the S3 object being read. * @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client. * @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream. * @param streamStatistics statistics for this stream.
* * @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IllegalArgumentException if context is null. * @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null. * @throws IllegalArgumentException if s3Attributes is null.
* @throws IllegalArgumentException if client is null. * @throws IllegalArgumentException if client is null.
@ -70,7 +73,9 @@ public S3ACachingInputStream(
S3AReadOpContext context, S3AReadOpContext context,
S3ObjectAttributes s3Attributes, S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client, S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) { S3AInputStreamStatistics streamStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
super(context, s3Attributes, client, streamStatistics); super(context, s3Attributes, client, streamStatistics);
this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount(); this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
@ -79,7 +84,9 @@ public S3ACachingInputStream(
this.getContext().getFuturePool(), this.getContext().getFuturePool(),
this.getReader(), this.getReader(),
this.getBlockData(), this.getBlockData(),
bufferPoolSize); bufferPoolSize,
conf,
localDirAllocator);
int fileSize = (int) s3Attributes.getLen(); int fileSize = (int) s3Attributes.getLen();
LOG.debug("Created caching input stream for {} (size = {})", this.getName(), LOG.debug("Created caching input stream for {} (size = {})", this.getName(),
fileSize); fileSize);
@ -176,9 +183,15 @@ protected BlockManager createBlockManager(
ExecutorServiceFuturePool futurePool, ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader, S3ARemoteObjectReader reader,
BlockData blockData, BlockData blockData,
int bufferPoolSize) { int bufferPoolSize,
return new S3ACachingBlockManager(futurePool, reader, blockData, Configuration conf,
LocalDirAllocator localDirAllocator) {
return new S3ACachingBlockManager(futurePool,
reader,
blockData,
bufferPoolSize, bufferPoolSize,
getS3AStreamStatistics()); getS3AStreamStatistics(),
conf,
localDirAllocator);
} }
} }

View File

@ -27,9 +27,11 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.impl.prefetch.Validate; import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AInputStream;
@ -79,7 +81,8 @@ public class S3APrefetchingInputStream
* @param s3Attributes attributes of the S3 object being read. * @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client. * @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream. * @param streamStatistics statistics for this stream.
* * @param conf the configuration.
* @param localDirAllocator the local dir allocator instance retrieved from S3A FS.
* @throws IllegalArgumentException if context is null. * @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null. * @throws IllegalArgumentException if s3Attributes is null.
* @throws IllegalArgumentException if client is null. * @throws IllegalArgumentException if client is null.
@ -88,7 +91,9 @@ public S3APrefetchingInputStream(
S3AReadOpContext context, S3AReadOpContext context,
S3ObjectAttributes s3Attributes, S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client, S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) { S3AInputStreamStatistics streamStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
Validate.checkNotNull(context, "context"); Validate.checkNotNull(context, "context");
Validate.checkNotNull(s3Attributes, "s3Attributes"); Validate.checkNotNull(s3Attributes, "s3Attributes");
@ -114,7 +119,9 @@ public S3APrefetchingInputStream(
context, context,
s3Attributes, s3Attributes,
client, client,
streamStatistics); streamStatistics,
conf,
localDirAllocator);
} }
} }

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.File;
import java.net.URI;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Test the cache file behaviour with prefetching input stream.
*/
public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
private Path testFile;
private FileSystem fs;
private int prefetchBlockSize;
private Configuration conf;
public ITestS3APrefetchingCacheFiles() {
super(true);
}
@Before
public void setUp() throws Exception {
super.setup();
// Sets BUFFER_DIR by calling S3ATestUtils#prepareTestConfiguration
conf = createConfiguration();
String testFileUri = S3ATestUtils.getCSVTestFile(conf);
testFile = new Path(testFileUri);
prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
fs = getFileSystem();
fs.initialize(new URI(testFileUri), conf);
}
@Override
public Configuration createConfiguration() {
Configuration configuration = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_ENABLED_KEY);
configuration.setBoolean(PREFETCH_ENABLED_KEY, true);
return configuration;
}
@Override
public synchronized void teardown() throws Exception {
super.teardown();
File tmpFileDir = new File(conf.get(BUFFER_DIR));
File[] tmpFiles = tmpFileDir.listFiles();
if (tmpFiles != null) {
for (File filePath : tmpFiles) {
String path = filePath.getPath();
if (path.endsWith(".bin") && path.contains("fs-cache-")) {
filePath.delete();
}
}
}
cleanupWithLogger(LOG, fs);
fs = null;
testFile = null;
}
/**
* Test to verify the existence of the cache file.
* Tries to perform inputStream read and seek ops to make the prefetching take place and
* asserts whether file with .bin suffix is present. It also verifies certain file stats.
*/
@Test
public void testCacheFileExistence() throws Throwable {
describe("Verify that FS cache files exist on local FS");
try (FSDataInputStream in = fs.open(testFile)) {
byte[] buffer = new byte[prefetchBlockSize];
in.read(buffer, 0, prefetchBlockSize - 10240);
in.seek(prefetchBlockSize * 2);
in.read(buffer, 0, prefetchBlockSize);
File tmpFileDir = new File(conf.get(BUFFER_DIR));
assertTrue("The dir to keep cache files must exist", tmpFileDir.exists());
File[] tmpFiles = tmpFileDir
.listFiles((dir, name) -> name.endsWith(".bin") && name.contains("fs-cache-"));
boolean isCacheFileForBlockFound = tmpFiles != null && tmpFiles.length > 0;
if (!isCacheFileForBlockFound) {
LOG.warn("No cache files found under " + tmpFileDir);
}
assertTrue("File to cache block data must exist", isCacheFileForBlockFound);
for (File tmpFile : tmpFiles) {
Path path = new Path(tmpFile.getAbsolutePath());
try (FileSystem localFs = FileSystem.getLocal(conf)) {
FileStatus stat = localFs.getFileStatus(path);
ContractTestUtils.assertIsFile(path, stat);
assertEquals("File length not matching with prefetchBlockSize", prefetchBlockSize,
stat.getLen());
assertEquals("User permissions should be RW", FsAction.READ_WRITE,
stat.getPermission().getUserAction());
assertEquals("Group permissions should be NONE", FsAction.NONE,
stat.getPermission().getGroupAction());
assertEquals("Other permissions should be NONE", FsAction.NONE,
stat.getPermission().getOtherAction());
}
}
}
}
}

View File

@ -36,7 +36,9 @@
import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.impl.prefetch.BlockCache; import org.apache.hadoop.fs.impl.prefetch.BlockCache;
import org.apache.hadoop.fs.impl.prefetch.BlockData; import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
@ -60,6 +62,8 @@
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.apache.hadoop.util.functional.CallableRaisingIOE;
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore;
/** /**
@ -86,6 +90,8 @@ private S3APrefetchFakes() {
public static final long MODIFICATION_TIME = 0L; public static final long MODIFICATION_TIME = 0L;
private static final Configuration CONF = new Configuration();
public static final ChangeDetectionPolicy CHANGE_POLICY = public static final ChangeDetectionPolicy CHANGE_POLICY =
ChangeDetectionPolicy.createPolicy( ChangeDetectionPolicy.createPolicy(
ChangeDetectionPolicy.Mode.None, ChangeDetectionPolicy.Mode.None,
@ -335,7 +341,9 @@ protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
private long fileCount = 0; private long fileCount = 0;
@Override @Override
protected Path getCacheFilePath() throws IOException { protected Path getCacheFilePath(final Configuration conf,
final LocalDirAllocator localDirAllocator)
throws IOException {
fileCount++; fileCount++;
return Paths.get(Long.toString(fileCount)); return Paths.get(Long.toString(fileCount));
} }
@ -363,9 +371,12 @@ public FakeS3ACachingBlockManager(
ExecutorServiceFuturePool futurePool, ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader, S3ARemoteObjectReader reader,
BlockData blockData, BlockData blockData,
int bufferPoolSize) { int bufferPoolSize,
Configuration conf,
LocalDirAllocator localDirAllocator) {
super(futurePool, reader, blockData, bufferPoolSize, super(futurePool, reader, blockData, bufferPoolSize,
new EmptyS3AStatisticsContext().newInputStreamStatistics()); new EmptyS3AStatisticsContext().newInputStreamStatistics(),
conf, localDirAllocator);
} }
@Override @Override
@ -390,7 +401,9 @@ public FakeS3ACachingInputStream(
S3ObjectAttributes s3Attributes, S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client, S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) { S3AInputStreamStatistics streamStatistics) {
super(context, s3Attributes, client, streamStatistics); super(context, s3Attributes, client, streamStatistics, CONF,
new LocalDirAllocator(
CONF.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR));
} }
@Override @Override
@ -405,9 +418,11 @@ protected S3ACachingBlockManager createBlockManager(
ExecutorServiceFuturePool futurePool, ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader, S3ARemoteObjectReader reader,
BlockData blockData, BlockData blockData,
int bufferPoolSize) { int bufferPoolSize,
Configuration conf,
LocalDirAllocator localDirAllocator) {
return new FakeS3ACachingBlockManager(futurePool, reader, blockData, return new FakeS3ACachingBlockManager(futurePool, reader, blockData,
bufferPoolSize); bufferPoolSize, conf, localDirAllocator);
} }
} }
} }

View File

@ -26,13 +26,18 @@
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.impl.prefetch.BlockData; import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.BufferData; import org.apache.hadoop.fs.impl.prefetch.BufferData;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.test.AbstractHadoopTestBase; import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -59,44 +64,45 @@ public void testArgChecks() throws Exception {
MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File); S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
Configuration conf = new Configuration();
// Should not throw. // Should not throw.
S3ACachingBlockManager blockManager = S3ACachingBlockManager blockManager =
new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE, new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE,
streamStatistics); streamStatistics, conf, null);
// Verify it throws correctly. // Verify it throws correctly.
intercept( intercept(
NullPointerException.class, NullPointerException.class,
() -> new S3ACachingBlockManager(null, reader, blockData, POOL_SIZE, () -> new S3ACachingBlockManager(null, reader, blockData, POOL_SIZE,
streamStatistics)); streamStatistics, conf, null));
intercept( intercept(
IllegalArgumentException.class, IllegalArgumentException.class,
"'reader' must not be null", "'reader' must not be null",
() -> new S3ACachingBlockManager(futurePool, null, blockData, POOL_SIZE, () -> new S3ACachingBlockManager(futurePool, null, blockData, POOL_SIZE,
streamStatistics)); streamStatistics, conf, null));
intercept( intercept(
IllegalArgumentException.class, IllegalArgumentException.class,
"'blockData' must not be null", "'blockData' must not be null",
() -> new S3ACachingBlockManager(futurePool, reader, null, POOL_SIZE, () -> new S3ACachingBlockManager(futurePool, reader, null, POOL_SIZE,
streamStatistics)); streamStatistics, conf, null));
intercept( intercept(
IllegalArgumentException.class, IllegalArgumentException.class,
"'bufferPoolSize' must be a positive integer", "'bufferPoolSize' must be a positive integer",
() -> new S3ACachingBlockManager(futurePool, reader, blockData, 0, () -> new S3ACachingBlockManager(futurePool, reader, blockData, 0,
streamStatistics)); streamStatistics, conf, null));
intercept( intercept(
IllegalArgumentException.class, IllegalArgumentException.class,
"'bufferPoolSize' must be a positive integer", "'bufferPoolSize' must be a positive integer",
() -> new S3ACachingBlockManager(futurePool, reader, blockData, -1, () -> new S3ACachingBlockManager(futurePool, reader, blockData, -1,
streamStatistics)); streamStatistics, conf, null));
intercept(NullPointerException.class, intercept(NullPointerException.class,
() -> new S3ACachingBlockManager(futurePool, reader, blockData, () -> new S3ACachingBlockManager(futurePool, reader, blockData,
POOL_SIZE, null)); POOL_SIZE, null, conf, null));
intercept( intercept(
IllegalArgumentException.class, IllegalArgumentException.class,
@ -125,13 +131,17 @@ public void testArgChecks() throws Exception {
private static final class BlockManagerForTesting private static final class BlockManagerForTesting
extends S3ACachingBlockManager { extends S3ACachingBlockManager {
private static final Configuration CONF =
S3ATestUtils.prepareTestConfiguration(new Configuration());
BlockManagerForTesting( BlockManagerForTesting(
ExecutorServiceFuturePool futurePool, ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader, S3ARemoteObjectReader reader,
BlockData blockData, BlockData blockData,
int bufferPoolSize, int bufferPoolSize,
S3AInputStreamStatistics streamStatistics) { S3AInputStreamStatistics streamStatistics) {
super(futurePool, reader, blockData, bufferPoolSize, streamStatistics); super(futurePool, reader, blockData, bufferPoolSize, streamStatistics, CONF,
new LocalDirAllocator(HADOOP_TMP_DIR));
} }
// If true, forces the next read operation to fail. // If true, forces the next read operation to fail.
@ -154,8 +164,8 @@ public int read(ByteBuffer buffer, long offset, int size)
private boolean forceNextCachePutToFail; private boolean forceNextCachePutToFail;
@Override @Override
protected void cachePut(int blockNumber, ByteBuffer buffer) protected void cachePut(int blockNumber,
throws IOException { ByteBuffer buffer) throws IOException {
if (forceNextCachePutToFail) { if (forceNextCachePutToFail) {
forceNextCachePutToFail = false; forceNextCachePutToFail = false;
throw new RuntimeException("bar"); throw new RuntimeException("bar");
@ -262,9 +272,11 @@ public void testCachingOfPrefetched()
throws IOException, InterruptedException { throws IOException, InterruptedException {
MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false); MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File); S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
Configuration conf = new Configuration();
S3ACachingBlockManager blockManager = S3ACachingBlockManager blockManager =
new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE, new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE,
streamStatistics); streamStatistics, conf, new LocalDirAllocator(
conf.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR));
assertInitialState(blockManager); assertInitialState(blockManager);
for (int b = 0; b < blockData.getNumBlocks(); b++) { for (int b = 0; b < blockData.getNumBlocks(); b++) {

View File

@ -27,11 +27,13 @@
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.impl.prefetch.ExceptionAsserts; import org.apache.hadoop.fs.impl.prefetch.ExceptionAsserts;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.test.AbstractHadoopTestBase; import org.apache.hadoop.test.AbstractHadoopTestBase;
@ -63,24 +65,25 @@ public void testArgChecks() throws Exception {
S3AInputStreamStatistics stats = S3AInputStreamStatistics stats =
readContext.getS3AStatisticsContext().newInputStreamStatistics(); readContext.getS3AStatisticsContext().newInputStreamStatistics();
Configuration conf = S3ATestUtils.prepareTestConfiguration(new Configuration());
// Should not throw. // Should not throw.
new S3ACachingInputStream(readContext, attrs, client, stats); new S3ACachingInputStream(readContext, attrs, client, stats, conf, null);
ExceptionAsserts.assertThrows( ExceptionAsserts.assertThrows(
NullPointerException.class, NullPointerException.class,
() -> new S3ACachingInputStream(null, attrs, client, stats)); () -> new S3ACachingInputStream(null, attrs, client, stats, conf, null));
ExceptionAsserts.assertThrows( ExceptionAsserts.assertThrows(
NullPointerException.class, NullPointerException.class,
() -> new S3ACachingInputStream(readContext, null, client, stats)); () -> new S3ACachingInputStream(readContext, null, client, stats, conf, null));
ExceptionAsserts.assertThrows( ExceptionAsserts.assertThrows(
NullPointerException.class, NullPointerException.class,
() -> new S3ACachingInputStream(readContext, attrs, null, stats)); () -> new S3ACachingInputStream(readContext, attrs, null, stats, conf, null));
ExceptionAsserts.assertThrows( ExceptionAsserts.assertThrows(
NullPointerException.class, NullPointerException.class,
() -> new S3ACachingInputStream(readContext, attrs, client, null)); () -> new S3ACachingInputStream(readContext, attrs, client, null, conf, null));
} }
@Test @Test