HADOOP-18925. S3A: option to enable/disable CopyFromLocalOperation (#6163)

Add a new option:
fs.s3a.optimized.copy.from.local.enabled

This will enable (default) or disable the
optimized CopyFromLocalOperation upload operation
when copyFromLocalFile() is invoked.

When false the superclass implementation is used; duration
statistics are still collected, though audit span entries
in logs will be for the individual fs operations, not the
overall operation.

Contributed by Steve Loughran
This commit is contained in:
Steve Loughran 2023-11-06 16:00:56 +00:00 committed by GitHub
parent 077263d9f3
commit ef7fb64764
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 139 additions and 30 deletions

View File

@ -1347,4 +1347,17 @@ private Constants() {
*/
public static final boolean DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT = false;
/**
* Is the higher performance copy from local file to S3 enabled?
* This switch allows for it to be disabled if there are problems.
* Value: {@value}.
*/
public static final String OPTIMIZED_COPY_FROM_LOCAL = "fs.s3a.optimized.copy.from.local.enabled";
/**
* Default value for {@link #OPTIMIZED_COPY_FROM_LOCAL}.
* Value: {@value}.
*/
public static final boolean OPTIMIZED_COPY_FROM_LOCAL_DEFAULT = true;
}

View File

@ -462,6 +462,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private String scheme = FS_S3A;
/**
* Flag to indicate that the higher performance copyFromLocalFile implementation
* should be used.
*/
private boolean optimizedCopyFromLocal;
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@ -696,6 +702,9 @@ public void initialize(URI name, Configuration originalConf)
AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
vectoredIOContext = populateVectoredIOContext(conf);
scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL,
OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
} catch (SdkException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
@ -4021,9 +4030,9 @@ private boolean s3Exists(final Path path, final Set<StatusProbeEnum> probes)
* the given dst name.
*
* This version doesn't need to create a temporary file to calculate the md5.
* Sadly this doesn't seem to be used by the shell cp :(
* If {@link Constants#OPTIMIZED_COPY_FROM_LOCAL} is set to false,
* the superclass implementation is used.
*
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param src path
@ -4031,35 +4040,59 @@ private boolean s3Exists(final Path path, final Set<StatusProbeEnum> probes)
* @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and
* overwrite==false
* @throws SdkException failure in the AWS SDK
*/
@Override
@AuditEntryPoint
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
Path dst) throws IOException {
checkNotClosed();
LOG.debug("Copying local file from {} to {}", src, dst);
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
() -> new CopyFromLocalOperation(
LOG.debug("Copying local file from {} to {} (delSrc={} overwrite={}",
src, dst, delSrc, overwrite);
if (optimizedCopyFromLocal) {
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () ->
new CopyFromLocalOperation(
createStoreContext(),
src,
dst,
delSrc,
overwrite,
createCopyFromLocalCallbacks()).execute());
createCopyFromLocalCallbacks(getActiveAuditSpan()))
.execute());
} else {
// call the superclass, but still count statistics.
// there is no overall span here, as each FS API call will
// be in its own span.
LOG.debug("Using base copyFromLocalFile implementation");
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
super.copyFromLocalFile(delSrc, overwrite, src, dst);
return null;
});
}
}
/**
* Create the CopyFromLocalCallbacks;
* protected to assist in mocking.
* @param span audit span.
* @return the callbacks
* @throws IOException failure to get the local fs.
*/
protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
createCopyFromLocalCallbacks() throws IOException {
createCopyFromLocalCallbacks(final AuditSpanS3A span) throws IOException {
LocalFileSystem local = getLocal(getConf());
return new CopyFromLocalCallbacksImpl(local);
return new CopyFromLocalCallbacksImpl(span, local);
}
protected final class CopyFromLocalCallbacksImpl implements
CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
/** Span to use for all operations. */
private final AuditSpanS3A span;
private final LocalFileSystem local;
private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
private CopyFromLocalCallbacksImpl(final AuditSpanS3A span,
LocalFileSystem local) {
this.span = span;
this.local = local;
}
@ -4081,18 +4114,16 @@ public boolean deleteLocal(Path path, boolean recursive) throws IOException {
@Override
public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException {
trackDurationAndSpan(
OBJECT_PUT_REQUESTS,
to,
() -> {
// the duration of the put is measured, but the active span is the
// constructor-supplied one -this ensures all audit log events are grouped correctly
span.activate();
trackDuration(getDurationTrackerFactory(), OBJECT_PUT_REQUESTS.getSymbol(), () -> {
final String key = pathToKey(to);
Progressable progress = null;
PutObjectRequest.Builder putObjectRequestBuilder =
newPutObjectRequestBuilder(key, file.length(), false);
S3AFileSystem.this.invoker.retry("putObject(" + "" + ")", to.toString(), true,
() -> executePut(putObjectRequestBuilder.build(), progress, putOptionsForPath(to),
file));
final String dest = to.toString();
S3AFileSystem.this.invoker.retry("putObject(" + dest + ")", dest, true, () ->
executePut(putObjectRequestBuilder.build(), null, putOptionsForPath(to), file));
return null;
});
}
@ -5399,6 +5430,10 @@ public boolean hasPathCapability(final Path path, final String capability)
case FS_S3A_CREATE_PERFORMANCE_ENABLED:
return performanceCreation;
// is the optimized copy from local enabled.
case OPTIMIZED_COPY_FROM_LOCAL:
return optimizedCopyFromLocal;
default:
return super.hasPathCapability(p, cap);
}

View File

@ -1544,3 +1544,13 @@ software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP
When this happens, try to set `fs.s3a.connection.request.timeout` to a larger value or disable it
completely by setting it to `0`.
### <a name="debug-switches"></a> Debugging Switches
There are some switches which can be set to enable/disable features and assist
in isolating problems and at least make them "go away".
| Key | Default | Action |
|------|---------|----------|
| `fs.s3a.optimized.copy.from.local.enabled` | `true` | [HADOOP-18925](https://issues.apache.org/jira/browse/HADOOP-18925) enable/disable CopyFromLocalOperation. Also a path capability. |

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.fs.s3a;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest;
@ -26,18 +28,67 @@
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.hadoop.fs.s3a.Constants.OPTIMIZED_COPY_FROM_LOCAL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test copying files from the local filesystem to S3A.
* Parameterized on whether or not the optimized
* copyFromLocalFile is enabled.
*/
@RunWith(Parameterized.class)
public class ITestS3ACopyFromLocalFile extends
AbstractContractCopyFromLocalTest {
/**
* Parameterization.
*/
@Parameterized.Parameters(name = "enabled={0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{true},
{false},
});
}
private final boolean enabled;
public ITestS3ACopyFromLocalFile(final boolean enabled) {
this.enabled = enabled;
}
@Override
protected Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(getTestBucketName(conf), conf,
OPTIMIZED_COPY_FROM_LOCAL);
conf.setBoolean(OPTIMIZED_COPY_FROM_LOCAL, enabled);
disableFilesystemCaching(conf);
return conf;
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
@Test
public void testOptionPropagation() throws Throwable {
Assertions.assertThat(getFileSystem().hasPathCapability(new Path("/"),
OPTIMIZED_COPY_FROM_LOCAL))
.describedAs("path capability of %s", OPTIMIZED_COPY_FROM_LOCAL)
.isEqualTo(enabled);
}
@Test
public void testLocalFilesOnly() throws Throwable {
describe("Copying into other file systems must fail");