HADOOP-18399. S3A Prefetch - SingleFilePerBlockCache to use LocalDirAllocator (#5054)
Contributed by Viraj Jasani
This commit is contained in:
parent
405ed1dde6
commit
0e3aafe6c0
@ -23,6 +23,9 @@ import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
|
||||
/**
|
||||
* Provides functionality necessary for caching blocks of data read from FileSystem.
|
||||
*/
|
||||
@ -64,7 +67,10 @@ public interface BlockCache extends Closeable {
|
||||
*
|
||||
* @param blockNumber the id of the given block.
|
||||
* @param buffer contents of the given block to be added to this cache.
|
||||
* @param conf the configuration.
|
||||
* @param localDirAllocator the local dir allocator instance.
|
||||
* @throws IOException if there is an error writing the given block.
|
||||
*/
|
||||
void put(int blockNumber, ByteBuffer buffer) throws IOException;
|
||||
void put(int blockNumber, ByteBuffer buffer, Configuration conf,
|
||||
LocalDirAllocator localDirAllocator) throws IOException;
|
||||
}
|
||||
|
@ -33,6 +33,8 @@ import java.util.function.Supplier;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.statistics.DurationTracker;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
@ -95,6 +97,10 @@ public abstract class CachingBlockManager extends BlockManager {
|
||||
|
||||
private final PrefetchingStatistics prefetchingStatistics;
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
private final LocalDirAllocator localDirAllocator;
|
||||
|
||||
/**
|
||||
* Constructs an instance of a {@code CachingBlockManager}.
|
||||
*
|
||||
@ -102,14 +108,17 @@ public abstract class CachingBlockManager extends BlockManager {
|
||||
* @param blockData information about each block of the underlying file.
|
||||
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
|
||||
* @param prefetchingStatistics statistics for this stream.
|
||||
*
|
||||
* @param conf the configuration.
|
||||
* @param localDirAllocator the local dir allocator instance.
|
||||
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
|
||||
*/
|
||||
public CachingBlockManager(
|
||||
ExecutorServiceFuturePool futurePool,
|
||||
BlockData blockData,
|
||||
int bufferPoolSize,
|
||||
PrefetchingStatistics prefetchingStatistics) {
|
||||
PrefetchingStatistics prefetchingStatistics,
|
||||
Configuration conf,
|
||||
LocalDirAllocator localDirAllocator) {
|
||||
super(blockData);
|
||||
|
||||
Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
|
||||
@ -129,6 +138,8 @@ public abstract class CachingBlockManager extends BlockManager {
|
||||
|
||||
this.ops = new BlockOperations();
|
||||
this.ops.setDebug(false);
|
||||
this.conf = requireNonNull(conf);
|
||||
this.localDirAllocator = localDirAllocator;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -468,7 +479,8 @@ public abstract class CachingBlockManager extends BlockManager {
|
||||
blockFuture = cf;
|
||||
}
|
||||
|
||||
CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now());
|
||||
CachePutTask task =
|
||||
new CachePutTask(data, blockFuture, this, Instant.now());
|
||||
Future<Void> actionFuture = futurePool.executeFunction(task);
|
||||
data.setCaching(actionFuture);
|
||||
ops.end(op);
|
||||
@ -554,7 +566,7 @@ public abstract class CachingBlockManager extends BlockManager {
|
||||
return;
|
||||
}
|
||||
|
||||
cache.put(blockNumber, buffer);
|
||||
cache.put(blockNumber, buffer, conf, localDirAllocator);
|
||||
}
|
||||
|
||||
private static class CachePutTask implements Supplier<Void> {
|
||||
|
@ -27,10 +27,9 @@ import java.nio.channels.WritableByteChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.OpenOption;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.nio.file.attribute.FileAttribute;
|
||||
import java.nio.file.attribute.PosixFilePermission;
|
||||
import java.nio.file.attribute.PosixFilePermissions;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
@ -39,9 +38,13 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
|
||||
|
||||
@ -67,6 +70,12 @@ public class SingleFilePerBlockCache implements BlockCache {
|
||||
|
||||
private final PrefetchingStatistics prefetchingStatistics;
|
||||
|
||||
/**
|
||||
* File attributes attached to any intermediate temporary file created during index creation.
|
||||
*/
|
||||
private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
|
||||
ImmutableSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);
|
||||
|
||||
/**
|
||||
* Cache entry.
|
||||
* Each block is stored as a separate file.
|
||||
@ -172,11 +181,17 @@ public class SingleFilePerBlockCache implements BlockCache {
|
||||
/**
|
||||
* Puts the given block in this cache.
|
||||
*
|
||||
* @throws IllegalArgumentException if buffer is null.
|
||||
* @throws IllegalArgumentException if buffer.limit() is zero or negative.
|
||||
* @param blockNumber the block number, used as a key for blocks map.
|
||||
* @param buffer buffer contents of the given block to be added to this cache.
|
||||
* @param conf the configuration.
|
||||
* @param localDirAllocator the local dir allocator instance.
|
||||
* @throws IOException if either local dir allocator fails to allocate file or if IO error
|
||||
* occurs while writing the buffer content to the file.
|
||||
* @throws IllegalArgumentException if buffer is null, or if buffer.limit() is zero or negative.
|
||||
*/
|
||||
@Override
|
||||
public void put(int blockNumber, ByteBuffer buffer) throws IOException {
|
||||
public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
|
||||
LocalDirAllocator localDirAllocator) throws IOException {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
@ -191,7 +206,7 @@ public class SingleFilePerBlockCache implements BlockCache {
|
||||
|
||||
Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()");
|
||||
|
||||
Path blockFilePath = getCacheFilePath();
|
||||
Path blockFilePath = getCacheFilePath(conf, localDirAllocator);
|
||||
long size = Files.size(blockFilePath);
|
||||
if (size != 0) {
|
||||
String message =
|
||||
@ -221,8 +236,19 @@ public class SingleFilePerBlockCache implements BlockCache {
|
||||
writeChannel.close();
|
||||
}
|
||||
|
||||
protected Path getCacheFilePath() throws IOException {
|
||||
return getTempFilePath();
|
||||
/**
|
||||
* Return temporary file created based on the file path retrieved from local dir allocator.
|
||||
*
|
||||
* @param conf The configuration object.
|
||||
* @param localDirAllocator Local dir allocator instance.
|
||||
* @return Path of the temporary file created.
|
||||
* @throws IOException if IO error occurs while local dir allocator tries to retrieve path
|
||||
* from local FS or file creation fails or permission set fails.
|
||||
*/
|
||||
protected Path getCacheFilePath(final Configuration conf,
|
||||
final LocalDirAllocator localDirAllocator)
|
||||
throws IOException {
|
||||
return getTempFilePath(conf, localDirAllocator);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -323,9 +349,19 @@ public class SingleFilePerBlockCache implements BlockCache {
|
||||
|
||||
private static final String CACHE_FILE_PREFIX = "fs-cache-";
|
||||
|
||||
public static boolean isCacheSpaceAvailable(long fileSize) {
|
||||
/**
|
||||
* Determine if the cache space is available on the local FS.
|
||||
*
|
||||
* @param fileSize The size of the file.
|
||||
* @param conf The configuration.
|
||||
* @param localDirAllocator Local dir allocator instance.
|
||||
* @return True if the given file size is less than the available free space on local FS,
|
||||
* False otherwise.
|
||||
*/
|
||||
public static boolean isCacheSpaceAvailable(long fileSize, Configuration conf,
|
||||
LocalDirAllocator localDirAllocator) {
|
||||
try {
|
||||
Path cacheFilePath = getTempFilePath();
|
||||
Path cacheFilePath = getTempFilePath(conf, localDirAllocator);
|
||||
long freeSpace = new File(cacheFilePath.toString()).getUsableSpace();
|
||||
LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace);
|
||||
Files.deleteIfExists(cacheFilePath);
|
||||
@ -339,16 +375,25 @@ public class SingleFilePerBlockCache implements BlockCache {
|
||||
// The suffix (file extension) of each serialized index file.
|
||||
private static final String BINARY_FILE_SUFFIX = ".bin";
|
||||
|
||||
// File attributes attached to any intermediate temporary file created during index creation.
|
||||
private static final FileAttribute<Set<PosixFilePermission>> TEMP_FILE_ATTRS =
|
||||
PosixFilePermissions.asFileAttribute(EnumSet.of(PosixFilePermission.OWNER_READ,
|
||||
PosixFilePermission.OWNER_WRITE));
|
||||
|
||||
private static Path getTempFilePath() throws IOException {
|
||||
return Files.createTempFile(
|
||||
CACHE_FILE_PREFIX,
|
||||
BINARY_FILE_SUFFIX,
|
||||
TEMP_FILE_ATTRS
|
||||
);
|
||||
/**
|
||||
* Create temporary file based on the file path retrieved from local dir allocator
|
||||
* instance. The file is created with .bin suffix. The created file has been granted
|
||||
* posix file permissions available in TEMP_FILE_ATTRS.
|
||||
*
|
||||
* @param conf the configuration.
|
||||
* @param localDirAllocator the local dir allocator instance.
|
||||
* @return path of the file created.
|
||||
* @throws IOException if IO error occurs while local dir allocator tries to retrieve path
|
||||
* from local FS or file creation fails or permission set fails.
|
||||
*/
|
||||
private static Path getTempFilePath(final Configuration conf,
|
||||
final LocalDirAllocator localDirAllocator) throws IOException {
|
||||
org.apache.hadoop.fs.Path path =
|
||||
localDirAllocator.getLocalPathForWrite(CACHE_FILE_PREFIX, conf);
|
||||
File dir = new File(path.getParent().toUri().getPath());
|
||||
String prefix = path.getName();
|
||||
File tmpFile = File.createTempFile(prefix, BINARY_FILE_SUFFIX, dir);
|
||||
Path tmpFilePath = Paths.get(tmpFile.toURI());
|
||||
return Files.setPosixFilePermissions(tmpFilePath, TEMP_FILE_ATTRS);
|
||||
}
|
||||
}
|
||||
|
@ -23,8 +23,11 @@ import java.nio.ByteBuffer;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.test.AbstractHadoopTestBase;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
@ -36,6 +39,8 @@ public class TestBlockCache extends AbstractHadoopTestBase {
|
||||
|
||||
private static final int BUFFER_SIZE = 16;
|
||||
|
||||
private static final Configuration CONF = new Configuration();
|
||||
|
||||
@Test
|
||||
public void testArgChecks() throws Exception {
|
||||
// Should not throw.
|
||||
@ -46,7 +51,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
|
||||
|
||||
// Verify it throws correctly.
|
||||
intercept(IllegalArgumentException.class, "'buffer' must not be null",
|
||||
() -> cache.put(42, null));
|
||||
() -> cache.put(42, null, null, null));
|
||||
|
||||
|
||||
intercept(NullPointerException.class, null,
|
||||
@ -67,7 +72,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
|
||||
|
||||
assertEquals(0, cache.size());
|
||||
assertFalse(cache.containsBlock(0));
|
||||
cache.put(0, buffer1);
|
||||
cache.put(0, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
|
||||
assertEquals(1, cache.size());
|
||||
assertTrue(cache.containsBlock(0));
|
||||
ByteBuffer buffer2 = ByteBuffer.allocate(BUFFER_SIZE);
|
||||
@ -77,7 +82,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
|
||||
|
||||
assertEquals(1, cache.size());
|
||||
assertFalse(cache.containsBlock(1));
|
||||
cache.put(1, buffer1);
|
||||
cache.put(1, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
|
||||
assertEquals(2, cache.size());
|
||||
assertTrue(cache.containsBlock(1));
|
||||
ByteBuffer buffer3 = ByteBuffer.allocate(BUFFER_SIZE);
|
||||
|
@ -200,6 +200,9 @@
|
||||
<exclude>**/ITestMarkerToolRootOperations.java</exclude>
|
||||
<!-- leave this until the end for better statistics -->
|
||||
<exclude>**/ITestAggregateIOStatistics.java</exclude>
|
||||
<!-- cache file based assertions cannot be properly achieved with parallel
|
||||
execution, let this be sequential -->
|
||||
<exclude>**/ITestS3APrefetchingCacheFiles.java</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</execution>
|
||||
@ -246,6 +249,8 @@
|
||||
<include>**/ITestS3AContractRootDir.java</include>
|
||||
<!-- leave this until the end for better statistics -->
|
||||
<include>**/ITestAggregateIOStatistics.java</include>
|
||||
<!-- sequential execution for the better cleanup -->
|
||||
<include>**/ITestS3APrefetchingCacheFiles.java</include>
|
||||
</includes>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
@ -1368,6 +1368,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||
*/
|
||||
File createTmpFileForWrite(String pathStr, long size,
|
||||
Configuration conf) throws IOException {
|
||||
initLocalDirAllocatorIfNotInitialized(conf);
|
||||
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
|
||||
size, conf);
|
||||
File dir = new File(path.getParent().toUri().getPath());
|
||||
String prefix = path.getName();
|
||||
// create a temp file on this directory
|
||||
return File.createTempFile(prefix, null, dir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize dir allocator if not already initialized.
|
||||
*
|
||||
* @param conf The Configuration object.
|
||||
*/
|
||||
private void initLocalDirAllocatorIfNotInitialized(Configuration conf) {
|
||||
if (directoryAllocator == null) {
|
||||
synchronized (this) {
|
||||
String bufferDir = conf.get(BUFFER_DIR) != null
|
||||
@ -1375,12 +1390,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||
directoryAllocator = new LocalDirAllocator(bufferDir);
|
||||
}
|
||||
}
|
||||
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
|
||||
size, conf);
|
||||
File dir = new File(path.getParent().toUri().getPath());
|
||||
String prefix = path.getName();
|
||||
// create a temp file on this directory
|
||||
return File.createTempFile(prefix, null, dir);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1573,12 +1582,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||
LOG.debug("Opening '{}'", readContext);
|
||||
|
||||
if (this.prefetchEnabled) {
|
||||
Configuration configuration = getConf();
|
||||
initLocalDirAllocatorIfNotInitialized(configuration);
|
||||
return new FSDataInputStream(
|
||||
new S3APrefetchingInputStream(
|
||||
readContext.build(),
|
||||
createObjectAttributes(path, fileStatus),
|
||||
createInputStreamCallbacks(auditSpan),
|
||||
inputStreamStats));
|
||||
inputStreamStats,
|
||||
configuration,
|
||||
directoryAllocator));
|
||||
} else {
|
||||
return new FSDataInputStream(
|
||||
new S3AInputStream(
|
||||
|
@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.impl.prefetch.BlockData;
|
||||
import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager;
|
||||
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
|
||||
@ -52,7 +54,8 @@ public class S3ACachingBlockManager extends CachingBlockManager {
|
||||
* @param blockData information about each block of the S3 file.
|
||||
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
|
||||
* @param streamStatistics statistics for this stream.
|
||||
*
|
||||
* @param conf the configuration.
|
||||
* @param localDirAllocator the local dir allocator instance.
|
||||
* @throws IllegalArgumentException if reader is null.
|
||||
*/
|
||||
public S3ACachingBlockManager(
|
||||
@ -60,8 +63,11 @@ public class S3ACachingBlockManager extends CachingBlockManager {
|
||||
S3ARemoteObjectReader reader,
|
||||
BlockData blockData,
|
||||
int bufferPoolSize,
|
||||
S3AInputStreamStatistics streamStatistics) {
|
||||
super(futurePool, blockData, bufferPoolSize, streamStatistics);
|
||||
S3AInputStreamStatistics streamStatistics,
|
||||
Configuration conf,
|
||||
LocalDirAllocator localDirAllocator) {
|
||||
|
||||
super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator);
|
||||
|
||||
Validate.checkNotNull(reader, "reader");
|
||||
|
||||
|
@ -24,6 +24,8 @@ import java.io.IOException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.impl.prefetch.BlockData;
|
||||
import org.apache.hadoop.fs.impl.prefetch.BlockManager;
|
||||
import org.apache.hadoop.fs.impl.prefetch.BufferData;
|
||||
@ -61,7 +63,8 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
|
||||
* @param s3Attributes attributes of the S3 object being read.
|
||||
* @param client callbacks used for interacting with the underlying S3 client.
|
||||
* @param streamStatistics statistics for this stream.
|
||||
*
|
||||
* @param conf the configuration.
|
||||
* @param localDirAllocator the local dir allocator instance.
|
||||
* @throws IllegalArgumentException if context is null.
|
||||
* @throws IllegalArgumentException if s3Attributes is null.
|
||||
* @throws IllegalArgumentException if client is null.
|
||||
@ -70,7 +73,9 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
|
||||
S3AReadOpContext context,
|
||||
S3ObjectAttributes s3Attributes,
|
||||
S3AInputStream.InputStreamCallbacks client,
|
||||
S3AInputStreamStatistics streamStatistics) {
|
||||
S3AInputStreamStatistics streamStatistics,
|
||||
Configuration conf,
|
||||
LocalDirAllocator localDirAllocator) {
|
||||
super(context, s3Attributes, client, streamStatistics);
|
||||
|
||||
this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
|
||||
@ -79,7 +84,9 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
|
||||
this.getContext().getFuturePool(),
|
||||
this.getReader(),
|
||||
this.getBlockData(),
|
||||
bufferPoolSize);
|
||||
bufferPoolSize,
|
||||
conf,
|
||||
localDirAllocator);
|
||||
int fileSize = (int) s3Attributes.getLen();
|
||||
LOG.debug("Created caching input stream for {} (size = {})", this.getName(),
|
||||
fileSize);
|
||||
@ -176,9 +183,15 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
|
||||
ExecutorServiceFuturePool futurePool,
|
||||
S3ARemoteObjectReader reader,
|
||||
BlockData blockData,
|
||||
int bufferPoolSize) {
|
||||
return new S3ACachingBlockManager(futurePool, reader, blockData,
|
||||
int bufferPoolSize,
|
||||
Configuration conf,
|
||||
LocalDirAllocator localDirAllocator) {
|
||||
return new S3ACachingBlockManager(futurePool,
|
||||
reader,
|
||||
blockData,
|
||||
bufferPoolSize,
|
||||
getS3AStreamStatistics());
|
||||
getS3AStreamStatistics(),
|
||||
conf,
|
||||
localDirAllocator);
|
||||
}
|
||||
}
|
||||
|
@ -27,9 +27,11 @@ import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CanSetReadahead;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.impl.prefetch.Validate;
|
||||
import org.apache.hadoop.fs.s3a.S3AInputStream;
|
||||
@ -79,7 +81,8 @@ public class S3APrefetchingInputStream
|
||||
* @param s3Attributes attributes of the S3 object being read.
|
||||
* @param client callbacks used for interacting with the underlying S3 client.
|
||||
* @param streamStatistics statistics for this stream.
|
||||
*
|
||||
* @param conf the configuration.
|
||||
* @param localDirAllocator the local dir allocator instance retrieved from S3A FS.
|
||||
* @throws IllegalArgumentException if context is null.
|
||||
* @throws IllegalArgumentException if s3Attributes is null.
|
||||
* @throws IllegalArgumentException if client is null.
|
||||
@ -88,7 +91,9 @@ public class S3APrefetchingInputStream
|
||||
S3AReadOpContext context,
|
||||
S3ObjectAttributes s3Attributes,
|
||||
S3AInputStream.InputStreamCallbacks client,
|
||||
S3AInputStreamStatistics streamStatistics) {
|
||||
S3AInputStreamStatistics streamStatistics,
|
||||
Configuration conf,
|
||||
LocalDirAllocator localDirAllocator) {
|
||||
|
||||
Validate.checkNotNull(context, "context");
|
||||
Validate.checkNotNull(s3Attributes, "s3Attributes");
|
||||
@ -114,7 +119,9 @@ public class S3APrefetchingInputStream
|
||||
context,
|
||||
s3Attributes,
|
||||
client,
|
||||
streamStatistics);
|
||||
streamStatistics,
|
||||
conf,
|
||||
localDirAllocator);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,144 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.URI;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
|
||||
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
||||
|
||||
/**
|
||||
* Test the cache file behaviour with prefetching input stream.
|
||||
*/
|
||||
public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
|
||||
|
||||
private Path testFile;
|
||||
private FileSystem fs;
|
||||
private int prefetchBlockSize;
|
||||
private Configuration conf;
|
||||
|
||||
public ITestS3APrefetchingCacheFiles() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setup();
|
||||
// Sets BUFFER_DIR by calling S3ATestUtils#prepareTestConfiguration
|
||||
conf = createConfiguration();
|
||||
String testFileUri = S3ATestUtils.getCSVTestFile(conf);
|
||||
|
||||
testFile = new Path(testFileUri);
|
||||
prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
|
||||
fs = getFileSystem();
|
||||
fs.initialize(new URI(testFileUri), conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration createConfiguration() {
|
||||
Configuration configuration = super.createConfiguration();
|
||||
S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_ENABLED_KEY);
|
||||
configuration.setBoolean(PREFETCH_ENABLED_KEY, true);
|
||||
return configuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void teardown() throws Exception {
|
||||
super.teardown();
|
||||
File tmpFileDir = new File(conf.get(BUFFER_DIR));
|
||||
File[] tmpFiles = tmpFileDir.listFiles();
|
||||
if (tmpFiles != null) {
|
||||
for (File filePath : tmpFiles) {
|
||||
String path = filePath.getPath();
|
||||
if (path.endsWith(".bin") && path.contains("fs-cache-")) {
|
||||
filePath.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
cleanupWithLogger(LOG, fs);
|
||||
fs = null;
|
||||
testFile = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify the existence of the cache file.
|
||||
* Tries to perform inputStream read and seek ops to make the prefetching take place and
|
||||
* asserts whether file with .bin suffix is present. It also verifies certain file stats.
|
||||
*/
|
||||
@Test
|
||||
public void testCacheFileExistence() throws Throwable {
|
||||
describe("Verify that FS cache files exist on local FS");
|
||||
|
||||
try (FSDataInputStream in = fs.open(testFile)) {
|
||||
byte[] buffer = new byte[prefetchBlockSize];
|
||||
|
||||
in.read(buffer, 0, prefetchBlockSize - 10240);
|
||||
in.seek(prefetchBlockSize * 2);
|
||||
in.read(buffer, 0, prefetchBlockSize);
|
||||
|
||||
File tmpFileDir = new File(conf.get(BUFFER_DIR));
|
||||
assertTrue("The dir to keep cache files must exist", tmpFileDir.exists());
|
||||
File[] tmpFiles = tmpFileDir
|
||||
.listFiles((dir, name) -> name.endsWith(".bin") && name.contains("fs-cache-"));
|
||||
boolean isCacheFileForBlockFound = tmpFiles != null && tmpFiles.length > 0;
|
||||
if (!isCacheFileForBlockFound) {
|
||||
LOG.warn("No cache files found under " + tmpFileDir);
|
||||
}
|
||||
assertTrue("File to cache block data must exist", isCacheFileForBlockFound);
|
||||
|
||||
for (File tmpFile : tmpFiles) {
|
||||
Path path = new Path(tmpFile.getAbsolutePath());
|
||||
try (FileSystem localFs = FileSystem.getLocal(conf)) {
|
||||
FileStatus stat = localFs.getFileStatus(path);
|
||||
ContractTestUtils.assertIsFile(path, stat);
|
||||
assertEquals("File length not matching with prefetchBlockSize", prefetchBlockSize,
|
||||
stat.getLen());
|
||||
assertEquals("User permissions should be RW", FsAction.READ_WRITE,
|
||||
stat.getPermission().getUserAction());
|
||||
assertEquals("Group permissions should be NONE", FsAction.NONE,
|
||||
stat.getPermission().getGroupAction());
|
||||
assertEquals("Other permissions should be NONE", FsAction.NONE,
|
||||
stat.getPermission().getOtherAction());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -36,7 +36,9 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.S3Object;
|
||||
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.impl.prefetch.BlockCache;
|
||||
import org.apache.hadoop.fs.impl.prefetch.BlockData;
|
||||
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
|
||||
@ -60,6 +62,8 @@ import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
|
||||
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore;
|
||||
|
||||
/**
|
||||
@ -86,6 +90,8 @@ public final class S3APrefetchFakes {
|
||||
|
||||
public static final long MODIFICATION_TIME = 0L;
|
||||
|
||||
private static final Configuration CONF = new Configuration();
|
||||
|
||||
public static final ChangeDetectionPolicy CHANGE_POLICY =
|
||||
ChangeDetectionPolicy.createPolicy(
|
||||
ChangeDetectionPolicy.Mode.None,
|
||||
@ -335,7 +341,9 @@ public final class S3APrefetchFakes {
|
||||
private long fileCount = 0;
|
||||
|
||||
@Override
|
||||
protected Path getCacheFilePath() throws IOException {
|
||||
protected Path getCacheFilePath(final Configuration conf,
|
||||
final LocalDirAllocator localDirAllocator)
|
||||
throws IOException {
|
||||
fileCount++;
|
||||
return Paths.get(Long.toString(fileCount));
|
||||
}
|
||||
@ -363,9 +371,12 @@ public final class S3APrefetchFakes {
|
||||
ExecutorServiceFuturePool futurePool,
|
||||
S3ARemoteObjectReader reader,
|
||||
BlockData blockData,
|
||||
int bufferPoolSize) {
|
||||
int bufferPoolSize,
|
||||
Configuration conf,
|
||||
LocalDirAllocator localDirAllocator) {
|
||||
super(futurePool, reader, blockData, bufferPoolSize,
|
||||
new EmptyS3AStatisticsContext().newInputStreamStatistics());
|
||||
new EmptyS3AStatisticsContext().newInputStreamStatistics(),
|
||||
conf, localDirAllocator);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -390,7 +401,9 @@ public final class S3APrefetchFakes {
|
||||
S3ObjectAttributes s3Attributes,
|
||||
S3AInputStream.InputStreamCallbacks client,
|
||||
S3AInputStreamStatistics streamStatistics) {
|
||||
super(context, s3Attributes, client, streamStatistics);
|
||||
super(context, s3Attributes, client, streamStatistics, CONF,
|
||||
new LocalDirAllocator(
|
||||
CONF.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -405,9 +418,11 @@ public final class S3APrefetchFakes {
|
||||
ExecutorServiceFuturePool futurePool,
|
||||
S3ARemoteObjectReader reader,
|
||||
BlockData blockData,
|
||||
int bufferPoolSize) {
|
||||
int bufferPoolSize,
|
||||
Configuration conf,
|
||||
LocalDirAllocator localDirAllocator) {
|
||||
return new FakeS3ACachingBlockManager(futurePool, reader, blockData,
|
||||
bufferPoolSize);
|
||||
bufferPoolSize, conf, localDirAllocator);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,13 +26,18 @@ import java.util.concurrent.Executors;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.impl.prefetch.BlockData;
|
||||
import org.apache.hadoop.fs.impl.prefetch.BufferData;
|
||||
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
||||
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
|
||||
import org.apache.hadoop.test.AbstractHadoopTestBase;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
@ -59,44 +64,45 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
|
||||
MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
|
||||
S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
// Should not throw.
|
||||
S3ACachingBlockManager blockManager =
|
||||
new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE,
|
||||
streamStatistics);
|
||||
streamStatistics, conf, null);
|
||||
|
||||
// Verify it throws correctly.
|
||||
intercept(
|
||||
NullPointerException.class,
|
||||
() -> new S3ACachingBlockManager(null, reader, blockData, POOL_SIZE,
|
||||
streamStatistics));
|
||||
streamStatistics, conf, null));
|
||||
|
||||
intercept(
|
||||
IllegalArgumentException.class,
|
||||
"'reader' must not be null",
|
||||
() -> new S3ACachingBlockManager(futurePool, null, blockData, POOL_SIZE,
|
||||
streamStatistics));
|
||||
streamStatistics, conf, null));
|
||||
|
||||
intercept(
|
||||
IllegalArgumentException.class,
|
||||
"'blockData' must not be null",
|
||||
() -> new S3ACachingBlockManager(futurePool, reader, null, POOL_SIZE,
|
||||
streamStatistics));
|
||||
streamStatistics, conf, null));
|
||||
|
||||
intercept(
|
||||
IllegalArgumentException.class,
|
||||
"'bufferPoolSize' must be a positive integer",
|
||||
() -> new S3ACachingBlockManager(futurePool, reader, blockData, 0,
|
||||
streamStatistics));
|
||||
streamStatistics, conf, null));
|
||||
|
||||
intercept(
|
||||
IllegalArgumentException.class,
|
||||
"'bufferPoolSize' must be a positive integer",
|
||||
() -> new S3ACachingBlockManager(futurePool, reader, blockData, -1,
|
||||
streamStatistics));
|
||||
streamStatistics, conf, null));
|
||||
|
||||
intercept(NullPointerException.class,
|
||||
() -> new S3ACachingBlockManager(futurePool, reader, blockData,
|
||||
POOL_SIZE, null));
|
||||
POOL_SIZE, null, conf, null));
|
||||
|
||||
intercept(
|
||||
IllegalArgumentException.class,
|
||||
@ -125,13 +131,17 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
|
||||
private static final class BlockManagerForTesting
|
||||
extends S3ACachingBlockManager {
|
||||
|
||||
private static final Configuration CONF =
|
||||
S3ATestUtils.prepareTestConfiguration(new Configuration());
|
||||
|
||||
BlockManagerForTesting(
|
||||
ExecutorServiceFuturePool futurePool,
|
||||
S3ARemoteObjectReader reader,
|
||||
BlockData blockData,
|
||||
int bufferPoolSize,
|
||||
S3AInputStreamStatistics streamStatistics) {
|
||||
super(futurePool, reader, blockData, bufferPoolSize, streamStatistics);
|
||||
super(futurePool, reader, blockData, bufferPoolSize, streamStatistics, CONF,
|
||||
new LocalDirAllocator(HADOOP_TMP_DIR));
|
||||
}
|
||||
|
||||
// If true, forces the next read operation to fail.
|
||||
@ -154,8 +164,8 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
|
||||
private boolean forceNextCachePutToFail;
|
||||
|
||||
@Override
|
||||
protected void cachePut(int blockNumber, ByteBuffer buffer)
|
||||
throws IOException {
|
||||
protected void cachePut(int blockNumber,
|
||||
ByteBuffer buffer) throws IOException {
|
||||
if (forceNextCachePutToFail) {
|
||||
forceNextCachePutToFail = false;
|
||||
throw new RuntimeException("bar");
|
||||
@ -262,9 +272,11 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
|
||||
throws IOException, InterruptedException {
|
||||
MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
|
||||
S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
|
||||
Configuration conf = new Configuration();
|
||||
S3ACachingBlockManager blockManager =
|
||||
new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE,
|
||||
streamStatistics);
|
||||
streamStatistics, conf, new LocalDirAllocator(
|
||||
conf.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR));
|
||||
assertInitialState(blockManager);
|
||||
|
||||
for (int b = 0; b < blockData.getNumBlocks(); b++) {
|
||||
|
@ -27,11 +27,13 @@ import java.util.concurrent.Executors;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.impl.prefetch.ExceptionAsserts;
|
||||
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
|
||||
import org.apache.hadoop.fs.s3a.S3AInputStream;
|
||||
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
|
||||
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
||||
import org.apache.hadoop.test.AbstractHadoopTestBase;
|
||||
@ -63,24 +65,25 @@ public class TestS3ARemoteInputStream extends AbstractHadoopTestBase {
|
||||
S3AInputStreamStatistics stats =
|
||||
readContext.getS3AStatisticsContext().newInputStreamStatistics();
|
||||
|
||||
Configuration conf = S3ATestUtils.prepareTestConfiguration(new Configuration());
|
||||
// Should not throw.
|
||||
new S3ACachingInputStream(readContext, attrs, client, stats);
|
||||
new S3ACachingInputStream(readContext, attrs, client, stats, conf, null);
|
||||
|
||||
ExceptionAsserts.assertThrows(
|
||||
NullPointerException.class,
|
||||
() -> new S3ACachingInputStream(null, attrs, client, stats));
|
||||
() -> new S3ACachingInputStream(null, attrs, client, stats, conf, null));
|
||||
|
||||
ExceptionAsserts.assertThrows(
|
||||
NullPointerException.class,
|
||||
() -> new S3ACachingInputStream(readContext, null, client, stats));
|
||||
() -> new S3ACachingInputStream(readContext, null, client, stats, conf, null));
|
||||
|
||||
ExceptionAsserts.assertThrows(
|
||||
NullPointerException.class,
|
||||
() -> new S3ACachingInputStream(readContext, attrs, null, stats));
|
||||
() -> new S3ACachingInputStream(readContext, attrs, null, stats, conf, null));
|
||||
|
||||
ExceptionAsserts.assertThrows(
|
||||
NullPointerException.class,
|
||||
() -> new S3ACachingInputStream(readContext, attrs, client, null));
|
||||
() -> new S3ACachingInputStream(readContext, attrs, client, null, conf, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
x
Reference in New Issue
Block a user