HADOOP-17483. Magic committer is enabled by default. (#2656)
* core-default.xml updated so that fs.s3a.committer.magic.enabled = true * CommitConstants updated to match * All tests which previously enabled the magic committer now rely on default settings. This helps make sure it is enabled. * Docs cover the switch, mention its enabled and explain why you may want to disable it. Note: this doesn't switch to using the committer -it just enables the path rewriting magic which it depends on. Contributed by Steve Loughran.
This commit is contained in:
parent
9628aa87bf
commit
0bb52a42e5
@ -1873,11 +1873,9 @@
|
|||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.committer.magic.enabled</name>
|
<name>fs.s3a.committer.magic.enabled</name>
|
||||||
<value>false</value>
|
<value>true</value>
|
||||||
<description>
|
<description>
|
||||||
Enable support in the filesystem for the S3 "Magic" committer.
|
Enable support in the S3A filesystem for the "Magic" committer.
|
||||||
When working with AWS S3, S3Guard must be enabled for the destination
|
|
||||||
bucket, as consistent metadata listings are required.
|
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
@ -108,9 +108,9 @@ private CommitConstants() {
|
|||||||
= "s3a:magic.committer";
|
= "s3a:magic.committer";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is the committer enabled by default? No.
|
* Is the committer enabled by default: {@value}.
|
||||||
*/
|
*/
|
||||||
public static final boolean DEFAULT_MAGIC_COMMITTER_ENABLED = false;
|
public static final boolean DEFAULT_MAGIC_COMMITTER_ENABLED = true;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the "Pending" directory of the {@code FileOutputCommitter};
|
* This is the "Pending" directory of the {@code FileOutputCommitter};
|
||||||
|
@ -434,8 +434,6 @@ public void createSuccessMarker(Path outputPath,
|
|||||||
conf.getTrimmed(METADATASTORE_AUTHORITATIVE, "false"));
|
conf.getTrimmed(METADATASTORE_AUTHORITATIVE, "false"));
|
||||||
successData.addDiagnostic(AUTHORITATIVE_PATH,
|
successData.addDiagnostic(AUTHORITATIVE_PATH,
|
||||||
conf.getTrimmed(AUTHORITATIVE_PATH, ""));
|
conf.getTrimmed(AUTHORITATIVE_PATH, ""));
|
||||||
successData.addDiagnostic(MAGIC_COMMITTER_ENABLED,
|
|
||||||
conf.getTrimmed(MAGIC_COMMITTER_ENABLED, "false"));
|
|
||||||
|
|
||||||
// now write
|
// now write
|
||||||
Path markerPath = new Path(outputPath, _SUCCESS);
|
Path markerPath = new Path(outputPath, _SUCCESS);
|
||||||
|
@ -685,6 +685,27 @@ Conflict management is left to the execution engine itself.
|
|||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Disabling magic committer path rewriting
|
||||||
|
|
||||||
|
The magic committer recognizes when files are created under paths with `__magic/` as a parent directory
|
||||||
|
and redirects the upload to a different location, adding the information needed to complete the upload
|
||||||
|
in the job commit operation.
|
||||||
|
|
||||||
|
If, for some reason, you *do not* want these paths to be redirected and not manifest until later,
|
||||||
|
the feature can be disabled by setting `fs.s3a.committer.magic.enabled` to false.
|
||||||
|
|
||||||
|
By default it is true.
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.committer.magic.enabled</name>
|
||||||
|
<value>true</value>
|
||||||
|
<description>
|
||||||
|
Enable support in the S3A filesystem for the "Magic" committer.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
```
|
||||||
|
|
||||||
## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination
|
## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination
|
||||||
|
|
||||||
It is sometimes possible for multiple jobs to simultaneously write to the same destination path.
|
It is sometimes possible for multiple jobs to simultaneously write to the same destination path.
|
||||||
|
@ -26,7 +26,6 @@
|
|||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
|
import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
|
||||||
|
|
||||||
@ -71,8 +70,6 @@ public Configuration createConfiguration() {
|
|||||||
// test we don't issue request to AWS DynamoDB service.
|
// test we don't issue request to AWS DynamoDB service.
|
||||||
conf.setClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
|
conf.setClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
|
||||||
MetadataStore.class);
|
MetadataStore.class);
|
||||||
// FS is always magic
|
|
||||||
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
|
|
||||||
// use minimum multipart size for faster triggering
|
// use minimum multipart size for faster triggering
|
||||||
conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
|
conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
|
||||||
conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
|
conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
|
||||||
|
@ -93,7 +93,6 @@
|
|||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -628,9 +627,6 @@ public static Configuration prepareTestConfiguration(final Configuration conf) {
|
|||||||
conf.set(HADOOP_TMP_DIR, tmpDir);
|
conf.set(HADOOP_TMP_DIR, tmpDir);
|
||||||
}
|
}
|
||||||
conf.set(BUFFER_DIR, tmpDir);
|
conf.set(BUFFER_DIR, tmpDir);
|
||||||
// add this so that even on tests where the FS is shared,
|
|
||||||
// the FS is always "magic"
|
|
||||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
|
||||||
|
|
||||||
// directory marker policy
|
// directory marker policy
|
||||||
String directoryRetention = getTestProperty(
|
String directoryRetention = getTestProperty(
|
||||||
|
@ -555,7 +555,6 @@ public void testAssumedRoleRetryHandler() throws Throwable {
|
|||||||
public void testRestrictedCommitActions() throws Throwable {
|
public void testRestrictedCommitActions() throws Throwable {
|
||||||
describe("Attempt commit operations against a path with restricted rights");
|
describe("Attempt commit operations against a path with restricted rights");
|
||||||
Configuration conf = createAssumedRoleConfig();
|
Configuration conf = createAssumedRoleConfig();
|
||||||
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
|
|
||||||
final int uploadPartSize = 5 * 1024 * 1024;
|
final int uploadPartSize = 5 * 1024 * 1024;
|
||||||
|
|
||||||
ProgressCounter progress = new ProgressCounter();
|
ProgressCounter progress = new ProgressCounter();
|
||||||
|
@ -117,7 +117,7 @@ protected Configuration createConfiguration() {
|
|||||||
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
|
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
|
||||||
FAST_UPLOAD_BUFFER);
|
FAST_UPLOAD_BUFFER);
|
||||||
|
|
||||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
conf.setBoolean(MAGIC_COMMITTER_ENABLED, DEFAULT_MAGIC_COMMITTER_ENABLED);
|
||||||
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
|
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
|
||||||
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
|
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
|
||||||
conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY);
|
conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY);
|
||||||
|
@ -66,13 +66,6 @@ public boolean useInconsistentClient() {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Configuration createConfiguration() {
|
|
||||||
Configuration conf = super.createConfiguration();
|
|
||||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getCommitterFactoryName() {
|
protected String getCommitterFactoryName() {
|
||||||
return CommitConstants.S3A_COMMITTER_FACTORY;
|
return CommitConstants.S3A_COMMITTER_FACTORY;
|
||||||
|
@ -26,7 +26,6 @@
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -83,17 +82,6 @@ public String getTestSuiteName() {
|
|||||||
return "ITestS3AHugeMagicCommits";
|
return "ITestS3AHugeMagicCommits";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Create the scale IO conf with the committer enabled.
|
|
||||||
* @return the configuration to use for the test FS.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
protected Configuration createScaleConfiguration() {
|
|
||||||
Configuration conf = super.createScaleConfiguration();
|
|
||||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
super.setup();
|
super.setup();
|
||||||
|
@ -54,7 +54,6 @@
|
|||||||
|
|
||||||
import static java.util.Optional.empty;
|
import static java.util.Optional.empty;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
|
||||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs Terasort against S3A.
|
* Runs Terasort against S3A.
|
||||||
@ -155,7 +154,6 @@ public void setup() throws Exception {
|
|||||||
@Override
|
@Override
|
||||||
protected void applyCustomConfigOptions(JobConf conf) {
|
protected void applyCustomConfigOptions(JobConf conf) {
|
||||||
// small sample size for faster runs
|
// small sample size for faster runs
|
||||||
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
|
|
||||||
conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
|
conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
|
||||||
getSampleSizeForEachPartition());
|
getSampleSizeForEachPartition());
|
||||||
conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
|
conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
|
||||||
|
@ -49,7 +49,6 @@
|
|||||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.ExitUtil;
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
@ -61,7 +60,6 @@
|
|||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption;
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.BucketInfo.IS_MARKER_AWARE;
|
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.BucketInfo.IS_MARKER_AWARE;
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.E_BAD_STATE;
|
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.INVALID_ARGUMENT;
|
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.INVALID_ARGUMENT;
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
|
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardToolTestHelper.exec;
|
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardToolTestHelper.exec;
|
||||||
@ -590,16 +588,8 @@ public void testProbeForMagic() throws Throwable {
|
|||||||
String name = fs.getUri().toString();
|
String name = fs.getUri().toString();
|
||||||
S3GuardTool.BucketInfo cmd = new S3GuardTool.BucketInfo(
|
S3GuardTool.BucketInfo cmd = new S3GuardTool.BucketInfo(
|
||||||
getConfiguration());
|
getConfiguration());
|
||||||
if (fs.hasPathCapability(fs.getWorkingDirectory(),
|
// this must always work
|
||||||
CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER)) {
|
|
||||||
// if the FS is magic, expect this to work
|
|
||||||
exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name);
|
exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name);
|
||||||
} else {
|
|
||||||
// if the FS isn't magic, expect the probe to fail
|
|
||||||
assertExitCode(E_BAD_STATE,
|
|
||||||
intercept(ExitUtil.ExitException.class,
|
|
||||||
() -> exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user