HADOOP-15107. Stabilize/tune S3A committers; review correctness & docs.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2018-08-30 14:49:53 +01:00
parent e8d138ca7c
commit 5a0babf765
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
19 changed files with 542 additions and 96 deletions

View File

@ -57,8 +57,8 @@ public abstract class PathOutputCommitter extends OutputCommitter {
protected PathOutputCommitter(Path outputPath,
TaskAttemptContext context) throws IOException {
this.context = Preconditions.checkNotNull(context, "Null context");
LOG.debug("Creating committer with output path {} and task context"
+ " {}", outputPath, context);
LOG.debug("Instantiating committer {} with output path {} and task context"
+ " {}", this, outputPath, context);
}
/**
@ -71,8 +71,8 @@ protected PathOutputCommitter(Path outputPath,
protected PathOutputCommitter(Path outputPath,
JobContext context) throws IOException {
this.context = Preconditions.checkNotNull(context, "Null context");
LOG.debug("Creating committer with output path {} and job context"
+ " {}", outputPath, context);
LOG.debug("Instantiating committer {} with output path {} and job context"
+ " {}", this, outputPath, context);
}
/**
@ -103,6 +103,8 @@ public boolean hasOutputPath() {
@Override
public String toString() {
return "PathOutputCommitter{context=" + context + '}';
return "PathOutputCommitter{context=" + context
+ "; " + super.toString()
+ '}';
}
}

View File

@ -130,8 +130,9 @@ public static void once(String action, String path, VoidOperation operation)
}
/**
* Execute an operation and ignore all raised IOExceptions; log at INFO.
* @param log log to log at info.
* Execute an operation and ignore all raised IOExceptions; log at INFO;
* full stack only at DEBUG.
* @param log log to use.
* @param action action to include in log
* @param path optional path to include in log
* @param operation operation to execute
@ -145,13 +146,17 @@ public static <T> void ignoreIOExceptions(
try {
once(action, path, operation);
} catch (IOException e) {
log.info("{}: {}", toDescription(action, path), e.toString(), e);
String description = toDescription(action, path);
String error = e.toString();
log.info("{}: {}", description, error);
log.debug("{}", description, e);
}
}
/**
* Execute an operation and ignore all raised IOExceptions; log at INFO.
* @param log log to log at info.
* Execute an operation and ignore all raised IOExceptions; log at INFO;
* full stack only at DEBUG.
* @param log log to use.
* @param action action to include in log
* @param path optional path to include in log
* @param operation operation to execute

View File

@ -292,7 +292,7 @@ public String toString() {
final StringBuilder sb = new StringBuilder(
"AbstractS3ACommitter{");
sb.append("role=").append(role);
sb.append(", name").append(getName());
sb.append(", name=").append(getName());
sb.append(", outputPath=").append(getOutputPath());
sb.append(", workPath=").append(workPath);
sb.append('}');
@ -532,8 +532,14 @@ protected void abortPendingUploadsInCleanup(
new DurationInfo(LOG, "Aborting all pending commits under %s",
dest)) {
CommitOperations ops = getCommitOperations();
List<MultipartUpload> pending = ops
.listPendingUploadsUnderPath(dest);
List<MultipartUpload> pending;
try {
pending = ops.listPendingUploadsUnderPath(dest);
} catch (IOException e) {
// raised if the listPendingUploads call failed.
maybeIgnore(suppressExceptions, "aborting pending uploads", e);
return;
}
Tasks.foreach(pending)
.executeWith(buildThreadPool(getJobContext()))
.suppressExceptions(suppressExceptions)
@ -656,7 +662,7 @@ protected void maybeIgnore(
}
/**
* Execute an operation; maybe suppress any raised IOException.
* Log or rethrow a caught IOException.
* @param suppress should raised IOEs be suppressed?
* @param action action (for logging when the IOE is suppressed.
* @param ex exception
@ -667,7 +673,7 @@ protected void maybeIgnore(
String action,
IOException ex) throws IOException {
if (suppress) {
LOG.info(action, ex);
LOG.debug(action, ex);
} else {
throw ex;
}

View File

@ -77,9 +77,20 @@ public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
AbstractS3ACommitterFactory factory = chooseCommitterFactory(fileSystem,
outputPath,
context.getConfiguration());
return factory != null ?
factory.createTaskCommitter(fileSystem, outputPath, context)
: createFileOutputCommitter(outputPath, context);
if (factory != null) {
PathOutputCommitter committer = factory.createTaskCommitter(
fileSystem, outputPath, context);
LOG.info("Using committer {} to output data to {}",
(committer instanceof AbstractS3ACommitter
? ((AbstractS3ACommitter) committer).getName()
: committer.toString()),
outputPath);
return committer;
} else {
LOG.warn("Using standard FileOutputCommitter to commit work."
+ " This is slow and potentially unsafe.");
return createFileOutputCommitter(outputPath, context);
}
}
/**
@ -104,6 +115,7 @@ private AbstractS3ACommitterFactory chooseCommitterFactory(
String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name);
LOG.debug("Committer option is {}", name);
switch (name) {
case COMMITTER_NAME_FILE:
factory = null;

View File

@ -285,4 +285,11 @@ public Path getTempTaskAttemptPath(TaskAttemptContext context) {
return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath());
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"MagicCommitter{");
sb.append('}');
return sb.toString();
}
}

View File

@ -27,13 +27,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
/**
* This commits to a directory.
@ -70,10 +68,8 @@ public void setupJob(JobContext context) throws IOException {
if (getConflictResolutionMode(context, fs.getConf())
== ConflictResolution.FAIL
&& fs.exists(outputPath)) {
LOG.debug("Failing commit by task attempt {} to write"
+ " to existing output path {}",
context.getJobID(), getOutputPath());
throw new PathExistsException(outputPath.toString(), E_DEST_EXISTS);
throw failDestinationExists(outputPath,
"Setting job as " + getRole());
}
}

View File

@ -31,14 +31,12 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
/**
* Partitioned committer.
@ -100,11 +98,8 @@ protected int commitTaskInternal(TaskAttemptContext context,
Path partitionPath = getFinalPath(partition + "/file",
context).getParent();
if (fs.exists(partitionPath)) {
LOG.debug("Failing commit by task attempt {} to write"
+ " to existing path {} under {}",
context.getTaskAttemptID(), partitionPath, getOutputPath());
throw new PathExistsException(partitionPath.toString(),
E_DEST_EXISTS);
throw failDestinationExists(partitionPath,
"Committing task " + context.getTaskAttemptID());
}
}
}

View File

@ -167,13 +167,15 @@ public static Path getLocalTaskAttemptTempDir(final Configuration conf,
return FileSystem.getLocal(conf).makeQualified(
allocator.getLocalPathForWrite(uuid, conf));
});
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (UncheckedExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} catch (ExecutionException | UncheckedExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(e);
if (cause instanceof IOException) {
throw (IOException) cause;
}
throw new IOException(e);
}
}

View File

@ -36,6 +36,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
@ -500,6 +502,10 @@ protected List<SinglePendingCommit> listPendingUploads(
listAndFilter(attemptFS,
wrappedJobAttemptPath, false,
HIDDEN_FILE_FILTER));
} catch (FileNotFoundException e) {
// this can mean the job was aborted early on, so don't confuse people
// with long stack traces that aren't the underlying problem.
maybeIgnore(suppressExceptions, "Pending upload directory not found", e);
} catch (IOException e) {
// unable to work with endpoint, if suppressing errors decide our actions
maybeIgnore(suppressExceptions, "Listing pending uploads", e);
@ -565,13 +571,13 @@ protected void abortJobInternal(JobContext context,
}
/**
* Delete the working paths of a job. Does not attempt to clean up
* the work of the wrapped committer.
* Delete the working paths of a job.
* <ol>
* <li>The job attempt path</li>
* <li>$dest/__temporary</li>
* <li>{@code $dest/__temporary}</li>
* <li>the local working directory for staged files</li>
* </ol>
* Does not attempt to clean up the work of the wrapped committer.
* @param context job context
* @throws IOException IO failure
*/
@ -835,6 +841,44 @@ public final ConflictResolution getConflictResolutionMode(
return conflictResolution;
}
/**
* Generate a {@link PathExistsException} because the destination exists.
* Lists some of the child entries first, to help diagnose the problem.
* @param path path which exists
* @param description description (usually task/job ID)
* @return an exception to throw
*/
protected PathExistsException failDestinationExists(final Path path,
final String description) {
LOG.error("{}: Failing commit by job {} to write"
+ " to existing output path {}.",
description,
getJobContext().getJobID(), path);
// List the first 10 descendants, to give some details
// on what is wrong but not overload things if there are many files.
try {
int limit = 10;
RemoteIterator<LocatedFileStatus> lf
= getDestFS().listFiles(path, true);
LOG.info("Partial Directory listing");
while (limit > 0 && lf.hasNext()) {
limit--;
LocatedFileStatus status = lf.next();
LOG.info("{}: {}",
status.getPath(),
status.isDirectory()
? " dir"
: ("file size " + status.getLen() + " bytes"));
}
} catch (IOException e) {
LOG.info("Discarding exception raised when listing {}: " + e, path);
LOG.debug("stack trace ", e);
}
return new PathExistsException(path.toString(),
description + ": " + InternalCommitterConstants.E_DEST_EXISTS);
}
/**
* Get the conflict mode option string.
* @param context context with the config

View File

@ -230,7 +230,6 @@ None: directories are created on demand.
Rename task attempt path to task committed path.
```python
def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest):
return fs.exists(taskAttemptPath)
@ -276,12 +275,12 @@ def commitJob(fs, jobAttemptDir, dest):
(See below for details on `mergePaths()`)
A failure during job abort cannot be recovered from except by re-executing
A failure during job commit cannot be recovered from except by re-executing
the entire query:
```python
def isCommitJobRepeatable() :
return True
return False
```
Accordingly, it is a failure point in the protocol. With a low number of files
@ -307,12 +306,28 @@ def cleanupJob(fs, dest):
```
### Job Recovery
### Job Recovery Before `commitJob()`
1. Data under task committed paths is retained
1. All directories under `$dest/_temporary/$appAttemptId/_temporary/` are deleted.
For all committers, the recovery process takes place in the application
master.
1. The job history file of the previous attempt is loaded and scanned
to determine which tasks were recorded as having succeeded.
1. For each successful task, the job committer has its `recoverTask()` method
invoked with a `TaskAttemptContext` built from the previous attempt's details.
1. If the method does not raise an exception, it is considered to have been
recovered, and not to be re-executed.
1. All other tasks are queued for execution.
Uncommitted/unexecuted tasks are (re)executed.
For the v1 committer, task recovery is straightforward.
The directory of the committed task from the previous attempt is
moved under the directory of the current application attempt.
```python
def recoverTask(tac):
oldAttemptId = appAttemptId - 1
fs.rename('$dest/_temporary/oldAttemptId/${tac.taskId}',
'$dest/_temporary/appAttemptId/${tac.taskId}')
```
This significantly improves time to recover from Job driver (here MR AM) failure.
The only lost work is that of all tasks in progress -those which had generated
@ -330,6 +345,11 @@ failure simply by rerunning the entire job. This is implicitly the strategy
in Spark, which does not attempt to recover any in-progress jobs. The faster
your queries, the simpler your recovery strategy needs to be.
### Job Recovery During `commitJob()`
This is not possible; a failure during job commit requires the entire job
to be re-executed after cleaning up the destination directory.
### `mergePaths(FileSystem fs, FileStatus src, Path dest)` Algorithm
`mergePaths()` is the core algorithm to merge data; it is somewhat confusing
@ -352,8 +372,7 @@ def mergePathsV1(fs, src, dest) :
fs.delete(dest, recursive = True)
fs.rename(src.getPath, dest)
else :
# destination is directory, choose action on source type
if src.isDirectory :
# src is directory, choose action on dest type
if not toStat is None :
if not toStat.isDirectory :
# Destination exists and is not a directory
@ -369,7 +388,7 @@ def mergePathsV1(fs, src, dest) :
fs.rename(src.getPath(), dest)
```
## v2 commit algorithm
## The v2 Commit Algorithm
The v2 algorithm directly commits task output into the destination directory.
@ -506,12 +525,31 @@ Cost: `O(1)` for normal filesystems, `O(files)` for object stores.
As no data is written to the destination directory, a task can be cleaned up
by deleting the task attempt directory.
### v2 Job Recovery
### v2 Job Recovery Before `commitJob()`
Because the data has been renamed into the destination directory, it is nominally
recoverable. However, this assumes that the number and name of generated
files are constant on retried tasks.
Because the data has been renamed into the destination directory, all tasks
recorded as having being committed have no recovery needed at all:
```python
def recoverTask(tac):
```
All active and queued tasks are scheduled for execution.
There is a weakness here, the same one on a failure during `commitTask()`:
it is only safe to repeat a task which failed during that commit operation
if the name of all generated files are constant across all task attempts.
If the Job AM fails while a task attempt has been instructed to commit,
and that commit is not recorded as having completed, the state of that
in-progress task is unknown...really it isn't be safe to recover the
job at this point.
### v2 Job Recovery During `commitJob()`
This is straightforward: `commitJob()` is re-invoked.
## How MapReduce uses the committer in a task
@ -896,7 +934,7 @@ and metadata.
POST bucket.s3.aws.com/path?uploads
An UploadId is returned
An `UploadId` is returned
1. Caller uploads one or more parts.
@ -994,7 +1032,7 @@ Task outputs are directed to the local FS by `getTaskAttemptPath` and `getWorkPa
The single-directory and partitioned committers handle conflict resolution by
checking whether target paths exist in S3 before uploading any data.
There are 3 conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`:
There are three conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`:
* `fail`: Fail a task if an output directory or partition already exists. (Default)
* `append`: Upload data files without checking whether directories or partitions already exist.

View File

@ -371,7 +371,7 @@ Put differently: start with the Directory Committer.
To use an S3A committer, the property `mapreduce.outputcommitter.factory.scheme.s3a`
must be set to the S3A committer factory, `org.apache.hadoop.fs.s3a.commit.staging.S3ACommitterFactory`.
This is done in `core-default.xml`
This is done in `mapred-default.xml`
```xml
<property>

View File

@ -173,6 +173,25 @@ public void setup() throws Exception {
}
}
/**
* Create a random Job ID using the fork ID as part of the number.
* @return fork ID string in a format parseable by Jobs
* @throws Exception failure
*/
protected String randomJobId() throws Exception {
String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001");
int l = testUniqueForkId.length();
String trailingDigits = testUniqueForkId.substring(l - 4, l);
try {
int digitValue = Integer.valueOf(trailingDigits);
return String.format("20070712%04d_%04d",
(long)(Math.random() * 1000),
digitValue);
} catch (NumberFormatException e) {
throw new Exception("Failed to parse " + trailingDigits, e);
}
}
/**
* Teardown waits for the consistency delay and resets failure count, so
* FS is stable, before the superclass teardown is called. This

View File

@ -38,7 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -266,9 +265,9 @@ public int getTestFileCount() {
/**
* Override point to let implementations tune the MR Job conf.
* @param c configuration
* @param jobConf configuration
*/
protected void applyCustomConfigOptions(Configuration c) {
protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
}

View File

@ -159,25 +159,6 @@ public void setup() throws Exception {
cleanupDestDir();
}
/**
* Create a random Job ID using the fork ID as part of the number.
* @return fork ID string in a format parseable by Jobs
* @throws Exception failure
*/
private String randomJobId() throws Exception {
String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001");
int l = testUniqueForkId.length();
String trailingDigits = testUniqueForkId.substring(l - 4, l);
try {
int digitValue = Integer.valueOf(trailingDigits);
return String.format("20070712%04d_%04d",
(long)(Math.random() * 1000),
digitValue);
} catch (NumberFormatException e) {
throw new Exception("Failed to parse " + trailingDigits, e);
}
}
@Override
public void teardown() throws Exception {
describe("teardown");
@ -765,6 +746,7 @@ public void testCommitLifecycle() throws Exception {
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
validateTaskAttemptWorkingDirectory(committer, tContext);
// write output
describe("1. Writing output");
@ -1360,12 +1342,55 @@ public void testParallelJobsToAdjacentPaths() throws Throwable {
}
@Test
public void testS3ACommitterFactoryBinding() throws Throwable {
describe("Verify that the committer factory returns this "
+ "committer when configured to do so");
Job job = newJob();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
taskAttempt0);
String name = getCommitterName();
S3ACommitterFactory factory = new S3ACommitterFactory();
assertEquals("Wrong committer from factory",
createCommitter(outDir, tContext).getClass(),
factory.createOutputCommitter(outDir, tContext).getClass());
}
/**
* Validate the path of a file being written to during the write
* itself.
* @param p path
* @throws IOException IO failure
*/
protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
}
/**
* Validate the path of a file being written to after the write
* operation has completed.
* @param p path
* @throws IOException IO failure
*/
protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
}
/**
* Perform any actions needed to validate the working directory of
* a committer.
* For example: filesystem, path attributes
* @param committer committer instance
* @param context task attempt context
* @throws IOException IO failure
*/
protected void validateTaskAttemptWorkingDirectory(
AbstractS3ACommitter committer,
TaskAttemptContext context) throws IOException {
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit;
import java.io.IOException;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
/**
* Tests for some aspects of the committer factory.
* All tests are grouped into one single test so that only one
* S3A FS client is set up and used for the entire run.
* Saves time and money.
*/
public class ITestS3ACommitterFactory extends AbstractCommitITest {
protected static final String INVALID_NAME = "invalid-name";
/**
* Counter to guarantee that even in parallel test runs, no job has the same
* ID.
*/
private String jobId;
// A random task attempt id for testing.
private String attempt0;
private TaskAttemptID taskAttempt0;
private Path outDir;
private S3ACommitterFactory factory;
private TaskAttemptContext tContext;
/**
* Parameterized list of bindings of committer name in config file to
* expected class instantiated.
*/
private static final Object[][] bindings = {
{COMMITTER_NAME_FILE, FileOutputCommitter.class},
{COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
{COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
{InternalCommitterConstants.COMMITTER_NAME_STAGING,
StagingCommitter.class},
{COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
};
/**
* This is a ref to the FS conf, so changes here are visible
* to callers querying the FS config.
*/
private Configuration filesystemConfRef;
private Configuration taskConfRef;
@Override
public void setup() throws Exception {
super.setup();
jobId = randomJobId();
attempt0 = "attempt_" + jobId + "_m_000000_0";
taskAttempt0 = TaskAttemptID.forName(attempt0);
outDir = path(getMethodName());
factory = new S3ACommitterFactory();
Configuration conf = new Configuration();
conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
filesystemConfRef = getFileSystem().getConf();
tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
taskConfRef = tContext.getConfiguration();
}
@Test
public void testEverything() throws Throwable {
testImplicitFileBinding();
testBindingsInTask();
testBindingsInFSConfig();
testInvalidFileBinding();
testInvalidTaskBinding();
}
/**
* Verify that if all config options are unset, the FileOutputCommitter
*
* is returned.
*/
public void testImplicitFileBinding() throws Throwable {
taskConfRef.unset(FS_S3A_COMMITTER_NAME);
filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
}
/**
* Verify that task bindings are picked up.
*/
public void testBindingsInTask() throws Throwable {
// set this to an invalid value to be confident it is not
// being checked.
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
for (Object[] binding : bindings) {
taskConfRef.set(FS_S3A_COMMITTER_NAME,
(String) binding[0]);
assertFactoryCreatesExpectedCommitter((Class) binding[1]);
}
}
/**
* Verify that FS bindings are picked up.
*/
public void testBindingsInFSConfig() throws Throwable {
taskConfRef.unset(FS_S3A_COMMITTER_NAME);
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
for (Object[] binding : bindings) {
taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
assertFactoryCreatesExpectedCommitter((Class) binding[1]);
}
}
/**
* Create an invalid committer via the FS binding,
*/
public void testInvalidFileBinding() throws Throwable {
taskConfRef.unset(FS_S3A_COMMITTER_NAME);
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
() -> createCommitter());
}
/**
* Create an invalid committer via the task attempt.
*/
public void testInvalidTaskBinding() throws Throwable {
filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
() -> createCommitter());
}
/**
* Assert that the factory creates the expected committer.
* @param expected expected committer class.
* @throws IOException IO failure.
*/
protected void assertFactoryCreatesExpectedCommitter(
final Class expected)
throws IOException {
assertEquals("Wrong Committer from factory",
expected,
createCommitter().getClass());
}
/**
* Create a committer.
* @return the committer
* @throws IOException IO failure.
*/
private PathOutputCommitter createCommitter() throws IOException {
return factory.createOutputCommitter(outDir, tContext);
}
}

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.fs.s3a.commit.magic;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.mapred.JobConf;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@ -30,7 +30,7 @@
*
* There's no need to disable the committer setting for the filesystem here,
* because the committers are being instantiated in their own processes;
* the settings in {@link #applyCustomConfigOptions(Configuration)} are
* the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
* passed down to these processes.
*/
public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
@ -54,7 +54,7 @@ protected String committerName() {
* @param conf configuration
*/
@Override
protected void applyCustomConfigOptions(Configuration conf) {
protected void applyCustomConfigOptions(JobConf conf) {
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@ -32,9 +35,8 @@
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.hamcrest.CoreMatchers.containsString;
/**
* Test the magic committer's commit protocol.
@ -115,6 +117,25 @@ protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
assertPathExists("pending file", pendingFile);
}
/**
* The magic committer paths are always on S3, and always have
* "__magic" in the path.
* @param committer committer instance
* @param context task attempt context
* @throws IOException IO failure
*/
@Override
protected void validateTaskAttemptWorkingDirectory(
final AbstractS3ACommitter committer,
final TaskAttemptContext context) throws IOException {
URI wd = committer.getWorkPath().toUri();
assertEquals("Wrong schema for working dir " + wd
+ " with committer " + committer,
"s3a", wd.getScheme());
assertThat(wd.getPath(),
containsString('/' + CommitConstants.MAGIC + '/'));
}
/**
* The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.test.LambdaTestUtils;
/**
* This is a test to verify that the committer will fail if the destination
* directory exists, and that this happens in job setup.
*/
public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob {
@Override
protected String committerName() {
return StagingCommitter.NAME;
}
/**
* create the destination directory and expect a failure.
* @param conf configuration
*/
@Override
protected void applyCustomConfigOptions(JobConf conf) throws IOException {
// This is the destination in the S3 FS
String outdir = conf.get(FileOutputFormat.OUTDIR);
S3AFileSystem fs = getFileSystem();
Path outputPath = new Path(outdir);
fs.mkdirs(outputPath);
}
@Override
public void testMRJob() throws Exception {
LambdaTestUtils.intercept(FileAlreadyExistsException.class,
"Output directory",
super::testMRJob);
}
}

View File

@ -117,6 +117,19 @@ protected FileSystem getLocalFS() throws IOException {
return FileSystem.getLocal(getConfiguration());
}
/**
* The staging committers always have the local FS for their work.
* @param committer committer instance
* @param context task attempt context
* @throws IOException IO failure
*/
@Override
protected void validateTaskAttemptWorkingDirectory(final AbstractS3ACommitter committer,
final TaskAttemptContext context) throws IOException {
Path wd = context.getWorkingDirectory();
assertEquals("file", wd.toUri().getScheme());
}
/**
* The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.