HADOOP-17483. Magic committer is enabled by default. (#2656)
* core-default.xml updated so that fs.s3a.committer.magic.enabled = true * CommitConstants updated to match * All tests which previously enabled the magic committer now rely on default settings. This helps make sure it is enabled. * Docs cover the switch, mention its enabled and explain why you may want to disable it. Note: this doesn't switch to using the committer -it just enables the path rewriting magic which it depends on. Contributed by Steve Loughran.
This commit is contained in:
parent
3e1eb16837
commit
2d124f2f5e
@ -1873,11 +1873,9 @@
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.committer.magic.enabled</name>
|
||||
<value>false</value>
|
||||
<value>true</value>
|
||||
<description>
|
||||
Enable support in the filesystem for the S3 "Magic" committer.
|
||||
When working with AWS S3, S3Guard must be enabled for the destination
|
||||
bucket, as consistent metadata listings are required.
|
||||
Enable support in the S3A filesystem for the "Magic" committer.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -108,9 +108,9 @@ private CommitConstants() {
|
||||
= "s3a:magic.committer";
|
||||
|
||||
/**
|
||||
* Is the committer enabled by default? No.
|
||||
* Is the committer enabled by default: {@value}.
|
||||
*/
|
||||
public static final boolean DEFAULT_MAGIC_COMMITTER_ENABLED = false;
|
||||
public static final boolean DEFAULT_MAGIC_COMMITTER_ENABLED = true;
|
||||
|
||||
/**
|
||||
* This is the "Pending" directory of the {@code FileOutputCommitter};
|
||||
|
@ -434,8 +434,6 @@ public void createSuccessMarker(Path outputPath,
|
||||
conf.getTrimmed(METADATASTORE_AUTHORITATIVE, "false"));
|
||||
successData.addDiagnostic(AUTHORITATIVE_PATH,
|
||||
conf.getTrimmed(AUTHORITATIVE_PATH, ""));
|
||||
successData.addDiagnostic(MAGIC_COMMITTER_ENABLED,
|
||||
conf.getTrimmed(MAGIC_COMMITTER_ENABLED, "false"));
|
||||
|
||||
// now write
|
||||
Path markerPath = new Path(outputPath, _SUCCESS);
|
||||
|
@ -685,6 +685,27 @@ Conflict management is left to the execution engine itself.
|
||||
|
||||
```
|
||||
|
||||
### Disabling magic committer path rewriting
|
||||
|
||||
The magic committer recognizes when files are created under paths with `__magic/` as a parent directory
|
||||
and redirects the upload to a different location, adding the information needed to complete the upload
|
||||
in the job commit operation.
|
||||
|
||||
If, for some reason, you *do not* want these paths to be redirected and not manifest until later,
|
||||
the feature can be disabled by setting `fs.s3a.committer.magic.enabled` to false.
|
||||
|
||||
By default it is true.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.committer.magic.enabled</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
Enable support in the S3A filesystem for the "Magic" committer.
|
||||
</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination
|
||||
|
||||
It is sometimes possible for multiple jobs to simultaneously write to the same destination path.
|
||||
|
@ -26,7 +26,6 @@
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
|
||||
|
||||
@ -71,8 +70,6 @@ public Configuration createConfiguration() {
|
||||
// test we don't issue request to AWS DynamoDB service.
|
||||
conf.setClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
|
||||
MetadataStore.class);
|
||||
// FS is always magic
|
||||
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
|
||||
// use minimum multipart size for faster triggering
|
||||
conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
|
||||
conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
|
||||
|
@ -93,7 +93,6 @@
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
@ -628,9 +627,6 @@ public static Configuration prepareTestConfiguration(final Configuration conf) {
|
||||
conf.set(HADOOP_TMP_DIR, tmpDir);
|
||||
}
|
||||
conf.set(BUFFER_DIR, tmpDir);
|
||||
// add this so that even on tests where the FS is shared,
|
||||
// the FS is always "magic"
|
||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
||||
|
||||
// directory marker policy
|
||||
String directoryRetention = getTestProperty(
|
||||
|
@ -555,7 +555,6 @@ public void testAssumedRoleRetryHandler() throws Throwable {
|
||||
public void testRestrictedCommitActions() throws Throwable {
|
||||
describe("Attempt commit operations against a path with restricted rights");
|
||||
Configuration conf = createAssumedRoleConfig();
|
||||
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
|
||||
final int uploadPartSize = 5 * 1024 * 1024;
|
||||
|
||||
ProgressCounter progress = new ProgressCounter();
|
||||
|
@ -117,7 +117,7 @@ protected Configuration createConfiguration() {
|
||||
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
|
||||
FAST_UPLOAD_BUFFER);
|
||||
|
||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, DEFAULT_MAGIC_COMMITTER_ENABLED);
|
||||
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
|
||||
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
|
||||
conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY);
|
||||
|
@ -66,13 +66,6 @@ public boolean useInconsistentClient() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
Configuration conf = super.createConfiguration();
|
||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getCommitterFactoryName() {
|
||||
return CommitConstants.S3A_COMMITTER_FACTORY;
|
||||
|
@ -26,7 +26,6 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -83,17 +82,6 @@ public String getTestSuiteName() {
|
||||
return "ITestS3AHugeMagicCommits";
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the scale IO conf with the committer enabled.
|
||||
* @return the configuration to use for the test FS.
|
||||
*/
|
||||
@Override
|
||||
protected Configuration createScaleConfiguration() {
|
||||
Configuration conf = super.createScaleConfiguration();
|
||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
|
@ -54,7 +54,6 @@
|
||||
|
||||
import static java.util.Optional.empty;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
|
||||
|
||||
/**
|
||||
* Runs Terasort against S3A.
|
||||
@ -155,7 +154,6 @@ public void setup() throws Exception {
|
||||
@Override
|
||||
protected void applyCustomConfigOptions(JobConf conf) {
|
||||
// small sample size for faster runs
|
||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
||||
conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
|
||||
getSampleSizeForEachPartition());
|
||||
conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
|
||||
|
@ -49,7 +49,6 @@
|
||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
@ -61,7 +60,6 @@
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption;
|
||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.BucketInfo.IS_MARKER_AWARE;
|
||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.E_BAD_STATE;
|
||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.INVALID_ARGUMENT;
|
||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
|
||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardToolTestHelper.exec;
|
||||
@ -590,16 +588,8 @@ public void testProbeForMagic() throws Throwable {
|
||||
String name = fs.getUri().toString();
|
||||
S3GuardTool.BucketInfo cmd = new S3GuardTool.BucketInfo(
|
||||
getConfiguration());
|
||||
if (fs.hasPathCapability(fs.getWorkingDirectory(),
|
||||
CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER)) {
|
||||
// if the FS is magic, expect this to work
|
||||
// this must always work
|
||||
exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name);
|
||||
} else {
|
||||
// if the FS isn't magic, expect the probe to fail
|
||||
assertExitCode(E_BAD_STATE,
|
||||
intercept(ExitUtil.ExitException.class,
|
||||
() -> exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user