HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)
Part of HADOOP-18103. Introducing fs.s3a.vectored.read.min.seek.size and fs.s3a.vectored.read.max.merged.size to configure min seek and max read during a vectored IO operation in S3A connector. These properties actually define how the ranges will be merged. To completely disable merging set fs.s3a.max.readsize.vectored.read to 0. Contributed By: Mukund Thakur
This commit is contained in:
parent
2daf0a814f
commit
5db0f34e29
@ -474,6 +474,7 @@ end of first and start of next range is more than this value.
|
|||||||
|
|
||||||
Maximum number of bytes which can be read in one go after merging the ranges.
|
Maximum number of bytes which can be read in one go after merging the ranges.
|
||||||
Two ranges won't be merged if the combined data to be read is more than this value.
|
Two ranges won't be merged if the combined data to be read is more than this value.
|
||||||
|
Essentially setting this to 0 will disable the merging of ranges.
|
||||||
|
|
||||||
## Consistency
|
## Consistency
|
||||||
|
|
||||||
|
@ -18,15 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.fs.contract;
|
package org.apache.hadoop.fs.contract;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FileRange;
|
|
||||||
import org.apache.hadoop.fs.FileRangeImpl;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.impl.FutureIOSupport;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
@ -42,6 +33,15 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileRange;
|
||||||
|
import org.apache.hadoop.fs.FileRangeImpl;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.impl.FutureIOSupport;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||||
|
|
||||||
@ -52,7 +52,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
|
|||||||
|
|
||||||
public static final int DATASET_LEN = 64 * 1024;
|
public static final int DATASET_LEN = 64 * 1024;
|
||||||
private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
|
private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
|
||||||
private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
|
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
|
||||||
private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
|
private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
|
||||||
private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
|
private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
|
||||||
|
|
||||||
@ -172,6 +172,7 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testSameRanges() throws Exception {
|
public void testSameRanges() throws Exception {
|
||||||
FileSystem fs = getFileSystem();
|
FileSystem fs = getFileSystem();
|
||||||
List<FileRange> fileRanges = new ArrayList<>();
|
List<FileRange> fileRanges = new ArrayList<>();
|
||||||
|
@ -207,6 +207,30 @@ public void testSortAndMergeMoreCases() throws Exception {
|
|||||||
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
|
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxSizeZeroDisablesMering() throws Exception {
|
||||||
|
List<FileRange> randomRanges = Arrays.asList(
|
||||||
|
new FileRangeImpl(3000, 110),
|
||||||
|
new FileRangeImpl(3000, 100),
|
||||||
|
new FileRangeImpl(2100, 100)
|
||||||
|
);
|
||||||
|
assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0);
|
||||||
|
assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0);
|
||||||
|
assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertEqualRangeCountsAfterMerging(List<FileRange> inputRanges,
|
||||||
|
int chunkSize,
|
||||||
|
int minimumSeek,
|
||||||
|
int maxSize) {
|
||||||
|
List<CombinedFileRange> combinedFileRanges = VectoredReadUtils
|
||||||
|
.sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize);
|
||||||
|
Assertions.assertThat(combinedFileRanges)
|
||||||
|
.describedAs("Mismatch in number of ranges post merging")
|
||||||
|
.hasSize(inputRanges.size());
|
||||||
|
}
|
||||||
|
|
||||||
interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
|
interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
|
||||||
// nothing
|
// nothing
|
||||||
}
|
}
|
||||||
|
@ -86,4 +86,16 @@ public static <T> void assertFutureFailedExceptionally(CompletableFuture<T> futu
|
|||||||
"completed exceptionally")
|
"completed exceptionally")
|
||||||
.isTrue();
|
.isTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert two same type of values.
|
||||||
|
* @param actual actual value.
|
||||||
|
* @param expected expected value.
|
||||||
|
* @param message error message to print in case of mismatch.
|
||||||
|
*/
|
||||||
|
public static <T> void assertEqual(T actual, T expected, String message) {
|
||||||
|
Assertions.assertThat(actual)
|
||||||
|
.describedAs("Mismatch in %s", message)
|
||||||
|
.isEqualTo(expected);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1177,4 +1177,30 @@ private Constants() {
|
|||||||
*/
|
*/
|
||||||
public static final String FS_S3A_CREATE_HEADER = "fs.s3a.create.header";
|
public static final String FS_S3A_CREATE_HEADER = "fs.s3a.create.header";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the smallest reasonable seek in bytes such
|
||||||
|
* that we group ranges together during vectored read operation.
|
||||||
|
* Value : {@value}.
|
||||||
|
*/
|
||||||
|
public static final String AWS_S3_VECTOR_READS_MIN_SEEK_SIZE =
|
||||||
|
"fs.s3a.vectored.read.min.seek.size";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the largest merged read size in bytes such
|
||||||
|
* that we group ranges together during vectored read.
|
||||||
|
* Setting this value to 0 will disable merging of ranges.
|
||||||
|
* Value : {@value}.
|
||||||
|
*/
|
||||||
|
public static final String AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE =
|
||||||
|
"fs.s3a.vectored.read.max.merged.size";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default minimum seek in bytes during vectored reads : {@value}.
|
||||||
|
*/
|
||||||
|
public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default maximum read size in bytes during vectored reads : {@value}.
|
||||||
|
*/
|
||||||
|
public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
|
||||||
}
|
}
|
||||||
|
@ -313,6 +313,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||||||
* {@code openFile()}.
|
* {@code openFile()}.
|
||||||
*/
|
*/
|
||||||
private S3AInputPolicy inputPolicy;
|
private S3AInputPolicy inputPolicy;
|
||||||
|
/** Vectored IO context. */
|
||||||
|
private VectoredIOContext vectoredIOContext;
|
||||||
|
|
||||||
|
private long readAhead;
|
||||||
private ChangeDetectionPolicy changeDetectionPolicy;
|
private ChangeDetectionPolicy changeDetectionPolicy;
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
private volatile boolean isClosed = false;
|
private volatile boolean isClosed = false;
|
||||||
@ -584,6 +588,7 @@ public void initialize(URI name, Configuration originalConf)
|
|||||||
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
|
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
|
||||||
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
|
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
|
||||||
inputPolicy);
|
inputPolicy);
|
||||||
|
vectoredIOContext = populateVectoredIOContext(conf);
|
||||||
} catch (AmazonClientException e) {
|
} catch (AmazonClientException e) {
|
||||||
// amazon client exception: stop all services then throw the translation
|
// amazon client exception: stop all services then throw the translation
|
||||||
cleanupWithLogger(LOG, span);
|
cleanupWithLogger(LOG, span);
|
||||||
@ -597,6 +602,23 @@ public void initialize(URI name, Configuration originalConf)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Populates the configurations related to vectored IO operation
|
||||||
|
* in the context which has to passed down to input streams.
|
||||||
|
* @param conf configuration object.
|
||||||
|
* @return VectoredIOContext.
|
||||||
|
*/
|
||||||
|
private VectoredIOContext populateVectoredIOContext(Configuration conf) {
|
||||||
|
final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
|
||||||
|
DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
|
||||||
|
final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
|
||||||
|
DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
|
||||||
|
return new VectoredIOContext()
|
||||||
|
.setMinSeekForVectoredReads(minSeekVectored)
|
||||||
|
.setMaxReadSizeForVectoredReads(maxReadSizeVectored)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the client side encryption gauge to 0 or 1, indicating if CSE is
|
* Set the client side encryption gauge to 0 or 1, indicating if CSE is
|
||||||
* enabled through the gauge or not.
|
* enabled through the gauge or not.
|
||||||
@ -1552,7 +1574,8 @@ protected S3AReadOpContext createReadContext(
|
|||||||
invoker,
|
invoker,
|
||||||
statistics,
|
statistics,
|
||||||
statisticsContext,
|
statisticsContext,
|
||||||
fileStatus)
|
fileStatus,
|
||||||
|
vectoredIOContext)
|
||||||
.withAuditSpan(auditSpan);
|
.withAuditSpan(auditSpan);
|
||||||
openFileHelper.applyDefaultOptions(roc);
|
openFileHelper.applyDefaultOptions(roc);
|
||||||
return roc.build();
|
return roc.build();
|
||||||
|
@ -145,6 +145,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
|
|||||||
private S3AInputPolicy inputPolicy;
|
private S3AInputPolicy inputPolicy;
|
||||||
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
|
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
|
||||||
|
|
||||||
|
/** Vectored IO context. */
|
||||||
|
private final VectoredIOContext vectoredIOContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the actual position within the object, used by
|
* This is the actual position within the object, used by
|
||||||
* lazy seek to decide whether to seek on the next read or not.
|
* lazy seek to decide whether to seek on the next read or not.
|
||||||
@ -212,6 +215,7 @@ public S3AInputStream(S3AReadOpContext ctx,
|
|||||||
setReadahead(ctx.getReadahead());
|
setReadahead(ctx.getReadahead());
|
||||||
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
|
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
|
||||||
this.unboundedThreadPool = unboundedThreadPool;
|
this.unboundedThreadPool = unboundedThreadPool;
|
||||||
|
this.vectoredIOContext = context.getVectoredIOContext();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -859,6 +863,7 @@ public String toString() {
|
|||||||
sb.append(" remainingInCurrentRequest=")
|
sb.append(" remainingInCurrentRequest=")
|
||||||
.append(remainingInCurrentRequest());
|
.append(remainingInCurrentRequest());
|
||||||
sb.append(" ").append(changeTracker);
|
sb.append(" ").append(changeTracker);
|
||||||
|
sb.append(" ").append(vectoredIOContext);
|
||||||
sb.append('\n').append(s);
|
sb.append('\n').append(s);
|
||||||
sb.append('}');
|
sb.append('}');
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
@ -905,6 +910,22 @@ public void readFully(long position, byte[] buffer, int offset, int length)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int minSeekForVectorReads() {
|
||||||
|
return vectoredIOContext.getMinSeekForVectorReads();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int maxReadSizeForVectorReads() {
|
||||||
|
return vectoredIOContext.getMaxReadSizeForVectorReads();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
* Vectored read implementation for S3AInputStream.
|
* Vectored read implementation for S3AInputStream.
|
||||||
|
@ -64,6 +64,12 @@ public class S3AReadOpContext extends S3AOpContext {
|
|||||||
*/
|
*/
|
||||||
private long asyncDrainThreshold;
|
private long asyncDrainThreshold;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Vectored IO context for vectored read api
|
||||||
|
* in {@code S3AInputStream#readVectored(List, IntFunction)}.
|
||||||
|
*/
|
||||||
|
private final VectoredIOContext vectoredIOContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiate.
|
* Instantiate.
|
||||||
* @param path path of read
|
* @param path path of read
|
||||||
@ -71,17 +77,19 @@ public class S3AReadOpContext extends S3AOpContext {
|
|||||||
* @param stats Fileystem statistics (may be null)
|
* @param stats Fileystem statistics (may be null)
|
||||||
* @param instrumentation statistics context
|
* @param instrumentation statistics context
|
||||||
* @param dstFileStatus target file status
|
* @param dstFileStatus target file status
|
||||||
|
* @param vectoredIOContext context for vectored read operation.
|
||||||
*/
|
*/
|
||||||
public S3AReadOpContext(
|
public S3AReadOpContext(
|
||||||
final Path path,
|
final Path path,
|
||||||
Invoker invoker,
|
Invoker invoker,
|
||||||
@Nullable FileSystem.Statistics stats,
|
@Nullable FileSystem.Statistics stats,
|
||||||
S3AStatisticsContext instrumentation,
|
S3AStatisticsContext instrumentation,
|
||||||
FileStatus dstFileStatus) {
|
FileStatus dstFileStatus,
|
||||||
|
VectoredIOContext vectoredIOContext) {
|
||||||
super(invoker, stats, instrumentation,
|
super(invoker, stats, instrumentation,
|
||||||
dstFileStatus);
|
dstFileStatus);
|
||||||
this.path = requireNonNull(path);
|
this.path = requireNonNull(path);
|
||||||
|
this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -145,6 +153,7 @@ public AuditSpan getAuditSpan() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
<<<<<<< HEAD
|
||||||
* Set builder value.
|
* Set builder value.
|
||||||
* @param value new value
|
* @param value new value
|
||||||
* @return the builder
|
* @return the builder
|
||||||
@ -199,6 +208,14 @@ public long getAsyncDrainThreshold() {
|
|||||||
return asyncDrainThreshold;
|
return asyncDrainThreshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Vectored IO context for this this read op.
|
||||||
|
* @return vectored IO context.
|
||||||
|
*/
|
||||||
|
public VectoredIOContext getVectoredIOContext() {
|
||||||
|
return vectoredIOContext;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
final StringBuilder sb = new StringBuilder(
|
final StringBuilder sb = new StringBuilder(
|
||||||
|
@ -0,0 +1,78 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.function.IntFunction;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context related to vectored IO operation.
|
||||||
|
* See {@link S3AInputStream#readVectored(List, IntFunction)}.
|
||||||
|
*/
|
||||||
|
public class VectoredIOContext {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the smallest reasonable seek that we should group
|
||||||
|
* ranges together during vectored read operation.
|
||||||
|
*/
|
||||||
|
private int minSeekForVectorReads;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* What is the largest size that we should group ranges
|
||||||
|
* together during vectored read operation.
|
||||||
|
* Setting this value 0 will disable merging of ranges.
|
||||||
|
*/
|
||||||
|
private int maxReadSizeForVectorReads;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default no arg constructor.
|
||||||
|
*/
|
||||||
|
public VectoredIOContext() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public VectoredIOContext setMinSeekForVectoredReads(int minSeek) {
|
||||||
|
this.minSeekForVectorReads = minSeek;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) {
|
||||||
|
this.maxReadSizeForVectorReads = maxSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public VectoredIOContext build() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMinSeekForVectorReads() {
|
||||||
|
return minSeekForVectorReads;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxReadSizeForVectorReads() {
|
||||||
|
return maxReadSizeForVectorReads;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "VectoredIOContext{" +
|
||||||
|
"minSeekForVectorReads=" + minSeekForVectorReads +
|
||||||
|
", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
@ -55,6 +55,36 @@ it isn't, and some attempts to preserve the metaphor are "aggressively suboptima
|
|||||||
|
|
||||||
To make most efficient use of S3, care is needed.
|
To make most efficient use of S3, care is needed.
|
||||||
|
|
||||||
|
## <a name="vectoredIO"></a> Improving read performance using Vectored IO
|
||||||
|
The S3A FileSystem supports implementation of vectored read api using which
|
||||||
|
a client can provide a list of file ranges to read returning a future read
|
||||||
|
object associated with each range. For full api specification please see
|
||||||
|
[FSDataInputStream](../../hadoop-common-project/hadoop-common/filesystem/fsdatainputstream.html).
|
||||||
|
|
||||||
|
The following properties can be configured to optimise vectored reads based
|
||||||
|
on the client requirements.
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.vectored.read.min.seek.size</name>
|
||||||
|
<value>4K</value>
|
||||||
|
<description>
|
||||||
|
What is the smallest reasonable seek in bytes such
|
||||||
|
that we group ranges together during vectored
|
||||||
|
read operation.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.vectored.read.max.merged.size</name>
|
||||||
|
<value>1M</value>
|
||||||
|
<description>
|
||||||
|
What is the largest merged read size in bytes such
|
||||||
|
that we group ranges together during vectored read.
|
||||||
|
Setting this value to 0 will disable merging of ranges.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
```
|
||||||
|
|
||||||
## <a name="fadvise"></a> Improving data input performance through fadvise
|
## <a name="fadvise"></a> Improving data input performance through fadvise
|
||||||
|
|
||||||
The S3A Filesystem client supports the notion of input policies, similar
|
The S3A Filesystem client supports the notion of input policies, similar
|
||||||
|
@ -18,15 +18,23 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.fs.contract.s3a;
|
package org.apache.hadoop.fs.contract.s3a;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileRange;
|
import org.apache.hadoop.fs.FileRange;
|
||||||
import org.apache.hadoop.fs.FileRangeImpl;
|
import org.apache.hadoop.fs.FileRangeImpl;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
|
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
|
||||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||||
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import static org.apache.hadoop.test.MoreAsserts.assertEqual;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
|
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
|
||||||
|
|
||||||
@ -42,7 +50,6 @@ protected AbstractFSContract createContract(Configuration conf) {
|
|||||||
/**
|
/**
|
||||||
* Overriding in S3 vectored read api fails fast in case of EOF
|
* Overriding in S3 vectored read api fails fast in case of EOF
|
||||||
* requested range.
|
* requested range.
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void testEOFRanges() throws Exception {
|
public void testEOFRanges() throws Exception {
|
||||||
@ -51,4 +58,45 @@ public void testEOFRanges() throws Exception {
|
|||||||
fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
|
fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
|
||||||
testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected");
|
testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception {
|
||||||
|
Configuration conf = getFileSystem().getConf();
|
||||||
|
S3ATestUtils.removeBaseAndBucketOverrides(conf,
|
||||||
|
Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
|
||||||
|
Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
|
||||||
|
S3ATestUtils.disableFilesystemCaching(conf);
|
||||||
|
final int configuredMinSeek = 2 * 1024;
|
||||||
|
final int configuredMaxSize = 10 * 1024 * 1024;
|
||||||
|
conf.set(Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K");
|
||||||
|
conf.set(Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M");
|
||||||
|
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
|
||||||
|
try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
||||||
|
int newMinSeek = fis.minSeekForVectorReads();
|
||||||
|
int newMaxSize = fis.maxReadSizeForVectorReads();
|
||||||
|
assertEqual(newMinSeek, configuredMinSeek,
|
||||||
|
"configured s3a min seek for vectored reads");
|
||||||
|
assertEqual(newMaxSize, configuredMaxSize,
|
||||||
|
"configured s3a max size for vectored reads");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMinSeekAndMaxSizeDefaultValues() throws Exception {
|
||||||
|
Configuration conf = getFileSystem().getConf();
|
||||||
|
S3ATestUtils.removeBaseAndBucketOverrides(conf,
|
||||||
|
Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
|
||||||
|
Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE);
|
||||||
|
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
|
||||||
|
try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
|
||||||
|
int minSeek = fis.minSeekForVectorReads();
|
||||||
|
int maxSize = fis.maxReadSizeForVectorReads();
|
||||||
|
assertEqual(minSeek, Constants.DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
|
||||||
|
"default s3a min seek for vectored reads");
|
||||||
|
assertEqual(maxSize, Constants.DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
|
||||||
|
"default s3a max read size for vectored reads");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user