From 5a0babf76550f63dad4c17173c4da2bf335c6532 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 30 Aug 2018 14:49:53 +0100 Subject: [PATCH] HADOOP-15107. Stabilize/tune S3A committers; review correctness & docs. Contributed by Steve Loughran. --- .../lib/output/PathOutputCommitter.java | 12 +- .../org/apache/hadoop/fs/s3a/Invoker.java | 15 +- .../fs/s3a/commit/AbstractS3ACommitter.java | 16 +- .../fs/s3a/commit/S3ACommitterFactory.java | 18 +- .../commit/magic/MagicS3GuardCommitter.java | 7 + .../staging/DirectoryStagingCommitter.java | 8 +- .../staging/PartitionedStagingCommitter.java | 9 +- .../hadoop/fs/s3a/commit/staging/Paths.java | 14 +- .../s3a/commit/staging/StagingCommitter.java | 50 ++++- .../hadoop-aws/committer_architecture.md | 94 +++++--- .../markdown/tools/hadoop-aws/committers.md | 2 +- .../fs/s3a/commit/AbstractCommitITest.java | 19 ++ .../fs/s3a/commit/AbstractITCommitMRJob.java | 5 +- .../s3a/commit/AbstractITCommitProtocol.java | 63 ++++-- .../s3a/commit/ITestS3ACommitterFactory.java | 200 ++++++++++++++++++ .../s3a/commit/magic/ITMagicCommitMRJob.java | 6 +- .../magic/ITestMagicCommitProtocol.java | 25 ++- .../ITStagingCommitMRJobBadDest.java | 62 ++++++ .../ITestStagingCommitProtocol.java | 13 ++ 19 files changed, 542 insertions(+), 96 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java index 3679d9f4d6..5e25f50fef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java @@ -57,8 +57,8 @@ public abstract class PathOutputCommitter extends OutputCommitter { protected PathOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { this.context = Preconditions.checkNotNull(context, "Null context"); - LOG.debug("Creating committer with output path {} and task context" - + " {}", outputPath, context); + LOG.debug("Instantiating committer {} with output path {} and task context" + + " {}", this, outputPath, context); } /** @@ -71,8 +71,8 @@ protected PathOutputCommitter(Path outputPath, protected PathOutputCommitter(Path outputPath, JobContext context) throws IOException { this.context = Preconditions.checkNotNull(context, "Null context"); - LOG.debug("Creating committer with output path {} and job context" - + " {}", outputPath, context); + LOG.debug("Instantiating committer {} with output path {} and job context" + + " {}", this, outputPath, context); } /** @@ -103,6 +103,8 @@ public boolean hasOutputPath() { @Override public String toString() { - return "PathOutputCommitter{context=" + context + '}'; + return "PathOutputCommitter{context=" + context + + "; " + super.toString() + + '}'; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java index a007ba156a..45912a0ac3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java @@ -130,8 +130,9 @@ public static void once(String action, String path, VoidOperation operation) } /** - * Execute an operation and ignore all raised IOExceptions; log at INFO. - * @param log log to log at info. + * Execute an operation and ignore all raised IOExceptions; log at INFO; + * full stack only at DEBUG. + * @param log log to use. * @param action action to include in log * @param path optional path to include in log * @param operation operation to execute @@ -145,13 +146,17 @@ public static void ignoreIOExceptions( try { once(action, path, operation); } catch (IOException e) { - log.info("{}: {}", toDescription(action, path), e.toString(), e); + String description = toDescription(action, path); + String error = e.toString(); + log.info("{}: {}", description, error); + log.debug("{}", description, e); } } /** - * Execute an operation and ignore all raised IOExceptions; log at INFO. - * @param log log to log at info. + * Execute an operation and ignore all raised IOExceptions; log at INFO; + * full stack only at DEBUG. + * @param log log to use. * @param action action to include in log * @param path optional path to include in log * @param operation operation to execute diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index 5f1ddfa6fc..d2501da6aa 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -292,7 +292,7 @@ public String toString() { final StringBuilder sb = new StringBuilder( "AbstractS3ACommitter{"); sb.append("role=").append(role); - sb.append(", name").append(getName()); + sb.append(", name=").append(getName()); sb.append(", outputPath=").append(getOutputPath()); sb.append(", workPath=").append(workPath); sb.append('}'); @@ -532,8 +532,14 @@ protected void abortPendingUploadsInCleanup( new DurationInfo(LOG, "Aborting all pending commits under %s", dest)) { CommitOperations ops = getCommitOperations(); - List pending = ops - .listPendingUploadsUnderPath(dest); + List pending; + try { + pending = ops.listPendingUploadsUnderPath(dest); + } catch (IOException e) { + // raised if the listPendingUploads call failed. + maybeIgnore(suppressExceptions, "aborting pending uploads", e); + return; + } Tasks.foreach(pending) .executeWith(buildThreadPool(getJobContext())) .suppressExceptions(suppressExceptions) @@ -656,7 +662,7 @@ protected void maybeIgnore( } /** - * Execute an operation; maybe suppress any raised IOException. + * Log or rethrow a caught IOException. * @param suppress should raised IOEs be suppressed? * @param action action (for logging when the IOE is suppressed. * @param ex exception @@ -667,7 +673,7 @@ protected void maybeIgnore( String action, IOException ex) throws IOException { if (suppress) { - LOG.info(action, ex); + LOG.debug(action, ex); } else { throw ex; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java index 6b170f9ef4..36d0af187d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java @@ -77,9 +77,20 @@ public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem, AbstractS3ACommitterFactory factory = chooseCommitterFactory(fileSystem, outputPath, context.getConfiguration()); - return factory != null ? - factory.createTaskCommitter(fileSystem, outputPath, context) - : createFileOutputCommitter(outputPath, context); + if (factory != null) { + PathOutputCommitter committer = factory.createTaskCommitter( + fileSystem, outputPath, context); + LOG.info("Using committer {} to output data to {}", + (committer instanceof AbstractS3ACommitter + ? ((AbstractS3ACommitter) committer).getName() + : committer.toString()), + outputPath); + return committer; + } else { + LOG.warn("Using standard FileOutputCommitter to commit work." + + " This is slow and potentially unsafe."); + return createFileOutputCommitter(outputPath, context); + } } /** @@ -104,6 +115,7 @@ private AbstractS3ACommitterFactory chooseCommitterFactory( String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE); name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name); + LOG.debug("Committer option is {}", name); switch (name) { case COMMITTER_NAME_FILE: factory = null; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index c3051416fe..c956a98064 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -285,4 +285,11 @@ public Path getTempTaskAttemptPath(TaskAttemptContext context) { return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath()); } + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "MagicCommitter{"); + sb.append('}'); + return sb.toString(); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java index 3eda24fbe2..23bb06bd92 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java @@ -27,13 +27,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathExistsException; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; -import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*; /** * This commits to a directory. @@ -70,10 +68,8 @@ public void setupJob(JobContext context) throws IOException { if (getConflictResolutionMode(context, fs.getConf()) == ConflictResolution.FAIL && fs.exists(outputPath)) { - LOG.debug("Failing commit by task attempt {} to write" - + " to existing output path {}", - context.getJobID(), getOutputPath()); - throw new PathExistsException(outputPath.toString(), E_DEST_EXISTS); + throw failDestinationExists(outputPath, + "Setting job as " + getRole()); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java index bfaf4434d1..b51bcb5f9c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java @@ -31,14 +31,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathExistsException; import org.apache.hadoop.fs.s3a.commit.PathCommitException; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; -import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*; /** * Partitioned committer. @@ -100,11 +98,8 @@ protected int commitTaskInternal(TaskAttemptContext context, Path partitionPath = getFinalPath(partition + "/file", context).getParent(); if (fs.exists(partitionPath)) { - LOG.debug("Failing commit by task attempt {} to write" - + " to existing path {} under {}", - context.getTaskAttemptID(), partitionPath, getOutputPath()); - throw new PathExistsException(partitionPath.toString(), - E_DEST_EXISTS); + throw failDestinationExists(partitionPath, + "Committing task " + context.getTaskAttemptID()); } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java index d5d256aefb..a941572f1e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java @@ -167,13 +167,15 @@ public static Path getLocalTaskAttemptTempDir(final Configuration conf, return FileSystem.getLocal(conf).makeQualified( allocator.getLocalPathForWrite(uuid, conf)); }); - } catch (ExecutionException e) { - throw new RuntimeException(e.getCause()); - } catch (UncheckedExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); + } catch (ExecutionException | UncheckedExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; } - throw new RuntimeException(e); + if (cause instanceof IOException) { + throw (IOException) cause; + } + throw new IOException(e); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 2182eaa2dd..6d02e866ed 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; import org.apache.hadoop.fs.s3a.commit.CommitConstants; @@ -500,6 +502,10 @@ protected List listPendingUploads( listAndFilter(attemptFS, wrappedJobAttemptPath, false, HIDDEN_FILE_FILTER)); + } catch (FileNotFoundException e) { + // this can mean the job was aborted early on, so don't confuse people + // with long stack traces that aren't the underlying problem. + maybeIgnore(suppressExceptions, "Pending upload directory not found", e); } catch (IOException e) { // unable to work with endpoint, if suppressing errors decide our actions maybeIgnore(suppressExceptions, "Listing pending uploads", e); @@ -565,13 +571,13 @@ protected void abortJobInternal(JobContext context, } /** - * Delete the working paths of a job. Does not attempt to clean up - * the work of the wrapped committer. + * Delete the working paths of a job. *
    *
  1. The job attempt path
  2. - *
  3. $dest/__temporary
  4. + *
  5. {@code $dest/__temporary}
  6. *
  7. the local working directory for staged files
  8. *
+ * Does not attempt to clean up the work of the wrapped committer. * @param context job context * @throws IOException IO failure */ @@ -835,6 +841,44 @@ public final ConflictResolution getConflictResolutionMode( return conflictResolution; } + /** + * Generate a {@link PathExistsException} because the destination exists. + * Lists some of the child entries first, to help diagnose the problem. + * @param path path which exists + * @param description description (usually task/job ID) + * @return an exception to throw + */ + protected PathExistsException failDestinationExists(final Path path, + final String description) { + + LOG.error("{}: Failing commit by job {} to write" + + " to existing output path {}.", + description, + getJobContext().getJobID(), path); + // List the first 10 descendants, to give some details + // on what is wrong but not overload things if there are many files. + try { + int limit = 10; + RemoteIterator lf + = getDestFS().listFiles(path, true); + LOG.info("Partial Directory listing"); + while (limit > 0 && lf.hasNext()) { + limit--; + LocatedFileStatus status = lf.next(); + LOG.info("{}: {}", + status.getPath(), + status.isDirectory() + ? " dir" + : ("file size " + status.getLen() + " bytes")); + } + } catch (IOException e) { + LOG.info("Discarding exception raised when listing {}: " + e, path); + LOG.debug("stack trace ", e); + } + return new PathExistsException(path.toString(), + description + ": " + InternalCommitterConstants.E_DEST_EXISTS); + } + /** * Get the conflict mode option string. * @param context context with the config diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md index e4ba75d98c..3071754836 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md @@ -230,7 +230,6 @@ None: directories are created on demand. Rename task attempt path to task committed path. ```python - def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest): return fs.exists(taskAttemptPath) @@ -276,12 +275,12 @@ def commitJob(fs, jobAttemptDir, dest): (See below for details on `mergePaths()`) -A failure during job abort cannot be recovered from except by re-executing +A failure during job commit cannot be recovered from except by re-executing the entire query: ```python def isCommitJobRepeatable() : - return True + return False ``` Accordingly, it is a failure point in the protocol. With a low number of files @@ -307,12 +306,28 @@ def cleanupJob(fs, dest): ``` -### Job Recovery +### Job Recovery Before `commitJob()` -1. Data under task committed paths is retained -1. All directories under `$dest/_temporary/$appAttemptId/_temporary/` are deleted. +For all committers, the recovery process takes place in the application +master. +1. The job history file of the previous attempt is loaded and scanned +to determine which tasks were recorded as having succeeded. +1. For each successful task, the job committer has its `recoverTask()` method +invoked with a `TaskAttemptContext` built from the previous attempt's details. +1. If the method does not raise an exception, it is considered to have been +recovered, and not to be re-executed. +1. All other tasks are queued for execution. -Uncommitted/unexecuted tasks are (re)executed. +For the v1 committer, task recovery is straightforward. +The directory of the committed task from the previous attempt is +moved under the directory of the current application attempt. + +```python +def recoverTask(tac): + oldAttemptId = appAttemptId - 1 + fs.rename('$dest/_temporary/oldAttemptId/${tac.taskId}', + '$dest/_temporary/appAttemptId/${tac.taskId}') +``` This significantly improves time to recover from Job driver (here MR AM) failure. The only lost work is that of all tasks in progress -those which had generated @@ -330,6 +345,11 @@ failure simply by rerunning the entire job. This is implicitly the strategy in Spark, which does not attempt to recover any in-progress jobs. The faster your queries, the simpler your recovery strategy needs to be. +### Job Recovery During `commitJob()` + +This is not possible; a failure during job commit requires the entire job +to be re-executed after cleaning up the destination directory. + ### `mergePaths(FileSystem fs, FileStatus src, Path dest)` Algorithm `mergePaths()` is the core algorithm to merge data; it is somewhat confusing @@ -352,24 +372,23 @@ def mergePathsV1(fs, src, dest) : fs.delete(dest, recursive = True) fs.rename(src.getPath, dest) else : - # destination is directory, choose action on source type - if src.isDirectory : - if not toStat is None : - if not toStat.isDirectory : - # Destination exists and is not a directory - fs.delete(dest) - fs.rename(src.getPath(), dest) - else : - # Destination exists and is a directory - # merge all children under destination directory - for child in fs.listStatus(src.getPath) : - mergePathsV1(fs, child, dest + child.getName) - else : - # destination does not exist + # src is directory, choose action on dest type + if not toStat is None : + if not toStat.isDirectory : + # Destination exists and is not a directory + fs.delete(dest) fs.rename(src.getPath(), dest) + else : + # Destination exists and is a directory + # merge all children under destination directory + for child in fs.listStatus(src.getPath) : + mergePathsV1(fs, child, dest + child.getName) + else : + # destination does not exist + fs.rename(src.getPath(), dest) ``` -## v2 commit algorithm +## The v2 Commit Algorithm The v2 algorithm directly commits task output into the destination directory. @@ -506,12 +525,31 @@ Cost: `O(1)` for normal filesystems, `O(files)` for object stores. As no data is written to the destination directory, a task can be cleaned up by deleting the task attempt directory. -### v2 Job Recovery +### v2 Job Recovery Before `commitJob()` -Because the data has been renamed into the destination directory, it is nominally -recoverable. However, this assumes that the number and name of generated -files are constant on retried tasks. +Because the data has been renamed into the destination directory, all tasks +recorded as having being committed have no recovery needed at all: + +```python +def recoverTask(tac): +``` + +All active and queued tasks are scheduled for execution. + +There is a weakness here, the same one on a failure during `commitTask()`: +it is only safe to repeat a task which failed during that commit operation +if the name of all generated files are constant across all task attempts. + +If the Job AM fails while a task attempt has been instructed to commit, +and that commit is not recorded as having completed, the state of that +in-progress task is unknown...really it isn't be safe to recover the +job at this point. + + +### v2 Job Recovery During `commitJob()` + +This is straightforward: `commitJob()` is re-invoked. ## How MapReduce uses the committer in a task @@ -896,7 +934,7 @@ and metadata. POST bucket.s3.aws.com/path?uploads - An UploadId is returned + An `UploadId` is returned 1. Caller uploads one or more parts. @@ -994,7 +1032,7 @@ Task outputs are directed to the local FS by `getTaskAttemptPath` and `getWorkPa The single-directory and partitioned committers handle conflict resolution by checking whether target paths exist in S3 before uploading any data. -There are 3 conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`: +There are three conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`: * `fail`: Fail a task if an output directory or partition already exists. (Default) * `append`: Upload data files without checking whether directories or partitions already exist. diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md index 392cde2f80..09e123d6ed 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md @@ -371,7 +371,7 @@ Put differently: start with the Directory Committer. To use an S3A committer, the property `mapreduce.outputcommitter.factory.scheme.s3a` must be set to the S3A committer factory, `org.apache.hadoop.fs.s3a.commit.staging.S3ACommitterFactory`. -This is done in `core-default.xml` +This is done in `mapred-default.xml` ```xml diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java index 90e88945b3..246bf9d613 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java @@ -173,6 +173,25 @@ public void setup() throws Exception { } } + /** + * Create a random Job ID using the fork ID as part of the number. + * @return fork ID string in a format parseable by Jobs + * @throws Exception failure + */ + protected String randomJobId() throws Exception { + String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001"); + int l = testUniqueForkId.length(); + String trailingDigits = testUniqueForkId.substring(l - 4, l); + try { + int digitValue = Integer.valueOf(trailingDigits); + return String.format("20070712%04d_%04d", + (long)(Math.random() * 1000), + digitValue); + } catch (NumberFormatException e) { + throw new Exception("Failed to parse " + trailingDigits, e); + } + } + /** * Teardown waits for the consistency delay and resets failure count, so * FS is stable, before the superclass teardown is called. This diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java index 13dfd831b3..161db8521d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java @@ -38,7 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -266,9 +265,9 @@ public int getTestFileCount() { /** * Override point to let implementations tune the MR Job conf. - * @param c configuration + * @param jobConf configuration */ - protected void applyCustomConfigOptions(Configuration c) { + protected void applyCustomConfigOptions(JobConf jobConf) throws IOException { } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index 4d7f524d39..5ae8f54522 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -159,25 +159,6 @@ public void setup() throws Exception { cleanupDestDir(); } - /** - * Create a random Job ID using the fork ID as part of the number. - * @return fork ID string in a format parseable by Jobs - * @throws Exception failure - */ - private String randomJobId() throws Exception { - String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001"); - int l = testUniqueForkId.length(); - String trailingDigits = testUniqueForkId.substring(l - 4, l); - try { - int digitValue = Integer.valueOf(trailingDigits); - return String.format("20070712%04d_%04d", - (long)(Math.random() * 1000), - digitValue); - } catch (NumberFormatException e) { - throw new Exception("Failed to parse " + trailingDigits, e); - } - } - @Override public void teardown() throws Exception { describe("teardown"); @@ -765,6 +746,7 @@ public void testCommitLifecycle() throws Exception { JobContext jContext = jobData.jContext; TaskAttemptContext tContext = jobData.tContext; AbstractS3ACommitter committer = jobData.committer; + validateTaskAttemptWorkingDirectory(committer, tContext); // write output describe("1. Writing output"); @@ -1360,12 +1342,55 @@ public void testParallelJobsToAdjacentPaths() throws Throwable { } + @Test + public void testS3ACommitterFactoryBinding() throws Throwable { + describe("Verify that the committer factory returns this " + + "committer when configured to do so"); + Job job = newJob(); + FileOutputFormat.setOutputPath(job, outDir); + Configuration conf = job.getConfiguration(); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0); + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, + taskAttempt0); + String name = getCommitterName(); + S3ACommitterFactory factory = new S3ACommitterFactory(); + assertEquals("Wrong committer from factory", + createCommitter(outDir, tContext).getClass(), + factory.createOutputCommitter(outDir, tContext).getClass()); + } + + /** + * Validate the path of a file being written to during the write + * itself. + * @param p path + * @throws IOException IO failure + */ protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException { } + /** + * Validate the path of a file being written to after the write + * operation has completed. + * @param p path + * @throws IOException IO failure + */ protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException { } + /** + * Perform any actions needed to validate the working directory of + * a committer. + * For example: filesystem, path attributes + * @param committer committer instance + * @param context task attempt context + * @throws IOException IO failure + */ + protected void validateTaskAttemptWorkingDirectory( + AbstractS3ACommitter committer, + TaskAttemptContext context) throws IOException { + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java new file mode 100644 index 0000000000..a8547d6728 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.test.LambdaTestUtils; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; + +/** + * Tests for some aspects of the committer factory. + * All tests are grouped into one single test so that only one + * S3A FS client is set up and used for the entire run. + * Saves time and money. + */ +public class ITestS3ACommitterFactory extends AbstractCommitITest { + + + protected static final String INVALID_NAME = "invalid-name"; + + /** + * Counter to guarantee that even in parallel test runs, no job has the same + * ID. + */ + + private String jobId; + + // A random task attempt id for testing. + private String attempt0; + + private TaskAttemptID taskAttempt0; + + private Path outDir; + + private S3ACommitterFactory factory; + + private TaskAttemptContext tContext; + + /** + * Parameterized list of bindings of committer name in config file to + * expected class instantiated. + */ + private static final Object[][] bindings = { + {COMMITTER_NAME_FILE, FileOutputCommitter.class}, + {COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class}, + {COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class}, + {InternalCommitterConstants.COMMITTER_NAME_STAGING, + StagingCommitter.class}, + {COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class} + }; + + /** + * This is a ref to the FS conf, so changes here are visible + * to callers querying the FS config. + */ + private Configuration filesystemConfRef; + + private Configuration taskConfRef; + + @Override + public void setup() throws Exception { + super.setup(); + jobId = randomJobId(); + attempt0 = "attempt_" + jobId + "_m_000000_0"; + taskAttempt0 = TaskAttemptID.forName(attempt0); + + outDir = path(getMethodName()); + factory = new S3ACommitterFactory(); + Configuration conf = new Configuration(); + conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString()); + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0); + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); + filesystemConfRef = getFileSystem().getConf(); + tContext = new TaskAttemptContextImpl(conf, taskAttempt0); + taskConfRef = tContext.getConfiguration(); + } + + @Test + public void testEverything() throws Throwable { + testImplicitFileBinding(); + testBindingsInTask(); + testBindingsInFSConfig(); + testInvalidFileBinding(); + testInvalidTaskBinding(); + } + + /** + * Verify that if all config options are unset, the FileOutputCommitter + * + * is returned. + */ + public void testImplicitFileBinding() throws Throwable { + taskConfRef.unset(FS_S3A_COMMITTER_NAME); + filesystemConfRef.unset(FS_S3A_COMMITTER_NAME); + assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class); + } + + /** + * Verify that task bindings are picked up. + */ + public void testBindingsInTask() throws Throwable { + // set this to an invalid value to be confident it is not + // being checked. + filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID"); + taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE); + assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class); + for (Object[] binding : bindings) { + taskConfRef.set(FS_S3A_COMMITTER_NAME, + (String) binding[0]); + assertFactoryCreatesExpectedCommitter((Class) binding[1]); + } + } + + /** + * Verify that FS bindings are picked up. + */ + public void testBindingsInFSConfig() throws Throwable { + taskConfRef.unset(FS_S3A_COMMITTER_NAME); + filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE); + assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class); + for (Object[] binding : bindings) { + taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]); + assertFactoryCreatesExpectedCommitter((Class) binding[1]); + } + } + + /** + * Create an invalid committer via the FS binding, + */ + public void testInvalidFileBinding() throws Throwable { + taskConfRef.unset(FS_S3A_COMMITTER_NAME); + filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME); + LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME, + () -> createCommitter()); + } + + /** + * Create an invalid committer via the task attempt. + */ + public void testInvalidTaskBinding() throws Throwable { + filesystemConfRef.unset(FS_S3A_COMMITTER_NAME); + taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME); + LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME, + () -> createCommitter()); + } + + /** + * Assert that the factory creates the expected committer. + * @param expected expected committer class. + * @throws IOException IO failure. + */ + protected void assertFactoryCreatesExpectedCommitter( + final Class expected) + throws IOException { + assertEquals("Wrong Committer from factory", + expected, + createCommitter().getClass()); + } + + /** + * Create a committer. + * @return the committer + * @throws IOException IO failure. + */ + private PathOutputCommitter createCommitter() throws IOException { + return factory.createOutputCommitter(outDir, tContext); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java index 57eb8b226f..b7be17ad5e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java @@ -18,10 +18,10 @@ package org.apache.hadoop.fs.s3a.commit.magic; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.mapred.JobConf; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; @@ -30,7 +30,7 @@ * * There's no need to disable the committer setting for the filesystem here, * because the committers are being instantiated in their own processes; - * the settings in {@link #applyCustomConfigOptions(Configuration)} are + * the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are * passed down to these processes. */ public class ITMagicCommitMRJob extends AbstractITCommitMRJob { @@ -54,7 +54,7 @@ protected String committerName() { * @param conf configuration */ @Override - protected void applyCustomConfigOptions(Configuration conf) { + protected void applyCustomConfigOptions(JobConf conf) { conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java index 74c1d9de4e..057adf5341 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -18,6 +18,9 @@ package org.apache.hadoop.fs.s3a.commit.magic; +import java.io.IOException; +import java.net.URI; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -32,9 +35,8 @@ import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import java.io.IOException; - import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.hamcrest.CoreMatchers.containsString; /** * Test the magic committer's commit protocol. @@ -115,6 +117,25 @@ protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException { assertPathExists("pending file", pendingFile); } + /** + * The magic committer paths are always on S3, and always have + * "__magic" in the path. + * @param committer committer instance + * @param context task attempt context + * @throws IOException IO failure + */ + @Override + protected void validateTaskAttemptWorkingDirectory( + final AbstractS3ACommitter committer, + final TaskAttemptContext context) throws IOException { + URI wd = committer.getWorkPath().toUri(); + assertEquals("Wrong schema for working dir " + wd + + " with committer " + committer, + "s3a", wd.getScheme()); + assertThat(wd.getPath(), + containsString('/' + CommitConstants.MAGIC + '/')); + } + /** * The class provides a overridden implementation of commitJobInternal which * causes the commit failed for the first time then succeed. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java new file mode 100644 index 0000000000..be477a7de6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJobBadDest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging.integration; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; +import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.test.LambdaTestUtils; + +/** + * This is a test to verify that the committer will fail if the destination + * directory exists, and that this happens in job setup. + */ +public class ITStagingCommitMRJobBadDest extends AbstractITCommitMRJob { + + @Override + protected String committerName() { + return StagingCommitter.NAME; + } + + /** + * create the destination directory and expect a failure. + * @param conf configuration + */ + @Override + protected void applyCustomConfigOptions(JobConf conf) throws IOException { + // This is the destination in the S3 FS + String outdir = conf.get(FileOutputFormat.OUTDIR); + S3AFileSystem fs = getFileSystem(); + Path outputPath = new Path(outdir); + fs.mkdirs(outputPath); + } + + @Override + public void testMRJob() throws Exception { + LambdaTestUtils.intercept(FileAlreadyExistsException.class, + "Output directory", + super::testMRJob); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java index 08c572ef6c..180e743522 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java @@ -117,6 +117,19 @@ protected FileSystem getLocalFS() throws IOException { return FileSystem.getLocal(getConfiguration()); } + /** + * The staging committers always have the local FS for their work. + * @param committer committer instance + * @param context task attempt context + * @throws IOException IO failure + */ + @Override + protected void validateTaskAttemptWorkingDirectory(final AbstractS3ACommitter committer, + final TaskAttemptContext context) throws IOException { + Path wd = context.getWorkingDirectory(); + assertEquals("file", wd.toUri().getScheme()); + } + /** * The class provides a overridden implementation of commitJobInternal which * causes the commit failed for the first time then succeed.