HADOOP-16202. Enhanced openFile(): hadoop-aws changes. (#2584/3)

S3A input stream support for the few fs.option.openfile settings.
As well as supporting the read policy option and values,
if the file length is declared in fs.option.openfile.length
then no HEAD request will be issued when opening a file.
This can cut a few tens of milliseconds off the operation.

The patch adds a new openfile parameter/FS configuration option
fs.s3a.input.async.drain.threshold (default: 16000).
It declares the number of bytes remaining in the http input stream
above which any operation to read and discard the rest of the stream,
"draining", is executed asynchronously.
This asynchronous draining offers some performance benefit on seek-heavy
file IO.

Contributed by Steve Loughran.

Change-Id: I9b0626bbe635e9fd97ac0f463f5e7167e0111e39
This commit is contained in:
Steve Loughran 2022-04-24 17:23:19 +01:00
parent e123de9f19
commit 9db61adeda
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
38 changed files with 2070 additions and 380 deletions

View File

@ -28,6 +28,11 @@
<Method name="s3Exists" /> <Method name="s3Exists" />
<Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" /> <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
</Match> </Match>
<!-- we are using completable futures, so ignore the Future which submit() returns -->
<Match>
<Class name="org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
</Match>
<!-- <!--
findbugs gets confused by lambda expressions in synchronized methods findbugs gets confused by lambda expressions in synchronized methods

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -602,37 +603,69 @@ public final class Constants {
public static final String READAHEAD_RANGE = "fs.s3a.readahead.range"; public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024; public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
/**
* The threshold at which drain operations switch
* to being asynchronous with the schedule/wait overhead
* compared to synchronous.
* Value: {@value}
*/
public static final String ASYNC_DRAIN_THRESHOLD = "fs.s3a.input.async.drain.threshold";
/**
* This is a number based purely on experimentation in
* {@code ITestS3AInputStreamPerformance}.
* Value: {@value}
*/
public static final int DEFAULT_ASYNC_DRAIN_THRESHOLD = 16_000;
/** /**
* Which input strategy to use for buffering, seeking and similar when * Which input strategy to use for buffering, seeking and similar when
* reading data. * reading data.
* Value: {@value} * Value: {@value}
*/ */
@InterfaceStability.Unstable
public static final String INPUT_FADVISE = public static final String INPUT_FADVISE =
"fs.s3a.experimental.input.fadvise"; "fs.s3a.experimental.input.fadvise";
/**
* The default value for this FS.
* Which for S3A, is adaptive.
* Value: {@value}
* @deprecated use the {@link Options.OpenFileOptions} value
* in code which only needs to be compiled against newer hadoop
* releases.
*/
public static final String INPUT_FADV_DEFAULT =
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
/** /**
* General input. Some seeks, some reads. * General input. Some seeks, some reads.
* The policy name "default" is standard across different stores,
* and should be preferred.
* Value: {@value} * Value: {@value}
*/ */
@InterfaceStability.Unstable
public static final String INPUT_FADV_NORMAL = "normal"; public static final String INPUT_FADV_NORMAL = "normal";
/** /**
* Optimized for sequential access. * Optimized for sequential access.
* Value: {@value} * Value: {@value}
* @deprecated use the {@link Options.OpenFileOptions} value
* in code which only needs to be compiled against newer hadoop
* releases.
*/ */
@InterfaceStability.Unstable public static final String INPUT_FADV_SEQUENTIAL =
public static final String INPUT_FADV_SEQUENTIAL = "sequential"; Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
/** /**
* Optimized purely for random seek+read/positionedRead operations; * Optimized purely for random seek+read/positionedRead operations;
* The performance of sequential IO may be reduced in exchange for * The performance of sequential IO may be reduced in exchange for
* more efficient {@code seek()} operations. * more efficient {@code seek()} operations.
* Value: {@value} * Value: {@value}
* @deprecated use the {@link Options.OpenFileOptions} value
* in code which only needs to be compiled against newer hadoop
* releases.
*/ */
@InterfaceStability.Unstable public static final String INPUT_FADV_RANDOM =
public static final String INPUT_FADV_RANDOM = "random"; Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
/** /**
* Gauge name for the input policy : {@value}. * Gauge name for the input policy : {@value}.

View File

@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.apache.hadoop.util.functional.CallableRaisingIOE;
@ -38,6 +39,8 @@ import org.apache.hadoop.util.functional.FutureIO;
import org.apache.hadoop.util.functional.InvocationRaisingIOE; import org.apache.hadoop.util.functional.InvocationRaisingIOE;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
/** /**
* Class to provide lambda expression invocation of AWS operations. * Class to provide lambda expression invocation of AWS operations.
* *
@ -122,6 +125,31 @@ public class Invoker {
} }
} }
/**
* Execute a function, translating any exception into an IOException.
* The supplied duration tracker instance is updated with success/failure.
* @param action action to execute (used in error messages)
* @param path path of work (used in error messages)
* @param tracker tracker to update
* @param operation operation to execute
* @param <T> type of return value
* @return the result of the function call
* @throws IOException any IOE raised, or translated exception
*/
@Retries.OnceTranslated
public static <T> T onceTrackingDuration(
final String action,
final String path,
final DurationTracker tracker,
final CallableRaisingIOE<T> operation)
throws IOException {
try {
return invokeTrackingDuration(tracker, operation);
} catch (AmazonClientException e) {
throw S3AUtils.translateException(action, path, e);
}
}
/** /**
* Execute an operation with no result. * Execute an operation with no result.
* @param action action to execute (used in error messages) * @param action action to execute (used in error messages)

View File

@ -90,6 +90,7 @@ import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Globber; import org.apache.hadoop.fs.Globber;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
@ -109,6 +110,7 @@ import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
import org.apache.hadoop.fs.s3a.impl.InternalConstants; import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks; import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.MkdirOperation; import org.apache.hadoop.fs.s3a.impl.MkdirOperation;
import org.apache.hadoop.fs.s3a.impl.OpenFileSupport;
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks; import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.RenameOperation; import org.apache.hadoop.fs.s3a.impl.RenameOperation;
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl; import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
@ -116,7 +118,6 @@ import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTracker;
@ -169,6 +170,7 @@ import org.apache.hadoop.fs.s3a.select.SelectConstants;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard; import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.s3a.statistics.impl.BondedS3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.impl.BondedS3AStatisticsContext;
import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.fs.s3native.S3xLoginHelper;
@ -187,7 +189,8 @@ import org.apache.hadoop.util.functional.CallableRaisingIOE;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Invoker.*; import static org.apache.hadoop.fs.s3a.Invoker.*;
@ -298,7 +301,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** Storage Statistics Bonded to the instrumentation. */ /** Storage Statistics Bonded to the instrumentation. */
private S3AStorageStatistics storageStatistics; private S3AStorageStatistics storageStatistics;
private long readAhead; /**
* Default input policy; may be overridden in
* {@code openFile()}.
*/
private S3AInputPolicy inputPolicy; private S3AInputPolicy inputPolicy;
private ChangeDetectionPolicy changeDetectionPolicy; private ChangeDetectionPolicy changeDetectionPolicy;
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
@ -327,6 +333,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private final ListingOperationCallbacks listingOperationCallbacks = private final ListingOperationCallbacks listingOperationCallbacks =
new ListingOperationCallbacksImpl(); new ListingOperationCallbacksImpl();
/**
* Helper for the openFile() method.
*/
private OpenFileSupport openFileHelper;
/** /**
* Directory policy. * Directory policy.
*/ */
@ -465,9 +477,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
readAhead = longBytesOption(conf, READAHEAD_RANGE,
DEFAULT_READAHEAD_RANGE, 0);
initThreadPools(conf); initThreadPools(conf);
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION); int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
@ -508,7 +517,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
doBucketProbing(); doBucketProbing();
inputPolicy = S3AInputPolicy.getPolicy( inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL)); conf.getTrimmed(INPUT_FADVISE,
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
S3AInputPolicy.Normal);
LOG.debug("Input fadvise policy = {}", inputPolicy); LOG.debug("Input fadvise policy = {}", inputPolicy);
changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf); changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
LOG.debug("Change detection policy = {}", changeDetectionPolicy); LOG.debug("Change detection policy = {}", changeDetectionPolicy);
@ -555,6 +566,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE, checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
"page size out of range: %s", pageSize); "page size out of range: %s", pageSize);
listing = new Listing(listingOperationCallbacks, createStoreContext()); listing = new Listing(listingOperationCallbacks, createStoreContext());
// now the open file logic
openFileHelper = new OpenFileSupport(
changeDetectionPolicy,
longBytesOption(conf, READAHEAD_RANGE,
DEFAULT_READAHEAD_RANGE, 0),
username,
intOption(conf, IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT, 0),
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
inputPolicy);
} catch (AmazonClientException e) { } catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation // amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span); cleanupWithLogger(LOG, span);
@ -1178,15 +1200,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return fixBucketRegion(region); return fixBucketRegion(region);
} }
/**
* Returns the read ahead range value used by this filesystem.
* @return the readahead range
*/
@VisibleForTesting
long getReadAheadRange() {
return readAhead;
}
/** /**
* Get the input policy for this FS instance. * Get the input policy for this FS instance.
* @return the input policy * @return the input policy
@ -1268,13 +1281,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** /**
* Change the input policy for this FS. * Change the input policy for this FS.
* This is now a no-op, retained in case some application
* or external test invokes it.
*
* @deprecated use openFile() options
* @param inputPolicy new policy * @param inputPolicy new policy
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
@Deprecated
public void setInputPolicy(S3AInputPolicy inputPolicy) { public void setInputPolicy(S3AInputPolicy inputPolicy) {
Objects.requireNonNull(inputPolicy, "Null inputStrategy"); LOG.warn("setInputPolicy is no longer supported");
LOG.debug("Setting input strategy: {}", inputPolicy);
this.inputPolicy = inputPolicy;
} }
/** /**
@ -1392,64 +1408,46 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
@Retries.RetryTranslated @Retries.RetryTranslated
public FSDataInputStream open(Path f, int bufferSize) public FSDataInputStream open(Path f, int bufferSize)
throws IOException { throws IOException {
return open(f, Optional.empty(), Optional.empty()); return executeOpen(qualify(f),
openFileHelper.openSimpleFile(bufferSize));
} }
/** /**
* Opens an FSDataInputStream at the indicated Path. * Opens an FSDataInputStream at the indicated Path.
* if status contains an S3AFileStatus reference, it is used * The {@code fileInformation} parameter controls how the file
* and so a HEAD request to the store is avoided. * is opened, whether it is normal vs. an S3 select call,
* * can a HEAD be skipped, etc.
* @param file the file to open * @param path the file to open
* @param options configuration options if opened with the builder API. * @param fileInformation information about the file to open
* @param providedStatus optional file status.
* @throws IOException IO failure. * @throws IOException IO failure.
*/ */
@Retries.RetryTranslated
@AuditEntryPoint @AuditEntryPoint
private FSDataInputStream open( @Retries.RetryTranslated
final Path file, private FSDataInputStream executeOpen(
final Optional<Configuration> options, final Path path,
final Optional<S3AFileStatus> providedStatus) final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException { throws IOException {
// create the input stream statistics before opening
final Path path = qualify(file); // the file so that the time to prepare to open the file is included.
S3AInputStreamStatistics inputStreamStats =
statisticsContext.newInputStreamStatistics();
// this span is passed into the stream. // this span is passed into the stream.
final AuditSpan auditSpan = entryPoint(INVOCATION_OPEN, path); final AuditSpan auditSpan = entryPoint(INVOCATION_OPEN, path);
S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, final S3AFileStatus fileStatus =
providedStatus); trackDuration(inputStreamStats,
ACTION_FILE_OPENED.getSymbol(), () ->
S3AReadOpContext readContext; extractOrFetchSimpleFileStatus(path, fileInformation));
if (options.isPresent()) { S3AReadOpContext readContext = createReadContext(
Configuration o = options.get(); fileStatus,
// normal path. Open the file with the chosen seek policy, if different auditSpan);
// from the normal one. fileInformation.applyOptions(readContext);
// and readahead.
S3AInputPolicy policy = S3AInputPolicy.getPolicy(
o.get(INPUT_FADVISE, inputPolicy.toString()));
long readAheadRange2 = o.getLong(READAHEAD_RANGE, readAhead);
// TODO support change detection policy from options?
readContext = createReadContext(
fileStatus,
policy,
changeDetectionPolicy,
readAheadRange2,
auditSpan);
} else {
readContext = createReadContext(
fileStatus,
inputPolicy,
changeDetectionPolicy,
readAhead,
auditSpan);
}
LOG.debug("Opening '{}'", readContext); LOG.debug("Opening '{}'", readContext);
return new FSDataInputStream( return new FSDataInputStream(
new S3AInputStream( new S3AInputStream(
readContext, readContext.build(),
createObjectAttributes(fileStatus), createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan))); createInputStreamCallbacks(auditSpan),
inputStreamStats));
} }
/** /**
@ -1503,34 +1501,40 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return s3.getObject(request); return s3.getObject(request);
} }
} }
@Override
public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
CompletableFuture<T> result = new CompletableFuture<>();
unboundedThreadPool.submit(() ->
LambdaUtils.eval(result, () -> {
try (AuditSpan span = auditSpan.activate()) {
return operation.apply();
}
}));
return result;
}
} }
/** /**
* Create the read context for reading from the referenced file, * Create the read context for reading from the referenced file,
* using FS state as well as the status. * using FS state as well as the status.
* @param fileStatus file status. * @param fileStatus file status.
* @param seekPolicy input policy for this operation
* @param changePolicy change policy for this operation.
* @param readAheadRange readahead value.
* @param auditSpan audit span. * @param auditSpan audit span.
* @return a context for read and select operations. * @return a context for read and select operations.
*/ */
@VisibleForTesting @VisibleForTesting
protected S3AReadOpContext createReadContext( protected S3AReadOpContext createReadContext(
final FileStatus fileStatus, final FileStatus fileStatus,
final S3AInputPolicy seekPolicy,
final ChangeDetectionPolicy changePolicy,
final long readAheadRange,
final AuditSpan auditSpan) { final AuditSpan auditSpan) {
return new S3AReadOpContext(fileStatus.getPath(), final S3AReadOpContext roc = new S3AReadOpContext(
fileStatus.getPath(),
invoker, invoker,
statistics, statistics,
statisticsContext, statisticsContext,
fileStatus, fileStatus)
seekPolicy, .withAuditSpan(auditSpan);
changePolicy, openFileHelper.applyDefaultOptions(roc);
readAheadRange, return roc.build();
auditSpan);
} }
/** /**
@ -1558,13 +1562,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** /**
* Create the attributes of an object for subsequent use. * Create the attributes of an object for subsequent use.
* @param path path -this is used over the file status path.
* @param fileStatus file status to build from. * @param fileStatus file status to build from.
* @return attributes to use when building the query. * @return attributes to use when building the query.
*/ */
private S3ObjectAttributes createObjectAttributes( private S3ObjectAttributes createObjectAttributes(
final Path path,
final S3AFileStatus fileStatus) { final S3AFileStatus fileStatus) {
return createObjectAttributes( return createObjectAttributes(
fileStatus.getPath(), path,
fileStatus.getEtag(), fileStatus.getEtag(),
fileStatus.getVersionId(), fileStatus.getVersionId(),
fileStatus.getLen()); fileStatus.getLen());
@ -1980,14 +1986,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
@Override @Override
public S3ObjectAttributes createObjectAttributes( public S3ObjectAttributes createObjectAttributes(
final S3AFileStatus fileStatus) { final S3AFileStatus fileStatus) {
return S3AFileSystem.this.createObjectAttributes(fileStatus); return S3AFileSystem.this.createObjectAttributes(
fileStatus.getPath(),
fileStatus);
} }
@Override @Override
public S3AReadOpContext createReadContext(final FileStatus fileStatus) { public S3AReadOpContext createReadContext(final FileStatus fileStatus) {
return S3AFileSystem.this.createReadContext(fileStatus, return S3AFileSystem.this.createReadContext(fileStatus,
inputPolicy,
changeDetectionPolicy, readAhead,
auditSpan); auditSpan);
} }
@ -4084,9 +4090,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** /**
* Return the number of bytes that large input files should be optimally * Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time. * be split into to minimize I/O time.
* @deprecated use {@link #getDefaultBlockSize(Path)} instead
*/ */
@Deprecated
public long getDefaultBlockSize() { public long getDefaultBlockSize() {
return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE); return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
} }
@ -4105,14 +4109,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
"S3AFileSystem{"); "S3AFileSystem{");
sb.append("uri=").append(uri); sb.append("uri=").append(uri);
sb.append(", workingDir=").append(workingDir); sb.append(", workingDir=").append(workingDir);
sb.append(", inputPolicy=").append(inputPolicy);
sb.append(", partSize=").append(partSize); sb.append(", partSize=").append(partSize);
sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete); sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
sb.append(", maxKeys=").append(maxKeys); sb.append(", maxKeys=").append(maxKeys);
if (cannedACL != null) { if (cannedACL != null) {
sb.append(", cannedACL=").append(cannedACL.toString()); sb.append(", cannedACL=").append(cannedACL);
}
if (openFileHelper != null) {
sb.append(", ").append(openFileHelper);
} }
sb.append(", readAhead=").append(readAhead);
if (getConf() != null) { if (getConf() != null) {
sb.append(", blockSize=").append(getDefaultBlockSize()); sb.append(", blockSize=").append(getDefaultBlockSize());
} }
@ -4798,23 +4803,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
@Retries.RetryTranslated @Retries.RetryTranslated
@AuditEntryPoint @AuditEntryPoint
private FSDataInputStream select(final Path source, private FSDataInputStream select(final Path source,
final String expression,
final Configuration options, final Configuration options,
final Optional<S3AFileStatus> providedStatus) final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException { throws IOException {
final AuditSpan auditSpan = entryPoint(OBJECT_SELECT_REQUESTS, source);
requireSelectSupport(source); requireSelectSupport(source);
final AuditSpan auditSpan = entryPoint(OBJECT_SELECT_REQUESTS, source);
final Path path = makeQualified(source); final Path path = makeQualified(source);
String expression = fileInformation.getSql();
final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
providedStatus); fileInformation);
// readahead range can be dynamically set // readahead range can be dynamically set
long ra = options.getLong(READAHEAD_RANGE, readAhead); S3ObjectAttributes objectAttributes = createObjectAttributes(
S3ObjectAttributes objectAttributes = createObjectAttributes(fileStatus); path, fileStatus);
S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy, ChangeDetectionPolicy changePolicy = fileInformation.getChangePolicy();
changeDetectionPolicy, ra, auditSpan); S3AReadOpContext readContext = createReadContext(
fileStatus,
auditSpan);
fileInformation.applyOptions(readContext);
if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None
&& fileStatus.getEtag() != null) { && fileStatus.getEtag() != null) {
// if there is change detection, and the status includes at least an // if there is change detection, and the status includes at least an
// etag, // etag,
@ -4826,7 +4834,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// version in the final read; nor can we check the etag match) // version in the final read; nor can we check the etag match)
ChangeTracker changeTracker = ChangeTracker changeTracker =
new ChangeTracker(uri.toString(), new ChangeTracker(uri.toString(),
changeDetectionPolicy, changePolicy,
readContext.getS3AStatisticsContext() readContext.getS3AStatisticsContext()
.newInputStreamStatistics() .newInputStreamStatistics()
.getChangeTrackerStatistics(), .getChangeTrackerStatistics(),
@ -4864,38 +4872,36 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
} }
/** /**
* Extract the status from the optional parameter, querying * Get the file status of the source file.
* S3 if it is absent. * If in the fileInformation parameter return that
* @param path path of the status * if not found, issue a HEAD request, looking for a
* @param optStatus optional status * file only.
* @param path path of the file to open
* @param fileInformation information on the file to open
* @return a file status * @return a file status
* @throws FileNotFoundException if there is no normal file at that path * @throws FileNotFoundException if a HEAD request found no file
* @throws IOException IO failure * @throws IOException IO failure
*/ */
private S3AFileStatus extractOrFetchSimpleFileStatus( private S3AFileStatus extractOrFetchSimpleFileStatus(
final Path path, final Optional<S3AFileStatus> optStatus) final Path path,
final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException { throws IOException {
S3AFileStatus fileStatus; S3AFileStatus fileStatus = fileInformation.getStatus();
if (optStatus.isPresent()) { if (fileStatus == null) {
fileStatus = optStatus.get();
// we check here for the passed in status // we check here for the passed in status
// being a directory // being a directory
if (fileStatus.isDirectory()) {
throw new FileNotFoundException(path.toString() + " is a directory");
}
} else {
// Executes a HEAD only.
// therefore: if there is is a dir marker, this
// will raise a FileNotFoundException
fileStatus = innerGetFileStatus(path, false, fileStatus = innerGetFileStatus(path, false,
StatusProbeEnum.HEAD_ONLY); StatusProbeEnum.HEAD_ONLY);
} }
if (fileStatus.isDirectory()) {
throw new FileNotFoundException(path.toString() + " is a directory");
}
return fileStatus; return fileStatus;
} }
/** /**
* Initiate the open or select operation. * Initiate the open() or select() operation.
* This is invoked from both the FileSystem and FileContext APIs. * This is invoked from both the FileSystem and FileContext APIs.
* It's declared as an audit entry point but the span creation is pushed * It's declared as an audit entry point but the span creation is pushed
* down into the open/select methods it ultimately calls. * down into the open/select methods it ultimately calls.
@ -4914,54 +4920,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
final Path rawPath, final Path rawPath,
final OpenFileParameters parameters) throws IOException { final OpenFileParameters parameters) throws IOException {
final Path path = qualify(rawPath); final Path path = qualify(rawPath);
Configuration options = parameters.getOptions(); OpenFileSupport.OpenFileInformation fileInformation =
Set<String> mandatoryKeys = parameters.getMandatoryKeys(); openFileHelper.prepareToOpenFile(
String sql = options.get(SelectConstants.SELECT_SQL, null); path,
boolean isSelect = sql != null; parameters,
// choice of keys depends on open type getDefaultBlockSize());
if (isSelect) {
rejectUnknownMandatoryKeys(
mandatoryKeys,
InternalSelectConstants.SELECT_OPTIONS,
"for " + path + " in S3 Select operation");
} else {
rejectUnknownMandatoryKeys(
mandatoryKeys,
InternalConstants.STANDARD_OPENFILE_KEYS,
"for " + path + " in non-select file I/O");
}
FileStatus providedStatus = parameters.getStatus();
S3AFileStatus fileStatus;
if (providedStatus != null) {
Preconditions.checkArgument(path.equals(providedStatus.getPath()),
"FileStatus parameter is not for the path %s: %s",
path, providedStatus);
if (providedStatus instanceof S3AFileStatus) {
// can use this status to skip our own probes,
// including etag and version.
LOG.debug("File was opened with a supplied S3AFileStatus;"
+ " skipping getFileStatus call in open() operation: {}",
providedStatus);
fileStatus = (S3AFileStatus) providedStatus;
} else if (providedStatus instanceof S3ALocatedFileStatus) {
LOG.debug("File was opened with a supplied S3ALocatedFileStatus;"
+ " skipping getFileStatus call in open() operation: {}",
providedStatus);
fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus();
} else {
LOG.debug("Ignoring file status {}", providedStatus);
fileStatus = null;
}
} else {
fileStatus = null;
}
Optional<S3AFileStatus> ost = Optional.ofNullable(fileStatus);
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>(); CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
if (!isSelect) { if (!fileInformation.isS3Select()) {
// normal path. // normal path.
unboundedThreadPool.submit(() -> unboundedThreadPool.submit(() ->
LambdaUtils.eval(result, LambdaUtils.eval(result,
() -> open(path, Optional.of(options), ost))); () -> executeOpen(path, fileInformation)));
} else { } else {
// it is a select statement. // it is a select statement.
// fail fast if the operation is not available // fail fast if the operation is not available
@ -4969,7 +4938,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// submit the query // submit the query
unboundedThreadPool.submit(() -> unboundedThreadPool.submit(() ->
LambdaUtils.eval(result, LambdaUtils.eval(result,
() -> select(path, sql, options, ost))); () -> select(path, parameters.getOptions(), fileInformation)));
} }
return result; return result;
} }

View File

@ -18,32 +18,46 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.classification.InterfaceAudience; import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceStability; import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Locale; import java.util.Locale;
import static org.apache.hadoop.fs.s3a.Constants.*; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
/** /**
* Filesystem input policy. * Stream input policy.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public enum S3AInputPolicy { public enum S3AInputPolicy {
Normal(INPUT_FADV_NORMAL), Normal(FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, false, true),
Sequential(INPUT_FADV_SEQUENTIAL), Random(FS_OPTION_OPENFILE_READ_POLICY_RANDOM, true, false),
Random(INPUT_FADV_RANDOM); Sequential(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, false, false);
private static final Logger LOG = /** Policy name. */
LoggerFactory.getLogger(S3AInputPolicy.class);
private final String policy; private final String policy;
S3AInputPolicy(String policy) { /** Is this random IO? */
private final boolean randomIO;
/** Is this an adaptive policy? */
private final boolean adaptive;
S3AInputPolicy(String policy,
boolean randomIO,
boolean adaptive) {
this.policy = policy; this.policy = policy;
this.randomIO = randomIO;
this.adaptive = adaptive;
} }
@Override @Override
@ -51,26 +65,63 @@ public enum S3AInputPolicy {
return policy; return policy;
} }
String getPolicy() {
return policy;
}
boolean isRandomIO() {
return randomIO;
}
boolean isAdaptive() {
return adaptive;
}
/** /**
* Choose an FS access policy. * Choose an access policy.
* Always returns something,
* primarily by downgrading to "normal" if there is no other match.
* @param name strategy name from a configuration option, etc. * @param name strategy name from a configuration option, etc.
* @param defaultPolicy default policy to fall back to.
* @return the chosen strategy * @return the chosen strategy
*/ */
public static S3AInputPolicy getPolicy(String name) { public static S3AInputPolicy getPolicy(
String name,
@Nullable S3AInputPolicy defaultPolicy) {
String trimmed = name.trim().toLowerCase(Locale.ENGLISH); String trimmed = name.trim().toLowerCase(Locale.ENGLISH);
switch (trimmed) { switch (trimmed) {
case INPUT_FADV_NORMAL: case FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE:
case FS_OPTION_OPENFILE_READ_POLICY_DEFAULT:
case Constants.INPUT_FADV_NORMAL:
return Normal; return Normal;
case INPUT_FADV_RANDOM:
// all these options currently map to random IO.
case FS_OPTION_OPENFILE_READ_POLICY_RANDOM:
case FS_OPTION_OPENFILE_READ_POLICY_VECTOR:
return Random; return Random;
case INPUT_FADV_SEQUENTIAL:
case FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL:
case FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE:
return Sequential; return Sequential;
default: default:
LOG.warn("Unrecognized " + INPUT_FADVISE + " value: \"{}\"", trimmed); return defaultPolicy;
return Normal;
} }
} }
/**
* Scan the list of input policies, returning the first one supported.
* @param policies list of policies.
* @param defaultPolicy fallback
* @return a policy or the defaultPolicy, which may be null
*/
public static S3AInputPolicy getFirstSupportedPolicy(
Collection<String> policies,
@Nullable S3AInputPolicy defaultPolicy) {
for (String s : policies) {
S3AInputPolicy nextPolicy = S3AInputPolicy.getPolicy(s, null);
if (nextPolicy != null) {
return nextPolicy;
}
}
return defaultPolicy;
}
} }

View File

@ -23,8 +23,9 @@ import javax.annotation.Nullable;
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.CanSetReadahead;
@ -37,7 +38,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -46,9 +47,14 @@ import java.io.Closeable;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.concurrent.CompletableFuture;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.util.StringUtils.toLowerCase; import static org.apache.hadoop.util.StringUtils.toLowerCase;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/** /**
* The input stream for an S3A object. * The input stream for an S3A object.
@ -78,6 +84,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
public static final String OPERATION_OPEN = "open"; public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open"; public static final String OPERATION_REOPEN = "re-open";
/**
* size of a buffer to create when draining the stream.
*/
private static final int DRAIN_BUFFER_SIZE = 16384;
/** /**
* This is the public position; the one set in {@link #seek(long)} * This is the public position; the one set in {@link #seek(long)}
* and returned in {@link #getPos()}. * and returned in {@link #getPos()}.
@ -136,6 +147,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/ */
private final IOStatistics ioStatistics; private final IOStatistics ioStatistics;
/**
* Threshold for stream reads to switch to
* asynchronous draining.
*/
private long asyncDrainThreshold;
/** /**
* Create the stream. * Create the stream.
* This does not attempt to open it; that is only done on the first * This does not attempt to open it; that is only done on the first
@ -143,10 +160,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* @param ctx operation context * @param ctx operation context
* @param s3Attributes object attributes * @param s3Attributes object attributes
* @param client S3 client to use * @param client S3 client to use
* @param streamStatistics statistics for this stream
*/ */
public S3AInputStream(S3AReadOpContext ctx, public S3AInputStream(S3AReadOpContext ctx,
S3ObjectAttributes s3Attributes, S3ObjectAttributes s3Attributes,
InputStreamCallbacks client) { InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
"No Bucket"); "No Bucket");
Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
@ -155,12 +174,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
this.context = ctx; this.context = ctx;
this.bucket = s3Attributes.getBucket(); this.bucket = s3Attributes.getBucket();
this.key = s3Attributes.getKey(); this.key = s3Attributes.getKey();
this.pathStr = ctx.dstFileStatus.getPath().toString(); this.pathStr = s3Attributes.getPath().toString();
this.contentLength = l; this.contentLength = l;
this.client = client; this.client = client;
this.uri = "s3a://" + this.bucket + "/" + this.key; this.uri = "s3a://" + this.bucket + "/" + this.key;
this.streamStatistics = ctx.getS3AStatisticsContext() this.streamStatistics = streamStatistics;
.newInputStreamStatistics();
this.ioStatistics = streamStatistics.getIOStatistics(); this.ioStatistics = streamStatistics.getIOStatistics();
this.changeTracker = new ChangeTracker(uri, this.changeTracker = new ChangeTracker(uri,
ctx.getChangeDetectionPolicy(), ctx.getChangeDetectionPolicy(),
@ -168,6 +186,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
s3Attributes); s3Attributes);
setInputPolicy(ctx.getInputPolicy()); setInputPolicy(ctx.getInputPolicy());
setReadahead(ctx.getReadahead()); setReadahead(ctx.getReadahead());
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
} }
/** /**
@ -193,7 +212,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
boolean forceAbort) throws IOException { boolean forceAbort) throws IOException {
if (isObjectStreamOpen()) { if (isObjectStreamOpen()) {
closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort); closeStream("reopen(" + reason + ")", forceAbort, false);
} }
contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos, contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
@ -211,21 +230,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
operation, uri, targetPos); operation, uri, targetPos);
changeTracker.maybeApplyConstraint(request); changeTracker.maybeApplyConstraint(request);
DurationTracker tracker = streamStatistics.initiateGetRequest(); object = onceTrackingDuration(text, uri,
try { streamStatistics.initiateGetRequest(), () ->
object = Invoker.once(text, uri, client.getObject(request));
() -> client.getObject(request));
} catch(IOException e) {
// input function failed: note it
tracker.failed();
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after any catch() call will have
// set the failed flag.
tracker.close();
}
changeTracker.processResponse(object, operation, changeTracker.processResponse(object, operation,
targetPos); targetPos);
@ -333,7 +341,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
streamStatistics.seekBackwards(diff); streamStatistics.seekBackwards(diff);
// if the stream is in "Normal" mode, switch to random IO at this // if the stream is in "Normal" mode, switch to random IO at this
// point, as it is indicative of columnar format IO // point, as it is indicative of columnar format IO
if (inputPolicy.equals(S3AInputPolicy.Normal)) { if (inputPolicy.isAdaptive()) {
LOG.info("Switching to Random IO seek policy"); LOG.info("Switching to Random IO seek policy");
setInputPolicy(S3AInputPolicy.Random); setInputPolicy(S3AInputPolicy.Random);
} }
@ -348,7 +356,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
// if the code reaches here, the stream needs to be reopened. // if the code reaches here, the stream needs to be reopened.
// close the stream; if read the object will be opened at the new pos // close the stream; if read the object will be opened at the new pos
closeStream("seekInStream()", this.contentRangeFinish, false); closeStream("seekInStream()", false, false);
pos = targetPos; pos = targetPos;
} }
@ -458,7 +466,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
uri, client, object); uri, client, object);
} }
streamStatistics.readException(); streamStatistics.readException();
closeStream("failure recovery", contentRangeFinish, forceAbort); closeStream("failure recovery", forceAbort, false);
} }
/** /**
@ -551,8 +559,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
if (!closed) { if (!closed) {
closed = true; closed = true;
try { try {
// close or abort the stream // close or abort the stream; blocking
closeStream("close() operation", this.contentRangeFinish, false); awaitFuture(closeStream("close() operation", false, true));
LOG.debug("Statistics of stream {}\n{}", key, streamStatistics); LOG.debug("Statistics of stream {}\n{}", key, streamStatistics);
// end the client+audit span. // end the client+audit span.
client.close(); client.close();
@ -571,16 +579,26 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* If a close() is attempted and fails, the operation escalates to * If a close() is attempted and fails, the operation escalates to
* an abort. * an abort.
* *
* The close is potentially; a future is returned.
* It's the draining of a stream which is time consuming so
* worth scheduling on a separate thread.
* In stream close, when an abort is issued or when there's no
* data to drain, block.
* This does not set the {@link #closed} flag. * This does not set the {@link #closed} flag.
* @param reason reason for stream being closed; used in messages * @param reason reason for stream being closed; used in messages
* @param length length of the stream.
* @param forceAbort force an abort; used if explicitly requested. * @param forceAbort force an abort; used if explicitly requested.
* @param blocking should the call block for completion, or is async IO allowed
* @return a future for the async operation
*/ */
@Retries.OnceRaw @Retries.OnceRaw
private void closeStream(String reason, long length, boolean forceAbort) { private CompletableFuture<Boolean> closeStream(
final String reason,
final boolean forceAbort,
final boolean blocking) {
if (!isObjectStreamOpen()) { if (!isObjectStreamOpen()) {
// steam is already closed // steam is already closed
return; return CompletableFuture.completedFuture(false);
} }
// if the amount of data remaining in the current request is greater // if the amount of data remaining in the current request is greater
@ -589,58 +607,139 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
LOG.debug("Closing stream {}: {}", reason, LOG.debug("Closing stream {}: {}", reason,
forceAbort ? "abort" : "soft"); forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead; boolean shouldAbort = forceAbort || remaining > readahead;
CompletableFuture<Boolean> operation;
if (blocking || shouldAbort || remaining <= asyncDrainThreshold) {
// don't bother with async io.
operation = CompletableFuture.completedFuture(
drain(shouldAbort, reason, remaining, object, wrappedStream));
} else {
LOG.debug("initiating asynchronous drain of {} bytes", remaining);
// schedule an async drain/abort with references to the fields so they
// can be reused
operation = client.submit(
() -> drain(false, reason, remaining, object, wrappedStream));
}
// either the stream is closed in the blocking call or the async call is
// submitted with its own copy of the references
wrappedStream = null;
object = null;
return operation;
}
/**
* drain the stream. This method is intended to be
* used directly or asynchronously, and measures the
* duration of the operation in the stream statistics.
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object; needed to avoid GC issues.
* @param inner stream to close.
* @return was the stream aborted?
*/
private boolean drain(
final boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final S3ObjectInputStream inner) {
try { try {
if (!shouldAbort) { return invokeTrackingDuration(
try { streamStatistics.initiateInnerStreamClose(shouldAbort),
// clean close. This will read to the end of the stream, () -> drainOrAbortHttpStream(
// so, while cleaner, can be pathological on a multi-GB object shouldAbort,
reason,
// explicitly drain the stream remaining,
long drained = 0; requestObject,
while (wrappedStream.read() >= 0) { inner));
drained++; } catch (IOException e) {
} // this is only here because invokeTrackingDuration() has it in its
LOG.debug("Drained stream of {} bytes", drained); // signature
return shouldAbort;
// now close it
wrappedStream.close();
// this MUST come after the close, so that if the IO operations fail
// and an abort is triggered, the initial attempt's statistics
// aren't collected.
streamStatistics.streamClose(false, drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream",
uri, reason, e);
shouldAbort = true;
}
}
if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
LOG.debug("Aborting stream {}", uri);
try {
wrappedStream.abort();
} catch (Exception e) {
LOG.warn("When aborting {} stream after failing to close it for {}",
uri, reason, e);
}
streamStatistics.streamClose(true, remaining);
}
LOG.debug("Stream {} {}: {}; remaining={} streamPos={},"
+ " nextReadPos={}," +
" request range {}-{} length={}",
uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining, pos, nextReadPos,
contentRangeStart, contentRangeFinish,
length);
} finally {
wrappedStream = null;
object = null;
} }
} }
/**
* Drain or abort the inner stream.
* Exceptions are swallowed.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* This does not set the {@link #closed} flag.
*
* A reference to the stream is passed in so that the instance
* {@link #wrappedStream} field can be reused as soon as this
* method is submitted;
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object; needed to avoid GC issues.
* @param inner stream to close.
* @return was the stream aborted?
*/
private boolean drainOrAbortHttpStream(
boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final S3ObjectInputStream inner) {
// force a use of the request object so IDEs don't warn of
// lack of use.
requireNonNull(requestObject);
if (!shouldAbort) {
try {
// clean close. This will read to the end of the stream,
// so, while cleaner, can be pathological on a multi-GB object
// explicitly drain the stream
long drained = 0;
byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
while (true) {
final int count = inner.read(buffer);
if (count < 0) {
// no more data is left
break;
}
drained += count;
}
LOG.debug("Drained stream of {} bytes", drained);
// now close it
inner.close();
// this MUST come after the close, so that if the IO operations fail
// and an abort is triggered, the initial attempt's statistics
// aren't collected.
streamStatistics.streamClose(false, drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream",
uri, reason, e);
shouldAbort = true;
}
}
if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
LOG.debug("Aborting stream {}", uri);
try {
inner.abort();
} catch (Exception e) {
LOG.warn("When aborting {} stream after failing to close it for {}",
uri, reason, e);
}
streamStatistics.streamClose(true, remaining);
}
LOG.debug("Stream {} {}: {}; remaining={}",
uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining);
return shouldAbort;
}
/** /**
* Forcibly reset the stream, by aborting the connection. The next * Forcibly reset the stream, by aborting the connection. The next
* {@code read()} operation will trigger the opening of a new HTTPS * {@code read()} operation will trigger the opening of a new HTTPS
@ -648,17 +747,17 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* *
* This is potentially very inefficient, and should only be invoked * This is potentially very inefficient, and should only be invoked
* in extreme circumstances. It logs at info for this reason. * in extreme circumstances. It logs at info for this reason.
*
* Blocks until the abort is completed.
*
* @return true if the connection was actually reset. * @return true if the connection was actually reset.
* @throws IOException if invoked on a closed stream. * @throws IOException if invoked on a closed stream.
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
public synchronized boolean resetConnection() throws IOException { public synchronized boolean resetConnection() throws IOException {
checkNotClosed(); checkNotClosed();
if (isObjectStreamOpen()) { LOG.info("Forcing reset of connection to {}", uri);
LOG.info("Forced reset of connection to {}", uri); return awaitFuture(closeStream("reset()", true, true));
closeStream("reset()", contentRangeFinish, true);
}
return isObjectStreamOpen();
} }
@Override @Override
@ -870,7 +969,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
@Override @Override
public synchronized void unbuffer() { public synchronized void unbuffer() {
try { try {
closeStream("unbuffer()", contentRangeFinish, false); closeStream("unbuffer()", false, false);
} finally { } finally {
streamStatistics.unbuffered(); streamStatistics.unbuffered();
} }
@ -918,6 +1017,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
@Retries.OnceRaw @Retries.OnceRaw
S3Object getObject(GetObjectRequest request); S3Object getObject(GetObjectRequest request);
/**
* Submit some asynchronous work, for example, draining a stream.
* @param operation operation to invoke
* @param <T> return type
* @return a future.
*/
<T> CompletableFuture<T> submit(CallableRaisingIOE<T> operation);
} }
} }

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
@ -837,7 +838,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
StreamStatisticNames.STREAM_READ_UNBUFFERED, StreamStatisticNames.STREAM_READ_UNBUFFERED,
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES) StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY) .withGauges(STREAM_READ_GAUGE_INPUT_POLICY)
.withDurationTracking(ACTION_HTTP_GET_REQUEST) .withDurationTracking(ACTION_HTTP_GET_REQUEST,
StoreStatisticNames.ACTION_FILE_OPENED,
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED)
.build(); .build();
setIOStatistics(st); setIOStatistics(st);
aborted = st.getCounterReference( aborted = st.getCounterReference(
@ -1271,6 +1275,12 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
return trackDuration(ACTION_HTTP_GET_REQUEST); return trackDuration(ACTION_HTTP_GET_REQUEST);
} }
@Override
public DurationTracker initiateInnerStreamClose(final boolean abort) {
return trackDuration(abort
? StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED
: StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED);
}
} }
/** /**

View File

@ -29,7 +29,7 @@ import javax.annotation.Nullable;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull; import static java.util.Objects.requireNonNull;
/** /**
* Read-specific operation context struct. * Read-specific operation context struct.
@ -44,19 +44,25 @@ public class S3AReadOpContext extends S3AOpContext {
/** /**
* Initial input policy of the stream. * Initial input policy of the stream.
*/ */
private final S3AInputPolicy inputPolicy; private S3AInputPolicy inputPolicy;
/** /**
* How to detect and deal with the object being updated during read. * How to detect and deal with the object being updated during read.
*/ */
private final ChangeDetectionPolicy changeDetectionPolicy; private ChangeDetectionPolicy changeDetectionPolicy;
/** /**
* Readahead for GET operations/skip, etc. * Readahead for GET operations/skip, etc.
*/ */
private final long readahead; private long readahead;
private final AuditSpan auditSpan; private AuditSpan auditSpan;
/**
* Threshold for stream reads to switch to
* asynchronous draining.
*/
private long asyncDrainThreshold;
/** /**
* Instantiate. * Instantiate.
@ -65,31 +71,33 @@ public class S3AReadOpContext extends S3AOpContext {
* @param stats Fileystem statistics (may be null) * @param stats Fileystem statistics (may be null)
* @param instrumentation statistics context * @param instrumentation statistics context
* @param dstFileStatus target file status * @param dstFileStatus target file status
* @param inputPolicy the input policy
* @param changeDetectionPolicy change detection policy.
* @param readahead readahead for GET operations/skip, etc.
* @param auditSpan active audit
*/ */
public S3AReadOpContext( public S3AReadOpContext(
final Path path, final Path path,
Invoker invoker, Invoker invoker,
@Nullable FileSystem.Statistics stats, @Nullable FileSystem.Statistics stats,
S3AStatisticsContext instrumentation, S3AStatisticsContext instrumentation,
FileStatus dstFileStatus, FileStatus dstFileStatus) {
S3AInputPolicy inputPolicy,
ChangeDetectionPolicy changeDetectionPolicy,
final long readahead,
final AuditSpan auditSpan) {
super(invoker, stats, instrumentation, super(invoker, stats, instrumentation,
dstFileStatus); dstFileStatus);
this.path = checkNotNull(path); this.path = requireNonNull(path);
this.auditSpan = auditSpan; }
/**
* validate the context.
* @return a read operation context ready for use.
*/
public S3AReadOpContext build() {
requireNonNull(inputPolicy, "inputPolicy");
requireNonNull(changeDetectionPolicy, "changeDetectionPolicy");
requireNonNull(auditSpan, "auditSpan");
requireNonNull(inputPolicy, "inputPolicy");
Preconditions.checkArgument(readahead >= 0, Preconditions.checkArgument(readahead >= 0,
"invalid readahead %d", readahead); "invalid readahead %d", readahead);
this.inputPolicy = checkNotNull(inputPolicy); Preconditions.checkArgument(asyncDrainThreshold >= 0,
this.changeDetectionPolicy = checkNotNull(changeDetectionPolicy); "invalid drainThreshold %d", asyncDrainThreshold);
this.readahead = readahead; return this;
} }
/** /**
@ -136,6 +144,61 @@ public class S3AReadOpContext extends S3AOpContext {
return auditSpan; return auditSpan;
} }
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3AReadOpContext withInputPolicy(final S3AInputPolicy value) {
inputPolicy = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3AReadOpContext withChangeDetectionPolicy(
final ChangeDetectionPolicy value) {
changeDetectionPolicy = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3AReadOpContext withReadahead(final long value) {
readahead = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3AReadOpContext withAuditSpan(final AuditSpan value) {
auditSpan = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3AReadOpContext withAsyncDrainThreshold(final long value) {
asyncDrainThreshold = value;
return this;
}
public long getAsyncDrainThreshold() {
return asyncDrainThreshold;
}
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
/** /**
* This class holds attributed of an object independent of the * This class holds attributes of an object independent of the
* file status type. * file status type.
* It is used in {@link S3AInputStream} and the select equivalent. * It is used in {@link S3AInputStream} and the select equivalent.
* as a way to reduce parameters being passed * as a way to reduce parameters being passed
@ -44,6 +44,17 @@ public class S3ObjectAttributes {
private final String versionId; private final String versionId;
private final long len; private final long len;
/**
* Constructor.
* @param bucket s3 bucket
* @param path path
* @param key object key
* @param serverSideEncryptionAlgorithm current encryption algorithm
* @param serverSideEncryptionKey any server side encryption key?
* @param len object length
* @param eTag optional etag
* @param versionId optional version id
*/
public S3ObjectAttributes( public S3ObjectAttributes(
String bucket, String bucket,
Path path, Path path,
@ -70,7 +81,7 @@ public class S3ObjectAttributes {
* @param copyResult copy result. * @param copyResult copy result.
* @param serverSideEncryptionAlgorithm current encryption algorithm * @param serverSideEncryptionAlgorithm current encryption algorithm
* @param serverSideEncryptionKey any server side encryption key? * @param serverSideEncryptionKey any server side encryption key?
* @param len * @param len object length
*/ */
public S3ObjectAttributes( public S3ObjectAttributes(
final Path path, final Path path,

View File

@ -55,6 +55,10 @@ public enum Statistic {
StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST, StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST,
"HEAD request.", "HEAD request.",
TYPE_DURATION), TYPE_DURATION),
ACTION_FILE_OPENED(
StoreStatisticNames.ACTION_FILE_OPENED,
"File opened.",
TYPE_DURATION),
ACTION_HTTP_GET_REQUEST( ACTION_HTTP_GET_REQUEST(
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
"GET request.", "GET request.",
@ -175,6 +179,10 @@ public enum Statistic {
StoreStatisticNames.OP_OPEN, StoreStatisticNames.OP_OPEN,
"Calls of open()", "Calls of open()",
TYPE_COUNTER), TYPE_COUNTER),
INVOCATION_OPENFILE(
StoreStatisticNames.OP_OPENFILE,
"Calls of openFile()",
TYPE_COUNTER),
INVOCATION_RENAME( INVOCATION_RENAME(
StoreStatisticNames.OP_RENAME, StoreStatisticNames.OP_RENAME,
"Calls of rename()", "Calls of rename()",
@ -296,6 +304,15 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_OPERATIONS, StreamStatisticNames.STREAM_READ_OPERATIONS,
"Count of read() operations in an input stream", "Count of read() operations in an input stream",
TYPE_COUNTER), TYPE_COUNTER),
STREAM_READ_REMOTE_STREAM_ABORTED(
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
"Duration of aborting a remote stream during stream IO",
TYPE_DURATION),
STREAM_READ_REMOTE_STREAM_CLOSED(
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
"Duration of closing a remote stream during stream IO",
TYPE_DURATION),
STREAM_READ_OPERATIONS_INCOMPLETE( STREAM_READ_OPERATIONS_INCOMPLETE(
StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE, StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE,
"Count of incomplete read() operations in an input stream", "Count of incomplete read() operations in an input stream",

View File

@ -267,7 +267,7 @@ public class CommitOperations extends AbstractStoreOperation
List<Pair<LocatedFileStatus, IOException>> failures = new ArrayList<>(1); List<Pair<LocatedFileStatus, IOException>> failures = new ArrayList<>(1);
for (LocatedFileStatus status : statusList) { for (LocatedFileStatus status : statusList) {
try { try {
commits.add(SinglePendingCommit.load(fs, status.getPath())); commits.add(SinglePendingCommit.load(fs, status.getPath(), status));
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to load commit file {}", status.getPath(), e); LOG.warn("Failed to load commit file {}", status.getPath(), e);
failures.add(Pair.of(status, e)); failures.add(Pair.of(status, e));
@ -350,10 +350,12 @@ public class CommitOperations extends AbstractStoreOperation
LOG.debug("No files to abort under {}", pendingDir); LOG.debug("No files to abort under {}", pendingDir);
} }
while (pendingFiles.hasNext()) { while (pendingFiles.hasNext()) {
Path pendingFile = pendingFiles.next().getPath(); LocatedFileStatus status = pendingFiles.next();
Path pendingFile = status.getPath();
if (pendingFile.getName().endsWith(CommitConstants.PENDING_SUFFIX)) { if (pendingFile.getName().endsWith(CommitConstants.PENDING_SUFFIX)) {
try { try {
abortSingleCommit(SinglePendingCommit.load(fs, pendingFile)); abortSingleCommit(SinglePendingCommit.load(fs, pendingFile,
status));
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
LOG.debug("listed file already deleted: {}", pendingFile); LOG.debug("listed file already deleted: {}", pendingFile);
} catch (IOException | IllegalArgumentException e) { } catch (IOException | IllegalArgumentException e) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a.commit.files; package org.apache.hadoop.fs.s3a.commit.files;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.util.ArrayList; import java.util.ArrayList;
@ -128,10 +129,7 @@ public class PendingSet extends PersistentCommitData
*/ */
public static PendingSet load(FileSystem fs, Path path) public static PendingSet load(FileSystem fs, Path path)
throws IOException { throws IOException {
LOG.debug("Reading pending commits in file {}", path); return load(fs, path, null);
PendingSet instance = serializer().load(fs, path);
instance.validate();
return instance;
} }
/** /**
@ -144,7 +142,25 @@ public class PendingSet extends PersistentCommitData
*/ */
public static PendingSet load(FileSystem fs, FileStatus status) public static PendingSet load(FileSystem fs, FileStatus status)
throws IOException { throws IOException {
return load(fs, status.getPath()); return load(fs, status.getPath(), status);
}
/**
* Load an instance from a file, then validate it.
* @param fs filesystem
* @param path path
* @param status status of file to load
* @return the loaded instance
* @throws IOException IO failure
* @throws ValidationFailure if the data is invalid
*/
public static PendingSet load(FileSystem fs, Path path,
@Nullable FileStatus status)
throws IOException {
LOG.debug("Reading pending commits in file {}", path);
PendingSet instance = serializer().load(fs, path, status);
instance.validate();
return instance;
} }
/** /**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a.commit.files; package org.apache.hadoop.fs.s3a.commit.files;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.Serializable; import java.io.Serializable;
@ -38,6 +39,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.ValidationFailure; import org.apache.hadoop.fs.s3a.commit.ValidationFailure;
@ -152,7 +154,23 @@ public class SinglePendingCommit extends PersistentCommitData
*/ */
public static SinglePendingCommit load(FileSystem fs, Path path) public static SinglePendingCommit load(FileSystem fs, Path path)
throws IOException { throws IOException {
SinglePendingCommit instance = serializer().load(fs, path); return load(fs, path, null);
}
/**
* Load an instance from a file, then validate it.
* @param fs filesystem
* @param path path
* @param status status of file to load or null
* @return the loaded instance
* @throws IOException IO failure
* @throws ValidationFailure if the data is invalid
*/
public static SinglePendingCommit load(FileSystem fs,
Path path,
@Nullable FileStatus status)
throws IOException {
SinglePendingCommit instance = serializer().load(fs, path, status);
instance.filename = path.toString(); instance.filename = path.toString();
instance.validate(); instance.validate();
return instance; return instance;

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.fs.s3a.impl; package org.apache.hadoop.fs.s3a.impl;
import org.apache.hadoop.fs.store.audit.AuditSpan; import javax.annotation.Nullable;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull; import org.apache.hadoop.fs.store.audit.AuditSpan;
/** /**
* Base class of operations in the store. * Base class of operations in the store.
@ -37,7 +37,7 @@ public abstract class AbstractStoreOperation {
/** /**
* Audit Span. * Audit Span.
*/ */
private AuditSpan auditSpan; private final AuditSpan auditSpan;
/** /**
* Constructor. * Constructor.
@ -45,8 +45,11 @@ public abstract class AbstractStoreOperation {
* stores it for later. * stores it for later.
* @param storeContext store context. * @param storeContext store context.
*/ */
protected AbstractStoreOperation(final StoreContext storeContext) { protected AbstractStoreOperation(final @Nullable StoreContext storeContext) {
this(storeContext, storeContext.getActiveAuditSpan()); this(storeContext,
storeContext != null
? storeContext.getActiveAuditSpan()
: null);
} }
/** /**
@ -54,10 +57,11 @@ public abstract class AbstractStoreOperation {
* @param storeContext store context. * @param storeContext store context.
* @param auditSpan active span * @param auditSpan active span
*/ */
protected AbstractStoreOperation(final StoreContext storeContext, protected AbstractStoreOperation(
final @Nullable StoreContext storeContext,
final AuditSpan auditSpan) { final AuditSpan auditSpan) {
this.storeContext = checkNotNull(storeContext); this.storeContext = storeContext;
this.auditSpan = checkNotNull(auditSpan); this.auditSpan = auditSpan;
} }
/** /**
@ -70,7 +74,7 @@ public abstract class AbstractStoreOperation {
/** /**
* Get the audit span this object was created with. * Get the audit span this object was created with.
* @return the current span * @return the current span or null
*/ */
public AuditSpan getAuditSpan() { public AuditSpan getAuditSpan() {
return auditSpan; return auditSpan;
@ -80,6 +84,8 @@ public abstract class AbstractStoreOperation {
* Activate the audit span. * Activate the audit span.
*/ */
public void activateAuditSpan() { public void activateAuditSpan() {
auditSpan.activate(); if (auditSpan != null) {
auditSpan.activate();
}
} }
} }

View File

@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause; import static org.apache.hadoop.util.functional.FutureIO.raiseInnerCause;
/** /**
* A bridge from Callable to Supplier; catching exceptions * A bridge from Callable to Supplier; catching exceptions

View File

@ -18,16 +18,18 @@
package org.apache.hadoop.fs.s3a.impl; package org.apache.hadoop.fs.s3a.impl;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.Constants;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
/** /**
* Internal constants private only to the S3A codebase. * Internal constants private only to the S3A codebase.
* Please don't refer to these outside of this module &amp; its tests. * Please don't refer to these outside of this module &amp; its tests.
@ -89,11 +91,16 @@ public final class InternalConstants {
* used becomes that of the select operation. * used becomes that of the select operation.
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
public static final Set<String> STANDARD_OPENFILE_KEYS = public static final Set<String> S3A_OPENFILE_KEYS;
Collections.unmodifiableSet(
new HashSet<>( static {
Arrays.asList(Constants.INPUT_FADVISE, Set<String> keys = Stream.of(
Constants.READAHEAD_RANGE))); Constants.INPUT_FADVISE,
Constants.READAHEAD_RANGE)
.collect(Collectors.toSet());
keys.addAll(FS_OPTION_OPENFILE_STANDARD_OPTIONS);
S3A_OPENFILE_KEYS = Collections.unmodifiableSet(keys);
}
/** 403 error code. */ /** 403 error code. */
public static final int SC_403 = 403; public static final int SC_403 = 403;

View File

@ -0,0 +1,600 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
import org.apache.hadoop.fs.s3a.select.SelectConstants;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.util.Preconditions.checkArgument;
/**
* Helper class for openFile() logic, especially processing file status
* args and length/etag/versionID.
* <p>
* This got complex enough it merited removal from S3AFileSystem -which
* also permits unit testing.
* </p>
* <p>
* The default values are those from the FileSystem configuration.
* in openFile(), they can all be changed by specific options;
* in FileSystem.open(path, buffersize) only the buffer size is
* set.
* </p>
*/
public class OpenFileSupport {
private static final Logger LOG =
LoggerFactory.getLogger(OpenFileSupport.class);
/**
* For use when a value of an split/file length is unknown.
*/
private static final int LENGTH_UNKNOWN = -1;
/** Default change detection policy. */
private final ChangeDetectionPolicy changePolicy;
/** Default read ahead range. */
private final long defaultReadAhead;
/** Username. */
private final String username;
/** Default buffer size. */
private final int defaultBufferSize;
/**
* Threshold for stream reads to switch to
* asynchronous draining.
*/
private final long defaultAsyncDrainThreshold;
/**
* Default input policy; may be overridden in
* {@code openFile()}.
*/
private final S3AInputPolicy defaultInputPolicy;
/**
* Instantiate with the default options from the filesystem.
* @param changePolicy change detection policy
* @param defaultReadAhead read ahead range
* @param username username
* @param defaultBufferSize buffer size
* @param defaultAsyncDrainThreshold drain threshold
* @param defaultInputPolicy input policy
*/
public OpenFileSupport(
final ChangeDetectionPolicy changePolicy,
final long defaultReadAhead,
final String username,
final int defaultBufferSize,
final long defaultAsyncDrainThreshold,
final S3AInputPolicy defaultInputPolicy) {
this.changePolicy = changePolicy;
this.defaultReadAhead = defaultReadAhead;
this.username = username;
this.defaultBufferSize = defaultBufferSize;
this.defaultAsyncDrainThreshold = defaultAsyncDrainThreshold;
this.defaultInputPolicy = defaultInputPolicy;
}
public ChangeDetectionPolicy getChangePolicy() {
return changePolicy;
}
public long getDefaultReadAhead() {
return defaultReadAhead;
}
public int getDefaultBufferSize() {
return defaultBufferSize;
}
public long getDefaultAsyncDrainThreshold() {
return defaultAsyncDrainThreshold;
}
/**
* Propagate the default options to the operation context
* being built up.
* @param roc context
* @return the context
*/
public S3AReadOpContext applyDefaultOptions(S3AReadOpContext roc) {
return roc
.withInputPolicy(defaultInputPolicy)
.withChangeDetectionPolicy(changePolicy)
.withAsyncDrainThreshold(defaultAsyncDrainThreshold)
.withReadahead(defaultReadAhead);
}
/**
* Prepare to open a file from the openFile parameters.
* @param path path to the file
* @param parameters open file parameters from the builder.
* @param blockSize for fileStatus
* @return open file options
* @throws IOException failure to resolve the link.
* @throws IllegalArgumentException unknown mandatory key
*/
@SuppressWarnings("ChainOfInstanceofChecks")
public OpenFileInformation prepareToOpenFile(
final Path path,
final OpenFileParameters parameters,
final long blockSize) throws IOException {
Configuration options = parameters.getOptions();
Set<String> mandatoryKeys = parameters.getMandatoryKeys();
String sql = options.get(SelectConstants.SELECT_SQL, null);
boolean isSelect = sql != null;
// choice of keys depends on open type
if (isSelect) {
// S3 Select call adds a large set of supported mandatory keys
rejectUnknownMandatoryKeys(
mandatoryKeys,
InternalSelectConstants.SELECT_OPTIONS,
"for " + path + " in S3 Select operation");
} else {
rejectUnknownMandatoryKeys(
mandatoryKeys,
InternalConstants.S3A_OPENFILE_KEYS,
"for " + path + " in non-select file I/O");
}
// where does a read end?
long fileLength = LENGTH_UNKNOWN;
// was a status passed in via a withStatus() invocation in
// the builder API?
FileStatus providedStatus = parameters.getStatus();
S3AFileStatus fileStatus = null;
if (providedStatus != null) {
// there's a file status
// make sure the file name matches -the rest of the path
// MUST NOT be checked.
Path providedStatusPath = providedStatus.getPath();
checkArgument(path.getName().equals(providedStatusPath.getName()),
"Filename mismatch between file being opened %s and"
+ " supplied filestatus %s",
path, providedStatusPath);
// make sure the status references a file
if (providedStatus.isDirectory()) {
throw new FileNotFoundException(
"Supplied status references a directory " + providedStatus);
}
// build up the values
long len = providedStatus.getLen();
long modTime = providedStatus.getModificationTime();
String versionId;
String eTag;
// can use this status to skip our own probes,
LOG.debug("File was opened with a supplied FileStatus;"
+ " skipping getFileStatus call in open() operation: {}",
providedStatus);
// what type is the status (and hence: what information does it contain?)
if (providedStatus instanceof S3AFileStatus) {
// is it an S3AFileSystem status?
S3AFileStatus st = (S3AFileStatus) providedStatus;
versionId = st.getVersionId();
eTag = st.getEtag();
} else if (providedStatus instanceof S3ALocatedFileStatus) {
// S3ALocatedFileStatus instance may supply etag and version.
S3ALocatedFileStatus st = (S3ALocatedFileStatus) providedStatus;
versionId = st.getVersionId();
eTag = st.getEtag();
} else {
// it is another type.
// build a status struct without etag or version.
LOG.debug("Converting file status {}", providedStatus);
versionId = null;
eTag = null;
}
// Construct a new file status with the real path of the file.
fileStatus = new S3AFileStatus(
len,
modTime,
path,
blockSize,
username,
eTag,
versionId);
// set the end of the read to the file length
fileLength = fileStatus.getLen();
}
// determine start and end of file.
long splitStart = options.getLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
// split end
long splitEnd = options.getLong(FS_OPTION_OPENFILE_SPLIT_END,
LENGTH_UNKNOWN);
if (splitStart > 0 && splitStart > splitEnd) {
LOG.warn("Split start {} is greater than split end {}, resetting",
splitStart, splitEnd);
splitStart = 0;
}
// read end is the open file value
fileLength = options.getLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
// if the read end has come from options, use that
// in creating a file status
if (fileLength >= 0 && fileStatus == null) {
fileStatus = createStatus(path, fileLength, blockSize);
}
// Build up the input policy.
// seek policy from default, s3a opt or standard option
// read from the FS standard option.
Collection<String> policies =
options.getStringCollection(FS_OPTION_OPENFILE_READ_POLICY);
if (policies.isEmpty()) {
// fall back to looking at the S3A-specific option.
policies = options.getStringCollection(INPUT_FADVISE);
}
return new OpenFileInformation()
.withS3Select(isSelect)
.withSql(sql)
.withAsyncDrainThreshold(
options.getLong(ASYNC_DRAIN_THRESHOLD,
defaultReadAhead))
.withBufferSize(
options.getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
.withChangePolicy(changePolicy)
.withFileLength(fileLength)
.withInputPolicy(
S3AInputPolicy.getFirstSupportedPolicy(policies, defaultInputPolicy))
.withReadAheadRange(
options.getLong(READAHEAD_RANGE, defaultReadAhead))
.withSplitStart(splitStart)
.withSplitEnd(splitEnd)
.withStatus(fileStatus)
.build();
}
/**
* Create a minimal file status.
* @param path path
* @param length file length/read end
* @param blockSize block size
* @return a new status
*/
private S3AFileStatus createStatus(Path path, long length, long blockSize) {
return new S3AFileStatus(
length,
0,
path,
blockSize,
username,
null,
null);
}
/**
* Open a simple file, using all the default
* options.
* @return the parameters needed to open a file through
* {@code open(path, bufferSize)}.
* @param bufferSize buffer size
*/
public OpenFileInformation openSimpleFile(final int bufferSize) {
return new OpenFileInformation()
.withS3Select(false)
.withAsyncDrainThreshold(defaultAsyncDrainThreshold)
.withBufferSize(bufferSize)
.withChangePolicy(changePolicy)
.withFileLength(LENGTH_UNKNOWN)
.withInputPolicy(defaultInputPolicy)
.withReadAheadRange(defaultReadAhead)
.withSplitStart(0)
.withSplitEnd(LENGTH_UNKNOWN)
.build();
}
@Override
public String toString() {
return "OpenFileSupport{" +
"changePolicy=" + changePolicy +
", defaultReadAhead=" + defaultReadAhead +
", defaultBufferSize=" + defaultBufferSize +
", defaultAsyncDrainThreshold=" + defaultAsyncDrainThreshold +
", defaultInputPolicy=" + defaultInputPolicy +
'}';
}
/**
* The information on a file needed to open it.
*/
public static final class OpenFileInformation {
/** Is this SQL? */
private boolean isS3Select;
/** File status; may be null. */
private S3AFileStatus status;
/** SQL string if this is a SQL select file. */
private String sql;
/** Active input policy. */
private S3AInputPolicy inputPolicy;
/** Change detection policy. */
private ChangeDetectionPolicy changePolicy;
/** Read ahead range. */
private long readAheadRange;
/** Buffer size. Currently ignored. */
private int bufferSize;
/**
* Where does the read start from. 0 unless known.
*/
private long splitStart;
/**
* What is the split end?
* Negative if not known.
*/
private long splitEnd = -1;
/**
* What is the file length?
* Negative if not known.
*/
private long fileLength = -1;
/**
* Threshold for stream reads to switch to
* asynchronous draining.
*/
private long asyncDrainThreshold;
/**
* Constructor.
*/
public OpenFileInformation() {
}
/**
* Build.
* @return this object
*/
public OpenFileInformation build() {
return this;
}
public boolean isS3Select() {
return isS3Select;
}
public S3AFileStatus getStatus() {
return status;
}
public String getSql() {
return sql;
}
public S3AInputPolicy getInputPolicy() {
return inputPolicy;
}
public ChangeDetectionPolicy getChangePolicy() {
return changePolicy;
}
public long getReadAheadRange() {
return readAheadRange;
}
public int getBufferSize() {
return bufferSize;
}
public long getSplitStart() {
return splitStart;
}
public long getSplitEnd() {
return splitEnd;
}
@Override
public String toString() {
return "OpenFileInformation{" +
"isSql=" + isS3Select +
", status=" + status +
", sql='" + sql + '\'' +
", inputPolicy=" + inputPolicy +
", changePolicy=" + changePolicy +
", readAheadRange=" + readAheadRange +
", splitStart=" + splitStart +
", splitEnd=" + splitEnd +
", bufferSize=" + bufferSize +
", drainThreshold=" + asyncDrainThreshold +
'}';
}
/**
* Get the file length.
* @return the file length; -1 if not known.
*/
public long getFileLength() {
return fileLength;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withS3Select(final boolean value) {
isS3Select = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withStatus(final S3AFileStatus value) {
status = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withSql(final String value) {
sql = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withInputPolicy(final S3AInputPolicy value) {
inputPolicy = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withChangePolicy(final ChangeDetectionPolicy value) {
changePolicy = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withReadAheadRange(final long value) {
readAheadRange = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withBufferSize(final int value) {
bufferSize = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withSplitStart(final long value) {
splitStart = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withSplitEnd(final long value) {
splitEnd = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withFileLength(final long value) {
fileLength = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withAsyncDrainThreshold(final long value) {
asyncDrainThreshold = value;
return this;
}
/**
* Propagate the options to the operation context
* being built up.
* @param roc context
* @return the context
*/
public S3AReadOpContext applyOptions(S3AReadOpContext roc) {
return roc
.withInputPolicy(inputPolicy)
.withChangeDetectionPolicy(changePolicy)
.withAsyncDrainThreshold(asyncDrainThreshold)
.withReadahead(readAheadRange);
}
}
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.MultipartUtils; import org.apache.hadoop.fs.s3a.MultipartUtils;
@ -426,7 +427,8 @@ public abstract class S3GuardTool extends Configured implements Tool,
String encryption = String encryption =
printOption(out, "\tEncryption", Constants.S3_ENCRYPTION_ALGORITHM, printOption(out, "\tEncryption", Constants.S3_ENCRYPTION_ALGORITHM,
"none"); "none");
printOption(out, "\tInput seek policy", INPUT_FADVISE, INPUT_FADV_NORMAL); printOption(out, "\tInput seek policy", INPUT_FADVISE,
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT);
printOption(out, "\tChange Detection Source", CHANGE_DETECT_SOURCE, printOption(out, "\tChange Detection Source", CHANGE_DETECT_SOURCE,
CHANGE_DETECT_SOURCE_DEFAULT); CHANGE_DETECT_SOURCE_DEFAULT);
printOption(out, "\tChange Detection Mode", CHANGE_DETECT_MODE, printOption(out, "\tChange Detection Mode", CHANGE_DETECT_MODE,

View File

@ -71,7 +71,7 @@ public final class InternalSelectConstants {
CSV_OUTPUT_QUOTE_FIELDS, CSV_OUTPUT_QUOTE_FIELDS,
CSV_OUTPUT_RECORD_DELIMITER CSV_OUTPUT_RECORD_DELIMITER
)); ));
options.addAll(InternalConstants.STANDARD_OPENFILE_KEYS); options.addAll(InternalConstants.S3A_OPENFILE_KEYS);
SELECT_OPTIONS = Collections.unmodifiableSet(options); SELECT_OPTIONS = Collections.unmodifiableSet(options);
} }
} }

View File

@ -39,12 +39,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool; import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool;
import org.apache.hadoop.fs.shell.CommandFormat; import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.OperationDuration; import org.apache.hadoop.util.OperationDuration;
import org.apache.hadoop.util.functional.FutureIO;
import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@ -261,7 +261,7 @@ public class SelectTool extends S3GuardTool {
FSDataInputStream stream; FSDataInputStream stream;
try(DurationInfo ignored = try(DurationInfo ignored =
new DurationInfo(LOG, "Selecting stream")) { new DurationInfo(LOG, "Selecting stream")) {
stream = FutureIOSupport.awaitFuture(builder.build()); stream = FutureIO.awaitFuture(builder.build());
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
// the source file is missing. // the source file is missing.
throw notFound(e); throw notFound(e);

View File

@ -188,4 +188,11 @@ public interface S3AInputStreamStatistics extends AutoCloseable,
*/ */
DurationTracker initiateGetRequest(); DurationTracker initiateGetRequest();
/**
* Initiate a stream close/abort.
* @param abort was the stream aborted?
* @return duration tracker;
*/
DurationTracker initiateInnerStreamClose(boolean abort);
} }

View File

@ -337,6 +337,10 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
return stubDurationTracker(); return stubDurationTracker();
} }
@Override
public DurationTracker initiateInnerStreamClose(final boolean abort) {
return stubDurationTracker();
}
} }
/** /**

View File

@ -1014,6 +1014,14 @@ options are covered in [Testing](./testing.md).
any call to setReadahead() is made to an open stream.</description> any call to setReadahead() is made to an open stream.</description>
</property> </property>
<property>
<name>fs.s3a.input.async.drain.threshold</name>
<value>64K</value>
<description>Bytes to read ahead during a seek() before closing and
re-opening the S3 HTTP connection. This option will be overridden if
any call to setReadahead() is made to an open stream.</description>
</property>
<property> <property>
<name>fs.s3a.list.version</name> <name>fs.s3a.list.version</name>
<value>2</value> <value>2</value>

View File

@ -18,10 +18,22 @@
package org.apache.hadoop.fs.contract.s3a; package org.apache.hadoop.fs.contract.s3a;
import java.io.FileNotFoundException;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractOpenTest; import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractFSContract;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/** /**
* S3A contract tests opening files. * S3A contract tests opening files.
*/ */
@ -40,4 +52,59 @@ public class ITestS3AContractOpen extends AbstractContractOpenTest {
protected boolean areZeroByteFilesEncrypted() { protected boolean areZeroByteFilesEncrypted() {
return true; return true;
} }
@Test
public void testOpenFileApplyReadBadName() throws Throwable {
describe("use the apply sequence to read a whole file");
Path path = methodPath();
FileSystem fs = getFileSystem();
touch(fs, path);
FileStatus st = fs.getFileStatus(path);
// The final element of the path is different, so
// openFile must fail
FileStatus st2 = new FileStatus(
0, false,
st.getReplication(),
st.getBlockSize(),
st.getModificationTime(),
st.getAccessTime(),
st.getPermission(),
st.getOwner(),
st.getGroup(),
new Path("gopher:///localhost/something.txt"));
intercept(IllegalArgumentException.class, () ->
fs.openFile(path)
.withFileStatus(st2)
.build());
}
/**
* Pass in a directory reference and expect the openFile call
* to fail.
*/
@Test
public void testOpenFileDirectory() throws Throwable {
describe("Change the status to a directory");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
FileStatus st = fs.getFileStatus(path);
FileStatus st2 = new FileStatus(
len, true,
st.getReplication(),
st.getBlockSize(),
st.getModificationTime(),
st.getAccessTime(),
st.getPermission(),
st.getOwner(),
st.getGroup(),
path);
intercept(FileNotFoundException.class, () ->
fs.openFile(path)
.withFileStatus(st2)
.build());
}
} }

View File

@ -44,11 +44,11 @@ import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.NativeCodeLoader;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_NORMAL;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.fs.s3a.Constants.SSL_CHANNEL_MODE; import static org.apache.hadoop.fs.s3a.Constants.SSL_CHANNEL_MODE;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE; import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE;
@ -87,9 +87,9 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<Object[]> params() { public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{ return Arrays.asList(new Object[][]{
{INPUT_FADV_SEQUENTIAL, Default_JSSE}, {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, Default_JSSE},
{INPUT_FADV_RANDOM, OpenSSL}, {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, OpenSSL},
{INPUT_FADV_NORMAL, Default_JSSE_with_GCM}, {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, Default_JSSE_with_GCM},
}); });
} }
@ -215,7 +215,8 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
public void testReadPolicyInFS() throws Throwable { public void testReadPolicyInFS() throws Throwable {
describe("Verify the read policy is being consistently set"); describe("Verify the read policy is being consistently set");
S3AFileSystem fs = getFileSystem(); S3AFileSystem fs = getFileSystem();
assertEquals(S3AInputPolicy.getPolicy(seekPolicy), fs.getInputPolicy()); assertEquals(S3AInputPolicy.getPolicy(seekPolicy, S3AInputPolicy.Normal),
fs.getInputPolicy());
} }
/** /**

View File

@ -70,6 +70,10 @@ public abstract class AbstractS3AMockTest {
// use minimum multipart size for faster triggering // use minimum multipart size for faster triggering
conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE); conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
conf.setInt(Constants.S3A_BUCKET_PROBE, 1); conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
// this is so stream draining is always blocking, allowing
// assertions to be safely made without worrying
// about any race conditions
conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE);
return conf; return conf;
} }

View File

@ -456,17 +456,6 @@ public class ITestS3AConfiguration {
tmp1.getParent(), tmp2.getParent()); tmp1.getParent(), tmp2.getParent());
} }
@Test
public void testReadAheadRange() throws Exception {
conf = new Configuration();
conf.set(Constants.READAHEAD_RANGE, "300K");
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
long readAheadRange = fs.getReadAheadRange();
assertNotNull(readAheadRange);
assertEquals("Read Ahead Range Incorrect.", 300 * 1024, readAheadRange);
}
@Test @Test
public void testUsernameFromUGI() throws Throwable { public void testUsernameFromUGI() throws Throwable {
final String alice = "alice"; final String alice = "alice";

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.FutureIO;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
@ -91,7 +92,6 @@ import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.buildEncryptionSecrets; import static org.apache.hadoop.fs.s3a.S3AUtils.buildEncryptionSecrets;
@ -1257,7 +1257,7 @@ public final class S3ATestUtils {
.withFileStatus(status) .withFileStatus(status)
.build(); .build();
try (FSDataInputStream in = awaitFuture(future)) { try (FSDataInputStream in = FutureIO.awaitFuture(future)) {
byte[] buf = new byte[(int) status.getLen()]; byte[] buf = new byte[(int) status.getLen()];
in.readFully(0, buf); in.readFully(0, buf);
return new String(buf); return new String(buf);

View File

@ -22,6 +22,7 @@ import javax.net.ssl.SSLException;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException; import java.net.SocketException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import com.amazonaws.SdkClientException; import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
@ -34,9 +35,10 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan; import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.util.functional.CallableRaisingIOE;
import static java.lang.Math.min; import static java.lang.Math.min;
import static org.apache.hadoop.util.functional.FutureIO.eval;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -54,7 +56,6 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
@Test @Test
public void testInputStreamReadRetryForException() throws IOException { public void testInputStreamReadRetryForException() throws IOException {
S3AInputStream s3AInputStream = getMockedS3AInputStream(); S3AInputStream s3AInputStream = getMockedS3AInputStream();
assertEquals("'a' from the test input stream 'ab' should be the first " + assertEquals("'a' from the test input stream 'ab' should be the first " +
"character being read", INPUT.charAt(0), s3AInputStream.read()); "character being read", INPUT.charAt(0), s3AInputStream.read());
assertEquals("'b' from the test input stream 'ab' should be the second " + assertEquals("'b' from the test input stream 'ab' should be the second " +
@ -103,13 +104,14 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
INPUT.length()); INPUT.length());
S3AReadOpContext s3AReadOpContext = fs.createReadContext( S3AReadOpContext s3AReadOpContext = fs.createReadContext(
s3AFileStatus, S3AInputPolicy.Normal, s3AFileStatus,
ChangeDetectionPolicy.getPolicy(fs.getConf()), 100, NoopSpan.INSTANCE); NoopSpan.INSTANCE);
return new S3AInputStream( return new S3AInputStream(
s3AReadOpContext, s3AReadOpContext,
s3ObjectAttributes, s3ObjectAttributes,
getMockedInputStreamCallback()); getMockedInputStreamCallback(),
s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics());
} }
/** /**
@ -151,6 +153,11 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
return new GetObjectRequest(fs.getBucket(), key); return new GetObjectRequest(fs.getBucket(), key);
} }
@Override
public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
return eval(operation);
}
@Override @Override
public void close() { public void close() {
} }

View File

@ -33,6 +33,7 @@ import java.util.Date;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -59,6 +60,8 @@ public class TestS3AUnbuffer extends AbstractS3AMockTest {
// Create mock S3ObjectInputStream and S3Object for open() // Create mock S3ObjectInputStream and S3Object for open()
S3ObjectInputStream objectStream = mock(S3ObjectInputStream.class); S3ObjectInputStream objectStream = mock(S3ObjectInputStream.class);
when(objectStream.read()).thenReturn(-1); when(objectStream.read()).thenReturn(-1);
when(objectStream.read(any(byte[].class))).thenReturn(-1);
when(objectStream.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
S3Object s3Object = mock(S3Object.class); S3Object s3Object = mock(S3Object.class);
when(s3Object.getObjectContent()).thenReturn(objectStream); when(s3Object.getObjectContent()).thenReturn(objectStream);
@ -67,7 +70,7 @@ public class TestS3AUnbuffer extends AbstractS3AMockTest {
// Call read and then unbuffer // Call read and then unbuffer
FSDataInputStream stream = fs.open(path); FSDataInputStream stream = fs.open(path);
assertEquals(0, stream.read(new byte[8])); // mocks read 0 bytes assertEquals(-1, stream.read(new byte[8])); // mocks read 0 bytes
stream.unbuffer(); stream.unbuffer();
// Verify that unbuffer closed the object stream // Verify that unbuffer closed the object stream

View File

@ -79,7 +79,7 @@ public class TestStreamChangeTracker extends HadoopTestBase {
public void testVersionCheckingHandlingNoVersionsVersionRequired() public void testVersionCheckingHandlingNoVersionsVersionRequired()
throws Throwable { throws Throwable {
LOG.info("If an endpoint doesn't return versions but we are configured to" LOG.info("If an endpoint doesn't return versions but we are configured to"
+ "require them"); + " require them");
ChangeTracker tracker = newTracker( ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Client, ChangeDetectionPolicy.Mode.Client,
ChangeDetectionPolicy.Source.VersionId, ChangeDetectionPolicy.Source.VersionId,

View File

@ -0,0 +1,429 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.test.HadoopTestBase;
import static java.util.Collections.singleton;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Unit tests for {@link OpenFileSupport} and the associated
* seek policy lookup in {@link S3AInputPolicy}.
*/
public class TestOpenFileSupport extends HadoopTestBase {
private static final ChangeDetectionPolicy CHANGE_POLICY =
ChangeDetectionPolicy.createPolicy(
ChangeDetectionPolicy.Mode.Server,
ChangeDetectionPolicy.Source.None,
false);
private static final long READ_AHEAD_RANGE = 16;
private static final String USERNAME = "hadoop";
public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
public static final String TESTFILE = "s3a://bucket/name";
private static final Path TESTPATH = new Path(TESTFILE);
/**
* Create a OpenFileSupport instance.
*/
private static final OpenFileSupport PREPARE =
new OpenFileSupport(
CHANGE_POLICY,
READ_AHEAD_RANGE,
USERNAME,
IO_FILE_BUFFER_SIZE_DEFAULT,
DEFAULT_ASYNC_DRAIN_THRESHOLD,
INPUT_POLICY);
@Test
public void testSimpleFile() throws Throwable {
ObjectAssert<OpenFileSupport.OpenFileInformation>
asst = assertFileInfo(
PREPARE.openSimpleFile(1024));
asst.extracting(f -> f.getChangePolicy())
.isEqualTo(CHANGE_POLICY);
asst.extracting(f -> f.getInputPolicy())
.isEqualTo(INPUT_POLICY);
asst.extracting(f -> f.getReadAheadRange())
.isEqualTo(READ_AHEAD_RANGE);
}
/**
* Initiate an assert from an open file information instance.
* @param fi file info
* @return an assert stream.
*/
private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
final OpenFileSupport.OpenFileInformation fi) {
return Assertions.assertThat(fi)
.describedAs("File Information %s", fi);
}
/**
* Create an assertion about the openFile information from a configuration
* with the given key/value option.
* @param key key to set.
* @param option option value.
* @return the constructed OpenFileInformation.
*/
public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
final String key,
final String option) throws IOException {
return assertFileInfo(prepareToOpenFile(params(key, option)));
}
@Test
public void testUnknownMandatoryOption() throws Throwable {
String key = "unknown";
intercept(IllegalArgumentException.class, key, () ->
prepareToOpenFile(params(key, "undefined")));
}
@Test
public void testSeekRandomIOPolicy() throws Throwable {
// ask for random IO
String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
// is picked up
assertOpenFile(INPUT_FADVISE, option)
.extracting(f -> f.getInputPolicy())
.isEqualTo(S3AInputPolicy.Random);
// and as neither status nor length was set: no file status
assertOpenFile(INPUT_FADVISE, option)
.extracting(f -> f.getStatus())
.isNull();
}
/**
* There's a standard policy name. 'adaptive',
* meaning 'whatever this stream does to adapt to the client's use'.
* On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
*/
@Test
public void testSeekPolicyAdaptive() throws Throwable {
// when caller asks for adaptive, they get "normal"
assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
.extracting(f -> f.getInputPolicy())
.isEqualTo(S3AInputPolicy.Normal);
}
/**
* Verify that an unknown seek policy falls back to
* {@link S3AInputPolicy#Normal}.
*/
@Test
public void testUnknownSeekPolicyS3AOption() throws Throwable {
// fall back to the normal seek policy.
assertOpenFile(INPUT_FADVISE, "undefined")
.extracting(f -> f.getInputPolicy())
.isEqualTo(INPUT_POLICY);
}
/**
* The S3A option also supports a list of values.
*/
@Test
public void testSeekPolicyListS3AOption() throws Throwable {
// fall back to the second seek policy if the first is unknown
assertOpenFile(INPUT_FADVISE, "hbase, random")
.extracting(f -> f.getInputPolicy())
.isEqualTo(S3AInputPolicy.Random);
}
/**
* Verify that if a list of policies is supplied in a configuration,
* the first recognized policy will be adopted.
*/
@Test
public void testSeekPolicyExtractionFromList() throws Throwable {
String plist = "a, b, RandOm, other ";
Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
Collection<String> options = conf.getTrimmedStringCollection(
FS_OPTION_OPENFILE_READ_POLICY);
Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null))
.describedAs("Policy from " + plist)
.isEqualTo(S3AInputPolicy.Random);
}
@Test
public void testAdaptiveSeekPolicyRecognized() throws Throwable {
Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
.describedAs("adaptive")
.isEqualTo(S3AInputPolicy.Normal);
}
@Test
public void testUnknownSeekPolicyFallback() throws Throwable {
Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
.describedAs("unknown policy")
.isNull();
}
/**
* Test the mapping of the standard option names.
*/
@Test
public void testInputPolicyMapping() throws Throwable {
Object[][] policyMapping = {
{"normal", S3AInputPolicy.Normal},
{FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal},
{FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal},
{FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random},
{FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential},
};
for (Object[] mapping : policyMapping) {
String name = (String) mapping[0];
Assertions.assertThat(S3AInputPolicy.getPolicy(name, null))
.describedAs("Policy %s", name)
.isEqualTo(mapping[1]);
}
}
/**
* Verify readahead range is picked up.
*/
@Test
public void testReadahead() throws Throwable {
// readahead range option
assertOpenFile(READAHEAD_RANGE, "4096")
.extracting(f -> f.getReadAheadRange())
.isEqualTo(4096L);
}
/**
* Verify buffer size is picked up.
*/
@Test
public void testBufferSize() throws Throwable {
// readahead range option
assertOpenFile(FS_OPTION_OPENFILE_BUFFER_SIZE, "4096")
.extracting(f -> f.getBufferSize())
.isEqualTo(4096);
}
@Test
public void testStatusWithValidFilename() throws Throwable {
Path p = new Path("file:///tmp/" + TESTPATH.getName());
ObjectAssert<OpenFileSupport.OpenFileInformation> asst =
assertFileInfo(prepareToOpenFile(
params(FS_OPTION_OPENFILE_LENGTH, "32")
.withStatus(status(p, 4096))));
asst.extracting(f -> f.getStatus().getVersionId())
.isEqualTo("version");
asst.extracting(f -> f.getStatus().getEtag())
.isEqualTo("etag");
asst.extracting(f -> f.getStatus().getLen())
.isEqualTo(4096L);
}
/**
* Verify S3ALocatedFileStatus is handled.
*/
@Test
public void testLocatedStatus() throws Throwable {
Path p = new Path("file:///tmp/" + TESTPATH.getName());
ObjectAssert<OpenFileSupport.OpenFileInformation> asst =
assertFileInfo(
prepareToOpenFile(
params(FS_OPTION_OPENFILE_LENGTH, "32")
.withStatus(
new S3ALocatedFileStatus(
status(p, 4096), null))));
asst.extracting(f -> f.getStatus().getVersionId())
.isEqualTo("version");
asst.extracting(f -> f.getStatus().getEtag())
.isEqualTo("etag");
asst.extracting(f -> f.getStatus().getLen())
.isEqualTo(4096L);
}
/**
* Callers cannot supply a directory status when opening a file.
*/
@Test
public void testDirectoryStatus() throws Throwable {
intercept(FileNotFoundException.class, TESTFILE, () ->
prepareToOpenFile(
params(INPUT_FADVISE, "normal")
.withStatus(new S3AFileStatus(true, TESTPATH, USERNAME))));
}
/**
* File name must match the path argument to openFile().
*/
@Test
public void testStatusWithInconsistentFilename() throws Throwable {
intercept(IllegalArgumentException.class, TESTFILE, () ->
prepareToOpenFile(params(INPUT_FADVISE, "normal")
.withStatus(new S3AFileStatus(true,
new Path(TESTFILE + "-"), USERNAME))));
}
/**
* Prepare to open a file with the set of parameters.
* @param parameters open a file
* @return
* @throws IOException
*/
public OpenFileSupport.OpenFileInformation prepareToOpenFile(
final OpenFileParameters parameters)
throws IOException {
return PREPARE.prepareToOpenFile(TESTPATH,
parameters,
IO_FILE_BUFFER_SIZE_DEFAULT
);
}
/**
* If a file length option is set, a file status
* is created.
*/
@Test
public void testFileLength() throws Throwable {
ObjectAssert<OpenFileSupport.OpenFileInformation> asst =
assertFileInfo(prepareToOpenFile(
params(FS_OPTION_OPENFILE_LENGTH, "8192")
.withStatus(null)));
asst.extracting(f -> f.getStatus())
.isNotNull();
asst.extracting(f -> f.getStatus().getPath())
.isEqualTo(TESTPATH);
asst.extracting(f -> f.getStatus().getLen())
.isEqualTo(8192L);
}
/**
* Verify that setting the split end sets the length.
* By passing in a value greater than the size of an int,
* the test verifies that the long is passed everywhere.
*/
@Test
public void testSplitEndSetsLength() throws Throwable {
long bigFile = 2L ^ 34;
assertOpenFile(FS_OPTION_OPENFILE_SPLIT_END, Long.toString(bigFile))
.matches(p -> p.getSplitEnd() == bigFile, "split end")
.matches(p -> p.getFileLength() == -1, "file length")
.matches(p -> p.getStatus() == null, "status");
}
/**
* Semantics of split and length. Split end can only be safely treated
* as a hint unless the codec is known (how?) that it will never
* read past it.
*/
@Test
public void testSplitEndAndLength() throws Throwable {
long splitEnd = 256;
long len = 8192;
Configuration conf = conf(FS_OPTION_OPENFILE_LENGTH,
Long.toString(len));
conf.setLong(FS_OPTION_OPENFILE_SPLIT_END, splitEnd);
conf.setLong(FS_OPTION_OPENFILE_SPLIT_START, 1024);
Set<String> s = new HashSet<>();
Collections.addAll(s,
FS_OPTION_OPENFILE_SPLIT_START,
FS_OPTION_OPENFILE_SPLIT_END,
FS_OPTION_OPENFILE_LENGTH);
assertFileInfo(prepareToOpenFile(
new OpenFileParameters()
.withMandatoryKeys(s)
.withOptions(conf)))
.matches(p -> p.getSplitStart() == 0, "split start")
.matches(p -> p.getSplitEnd() == splitEnd, "split end")
.matches(p -> p.getStatus().getLen() == len, "file length");
}
/**
* Create an S3A status entry with stub etag and versions, timestamp of 0.
* @param path status path
* @param length file length
* @return a status instance.
*/
private S3AFileStatus status(final Path path, final int length) {
return new S3AFileStatus(length, 0,
path, 0, "", "etag", "version");
}
/**
* Create an instance of {@link OpenFileParameters} with
* the key as a mandatory parameter.
* @param key mandatory key
* @param val value
* @return the instance.
*/
private OpenFileParameters params(final String key, final String val) {
return new OpenFileParameters()
.withMandatoryKeys(singleton(key))
.withOptions(conf(key, val));
}
/**
* Create a configuration with a single entry.
* @param key entry key
* @param val entry value
* @return a configuration
*/
private Configuration conf(String key, Object val) {
Configuration c = new Configuration(false);
c.set(key, val.toString());
return c;
}
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.performance;
import java.io.EOFException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.statistics.IOStatistics;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Cost of openFile().
*/
public class ITestS3AOpenCost extends AbstractS3ACostTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3AOpenCost.class);
private Path testFile;
private FileStatus testFileStatus;
private long fileLength;
public ITestS3AOpenCost() {
super(true);
}
/**
* Setup creates a test file, saves is status and length
* to fields.
*/
@Override
public void setup() throws Exception {
super.setup();
S3AFileSystem fs = getFileSystem();
testFile = methodPath();
writeTextFile(fs, testFile, "openfile", true);
testFileStatus = fs.getFileStatus(testFile);
fileLength = testFileStatus.getLen();
}
/**
* Test when openFile() performs GET requests when file status
* and length options are passed down.
* Note that the input streams only update the FS statistics
* in close(), so metrics cannot be verified until all operations
* on a stream are complete.
* This is slightly less than ideal.
*/
@Test
public void testOpenFileWithStatusOfOtherFS() throws Throwable {
describe("Test cost of openFile with/without status; raw only");
S3AFileSystem fs = getFileSystem();
// now read that file back in using the openFile call.
// with a new FileStatus and a different path.
// this verifies that any FileStatus class/subclass is used
// as a source of the file length.
FileStatus st2 = new FileStatus(
fileLength, false,
testFileStatus.getReplication(),
testFileStatus.getBlockSize(),
testFileStatus.getModificationTime(),
testFileStatus.getAccessTime(),
testFileStatus.getPermission(),
testFileStatus.getOwner(),
testFileStatus.getGroup(),
new Path("gopher:///localhost/" + testFile.getName()));
// no IO in open
FSDataInputStream in = verifyMetrics(() ->
fs.openFile(testFile)
.withFileStatus(st2)
.build()
.get(),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0));
// the stream gets opened during read
long readLen = verifyMetrics(() ->
readStream(in),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 1));
assertEquals("bytes read from file", fileLength, readLen);
}
@Test
public void testOpenFileShorterLength() throws Throwable {
// do a second read with the length declared as short.
// we now expect the bytes read to be shorter.
S3AFileSystem fs = getFileSystem();
S3ATestUtils.MetricDiff bytesDiscarded =
new S3ATestUtils.MetricDiff(fs, STREAM_READ_BYTES_READ_CLOSE);
int offset = 2;
long shortLen = fileLength - offset;
// open the file
FSDataInputStream in2 = verifyMetrics(() ->
fs.openFile(testFile)
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
.opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
.build()
.get(),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0));
// verify that the statistics are in range
IOStatistics ioStatistics = extractStatistics(in2);
Object statsString = demandStringifyIOStatistics(ioStatistics);
LOG.info("Statistics of open stream {}", statsString);
verifyStatisticCounterValue(ioStatistics, ACTION_FILE_OPENED, 1);
// no network IO happened, duration is 0. There's a very small
// risk of some delay making it positive just from scheduling delays
assertDurationRange(ioStatistics, ACTION_FILE_OPENED, 0, 0);
// now read it
long r2 = verifyMetrics(() ->
readStream(in2),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 1),
with(STREAM_READ_BYTES_READ_CLOSE, 0),
with(STREAM_READ_SEEK_BYTES_SKIPPED, 0));
LOG.info("Statistics of read stream {}", statsString);
assertEquals("bytes read from file", shortLen, r2);
// no bytes were discarded.
bytesDiscarded.assertDiffEquals(0);
}
@Test
public void testOpenFileLongerLength() throws Throwable {
// do a second read with the length declared as longer
// than it is.
// An EOF will be read on readFully(), -1 on a read()
S3AFileSystem fs = getFileSystem();
// set a length past the actual file length
long longLen = fileLength + 10;
FSDataInputStream in3 = verifyMetrics(() ->
fs.openFile(testFile)
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
.must(FS_OPTION_OPENFILE_LENGTH, longLen)
.build()
.get(),
always(NO_HEAD_OR_LIST));
// assert behaviors of seeking/reading past the file length.
// there is no attempt at recovery.
verifyMetrics(() -> {
byte[] out = new byte[(int) longLen];
intercept(EOFException.class,
() -> in3.readFully(0, out));
in3.seek(longLen - 1);
assertEquals("read past real EOF on " + in3,
-1, in3.read());
in3.close();
return in3.toString();
},
// two GET calls were made, one for readFully,
// the second on the read() past the EOF
// the operation has got as far as S3
with(STREAM_READ_OPENED, 2));
}
}

View File

@ -62,6 +62,12 @@ public final class OperationCost {
public static final OperationCost NO_IO = public static final OperationCost NO_IO =
new OperationCost(0, 0); new OperationCost(0, 0);
/**
* More detailed description of the NO_IO cost.
*/
public static final OperationCost NO_HEAD_OR_LIST =
NO_IO;
/** A HEAD operation. */ /** A HEAD operation. */
public static final OperationCost HEAD_OPERATION = new OperationCost(1, 0); public static final OperationCost HEAD_OPERATION = new OperationCost(1, 0);

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
@ -41,7 +42,6 @@ import org.apache.hadoop.util.LineReader;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -50,6 +50,9 @@ import org.slf4j.LoggerFactory;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
@ -57,20 +60,22 @@ import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatSt
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMaximumStatistic; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMaximumStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MIN; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MIN;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/** /**
* Look at the performance of S3a operations. * Look at the performance of S3a Input Stream Reads.
*/ */
public class ITestS3AInputStreamPerformance extends S3AScaleTestBase { public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
private static final Logger LOG = LoggerFactory.getLogger( private static final Logger LOG = LoggerFactory.getLogger(
ITestS3AInputStreamPerformance.class); ITestS3AInputStreamPerformance.class);
private static final int READAHEAD_128K = 128 * _1KB;
private S3AFileSystem s3aFS; private S3AFileSystem s3aFS;
private Path testData; private Path testData;
@ -128,14 +133,16 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
describe("cleanup"); describe("cleanup");
IOUtils.closeStream(in); IOUtils.closeStream(in);
if (in != null) { if (in != null) {
final IOStatistics stats = in.getIOStatistics();
LOG.info("Stream statistics {}", LOG.info("Stream statistics {}",
ioStatisticsSourceToString(in)); ioStatisticsToPrettyString(stats));
IOSTATS.aggregate(in.getIOStatistics()); IOSTATS.aggregate(stats);
} }
if (s3aFS != null) { if (s3aFS != null) {
final IOStatistics stats = s3aFS.getIOStatistics();
LOG.info("FileSystem statistics {}", LOG.info("FileSystem statistics {}",
ioStatisticsSourceToString(s3aFS)); ioStatisticsToPrettyString(stats));
FILESYSTEM_IOSTATS.aggregate(s3aFS.getIOStatistics()); FILESYSTEM_IOSTATS.aggregate(stats);
IOUtils.closeStream(s3aFS); IOUtils.closeStream(s3aFS);
} }
} }
@ -177,7 +184,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead) FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead)
throws IOException { throws IOException {
requireCSVTestData(); requireCSVTestData();
return openDataFile(s3aFS, this.testData, inputPolicy, readahead); return openDataFile(s3aFS, testData, inputPolicy, readahead, testDataStatus.getLen());
} }
/** /**
@ -187,27 +194,28 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
* @param path path to open * @param path path to open
* @param inputPolicy input policy to use * @param inputPolicy input policy to use
* @param readahead readahead/buffer size * @param readahead readahead/buffer size
* @param length
* @return the stream, wrapping an S3a one * @return the stream, wrapping an S3a one
* @throws IOException IO problems * @throws IOException IO problems
*/ */
private FSDataInputStream openDataFile(S3AFileSystem fs, private FSDataInputStream openDataFile(S3AFileSystem fs,
Path path, Path path,
S3AInputPolicy inputPolicy, S3AInputPolicy inputPolicy,
long readahead) throws IOException { long readahead,
final long length) throws IOException {
int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE, int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE,
DEFAULT_READ_BUFFER_SIZE); DEFAULT_READ_BUFFER_SIZE);
S3AInputPolicy policy = fs.getInputPolicy(); final FutureDataInputStreamBuilder builder = fs.openFile(path)
fs.setInputPolicy(inputPolicy); .opt(FS_OPTION_OPENFILE_READ_POLICY,
try { inputPolicy.toString())
FSDataInputStream stream = fs.open(path, bufferSize); .opt(FS_OPTION_OPENFILE_LENGTH, length)
if (readahead >= 0) { .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
stream.setReadahead(readahead); if (readahead > 0) {
} builder.opt(READAHEAD_RANGE, readahead);
streamStatistics = getInputStreamStatistics(stream);
return stream;
} finally {
fs.setInputPolicy(policy);
} }
FSDataInputStream stream = awaitFuture(builder.build());
streamStatistics = getInputStreamStatistics(stream);
return stream;
} }
/** /**
@ -293,8 +301,10 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
if (bandwidth(blockTimer, blockSize) < minimumBandwidth) { if (bandwidth(blockTimer, blockSize) < minimumBandwidth) {
LOG.warn("Bandwidth {} too low on block {}: resetting connection", LOG.warn("Bandwidth {} too low on block {}: resetting connection",
bw, blockId); bw, blockId);
Assert.assertTrue("Bandwidth of " + bw +" too low after " Assertions.assertThat(resetCount)
+ resetCount + " attempts", resetCount <= maxResetCount); .describedAs("Bandwidth of %s too low after %s attempts",
bw, resetCount)
.isLessThanOrEqualTo(maxResetCount);
resetCount++; resetCount++;
// reset the connection // reset the connection
getS3AInputStream(in).resetConnection(); getS3AInputStream(in).resetConnection();
@ -359,7 +369,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
public void testDecompressionSequential128K() throws Throwable { public void testDecompressionSequential128K() throws Throwable {
describe("Decompress with a 128K readahead"); describe("Decompress with a 128K readahead");
skipIfClientSideEncryption(); skipIfClientSideEncryption();
executeDecompression(128 * _1KB, S3AInputPolicy.Sequential); executeDecompression(READAHEAD_128K, S3AInputPolicy.Sequential);
assertStreamOpenedExactlyOnce(); assertStreamOpenedExactlyOnce();
} }
@ -558,7 +568,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
byte[] buffer = new byte[datasetLen]; byte[] buffer = new byte[datasetLen];
int readahead = _8K; int readahead = _8K;
int halfReadahead = _4K; int halfReadahead = _4K;
in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead); in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead, datasetLen);
LOG.info("Starting initial reads"); LOG.info("Starting initial reads");
S3AInputStream s3aStream = getS3aStream(); S3AInputStream s3aStream = getS3aStream();

View File

@ -60,11 +60,11 @@ import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getLandsatCSVPath; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getLandsatCSVPath;
import static org.apache.hadoop.fs.s3a.select.CsvFile.ALL_QUOTES; import static org.apache.hadoop.fs.s3a.select.CsvFile.ALL_QUOTES;
import static org.apache.hadoop.fs.s3a.select.SelectConstants.*; import static org.apache.hadoop.fs.s3a.select.SelectConstants.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/** /**
* Superclass for S3 Select tests. * Superclass for S3 Select tests.

View File

@ -63,8 +63,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_NORMAL; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.fs.s3a.select.CsvFile.ALL_QUOTES; import static org.apache.hadoop.fs.s3a.select.CsvFile.ALL_QUOTES;
import static org.apache.hadoop.fs.s3a.select.SelectBinding.expandBackslashChars; import static org.apache.hadoop.fs.s3a.select.SelectBinding.expandBackslashChars;
@ -767,7 +767,8 @@ public class ITestS3Select extends AbstractS3SelectTest {
JobConf conf = createJobConf(); JobConf conf = createJobConf();
inputOpt(conf, CSV_INPUT_HEADER, CSV_HEADER_OPT_NONE); inputOpt(conf, CSV_INPUT_HEADER, CSV_HEADER_OPT_NONE);
inputMust(conf, CSV_INPUT_HEADER, CSV_HEADER_OPT_IGNORE); inputMust(conf, CSV_INPUT_HEADER, CSV_HEADER_OPT_IGNORE);
inputMust(conf, INPUT_FADVISE, INPUT_FADV_NORMAL); inputMust(conf, FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_DEFAULT);
inputMust(conf, SELECT_ERRORS_INCLUDE_SQL, "true"); inputMust(conf, SELECT_ERRORS_INCLUDE_SQL, "true");
verifySelectionCount(EVEN_ROWS_COUNT, verifySelectionCount(EVEN_ROWS_COUNT,
SELECT_EVEN_ROWS_NO_HEADER, SELECT_EVEN_ROWS_NO_HEADER,

View File

@ -30,7 +30,6 @@ import org.junit.Test;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount; import org.apache.hadoop.examples.WordCount;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.S3AUtils;
@ -44,6 +43,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.functional.FutureIO;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
@ -203,7 +203,7 @@ public class ITestS3SelectMRJob extends AbstractS3SelectTest {
private String readStringFromFile(Path path) throws IOException { private String readStringFromFile(Path path) throws IOException {
int bytesLen = (int)fs.getFileStatus(path).getLen(); int bytesLen = (int)fs.getFileStatus(path).getLen();
byte[] buffer = new byte[bytesLen]; byte[] buffer = new byte[bytesLen];
return FutureIOSupport.awaitFuture( return FutureIO.awaitFuture(
fs.openFile(path).build().thenApply(in -> { fs.openFile(path).build().thenApply(in -> {
try { try {
IOUtils.readFully(in, buffer, 0, bytesLen); IOUtils.readFully(in, buffer, 0, bytesLen);