MAPREDUCE-7474. Improve Manifest committer resilience (#6716)

Improve task commit resilience everywhere
and add an option to reduce delete IO requests on
job cleanup (relevant for ABFS and HDFS).

Task Commit Resilience
----------------------

Task manifest saving is re-attempted on failure; the number of 
attempts made is configurable with the option:

  mapreduce.manifest.committer.manifest.save.attempts

* The default is 5.
* The minimum is 1; asking for less is ignored.
* A retry policy adds 500ms of sleep per attempt.
* Move from classic rename() to commitFile() to rename the file,
  after calling getFileStatus() to get its length and possibly etag.
  This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach
  the ResilientCommitByRename callbacks in abfs, which report on
  the outcome to the caller...which is then logged at WARN.
* New statistic task_stage_save_summary_file to distinguish from
  other saving operations (job success/report file).
  This is only saved to the manifest on task commit retries, and
  provides statistics on all previous unsuccessful attempts to save
  the manifests
+ test changes to match the codepath changes, including improvements
  in fault injection.

Directory size for deletion
---------------------------

New option

  mapreduce.manifest.committer.cleanup.parallel.delete.base.first

This attempts an initial attempt at deleting the base dir, only falling
back to parallel deletes if there's a timeout.

This option is disabled by default; Consider enabling it for abfs to
reduce IO load. Consult the documentation for more details.

Success file printing
---------------------

The command to print a JSON _SUCCESS file from this committer and
any S3A committer is now something which can be invoked from
the mapred command:

  mapred successfile <path to file>

Contributed by Steve Loughran
This commit is contained in:
Steve Loughran 2024-05-13 21:12:34 +01:00 committed by GitHub
parent 12e0ca6b24
commit c9270600b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 1437 additions and 261 deletions

View File

@ -37,6 +37,7 @@ function hadoop_usage
hadoop_add_subcommand "frameworkuploader" admin "mapreduce framework upload"
hadoop_add_subcommand "version" client "print the version"
hadoop_add_subcommand "minicluster" client "CLI MiniCluster"
hadoop_add_subcommand "successfile" client "Print a _SUCCESS manifest from the manifest and S3A committers"
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
}
@ -102,6 +103,9 @@ function mapredcmd_case
version)
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
;;
successfile)
HADOOP_CLASSNAME=org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter
;;
minicluster)
hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/timelineservice"'/*'
hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/test"'/*'

View File

@ -21,6 +21,9 @@ package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.io.IOException;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -51,6 +54,9 @@ import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.Man
*/
public final class ManifestCommitterConfig implements IOStatisticsSource {
private static final Logger LOG = LoggerFactory.getLogger(
ManifestCommitterConfig.class);
/**
* Final destination of work.
* This is <i>unqualified</i>.
@ -153,6 +159,12 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
*/
private final int writerQueueCapacity;
/**
* How many attempts to save a task manifest by save and rename
* before giving up.
*/
private final int saveManifestAttempts;
/**
* Constructor.
* @param outputPath destination path of the job.
@ -198,6 +210,14 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
this.writerQueueCapacity = conf.getInt(
OPT_WRITER_QUEUE_CAPACITY,
DEFAULT_WRITER_QUEUE_CAPACITY);
int attempts = conf.getInt(OPT_MANIFEST_SAVE_ATTEMPTS,
OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT);
if (attempts < 1) {
LOG.warn("Invalid value for {}: {}",
OPT_MANIFEST_SAVE_ATTEMPTS, attempts);
attempts = 1;
}
this.saveManifestAttempts = attempts;
// if constructed with a task attempt, build the task ID and path.
if (context instanceof TaskAttemptContext) {
@ -332,6 +352,10 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
return name;
}
public int getSaveManifestAttempts() {
return saveManifestAttempts;
}
/**
* Get writer queue capacity.
* @return the queue capacity

View File

@ -132,7 +132,9 @@ public final class ManifestCommitterConstants {
* Should dir cleanup do parallel deletion of task attempt dirs
* before trying to delete the toplevel dirs.
* For GCS this may deliver speedup, while on ABFS it may avoid
* timeouts in certain deployments.
* timeouts in certain deployments, something
* {@link #OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST}
* can alleviate.
* Value: {@value}.
*/
public static final String OPT_CLEANUP_PARALLEL_DELETE =
@ -143,6 +145,20 @@ public final class ManifestCommitterConstants {
*/
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT = true;
/**
* Should parallel cleanup try to delete the base first?
* Best for azure as it skips the task attempt deletions unless
* the toplevel delete fails.
* Value: {@value}.
*/
public static final String OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST =
OPT_PREFIX + "cleanup.parallel.delete.base.first";
/**
* Default value of option {@link #OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST}: {@value}.
*/
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT = false;
/**
* Threads to use for IO.
*/
@ -260,6 +276,19 @@ public final class ManifestCommitterConstants {
*/
public static final int DEFAULT_WRITER_QUEUE_CAPACITY = OPT_IO_PROCESSORS_DEFAULT;
/**
* How many attempts to save a task manifest by save and rename
* before giving up.
* Value: {@value}.
*/
public static final String OPT_MANIFEST_SAVE_ATTEMPTS =
OPT_PREFIX + "manifest.save.attempts";
/**
* Default value of {@link #OPT_MANIFEST_SAVE_ATTEMPTS}: {@value}.
*/
public static final int OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT = 5;
private ManifestCommitterConstants() {
}

View File

@ -187,6 +187,12 @@ public final class ManifestCommitterStatisticNames {
public static final String OP_SAVE_TASK_MANIFEST =
"task_stage_save_task_manifest";
/**
* Save a summary file: {@value}.
*/
public static final String OP_SAVE_SUMMARY_FILE =
"task_stage_save_summary_file";
/**
* Task abort: {@value}.
*/
@ -259,6 +265,9 @@ public final class ManifestCommitterStatisticNames {
public static final String OP_STAGE_TASK_SCAN_DIRECTORY
= "task_stage_scan_directory";
/** Delete a directory: {@value}. */
public static final String OP_DELETE_DIR = "op_delete_dir";
private ManifestCommitterStatisticNames() {
}
}

View File

@ -36,7 +36,7 @@ import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsTo
*/
public class ManifestPrinter extends Configured implements Tool {
private static final String USAGE = "ManifestPrinter <success-file>";
private static final String USAGE = "successfile <success-file>";
/**
* Output for printing.
@ -88,7 +88,7 @@ public class ManifestPrinter extends Configured implements Tool {
return success;
}
private void printManifest(ManifestSuccessData success) {
public void printManifest(ManifestSuccessData success) {
field("succeeded", success.getSuccess());
field("created", success.getDate());
field("committer", success.getCommitter());

View File

@ -73,6 +73,7 @@ public final class InternalConstants {
OP_CREATE_ONE_DIRECTORY,
OP_DIRECTORY_SCAN,
OP_DELETE,
OP_DELETE_DIR,
OP_DELETE_FILE_UNDER_DESTINATION,
OP_GET_FILE_STATUS,
OP_IS_DIRECTORY,
@ -85,6 +86,7 @@ public final class InternalConstants {
OP_MSYNC,
OP_PREPARE_DIR_ANCESTORS,
OP_RENAME_FILE,
OP_SAVE_SUMMARY_FILE,
OP_SAVE_TASK_MANIFEST,
OBJECT_LIST_REQUEST,
@ -127,4 +129,11 @@ public final class InternalConstants {
/** Schemas of filesystems we know to not work with this committer. */
public static final Set<String> UNSUPPORTED_FS_SCHEMAS =
ImmutableSet.of("s3a", "wasb");
/**
* Interval in milliseconds between save retries.
* Value {@value} milliseconds.
*/
public static final int SAVE_SLEEP_INTERVAL = 500;
}

View File

@ -97,6 +97,35 @@ public abstract class ManifestStoreOperations implements Closeable {
public abstract boolean delete(Path path, boolean recursive)
throws IOException;
/**
* Forward to {@code delete(Path, true)}
* unless overridden.
* <p>
* If it returns without an error: there is no file at
* the end of the path.
* @param path path
* @return outcome
* @throws IOException failure.
*/
public boolean deleteFile(Path path)
throws IOException {
return delete(path, false);
}
/**
* Call {@code FileSystem#delete(Path, true)} or equivalent.
* <p>
* If it returns without an error: there is nothing at
* the end of the path.
* @param path path
* @return outcome
* @throws IOException failure.
*/
public boolean deleteRecursive(Path path)
throws IOException {
return delete(path, true);
}
/**
* Forward to {@link FileSystem#mkdirs(Path)}.
* Usual "what does 'false' mean" ambiguity.

View File

@ -108,6 +108,11 @@ public class ManifestStoreOperationsThroughFileSystem extends ManifestStoreOpera
return fileSystem.delete(path, recursive);
}
@Override
public boolean deleteRecursive(final Path path) throws IOException {
return fileSystem.delete(path, true);
}
@Override
public boolean mkdirs(Path path)
throws IOException {

View File

@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_ABORT_TASK;
/**
@ -55,7 +56,11 @@ public class AbortTaskStage extends
final Path dir = getTaskAttemptDir();
if (dir != null) {
LOG.info("{}: Deleting task attempt directory {}", getName(), dir);
deleteDir(dir, suppressExceptions);
if (suppressExceptions) {
deleteRecursiveSuppressingExceptions(dir, OP_DELETE_DIR);
} else {
deleteRecursive(dir, OP_DELETE_DIR);
}
}
return dir;
}

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,6 +35,7 @@ import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
@ -53,14 +56,18 @@ import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.SAVE_SLEEP_INTERVAL;
/**
* A Stage in Task/Job Commit.
@ -366,6 +373,7 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
*/
protected final void progress() {
if (stageConfig.getProgressable() != null) {
LOG.trace("{}: Progressing", getName());
stageConfig.getProgressable().progress();
}
}
@ -424,7 +432,7 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
* @return status or null
* @throws IOException IO Failure.
*/
protected final boolean delete(
public final boolean delete(
final Path path,
final boolean recursive)
throws IOException {
@ -440,14 +448,34 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
* @return status or null
* @throws IOException IO Failure.
*/
protected Boolean delete(
public Boolean delete(
final Path path,
final boolean recursive,
final String statistic)
throws IOException {
return trackDuration(getIOStatistics(), statistic, () -> {
return operations.delete(path, recursive);
});
if (recursive) {
return deleteRecursive(path, statistic);
} else {
return deleteFile(path, statistic);
}
}
/**
* Delete a file at a path.
* <p>
* If it returns without an error: there is nothing at
* the end of the path.
* @param path path
* @param statistic statistic to update
* @return outcome.
* @throws IOException IO Failure.
*/
public boolean deleteFile(
final Path path,
final String statistic)
throws IOException {
return trackDuration(getIOStatistics(), statistic, () ->
operations.deleteFile(path));
}
/**
@ -457,7 +485,7 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
* @return true if the directory was created/exists.
* @throws IOException IO Failure.
*/
protected final boolean mkdirs(
public final boolean mkdirs(
final Path path,
final boolean escalateFailure)
throws IOException {
@ -494,7 +522,7 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
* @return the manifest.
* @throws IOException IO Failure.
*/
protected final TaskManifest loadManifest(
public final TaskManifest loadManifest(
final FileStatus status)
throws IOException {
LOG.trace("{}: loadManifest('{}')", getName(), status);
@ -582,19 +610,123 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
* Save a task manifest or summary. This will be done by
* writing to a temp path and then renaming.
* If the destination path exists: Delete it.
* This will retry so that a rename failure from abfs load or IO errors
* will not fail the task.
* @param manifestData the manifest/success file
* @param tempPath temp path for the initial save
* @param finalPath final path for rename.
* @throws IOException failure to load/parse
* @return the manifest saved.
* @throws IOException failure to rename after retries.
*/
@SuppressWarnings("unchecked")
protected final <T extends AbstractManifestData> void save(T manifestData,
protected final <T extends AbstractManifestData> T save(
final T manifestData,
final Path tempPath,
final Path finalPath) throws IOException {
LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath, finalPath);
trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () ->
operations.save(manifestData, tempPath, true));
renameFile(tempPath, finalPath);
return saveManifest(() -> manifestData, tempPath, finalPath, OP_SAVE_TASK_MANIFEST);
}
/**
* Generate and save a task manifest or summary file.
* This is be done by writing to a temp path and then renaming.
* <p>
* If the destination path exists: Delete it before the rename.
* <p>
* This will retry so that a rename failure from abfs load or IO errors
* such as delete or save failure will not fail the task.
* <p>
* The {@code manifestSource} supplier is invoked to get the manifest data
* on every attempt.
* This permits statistics to be updated, <i>including those of failures</i>.
* @param manifestSource supplier the manifest/success file
* @param tempPath temp path for the initial save
* @param finalPath final path for rename.
* @param statistic statistic to use for timing
* @return the manifest saved.
* @throws IOException failure to save/delete/rename after retries.
*/
@SuppressWarnings("unchecked")
protected final <T extends AbstractManifestData> T saveManifest(
final Supplier<T> manifestSource,
final Path tempPath,
final Path finalPath,
String statistic) throws IOException {
int retryCount = 0;
RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep(
getStageConfig().getManifestSaveAttempts(),
SAVE_SLEEP_INTERVAL,
TimeUnit.MILLISECONDS);
boolean success = false;
T savedManifest = null;
// loop until returning a value or raising an exception
while (!success) {
try {
// get the latest manifest, which may include updated statistics
final T manifestData = requireNonNull(manifestSource.get());
LOG.info("{}: save manifest to {} then rename as {}'); retry count={}",
getName(), tempPath, finalPath, retryCount);
trackDurationOfInvocation(getIOStatistics(), statistic, () -> {
// delete temp path.
// even though this is written with overwrite=true, this extra recursive
// delete also handles a directory being there.
// this should not happen as no part of the commit protocol creates a directory
// -this is just a little bit of due diligence.
deleteRecursive(tempPath, OP_DELETE);
// save the temp file.
operations.save(manifestData, tempPath, true);
// get the length and etag.
final FileStatus st = getFileStatus(tempPath);
// commit rename of temporary file to the final path; deleting the destination first.
final CommitOutcome outcome = commitFile(
new FileEntry(tempPath, finalPath, st.getLen(), getEtag(st)),
true);
if (outcome.recovered) {
LOG.warn("Task manifest file {} committed using rename recovery",
manifestData);
}
});
// success: save the manifest and declare success
savedManifest = manifestData;
success = true;
} catch (IOException e) {
// failure.
// log then decide whether to sleep and retry or give up.
LOG.warn("{}: Failed to save and commit file {} renamed to {}; retry count={}",
getName(), tempPath, finalPath, retryCount, e);
// increment that count.
retryCount++;
RetryPolicy.RetryAction retryAction;
try {
retryAction = retryPolicy.shouldRetry(e, retryCount, 0, true);
} catch (Exception ex) {
// it's not clear why this probe can raise an exception; it is just
// caught and mapped to a fail.
LOG.debug("Failure in retry policy", ex);
retryAction = RetryPolicy.RetryAction.FAIL;
}
LOG.debug("{}: Retry action: {}", getName(), retryAction.action);
if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
// too many failures: escalate.
throw e;
}
// else, sleep
try {
LOG.info("{}: Sleeping for {} ms before retrying",
getName(), retryAction.delayMillis);
Thread.sleep(retryAction.delayMillis);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
// success: return the manifest which was saved.
return savedManifest;
}
/**
@ -609,8 +741,10 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
}
/**
* Rename a file from source to dest; if the underlying FS API call
* returned false that's escalated to an IOE.
* Rename a file from source to dest.
* <p>
* The destination is always deleted through a call to
* {@link #maybeDeleteDest(boolean, Path)}.
* @param source source file.
* @param dest dest file
* @throws IOException failure
@ -618,7 +752,6 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
*/
protected final void renameFile(final Path source, final Path dest)
throws IOException {
maybeDeleteDest(true, dest);
executeRenamingOperation("renameFile", source, dest,
OP_RENAME_FILE, () ->
operations.renameFile(source, dest));
@ -637,7 +770,7 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
maybeDeleteDest(true, dest);
executeRenamingOperation("renameDir", source, dest,
OP_RENAME_FILE, () ->
OP_RENAME_DIR, () ->
operations.renameDir(source, dest)
);
}
@ -669,13 +802,14 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
// note any delay which took place
noteAnyRateLimiting(STORE_IO_RATE_LIMITED, result.getWaitTime());
}
return new CommitOutcome(result.recovered());
} else {
// commit with a simple rename; failures will be escalated.
executeRenamingOperation("renameFile", source, dest,
OP_COMMIT_FILE_RENAME, () ->
operations.renameFile(source, dest));
return new CommitOutcome(false);
}
return new CommitOutcome();
}
/**
@ -696,12 +830,15 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
*/
private void maybeDeleteDest(final boolean deleteDest, final Path dest) throws IOException {
if (deleteDest && getFileStatusOrNull(dest) != null) {
boolean deleted = delete(dest, true);
// log the outcome in case of emergency diagnostics traces
// being needed.
LOG.debug("{}: delete('{}') returned {}'", getName(), dest, deleted);
if (deleteDest) {
final FileStatus st = getFileStatusOrNull(dest);
if (st != null) {
if (st.isDirectory()) {
deleteRecursive(dest, OP_DELETE_DIR);
} else {
deleteFile(dest, OP_DELETE);
}
}
}
}
@ -792,6 +929,14 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
*/
public static final class CommitOutcome {
/**
* Dit the commit recover from a failure?
*/
public final boolean recovered;
public CommitOutcome(final boolean recovered) {
this.recovered = recovered;
}
}
/**
@ -866,7 +1011,7 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
}
/**
* Get the task attemptDir; raise an NPE
* Get the task attemptDir and raise an NPE
* if it is null.
* @return a non-null task attempt dir.
*/
@ -915,26 +1060,35 @@ public abstract class AbstractJobOrTaskStage<IN, OUT>
}
/**
* Delete a directory, possibly suppressing exceptions.
* Delete a directory (or a file).
* @param dir directory.
* @param suppressExceptions should exceptions be suppressed?
* @param statistic statistic to use
* @return true if the path is no longer present.
* @throws IOException exceptions raised in delete if not suppressed.
* @return any exception caught and suppressed
*/
protected IOException deleteDir(
protected boolean deleteRecursive(
final Path dir,
final Boolean suppressExceptions)
final String statistic)
throws IOException {
return trackDuration(getIOStatistics(), statistic, () ->
operations.deleteRecursive(dir));
}
/**
* Delete a directory or file, catching exceptions.
* @param dir directory.
* @param statistic statistic to use
* @return any exception caught.
*/
protected IOException deleteRecursiveSuppressingExceptions(
final Path dir,
final String statistic) {
try {
delete(dir, true);
deleteRecursive(dir, statistic);
return null;
} catch (IOException ex) {
LOG.info("Error deleting {}: {}", dir, ex.toString());
if (!suppressExceptions) {
throw ex;
} else {
return ex;
}
return ex;
}
}

View File

@ -40,7 +40,10 @@ import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUT
import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED;
import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
/**
@ -49,7 +52,7 @@ import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.Manifest
* Returns: the outcome of the overall operation
* The result is detailed purely for the benefit of tests, which need
* to make assertions about error handling and fallbacks.
*
* <p>
* There's a few known issues with the azure and GCS stores which
* this stage tries to address.
* - Google GCS directory deletion is O(entries), so is slower for big jobs.
@ -57,19 +60,28 @@ import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.Manifest
* when not the store owner triggers a scan down the tree to verify the
* caller has the permission to delete each subdir.
* If this scan takes over 90s, the operation can time out.
*
* <p>
* The main solution for both of these is that task attempts are
* deleted in parallel, in different threads.
* This will speed up GCS cleanup and reduce the risk of
* abfs related timeouts.
* Exceptions during cleanup can be suppressed,
* so that these do not cause the job to fail.
*
* <p>
* There is one weakness of this design: the number of delete operations
* is 1 + number of task attempts, which, on ABFS can generate excessive
* load.
* For this reason, there is an option to attempt to delete the base directory
* first; if this does not time out then, on Azure ADLS Gen2 storage,
* this is the most efficient cleanup.
* Only if that attempt fails for any reason then the parallel delete
* phase takes place.
* <p>
* Also, some users want to be able to run multiple independent jobs
* targeting the same output directory simultaneously.
* If one job deletes the directory `__temporary` all the others
* will fail.
*
* <p>
* This can be addressed by disabling cleanup entirely.
*
*/
@ -128,7 +140,7 @@ public class CleanupJobStage extends
stageName = getStageName(args);
// this is $dest/_temporary
final Path baseDir = requireNonNull(getStageConfig().getOutputTempSubDir());
LOG.debug("{}: Cleaup of directory {} with {}", getName(), baseDir, args);
LOG.debug("{}: Cleanup of directory {} with {}", getName(), baseDir, args);
if (!args.enabled) {
LOG.info("{}: Cleanup of {} disabled", getName(), baseDir);
return new Result(Outcome.DISABLED, baseDir,
@ -142,64 +154,105 @@ public class CleanupJobStage extends
}
Outcome outcome = null;
IOException exception;
IOException exception = null;
boolean baseDirDeleted = false;
// to delete.
LOG.info("{}: Deleting job directory {}", getName(), baseDir);
final long directoryCount = args.directoryCount;
if (directoryCount > 0) {
// log the expected directory count, which drives duration in GCS
// and may cause timeouts on azure if the count is too high for a
// timely permissions tree scan.
LOG.info("{}: Expected directory count: {}", getName(), directoryCount);
}
progress();
// check and maybe execute parallel delete of task attempt dirs.
if (args.deleteTaskAttemptDirsInParallel) {
// Attempt to do a parallel delete of task attempt dirs;
// don't overreact if a delete fails, but stop trying
// to delete the others, and fall back to deleting the
// job dir.
Path taskSubDir
= getStageConfig().getJobAttemptTaskSubDir();
try (DurationInfo info = new DurationInfo(LOG,
"parallel deletion of task attempts in %s",
taskSubDir)) {
RemoteIterator<FileStatus> dirs =
RemoteIterators.filteringRemoteIterator(
listStatusIterator(taskSubDir),
FileStatus::isDirectory);
TaskPool.foreach(dirs)
.executeWith(getIOProcessors())
.stopOnFailure()
.suppressExceptions(false)
.run(this::rmTaskAttemptDir);
getIOStatistics().aggregate((retrieveIOStatistics(dirs)));
if (getLastDeleteException() != null) {
// one of the task attempts failed.
throw getLastDeleteException();
if (args.parallelDeleteAttemptBaseDeleteFirst) {
// attempt to delete the base dir first.
// This can reduce ABFS delete load but may time out
// (which the fallback to parallel delete will handle).
// on GCS it is slow.
try (DurationInfo info = new DurationInfo(LOG, true,
"Initial delete of %s", baseDir)) {
exception = deleteOneDir(baseDir);
if (exception == null) {
// success: record this as the outcome,
outcome = Outcome.DELETED;
// and flag that the the parallel delete should be skipped because the
// base directory is alredy deleted.
baseDirDeleted = true;
} else {
// failure: log and continue
LOG.warn("{}: Exception on initial attempt at deleting base dir {}"
+ " with directory count {}. Falling back to parallel delete",
getName(), baseDir, directoryCount, exception);
}
}
}
if (!baseDirDeleted) {
// no base delete attempted or it failed.
// Attempt to do a parallel delete of task attempt dirs;
// don't overreact if a delete fails, but stop trying
// to delete the others, and fall back to deleting the
// job dir.
Path taskSubDir
= getStageConfig().getJobAttemptTaskSubDir();
try (DurationInfo info = new DurationInfo(LOG, true,
"parallel deletion of task attempts in %s",
taskSubDir)) {
RemoteIterator<FileStatus> dirs =
RemoteIterators.filteringRemoteIterator(
listStatusIterator(taskSubDir),
FileStatus::isDirectory);
TaskPool.foreach(dirs)
.executeWith(getIOProcessors())
.stopOnFailure()
.suppressExceptions(false)
.run(this::rmTaskAttemptDir);
getIOStatistics().aggregate((retrieveIOStatistics(dirs)));
if (getLastDeleteException() != null) {
// one of the task attempts failed.
throw getLastDeleteException();
} else {
// success: record this as the outcome.
outcome = Outcome.PARALLEL_DELETE;
}
} catch (FileNotFoundException ex) {
// not a problem if there's no dir to list.
LOG.debug("{}: Task attempt dir {} not found", getName(), taskSubDir);
outcome = Outcome.DELETED;
} catch (IOException ex) {
// failure. Log and continue
LOG.info(
"{}: Exception while listing/deleting task attempts under {}; continuing",
getName(),
taskSubDir, ex);
}
// success: record this as the outcome.
outcome = Outcome.PARALLEL_DELETE;
} catch (FileNotFoundException ex) {
// not a problem if there's no dir to list.
LOG.debug("{}: Task attempt dir {} not found", getName(), taskSubDir);
outcome = Outcome.DELETED;
} catch (IOException ex) {
// failure. Log and continue
LOG.info(
"{}: Exception while listing/deleting task attempts under {}; continuing",
getName(),
taskSubDir, ex);
// not overreacting here as the base delete will still get executing
outcome = Outcome.DELETED;
}
}
// Now the top-level deletion; exception gets saved
exception = deleteOneDir(baseDir);
if (exception != null) {
// failure, report and continue
// assume failure.
outcome = Outcome.FAILURE;
} else {
// if the outcome isn't already recorded as parallel delete,
// mark is a simple delete.
if (outcome == null) {
outcome = Outcome.DELETED;
// Now the top-level deletion if not already executed; exception gets saved
if (!baseDirDeleted) {
exception = deleteOneDir(baseDir);
if (exception != null) {
// failure, report and continue
LOG.warn("{}: Exception on final attempt at deleting base dir {}"
+ " with directory count {}",
getName(), baseDir, directoryCount, exception);
// assume failure.
outcome = Outcome.FAILURE;
} else {
// if the outcome isn't already recorded as parallel delete,
// mark is a simple delete.
if (outcome == null) {
outcome = Outcome.DELETED;
}
}
}
@ -235,7 +288,7 @@ public class CleanupJobStage extends
}
/**
* Delete a directory.
* Delete a directory suppressing exceptions.
* The {@link #deleteFailureCount} counter.
* is incremented on every failure.
* @param dir directory
@ -246,21 +299,22 @@ public class CleanupJobStage extends
throws IOException {
deleteDirCount.incrementAndGet();
IOException ex = deleteDir(dir, true);
if (ex != null) {
deleteFailure(ex);
}
return ex;
return noteAnyDeleteFailure(
deleteRecursiveSuppressingExceptions(dir, OP_DELETE_DIR));
}
/**
* Note a failure.
* Note a failure if the exception is not null.
* @param ex exception
* @return the exception
*/
private synchronized void deleteFailure(IOException ex) {
// excaption: add the count
deleteFailureCount.incrementAndGet();
lastDeleteException = ex;
private synchronized IOException noteAnyDeleteFailure(IOException ex) {
if (ex != null) {
// exception: add the count
deleteFailureCount.incrementAndGet();
lastDeleteException = ex;
}
return ex;
}
/**
@ -287,26 +341,47 @@ public class CleanupJobStage extends
/** Attempt parallel delete of task attempt dirs? */
private final boolean deleteTaskAttemptDirsInParallel;
/**
* Make an initial attempt to delete the base directory.
* This will reduce IO load on abfs. If it times out, the
* parallel delete will be the fallback.
*/
private final boolean parallelDeleteAttemptBaseDeleteFirst;
/** Ignore failures? */
private final boolean suppressExceptions;
/**
* Non-final count of directories.
* Default value, "0", means "unknown".
* This can be dynamically updated during job commit.
*/
private long directoryCount;
/**
* Arguments to the stage.
* @param statisticName stage name to report
* @param enabled is the stage enabled?
* @param deleteTaskAttemptDirsInParallel delete task attempt dirs in
* parallel?
* @param parallelDeleteAttemptBaseDeleteFirst Make an initial attempt to
* delete the base directory in a parallel delete?
* @param suppressExceptions suppress exceptions?
* @param directoryCount directories under job dir; 0 means unknown.
*/
public Arguments(
final String statisticName,
final boolean enabled,
final boolean deleteTaskAttemptDirsInParallel,
final boolean suppressExceptions) {
final boolean parallelDeleteAttemptBaseDeleteFirst,
final boolean suppressExceptions,
long directoryCount) {
this.statisticName = statisticName;
this.enabled = enabled;
this.deleteTaskAttemptDirsInParallel = deleteTaskAttemptDirsInParallel;
this.suppressExceptions = suppressExceptions;
this.parallelDeleteAttemptBaseDeleteFirst = parallelDeleteAttemptBaseDeleteFirst;
this.directoryCount = directoryCount;
}
public String getStatisticName() {
@ -325,6 +400,18 @@ public class CleanupJobStage extends
return suppressExceptions;
}
public boolean isParallelDeleteAttemptBaseDeleteFirst() {
return parallelDeleteAttemptBaseDeleteFirst;
}
public long getDirectoryCount() {
return directoryCount;
}
public void setDirectoryCount(final long directoryCount) {
this.directoryCount = directoryCount;
}
@Override
public String toString() {
return "Arguments{" +
@ -332,6 +419,7 @@ public class CleanupJobStage extends
+ ", enabled=" + enabled
+ ", deleteTaskAttemptDirsInParallel="
+ deleteTaskAttemptDirsInParallel
+ ", parallelDeleteAttemptBaseDeleteFirst=" + parallelDeleteAttemptBaseDeleteFirst
+ ", suppressExceptions=" + suppressExceptions
+ '}';
}
@ -343,8 +431,9 @@ public class CleanupJobStage extends
public static final Arguments DISABLED = new Arguments(OP_STAGE_JOB_CLEANUP,
false,
false,
false
);
false,
false,
0);
/**
* Build an options argument from a configuration, using the
@ -364,12 +453,16 @@ public class CleanupJobStage extends
boolean deleteTaskAttemptDirsInParallel = conf.getBoolean(
OPT_CLEANUP_PARALLEL_DELETE,
OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT);
boolean parallelDeleteAttemptBaseDeleteFirst = conf.getBoolean(
OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST,
OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT);
return new Arguments(
statisticName,
enabled,
deleteTaskAttemptDirsInParallel,
suppressExceptions
);
parallelDeleteAttemptBaseDeleteFirst,
suppressExceptions,
0);
}
/**

View File

@ -37,6 +37,7 @@ import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_BYTES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_FILES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_DIRECTORY_COUNT_MEAN;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CREATE_TARGET_DIRS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_LOAD_MANIFESTS;
@ -161,7 +162,12 @@ public class CommitJobStage extends
}
// optional cleanup
new CleanupJobStage(stageConfig).apply(arguments.getCleanupArguments());
final CleanupJobStage.Arguments cleanupArguments = arguments.getCleanupArguments();
// determine the directory count
cleanupArguments.setDirectoryCount(iostats.counters()
.getOrDefault(COMMITTER_TASK_DIRECTORY_COUNT_MEAN, 0L));
new CleanupJobStage(stageConfig).apply(cleanupArguments);
// and then, after everything else: optionally validate.
if (arguments.isValidateOutput()) {

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
@ -69,19 +70,21 @@ public class CommitTaskStage extends
// the saving, but ...
scanStage.addExecutionDurationToStatistics(getIOStatistics(), OP_STAGE_TASK_COMMIT);
// save a snapshot of the IO Statistics
final IOStatisticsSnapshot manifestStats = snapshotIOStatistics();
manifestStats.aggregate(getIOStatistics());
manifest.setIOStatistics(manifestStats);
// Now save with rename
Path manifestPath = new SaveTaskManifestStage(getStageConfig())
.apply(manifest);
return new CommitTaskStage.Result(manifestPath, manifest);
// Now save with retry, updating the statistics on every attempt.
Pair<Path, TaskManifest> p = new SaveTaskManifestStage(getStageConfig())
.apply(() -> {
/* save a snapshot of the IO Statistics */
final IOStatisticsSnapshot manifestStats = snapshotIOStatistics();
manifestStats.aggregate(getIOStatistics());
manifest.setIOStatistics(manifestStats);
return manifest;
});
return new CommitTaskStage.Result(p.getLeft(), p.getRight());
}
/**
* Result of the stage.
* Result of the stage: the path the manifest was saved to
* and the manifest which was successfully saved.
*/
public static final class Result {
/** The path the manifest was saved to. */
@ -111,5 +114,9 @@ public class CommitTaskStage extends
return taskManifest;
}
@Override
public String toString() {
return "Result{path=" + path + '}';
}
}
}

View File

@ -105,7 +105,7 @@ public class CreateOutputDirectoriesStage extends
throws IOException {
final List<Path> directories = createAllDirectories(manifestDirs);
LOG.debug("{}: Created {} directories", getName(), directories.size());
LOG.info("{}: Created {} directories", getName(), directories.size());
return new Result(new HashSet<>(directories), dirMap);
}
@ -163,8 +163,9 @@ public class CreateOutputDirectoriesStage extends
// Now the real work.
final int createCount = leaves.size();
LOG.info("Preparing {} directory/directories; {} parent dirs implicitly created",
createCount, parents.size());
LOG.info("Preparing {} directory/directories; {} parent dirs implicitly created."
+ " Files deleted: {}",
createCount, parents.size(), filesToDelete.size());
// now probe for and create the leaf dirs, which are those at the
// bottom level
@ -232,7 +233,7 @@ public class CreateOutputDirectoriesStage extends
// report progress back
progress();
LOG.info("{}: Deleting file {}", getName(), dir);
delete(dir, false, OP_DELETE);
deleteFile(dir, OP_DELETE);
// note its final state
addToDirectoryMap(dir, DirMapState.fileNowDeleted);
}
@ -323,7 +324,7 @@ public class CreateOutputDirectoriesStage extends
// is bad: delete a file
LOG.info("{}: Deleting file where a directory should go: {}",
getName(), st);
delete(path, false, OP_DELETE_FILE_UNDER_DESTINATION);
deleteFile(path, OP_DELETE_FILE_UNDER_DESTINATION);
} else {
// is good.
LOG.warn("{}: Even though mkdirs({}) failed, there is now a directory there",

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestS
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.TMP_SUFFIX;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_SUMMARY_FILE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_SAVE_SUCCESS;
@ -72,7 +73,7 @@ public class SaveSuccessFileStage extends
LOG.debug("{}: Saving _SUCCESS file to {} via {}", successFile,
getName(),
successTempFile);
save(successData, successTempFile, successFile);
saveManifest(() -> successData, successTempFile, successFile, OP_SAVE_SUMMARY_FILE);
return successFile;
}

View File

@ -19,13 +19,16 @@
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
import java.io.IOException;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_SAVE_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestTempPathForTaskAttempt;
@ -38,16 +41,36 @@ import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.Man
* Uses both the task ID and task attempt ID to determine the temp filename;
* Before the rename of (temp, final-path), any file at the final path
* is deleted.
* <p>
* This is so that when this stage is invoked in a task commit, its output
* overwrites any of the first commit.
* When it succeeds, therefore, unless there is any subsequent commit of
* another task, the task manifest at the final path is from this
* operation.
*
* Returns the path where the manifest was saved.
* <p>
* If the save and rename fails, there are a limited number of retries, with no sleep
* interval.
* This is to briefly try recover from any transient rename() failure, including a
* race condition with any other task commit.
* <ol>
* <li>If the previous task commit has already succeeded, this rename will overwrite it.
* Both task attempts will report success.</li>
* <li>If after, writing, another task attempt overwrites it, again, both
* task attempts will report success.</li>
* <li>If another task commits between the delete() and rename() operations, the retry will
* attempt to recover by repeating the manifest write, and then report success.</li>
* </ol>
* This means that multiple task attempts may report success, but only one will have it actual
* manifest saved.
* The mapreduce and spark committers only schedule a second task commit attempt if the first
* task attempt's commit operation fails <i>or fails to report success in the allocated time</i>.
* The overwrite with retry loop is an attempt to ensure that the second attempt will report
* success, if a partitioned cluster means that the original TA commit is still in progress.
* <p>
* Returns (the path where the manifest was saved, the manifest).
*/
public class SaveTaskManifestStage extends
AbstractJobOrTaskStage<TaskManifest, Path> {
AbstractJobOrTaskStage<Supplier<TaskManifest>, Pair<Path, TaskManifest>> {
private static final Logger LOG = LoggerFactory.getLogger(
SaveTaskManifestStage.class);
@ -57,14 +80,16 @@ public class SaveTaskManifestStage extends
}
/**
* Save the manifest to a temp file and rename to the final
* Generate and save a manifest to a temp file and rename to the final
* manifest destination.
* @param manifest manifest
* The manifest is generated on each retried attempt.
* @param manifestSource supplier the manifest/success file
*
* @return the path to the final entry
* @throws IOException IO failure.
*/
@Override
protected Path executeStage(final TaskManifest manifest)
protected Pair<Path, TaskManifest> executeStage(Supplier<TaskManifest> manifestSource)
throws IOException {
final Path manifestDir = getTaskManifestDir();
@ -74,8 +99,9 @@ public class SaveTaskManifestStage extends
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
getRequiredTaskAttemptId());
LOG.info("{}: Saving manifest file to {}", getName(), manifestFile);
save(manifest, manifestTempFile, manifestFile);
return manifestFile;
final TaskManifest manifest =
saveManifest(manifestSource, manifestTempFile, manifestFile, OP_SAVE_TASK_MANIFEST);
return Pair.of(manifestFile, manifest);
}
}

View File

@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_SETUP;
/**
@ -55,7 +56,7 @@ public class SetupJobStage extends
createNewDirectory("Creating task manifest dir", getTaskManifestDir());
// delete any success marker if so instructed.
if (deleteMarker) {
delete(getStageConfig().getJobSuccessMarkerPath(), false);
deleteFile(getStageConfig().getJobSuccessMarkerPath(), OP_DELETE);
}
return path;
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.util.functional.TaskPool;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_WRITER_QUEUE_CAPACITY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT;
/**
* Stage Config.
@ -172,6 +173,12 @@ public class StageConfig {
*/
private int successMarkerFileLimit = SUCCESS_MARKER_FILE_LIMIT;
/**
* How many attempts to save a manifest by save and rename
* before giving up: {@value}.
*/
private int manifestSaveAttempts = OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT;
public StageConfig() {
}
@ -604,6 +611,21 @@ public class StageConfig {
return successMarkerFileLimit;
}
public int getManifestSaveAttempts() {
return manifestSaveAttempts;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public StageConfig withManifestSaveAttempts(final int value) {
checkOpen();
manifestSaveAttempts = value;
return this;
}
/**
* Enter the stage; calls back to
* {@link #enterStageEventHandler} if non-null.

View File

@ -134,6 +134,11 @@ Usage: `mapred envvars`
Display computed Hadoop environment variables.
# `successfile`
Load and print a JSON `_SUCCESS` file from a [Manifest Committer](manifest_committer.html) or an S3A Committer,
Administration Commands
-----------------------

View File

@ -15,14 +15,16 @@
# The Manifest Committer for Azure and Google Cloud Storage
This document how to use the _Manifest Committer_.
<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
This documents how to use the _Manifest Committer_.
The _Manifest_ committer is a committer for work which provides
performance on ABFS for "real world" queries,
and performance and correctness on GCS.
It also works with other filesystems, including HDFS.
However, the design is optimized for object stores where
listing operatons are slow and expensive.
listing operations are slow and expensive.
The architecture and implementation of the committer is covered in
[Manifest Committer Architecture](manifest_committer_architecture.html).
@ -31,10 +33,16 @@ The architecture and implementation of the committer is covered in
The protocol and its correctness are covered in
[Manifest Committer Protocol](manifest_committer_protocol.html).
It was added in March 2022, and should be considered unstable
in early releases.
It was added in March 2022.
As of April 2024, the problems which surfaced have been
* Memory use at scale.
* Directory deletion scalability.
* Resilience to task commit to rename failures.
<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
That is: the core algorithms is correct, but task commit
robustness was insufficient to some failure conditions.
And scale is always a challenge, even with components tested through
large TPC-DS test runs.
## Problem:
@ -70,10 +78,13 @@ This committer uses the extension point which came in for the S3A committers.
Users can declare a new committer factory for abfs:// and gcs:// URLs.
A suitably configured spark deployment will pick up the new committer.
Directory performance issues in job cleanup can be addressed by two options
Directory performance issues in job cleanup can be addressed by some options
1. The committer will parallelize deletion of task attempt directories before
deleting the `_temporary` directory.
1. Cleanup can be disabled. .
2. An initial attempt to delete the `_temporary` directory before the parallel
attempt is made.
3. Exceptions can be supressed, so that cleanup failures do not fail the job
4. Cleanup can be disabled.
The committer can be used with any filesystem client which has a "real" file rename()
operation.
@ -112,8 +123,8 @@ These can be done in `core-site.xml`, if it is not defined in the `mapred-defaul
## Binding to the manifest committer in Spark.
In Apache Spark, the configuration can be done either with command line options (after the '--conf') or by using the `spark-defaults.conf` file. The following is an example of using `spark-defaults.conf` also including the configuration for Parquet with a subclass of the parquet
committer which uses the factory mechansim internally.
In Apache Spark, the configuration can be done either with command line options (after the `--conf`) or by using the `spark-defaults.conf` file.
The following is an example of using `spark-defaults.conf` also including the configuration for Parquet with a subclass of the parquet committer which uses the factory mechanism internally.
```
spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
@ -184,6 +195,7 @@ Here are the main configuration options of the committer.
| `mapreduce.manifest.committer.io.threads` | Thread count for parallel operations | `64` |
| `mapreduce.manifest.committer.summary.report.directory` | directory to save reports. | `""` |
| `mapreduce.manifest.committer.cleanup.parallel.delete` | Delete temporary directories in parallel | `true` |
| `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` | Attempt to delete the base directory before parallel task attempts | `false` |
| `mapreduce.fileoutputcommitter.cleanup.skipped` | Skip cleanup of `_temporary` directory| `false` |
| `mapreduce.fileoutputcommitter.cleanup-failures.ignored` | Ignore errors during cleanup | `false` |
| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | Create a `_SUCCESS` marker file on successful completion. (and delete any existing one in job setup) | `true` |
@ -238,37 +250,6 @@ Caveats
are made against the store. The rate throttling option
`mapreduce.manifest.committer.io.rate` can help avoid this.
### `mapreduce.manifest.committer.writer.queue.capacity`
This is a secondary scale option.
It controls the size of the queue for storing lists of files to rename from
the manifests loaded from the target filesystem, manifests loaded
from a pool of worker threads, and the single thread which saves
the entries from each manifest to an intermediate file in the local filesystem.
Once the queue is full, all manifest loading threads will block.
```xml
<property>
<name>mapreduce.manifest.committer.writer.queue.capacity</name>
<value>32</value>
</property>
```
As the local filesystem is usually much faster to write to than any cloud store,
this queue size should not be a limit on manifest load performance.
It can help limit the amount of memory consumed during manifest load during
job commit.
The maximum number of loaded manifests will be:
```
mapreduce.manifest.committer.writer.queue.capacity + mapreduce.manifest.committer.io.threads
```
## <a name="deleting"></a> Optional: deleting target files in Job Commit
The classic `FileOutputCommitter` deletes files at the destination paths
@ -403,6 +384,153 @@ hadoop org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestP
This works for the files saved at the base of an output directory, and
any reports saved to a report directory.
Example from a run of the `ITestAbfsTerasort` MapReduce terasort.
```
bin/mapred successfile abfs://testing@ukwest.dfs.core.windows.net/terasort/_SUCCESS
Manifest file: abfs://testing@ukwest.dfs.core.windows.net/terasort/_SUCCESS
succeeded: true
created: 2024-04-18T18:34:34.003+01:00[Europe/London]
committer: org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter
hostname: pi5
jobId: job_1713461587013_0003
jobIdSource: JobID
Diagnostics
mapreduce.manifest.committer.io.threads = 192
principal = alice
stage = committer_commit_job
Statistics:
counters=((commit_file_rename=1)
(committer_bytes_committed=21)
(committer_commit_job=1)
(committer_files_committed=1)
(committer_task_directory_depth=2)
(committer_task_file_count=2)
(committer_task_file_size=21)
(committer_task_manifest_file_size=37157)
(job_stage_cleanup=1)
(job_stage_create_target_dirs=1)
(job_stage_load_manifests=1)
(job_stage_optional_validate_output=1)
(job_stage_rename_files=1)
(job_stage_save_success_marker=1)
(job_stage_setup=1)
(op_create_directories=1)
(op_delete=3)
(op_delete_dir=1)
(op_get_file_status=9)
(op_get_file_status.failures=6)
(op_list_status=3)
(op_load_all_manifests=1)
(op_load_manifest=2)
(op_mkdirs=4)
(op_msync=1)
(op_rename=2)
(op_rename.failures=1)
(task_stage_commit=2)
(task_stage_save_task_manifest=1)
(task_stage_scan_directory=2)
(task_stage_setup=2));
gauges=();
minimums=((commit_file_rename.min=141)
(committer_commit_job.min=2306)
(committer_task_directory_count=0)
(committer_task_directory_depth=1)
(committer_task_file_count=0)
(committer_task_file_size=0)
(committer_task_manifest_file_size=18402)
(job_stage_cleanup.min=196)
(job_stage_create_target_dirs.min=2)
(job_stage_load_manifests.min=687)
(job_stage_optional_validate_output.min=66)
(job_stage_rename_files.min=161)
(job_stage_save_success_marker.min=653)
(job_stage_setup.min=571)
(op_create_directories.min=1)
(op_delete.min=57)
(op_delete_dir.min=129)
(op_get_file_status.failures.min=57)
(op_get_file_status.min=55)
(op_list_status.min=202)
(op_load_all_manifests.min=445)
(op_load_manifest.min=171)
(op_mkdirs.min=67)
(op_msync.min=0)
(op_rename.failures.min=266)
(op_rename.min=139)
(task_stage_commit.min=206)
(task_stage_save_task_manifest.min=651)
(task_stage_scan_directory.min=206)
(task_stage_setup.min=127));
maximums=((commit_file_rename.max=141)
(committer_commit_job.max=2306)
(committer_task_directory_count=0)
(committer_task_directory_depth=1)
(committer_task_file_count=1)
(committer_task_file_size=21)
(committer_task_manifest_file_size=18755)
(job_stage_cleanup.max=196)
(job_stage_create_target_dirs.max=2)
(job_stage_load_manifests.max=687)
(job_stage_optional_validate_output.max=66)
(job_stage_rename_files.max=161)
(job_stage_save_success_marker.max=653)
(job_stage_setup.max=571)
(op_create_directories.max=1)
(op_delete.max=113)
(op_delete_dir.max=129)
(op_get_file_status.failures.max=231)
(op_get_file_status.max=61)
(op_list_status.max=300)
(op_load_all_manifests.max=445)
(op_load_manifest.max=436)
(op_mkdirs.max=123)
(op_msync.max=0)
(op_rename.failures.max=266)
(op_rename.max=139)
(task_stage_commit.max=302)
(task_stage_save_task_manifest.max=651)
(task_stage_scan_directory.max=302)
(task_stage_setup.max=157));
means=((commit_file_rename.mean=(samples=1, sum=141, mean=141.0000))
(committer_commit_job.mean=(samples=1, sum=2306, mean=2306.0000))
(committer_task_directory_count=(samples=4, sum=0, mean=0.0000))
(committer_task_directory_depth=(samples=2, sum=2, mean=1.0000))
(committer_task_file_count=(samples=4, sum=2, mean=0.5000))
(committer_task_file_size=(samples=2, sum=21, mean=10.5000))
(committer_task_manifest_file_size=(samples=2, sum=37157, mean=18578.5000))
(job_stage_cleanup.mean=(samples=1, sum=196, mean=196.0000))
(job_stage_create_target_dirs.mean=(samples=1, sum=2, mean=2.0000))
(job_stage_load_manifests.mean=(samples=1, sum=687, mean=687.0000))
(job_stage_optional_validate_output.mean=(samples=1, sum=66, mean=66.0000))
(job_stage_rename_files.mean=(samples=1, sum=161, mean=161.0000))
(job_stage_save_success_marker.mean=(samples=1, sum=653, mean=653.0000))
(job_stage_setup.mean=(samples=1, sum=571, mean=571.0000))
(op_create_directories.mean=(samples=1, sum=1, mean=1.0000))
(op_delete.mean=(samples=3, sum=240, mean=80.0000))
(op_delete_dir.mean=(samples=1, sum=129, mean=129.0000))
(op_get_file_status.failures.mean=(samples=6, sum=614, mean=102.3333))
(op_get_file_status.mean=(samples=3, sum=175, mean=58.3333))
(op_list_status.mean=(samples=3, sum=671, mean=223.6667))
(op_load_all_manifests.mean=(samples=1, sum=445, mean=445.0000))
(op_load_manifest.mean=(samples=2, sum=607, mean=303.5000))
(op_mkdirs.mean=(samples=4, sum=361, mean=90.2500))
(op_msync.mean=(samples=1, sum=0, mean=0.0000))
(op_rename.failures.mean=(samples=1, sum=266, mean=266.0000))
(op_rename.mean=(samples=1, sum=139, mean=139.0000))
(task_stage_commit.mean=(samples=2, sum=508, mean=254.0000))
(task_stage_save_task_manifest.mean=(samples=1, sum=651, mean=651.0000))
(task_stage_scan_directory.mean=(samples=2, sum=508, mean=254.0000))
(task_stage_setup.mean=(samples=2, sum=284, mean=142.0000)));
```
## <a name="summaries"></a> Collecting Job Summaries `mapreduce.manifest.committer.summary.report.directory`
The committer can be configured to save the `_SUCCESS` summary files to a report directory,
@ -431,46 +559,62 @@ This allows for the statistics of jobs to be collected irrespective of their out
saving the `_SUCCESS` marker is enabled, and without problems caused by a chain of queries
overwriting the markers.
The `mapred successfile` operation can be used to print these reports.
# <a name="cleanup"></a> Cleanup
Job cleanup is convoluted as it is designed to address a number of issues which
may surface in cloud storage.
* Slow performance for deletion of directories.
* Timeout when deleting very deep and wide directory trees.
* Slow performance for deletion of directories (GCS).
* Timeout when deleting very deep and wide directory trees (Azure).
* General resilience to cleanup issues escalating to job failures.
| Option | Meaning | Default Value |
|--------|---------|---------------|
| `mapreduce.fileoutputcommitter.cleanup.skipped` | Skip cleanup of `_temporary` directory| `false` |
| `mapreduce.fileoutputcommitter.cleanup-failures.ignored` | Ignore errors during cleanup | `false` |
| `mapreduce.manifest.committer.cleanup.parallel.delete` | Delete task attempt directories in parallel | `true` |
| Option | Meaning | Default Value |
|-------------------------------------------------------------------|--------------------------------------------------------------------|---------------|
| `mapreduce.fileoutputcommitter.cleanup.skipped` | Skip cleanup of `_temporary` directory | `false` |
| `mapreduce.fileoutputcommitter.cleanup-failures.ignored` | Ignore errors during cleanup | `false` |
| `mapreduce.manifest.committer.cleanup.parallel.delete` | Delete task attempt directories in parallel | `true` |
| `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` | Attempt to delete the base directory before parallel task attempts | `false` |
The algorithm is:
```
if `mapreduce.fileoutputcommitter.cleanup.skipped`:
```python
if "mapreduce.fileoutputcommitter.cleanup.skipped":
return
if `mapreduce.manifest.committer.cleanup.parallel.delete`:
attempt parallel delete of task directories; catch any exception
if not `mapreduce.fileoutputcommitter.cleanup.skipped`:
delete(`_temporary`); catch any exception
if caught-exception and not `mapreduce.fileoutputcommitter.cleanup-failures.ignored`:
throw caught-exception
if "mapreduce.manifest.committer.cleanup.parallel.delete":
if "mapreduce.manifest.committer.cleanup.parallel.delete.base.first" :
if delete("_temporary"):
return
delete(list("$task-directories")) catch any exception
if not "mapreduce.fileoutputcommitter.cleanup.skipped":
delete("_temporary"); catch any exception
if caught-exception and not "mapreduce.fileoutputcommitter.cleanup-failures.ignored":
raise caught-exception
```
It's a bit complicated, but the goal is to perform a fast/scalable delete and
throw a meaningful exception if that didn't work.
When working with ABFS and GCS, these settings should normally be left alone.
If somehow errors surface during cleanup, enabling the option to
ignore failures will ensure the job still completes.
For ABFS set `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` to `true`
which should normally result in less network IO and a faster cleanup.
```
spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first true
```
For GCS, setting `mapreduce.manifest.committer.cleanup.parallel.delete.base.first`
to `false` may speed up cleanup.
If somehow errors surface during cleanup, ignoring failures will ensure the job
is still considered a success.
`mapreduce.fileoutputcommitter.cleanup-failures.ignored = true`
Disabling cleanup even avoids the overhead of cleanup, but
requires a workflow or manual operation to clean up all
`_temporary` directories on a regular basis.
`_temporary` directories on a regular basis:
`mapreduce.fileoutputcommitter.cleanup.skipped = true`.
# <a name="abfs"></a> Working with Azure ADLS Gen2 Storage
@ -504,9 +648,15 @@ The core set of Azure-optimized options becomes
</property>
<property>
<name>spark.hadoop.fs.azure.io.rate.limit</name>
<value>10000</value>
<name>fs.azure.io.rate.limit</name>
<value>1000</value>
</property>
<property>
<name>mapreduce.manifest.committer.cleanup.parallel.delete.base.first</name>
<value>true</value>
</property>
```
And optional settings for debugging/performance analysis
@ -514,7 +664,7 @@ And optional settings for debugging/performance analysis
```xml
<property>
<name>mapreduce.manifest.committer.summary.report.directory</name>
<value>abfs:// Path within same store/separate store</value>
<value>Path within same store/separate store</value>
<description>Optional: path to where job summaries are saved</description>
</property>
```
@ -523,14 +673,15 @@ And optional settings for debugging/performance analysis
```
spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
spark.hadoop.fs.azure.io.rate.limit 10000
spark.hadoop.fs.azure.io.rate.limit 1000
spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first true
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: URI of a directory for job summaries)
```
## Experimental: ABFS Rename Rate Limiting `fs.azure.io.rate.limit`
## <a name="abfs-rate-limit"></a> ABFS Rename Rate Limiting `fs.azure.io.rate.limit`
To avoid triggering store throttling and backoff delays, as well as other
throttling-related failure conditions file renames during job commit
@ -544,13 +695,12 @@ may issue.
Set the option to `0` remove all rate limiting.
The default value of this is set to 10000, which is the default IO capacity for
an ADLS storage account.
The default value of this is set to 1000.
```xml
<property>
<name>fs.azure.io.rate.limit</name>
<value>10000</value>
<value>1000</value>
<description>maximum number of renames attempted per second</description>
</property>
```
@ -569,7 +719,7 @@ If server-side throttling took place, signs of this can be seen in
* The store service's logs and their throttling status codes (usually 503 or 500).
* The job statistic `commit_file_rename_recovered`. This statistic indicates that
ADLS throttling manifested as failures in renames, failures which were recovered
from in the comitter.
from in the committer.
If these are seen -or other applications running at the same time experience
throttling/throttling-triggered problems, consider reducing the value of
@ -598,13 +748,14 @@ The Spark settings to switch to this committer are
spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first false
spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: URI of a directory for job summaries)
```
The store's directory delete operations are `O(files)` so the value
of `mapreduce.manifest.committer.cleanup.parallel.delete`
SHOULD be left at the default of `true`.
SHOULD be left at the default of `true`, but
`mapreduce.manifest.committer.cleanup.parallel.delete.base.first` changed to `false`
For mapreduce, declare the binding in `core-site.xml`or `mapred-site.xml`
```xml
@ -639,19 +790,33 @@ spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOut
spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: URI of a directory for job summaries)
```
# <a name="advanced"></a> Advanced Topics
## Advanced Configuration options
# <a name="advanced"></a> Advanced Configuration options
There are some advanced options which are intended for development and testing,
rather than production use.
| Option | Meaning | Default Value |
|--------|----------------------------------------------|---------------|
| `mapreduce.manifest.committer.store.operations.classname` | Classname for Manifest Store Operations | `""` |
| `mapreduce.manifest.committer.validate.output` | Perform output validation? | `false` |
| `mapreduce.manifest.committer.writer.queue.capacity` | Queue capacity for writing intermediate file | `32` |
| Option | Meaning | Default Value |
|-----------------------------------------------------------|-------------------------------------------------------------|---------------|
| `mapreduce.manifest.committer.manifest.save.attempts` | How many attempts should be made to commit a task manifest? | `5` |
| `mapreduce.manifest.committer.store.operations.classname` | Classname for Manifest Store Operations | `""` |
| `mapreduce.manifest.committer.validate.output` | Perform output validation? | `false` |
| `mapreduce.manifest.committer.writer.queue.capacity` | Queue capacity for writing intermediate file | `32` |
### `mapreduce.manifest.committer.manifest.save.attempts`
The number of attempts which should be made to save a task attempt manifest, which is done by
1. Writing the file to a temporary file in the job attempt directory.
2. Deleting any existing task manifest
3. Renaming the temporary file to the final filename.
This may fail for unrecoverable reasons (permissions, permanent loss of network, service down,...) or it may be
a transient problem which may not reoccur if another attempt is made to write the data.
The number of attempts to make is set by `mapreduce.manifest.committer.manifest.save.attempts`;
the sleep time increases with each attempt.
Consider increasing the default value if task attempts fail to commit their work
and fail to recover from network problems.
### Validating output `mapreduce.manifest.committer.validate.output`
@ -691,6 +856,34 @@ There is no need to alter these values, except when writing new implementations
something which is only needed if the store provides extra integration support for the
committer.
### `mapreduce.manifest.committer.writer.queue.capacity`
This is a secondary scale option.
It controls the size of the queue for storing lists of files to rename from
the manifests loaded from the target filesystem, manifests loaded
from a pool of worker threads, and the single thread which saves
the entries from each manifest to an intermediate file in the local filesystem.
Once the queue is full, all manifest loading threads will block.
```xml
<property>
<name>mapreduce.manifest.committer.writer.queue.capacity</name>
<value>32</value>
</property>
```
As the local filesystem is usually much faster to write to than any cloud store,
this queue size should not be a limit on manifest load performance.
It can help limit the amount of memory consumed during manifest load during
job commit.
The maximum number of loaded manifests will be:
```
mapreduce.manifest.committer.writer.queue.capacity + mapreduce.manifest.committer.io.threads
```
## <a name="concurrent"></a> Support for concurrent jobs to the same directory
It *may* be possible to run multiple jobs targeting the same directory tree.

View File

@ -19,6 +19,7 @@ This document describes the architecture and other implementation/correctness
aspects of the [Manifest Committer](manifest_committer.html)
The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html).
<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world"
@ -278,6 +279,11 @@ The manifest committer assumes that the amount of data being stored in memory is
because there is no longer the need to store an etag for every block of every
file being committed.
This assumption turned out not to hold for some jobs:
[MAPREDUCE-7435. ManifestCommitter OOM on azure job](https://issues.apache.org/jira/browse/MAPREDUCE-7435)
The strategy here was to read in all manifests and stream their entries to a local file, as Hadoop
Writable objects -hence with lower marshalling overhead than JSON.
#### Duplicate creation of directories in the dest dir

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestS
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SaveTaskManifestStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
@ -167,6 +168,12 @@ public abstract class AbstractManifestCommitterTest
private static final int MAX_LEN = 64_000;
/**
* How many attempts to save manifests before giving up.
* Kept small to reduce sleep times and network delays.
*/
public static final int SAVE_ATTEMPTS = 4;
/**
* Submitter for tasks; may be null.
*/
@ -771,6 +778,9 @@ public abstract class AbstractManifestCommitterTest
/**
* Create the stage config for job or task but don't finalize it.
* Uses {@link #TASK_IDS} for job/task ID.
* The store operations is extracted from
* {@link #getStoreOperations()}, which is how fault injection
* can be set up.
* @param jobAttemptNumber job attempt number
* @param taskIndex task attempt index; -1 for job attempt only.
* @param taskAttemptNumber task attempt number
@ -796,6 +806,7 @@ public abstract class AbstractManifestCommitterTest
.withJobAttemptNumber(jobAttemptNumber)
.withJobDirectories(attemptDirs)
.withName(String.format(NAME_FORMAT_JOB_ATTEMPT, jobId))
.withManifestSaveAttempts(SAVE_ATTEMPTS)
.withOperations(getStoreOperations())
.withProgressable(getProgressCounter())
.withSuccessMarkerFileLimit(100_000)
@ -924,7 +935,7 @@ public abstract class AbstractManifestCommitterTest
}
// save the manifest for this stage.
new SaveTaskManifestStage(taskStageConfig).apply(manifest);
new SaveTaskManifestStage(taskStageConfig).apply(() -> manifest);
return manifest;
}
@ -998,7 +1009,9 @@ public abstract class AbstractManifestCommitterTest
* Create and execute a cleanup stage.
* @param enabled is the stage enabled?
* @param deleteTaskAttemptDirsInParallel delete task attempt dirs in
* parallel?
* parallel?
* @param attemptBaseDeleteFirst Make an initial attempt to
* delete the base directory
* @param suppressExceptions suppress exceptions?
* @param outcome expected outcome.
* @param expectedDirsDeleted #of directories deleted. -1 for no checks
@ -1008,13 +1021,18 @@ public abstract class AbstractManifestCommitterTest
protected CleanupJobStage.Result cleanup(
final boolean enabled,
final boolean deleteTaskAttemptDirsInParallel,
boolean attemptBaseDeleteFirst,
final boolean suppressExceptions,
final CleanupJobStage.Outcome outcome,
final int expectedDirsDeleted) throws IOException {
StageConfig stageConfig = getJobStageConfig();
CleanupJobStage.Result result = new CleanupJobStage(stageConfig)
.apply(new CleanupJobStage.Arguments(OP_STAGE_JOB_CLEANUP,
enabled, deleteTaskAttemptDirsInParallel, suppressExceptions));
enabled,
deleteTaskAttemptDirsInParallel,
attemptBaseDeleteFirst,
suppressExceptions,
0));
assertCleanupResult(result, outcome, expectedDirsDeleted);
return result;
}
@ -1038,6 +1056,24 @@ public abstract class AbstractManifestCommitterTest
StandardCharsets.UTF_8);
}
/**
* Make the store operations unreliable.
* If it already was then reset the failure options.
* @return the store operations
*/
protected UnreliableManifestStoreOperations makeStoreOperationsUnreliable() {
UnreliableManifestStoreOperations failures;
final ManifestStoreOperations wrappedOperations = getStoreOperations();
if (wrappedOperations instanceof UnreliableManifestStoreOperations) {
failures = (UnreliableManifestStoreOperations) wrappedOperations;
failures.reset();
} else {
failures = new UnreliableManifestStoreOperations(wrappedOperations);
setStoreOperations(failures);
}
return failures;
}
/**
* Counter.
*/

View File

@ -38,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordWriter;
@ -314,6 +315,21 @@ public final class ManifestCommitterTestSupport {
.isEqualTo(type);
}
/**
* Assert that none of the named statistics have any failure counts,
* which may be from being null or 0.
* @param iostats statistics
* @param names base name of the statistics (i.e. without ".failures" suffix)
*/
public static void assertNoFailureStatistics(IOStatistics iostats, String... names) {
final Map<String, Long> counters = iostats.counters();
for (String name : names) {
Assertions.assertThat(counters.get(name + ".failures"))
.describedAs("Failure count of %s", name)
.matches(f -> f == null || f == 0);
}
}
/**
* Save a manifest to an entry file; returning the loaded manifest data.
* Caller MUST clean up the temp file.

View File

@ -80,17 +80,25 @@ public class TestCleanupStage extends AbstractManifestCommitterTest {
@Test
public void testCleanupInParallelHealthy() throws Throwable {
describe("parallel cleanup of TA dirs.");
cleanup(true, true, false,
cleanup(true, true, false, false,
CleanupJobStage.Outcome.PARALLEL_DELETE,
PARALLEL_DELETE_COUNT);
verifyJobDirsCleanedUp();
}
@Test
public void testCleanupInParallelHealthyBaseFirst() throws Throwable {
describe("parallel cleanup of TA dirs with base first: one operation");
cleanup(true, true, true, false,
CleanupJobStage.Outcome.DELETED, ROOT_DELETE_COUNT);
verifyJobDirsCleanedUp();
}
@Test
public void testCleanupSingletonHealthy() throws Throwable {
describe("Cleanup with a single delete. Not the default; would be best on HDFS");
cleanup(true, false, false,
cleanup(true, false, false, false,
CleanupJobStage.Outcome.DELETED, ROOT_DELETE_COUNT);
verifyJobDirsCleanedUp();
}
@ -99,31 +107,69 @@ public class TestCleanupStage extends AbstractManifestCommitterTest {
public void testCleanupNoDir() throws Throwable {
describe("parallel cleanup MUST not fail if there's no dir");
// first do the cleanup
cleanup(true, true, false,
cleanup(true, true, false, false,
CleanupJobStage.Outcome.PARALLEL_DELETE, PARALLEL_DELETE_COUNT);
// now expect cleanup by single delete still works
// the delete count is 0 as pre check skips it
cleanup(true, false, false,
cleanup(true, false, false, false,
CleanupJobStage.Outcome.NOTHING_TO_CLEAN_UP, 0);
cleanup(true, true, true, false,
CleanupJobStage.Outcome.NOTHING_TO_CLEAN_UP, 0);
// if skipped, that happens first
cleanup(false, true, false,
cleanup(false, true, false, false,
CleanupJobStage.Outcome.DISABLED, 0);
}
@Test
public void testFailureInParallelDelete() throws Throwable {
describe("A parallel delete fails, but the base delete works");
describe("A parallel delete fails, but the fallback base delete works");
// pick one of the manifests
TaskManifest manifest = manifests.get(4);
Path taPath = new Path(manifest.getTaskAttemptDir());
failures.addDeletePathToFail(taPath);
cleanup(true, true, false,
failures.addDeletePathToFail(new Path(manifest.getTaskAttemptDir()));
cleanup(true, true, false, false,
CleanupJobStage.Outcome.DELETED, PARALLEL_DELETE_COUNT);
}
@Test
public void testFailureInParallelBaseDelete() throws Throwable {
describe("A parallel delete fails in the base delete; the parallel stage works");
// base path will timeout on first delete; the parallel delete will take place
failures.addDeletePathToTimeOut(getJobStageConfig().getOutputTempSubDir());
failures.setFailureLimit(1);
cleanup(true, true, false, false,
CleanupJobStage.Outcome.PARALLEL_DELETE, PARALLEL_DELETE_COUNT);
}
@Test
public void testDoubleFailureInParallelBaseDelete() throws Throwable {
describe("A parallel delete fails with the base delete and a task attempt dir");
// base path will timeout on first delete; the parallel delete will take place
failures.addDeletePathToTimeOut(getJobStageConfig().getOutputTempSubDir());
TaskManifest manifest = manifests.get(4);
failures.addDeletePathToFail(new Path(manifest.getTaskAttemptDir()));
failures.setFailureLimit(2);
cleanup(true, true, true, false,
CleanupJobStage.Outcome.DELETED, PARALLEL_DELETE_COUNT + 1);
}
@Test
public void testTripleFailureInParallelBaseDelete() throws Throwable {
describe("All delete phases will fail");
// base path will timeout on first delete; the parallel delete will take place
failures.addDeletePathToTimeOut(getJobStageConfig().getOutputTempSubDir());
TaskManifest manifest = manifests.get(4);
failures.addDeletePathToFail(new Path(manifest.getTaskAttemptDir()));
failures.setFailureLimit(4);
cleanup(true, true, true, true,
CleanupJobStage.Outcome.FAILURE, PARALLEL_DELETE_COUNT + 1);
}
/**
* If there's no job task attempt subdir then the list of it will raise
* and FNFE; this MUST be caught and the base delete executed.
@ -135,7 +181,7 @@ public class TestCleanupStage extends AbstractManifestCommitterTest {
StageConfig stageConfig = getJobStageConfig();
// TA dir doesn't exist, so listing will fail.
failures.addPathNotFound(stageConfig.getJobAttemptTaskSubDir());
cleanup(true, true, false,
cleanup(true, true, false, false,
CleanupJobStage.Outcome.DELETED, ROOT_DELETE_COUNT);
}

View File

@ -19,13 +19,21 @@
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.io.FileNotFoundException;
import java.net.SocketTimeoutException;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage;
@ -33,14 +41,27 @@ import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJob
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestTempPathForTaskAttempt;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations.E_TIMEOUT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations.generatedErrorMessage;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test committing a task.
* Test committing a task, with lots of fault injection to validate
* resilience to transient failures.
*/
public class TestCommitTaskStage extends AbstractManifestCommitterTest {
public static final String TASK1 = String.format("task_%03d", 1);
public static final String TASK1_ATTEMPT1 = String.format("%s_%02d",
TASK1, 1);
@Override
public void setup() throws Exception {
super.setup();
@ -51,6 +72,15 @@ public class TestCommitTaskStage extends AbstractManifestCommitterTest {
new SetupJobStage(stageConfig).apply(true);
}
/**
* Create a stage config for job 1 task1 attempt 1.
* @return a task stage configuration.
*/
private StageConfig createStageConfig() {
return createTaskStageConfig(JOB1, TASK1, TASK1_ATTEMPT1);
}
@Test
public void testCommitMissingDirectory() throws Throwable {
@ -108,8 +138,9 @@ public class TestCommitTaskStage extends AbstractManifestCommitterTest {
OP_STAGE_JOB_CLEANUP,
true,
true,
false
)));
false,
false,
0)));
// review success file
final Path successPath = outcome.getSuccessPath();
@ -123,4 +154,283 @@ public class TestCommitTaskStage extends AbstractManifestCommitterTest {
.isEmpty();
}
@Test
public void testManifestSaveFailures() throws Throwable {
describe("Test recovery of manifest save/rename failures");
UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable();
StageConfig stageConfig = createStageConfig();
new SetupTaskStage(stageConfig).apply("setup");
final Path manifestDir = stageConfig.getTaskManifestDir();
// final manifest file is by task ID
Path manifestFile = manifestPathForTask(manifestDir,
stageConfig.getTaskId());
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
stageConfig.getTaskAttemptId());
// manifest save will fail but recover before the task gives up.
failures.addSaveToFail(manifestTempFile);
// will fail because too many attempts failed.
failures.setFailureLimit(SAVE_ATTEMPTS + 1);
intercept(PathIOException.class, generatedErrorMessage("save"), () ->
new CommitTaskStage(stageConfig).apply(null));
// will succeed because the failure limit is set lower
failures.setFailureLimit(SAVE_ATTEMPTS - 1);
new CommitTaskStage(stageConfig).apply(null);
describe("Testing timeouts on rename operations.");
// now do it for the renames, which will fail after the rename
failures.reset();
failures.addTimeoutBeforeRename(manifestTempFile);
// first verify that if too many attempts fail, the task will fail
failures.setFailureLimit(SAVE_ATTEMPTS + 1);
intercept(SocketTimeoutException.class, E_TIMEOUT, () ->
new CommitTaskStage(stageConfig).apply(null));
// reduce the limit and expect the stage to succeed.
failures.setFailureLimit(SAVE_ATTEMPTS - 1);
new CommitTaskStage(stageConfig).apply(null);
}
/**
* Save with renaming failing before the rename; the source file
* will be present on the next attempt.
* The successfully saved manifest file is loaded and its statistics
* examined to verify that the failure count is updated.
*/
@Test
public void testManifestRenameEarlyTimeouts() throws Throwable {
describe("Testing timeouts on rename operations.");
UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable();
StageConfig stageConfig = createStageConfig();
new SetupTaskStage(stageConfig).apply("setup");
final Path manifestDir = stageConfig.getTaskManifestDir();
// final manifest file is by task ID
Path manifestFile = manifestPathForTask(manifestDir,
stageConfig.getTaskId());
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
stageConfig.getTaskAttemptId());
// configure for which will fail after the rename
failures.addTimeoutBeforeRename(manifestTempFile);
// first verify that if too many attempts fail, the task will fail
failures.setFailureLimit(SAVE_ATTEMPTS + 1);
intercept(SocketTimeoutException.class, E_TIMEOUT, () ->
new CommitTaskStage(stageConfig).apply(null));
// and that the IO stats are updated
final IOStatisticsStore iostats = stageConfig.getIOStatistics();
assertThatStatisticCounter(iostats, OP_SAVE_TASK_MANIFEST + ".failures")
.isEqualTo(SAVE_ATTEMPTS);
// reduce the limit and expect the stage to succeed.
iostats.reset();
failures.setFailureLimit(SAVE_ATTEMPTS);
final CommitTaskStage.Result r = new CommitTaskStage(stageConfig).apply(null);
// load in the manifest
final TaskManifest loadedManifest = TaskManifest.load(getFileSystem(), r.getPath());
final IOStatisticsSnapshot loadedIOStats = loadedManifest.getIOStatistics();
LOG.info("Statistics of file successfully saved:\nD {}",
ioStatisticsToPrettyString(loadedIOStats));
assertThatStatisticCounter(loadedIOStats, OP_SAVE_TASK_MANIFEST + ".failures")
.isEqualTo(SAVE_ATTEMPTS - 1);
}
@Test
public void testManifestRenameLateTimeoutsFailure() throws Throwable {
describe("Testing timeouts on rename operations.");
UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable();
StageConfig stageConfig = createStageConfig();
new SetupTaskStage(stageConfig).apply("setup");
final Path manifestDir = stageConfig.getTaskManifestDir();
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
stageConfig.getTaskAttemptId());
failures.addTimeoutAfterRename(manifestTempFile);
// if too many attempts fail, the task will fail
failures.setFailureLimit(SAVE_ATTEMPTS + 1);
intercept(SocketTimeoutException.class, E_TIMEOUT, () ->
new CommitTaskStage(stageConfig).apply(null));
}
@Test
public void testManifestRenameLateTimeoutsRecovery() throws Throwable {
describe("Testing recovery from late timeouts on rename operations.");
UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable();
StageConfig stageConfig = createStageConfig();
new SetupTaskStage(stageConfig).apply("setup");
final Path manifestDir = stageConfig.getTaskManifestDir();
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
stageConfig.getTaskAttemptId());
failures.addTimeoutAfterRename(manifestTempFile);
// reduce the limit and expect the stage to succeed.
failures.setFailureLimit(SAVE_ATTEMPTS);
stageConfig.getIOStatistics().reset();
new CommitTaskStage(stageConfig).apply(null);
final CommitTaskStage.Result r = new CommitTaskStage(stageConfig).apply(null);
// load in the manifest
final TaskManifest loadedManifest = TaskManifest.load(getFileSystem(), r.getPath());
final IOStatisticsSnapshot loadedIOStats = loadedManifest.getIOStatistics();
LOG.info("Statistics of file successfully saved:\n{}",
ioStatisticsToPrettyString(loadedIOStats));
// the failure event is one less than the limit.
assertThatStatisticCounter(loadedIOStats, OP_SAVE_TASK_MANIFEST + ".failures")
.isEqualTo(SAVE_ATTEMPTS - 1);
}
@Test
public void testFailureToDeleteManifestPath() throws Throwable {
describe("Testing failure in the delete call made before renaming the manifest");
UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable();
StageConfig stageConfig = createStageConfig();
new SetupTaskStage(stageConfig).apply("setup");
final Path manifestDir = stageConfig.getTaskManifestDir();
// final manifest file is by task ID
Path manifestFile = manifestPathForTask(manifestDir,
stageConfig.getTaskId());
// put a file in as there is a check for it before the delete
ContractTestUtils.touch(getFileSystem(), manifestFile);
/* and the delete shall fail */
failures.addDeletePathToFail(manifestFile);
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
stageConfig.getTaskAttemptId());
// first verify that if too many attempts fail, the task will fail
failures.setFailureLimit(SAVE_ATTEMPTS + 1);
intercept(PathIOException.class, () ->
new CommitTaskStage(stageConfig).apply(null));
// reduce the limit and expect the stage to succeed.
failures.setFailureLimit(SAVE_ATTEMPTS - 1);
new CommitTaskStage(stageConfig).apply(null);
}
/**
* Failure of delete before saving the manifest to a temporary path.
*/
@Test
public void testFailureOfDeleteBeforeSavingTemporaryFile() throws Throwable {
describe("Testing failure in the delete call made before rename");
UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable();
StageConfig stageConfig = createStageConfig();
new SetupTaskStage(stageConfig).apply("setup");
final Path manifestDir = stageConfig.getTaskManifestDir();
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
stageConfig.getTaskAttemptId());
// delete will fail
failures.addDeletePathToFail(manifestTempFile);
// first verify that if too many attempts fail, the task will fail
failures.setFailureLimit(SAVE_ATTEMPTS + 1);
intercept(PathIOException.class, () ->
new CommitTaskStage(stageConfig).apply(null));
// reduce the limit and expect the stage to succeed.
failures.setFailureLimit(SAVE_ATTEMPTS - 1);
new CommitTaskStage(stageConfig).apply(null);
}
/**
* Rename target is a directory.
*/
@Test
public void testRenameTargetIsDir() throws Throwable {
describe("Rename target is a directory");
final ManifestStoreOperations operations = getStoreOperations();
StageConfig stageConfig = createStageConfig();
final SetupTaskStage setup = new SetupTaskStage(stageConfig);
setup.apply("setup");
final Path manifestDir = stageConfig.getTaskManifestDir();
// final manifest file is by task ID
Path manifestFile = manifestPathForTask(manifestDir,
stageConfig.getTaskId());
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
stageConfig.getTaskAttemptId());
// add a directory where the manifest file is to go
setup.mkdirs(manifestFile, true);
ContractTestUtils.assertIsDirectory(getFileSystem(), manifestFile);
new CommitTaskStage(stageConfig).apply(null);
// this must be a file.
final FileStatus st = operations.getFileStatus(manifestFile);
Assertions.assertThat(st)
.describedAs("File status of %s", manifestFile)
.matches(FileStatus::isFile, "is a file");
// and it must load.
final TaskManifest manifest = setup.loadManifest(st);
Assertions.assertThat(manifest)
.matches(m -> m.getTaskID().equals(TASK1))
.matches(m -> m.getTaskAttemptID().equals(TASK1_ATTEMPT1));
}
/**
* Manifest temp file path is a directory.
*/
@Test
public void testManifestTempFileIsDir() throws Throwable {
describe("Manifest temp file path is a directory");
final ManifestStoreOperations operations = getStoreOperations();
StageConfig stageConfig = createStageConfig();
final SetupTaskStage setup = new SetupTaskStage(stageConfig);
setup.apply("setup");
final Path manifestDir = stageConfig.getTaskManifestDir();
// final manifest file is by task ID
Path manifestFile = manifestPathForTask(manifestDir,
stageConfig.getTaskId());
Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir,
stageConfig.getTaskAttemptId());
// add a directory where the manifest file is to go
setup.mkdirs(manifestTempFile, true);
new CommitTaskStage(stageConfig).apply(null);
final TaskManifest manifest = setup.loadManifest(
operations.getFileStatus(manifestFile));
Assertions.assertThat(manifest)
.matches(m -> m.getTaskID().equals(TASK1))
.matches(m -> m.getTaskAttemptID().equals(TASK1_ATTEMPT1));
}
}

View File

@ -247,7 +247,7 @@ public class TestCreateOutputDirectoriesStage extends AbstractManifestCommitterT
CreateOutputDirectoriesStage attempt2 =
new CreateOutputDirectoriesStage(
createStageConfigForJob(JOB1, destDir)
.withDeleteTargetPaths(true));
.withDeleteTargetPaths(false));
// attempt will fail because one of the entries marked as
// a file to delete is now a non-empty directory
LOG.info("Executing failing attempt to create the directories");

View File

@ -598,7 +598,8 @@ public class TestJobThroughManifestCommitter
public void test_0900_cleanupJob() throws Throwable {
describe("Cleanup job");
CleanupJobStage.Arguments arguments = new CleanupJobStage.Arguments(
OP_STAGE_JOB_CLEANUP, true, true, false);
OP_STAGE_JOB_CLEANUP, true, true,
false, false, 0);
// the first run will list the three task attempt dirs and delete each
// one before the toplevel dir.
CleanupJobStage.Result result = new CleanupJobStage(
@ -615,7 +616,7 @@ public class TestJobThroughManifestCommitter
* Needed to clean up the shared test root, as test case teardown
* does not do it.
*/
//@Test
@Test
public void test_9999_cleanupTestDir() throws Throwable {
if (shouldDeleteTestRootAtEndOfTestRun()) {
deleteSharedTestRoot();

View File

@ -176,7 +176,7 @@ public class TestLoadManifestsStage extends AbstractManifestCommitterTest {
// and skipping the rename stage (which is going to fail),
// go straight to cleanup
new CleanupJobStage(stageConfig).apply(
new CleanupJobStage.Arguments("", true, true, false));
new CleanupJobStage.Arguments("", true, true, false, false, 0));
heapinfo(heapInfo, "cleanup");
ManifestSuccessData success = createManifestOutcome(stageConfig, OP_STAGE_JOB_COMMIT);

View File

@ -21,8 +21,10 @@ package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,8 +49,7 @@ import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.Int
* This is for testing. It could be implemented via
* Mockito 2 spy code but is not so that:
* 1. It can be backported to Hadoop versions using Mockito 1.x.
* 2. It can be extended to use in production. This is why it is in
* the production module -to allow for downstream tests to adopt it.
* 2. It can be extended to use in production.
* 3. You can actually debug what's going on.
*/
@InterfaceAudience.Private
@ -69,6 +70,12 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
*/
public static final String SIMULATED_FAILURE = "Simulated failure";
/**
* Default failure limit.
* Set to a large enough value that most tests don't hit it.
*/
private static final int DEFAULT_FAILURE_LIMIT = Integer.MAX_VALUE;
/**
* Underlying store operations to wrap.
*/
@ -110,6 +117,16 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
*/
private final Set<Path> renameDestDirsToFail = new HashSet<>();
/**
* Source paths of rename operations to time out before the rename request is issued.
*/
private final Set<Path> renamePathsToTimeoutBeforeRename = new HashSet<>();
/**
* Source paths of rename operations to time out after the rename request has succeeded.
*/
private final Set<Path> renamePathsToTimeoutAfterRename = new HashSet<>();
/**
* Path of save() to fail.
*/
@ -125,6 +142,11 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
*/
private boolean renameToFailWithException = true;
/**
* How many failures before an operation is passed through.
*/
private final AtomicInteger failureLimit = new AtomicInteger(DEFAULT_FAILURE_LIMIT);
/**
* Constructor.
* @param wrappedOperations operations to wrap.
@ -133,16 +155,19 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
this.wrappedOperations = wrappedOperations;
}
/**
* Reset everything.
*/
public void reset() {
deletePathsToFail.clear();
deletePathsToTimeOut.clear();
failureLimit.set(DEFAULT_FAILURE_LIMIT);
pathNotFound.clear();
renameSourceFilesToFail.clear();
renameDestDirsToFail.clear();
renamePathsToTimeoutBeforeRename.clear();
renamePathsToTimeoutAfterRename.clear();
saveToFail.clear();
timeoutSleepTimeMillis = 0;
}
@ -219,6 +244,21 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
renameDestDirsToFail.add(requireNonNull(path));
}
/**
* Add a source path to timeout before the rename.
* @param path path to add.
*/
public void addTimeoutBeforeRename(Path path) {
renamePathsToTimeoutBeforeRename.add(requireNonNull(path));
}
/**
* Add a source path to timeout after the rename.
* @param path path to add.
*/
public void addTimeoutAfterRename(Path path) {
renamePathsToTimeoutAfterRename.add(requireNonNull(path));
}
/**
* Add a path to the list of paths where save will fail.
* @param path path to add.
@ -228,7 +268,16 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
}
/**
* Raise an exception if the path is in the set of target paths.
* Set the failure limit.
* @param limit limit
*/
public void setFailureLimit(int limit) {
failureLimit.set(limit);
}
/**
* Raise an exception if the path is in the set of target paths
* and the failure limit is not exceeded.
* @param operation operation which failed.
* @param path path to check
* @param paths paths to probe for {@code path} being in.
@ -236,20 +285,56 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
*/
private void maybeRaiseIOE(String operation, Path path, Set<Path> paths)
throws IOException {
if (paths.contains(path) && decrementAndCheckFailureLimit()) {
// hand off to the inner check.
maybeRaiseIOENoFailureLimitCheck(operation, path, paths);
}
}
/**
* Raise an exception if the path is in the set of target paths.
* No checks on failure count are performed.
* @param operation operation which failed.
* @param path path to check
* @param paths paths to probe for {@code path} being in.
* @throws IOException simulated failure
*/
private void maybeRaiseIOENoFailureLimitCheck(String operation, Path path, Set<Path> paths)
throws IOException {
if (paths.contains(path)) {
LOG.info("Simulating failure of {} with {}", operation, path);
throw new PathIOException(path.toString(),
SIMULATED_FAILURE + " of " + operation);
generatedErrorMessage(operation));
}
}
/**
* Given an operation, return the error message which is used for the simulated
* {@link PathIOException}.
* @param operation operation name
* @return error text
*/
public static String generatedErrorMessage(final String operation) {
return SIMULATED_FAILURE + " of " + operation;
}
/**
* Check if the failure limit is exceeded.
* Call this after any other trigger checks, as it decrements the counter.
*
* @return true if the limit is not exceeded.
*/
private boolean decrementAndCheckFailureLimit() {
return failureLimit.decrementAndGet() > 0;
}
/**
* Verify that a path is not on the file not found list.
* @param path path
* @throws FileNotFoundException if configured to fail.
*/
private void verifyExists(Path path) throws FileNotFoundException {
if (pathNotFound.contains(path)) {
if (pathNotFound.contains(path) && decrementAndCheckFailureLimit()) {
throw new FileNotFoundException(path.toString());
}
}
@ -260,11 +345,12 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
* @param operation operation which failed.
* @param path path to check
* @param paths paths to probe for {@code path} being in.
* @throws IOException simulated timeout
* @throws SocketTimeoutException simulated timeout
* @throws InterruptedIOException if the sleep is interrupted.
*/
private void maybeTimeout(String operation, Path path, Set<Path> paths)
throws IOException {
if (paths.contains(path)) {
throws SocketTimeoutException, InterruptedIOException {
if (paths.contains(path) && decrementAndCheckFailureLimit()) {
LOG.info("Simulating timeout of {} with {}", operation, path);
try {
if (timeoutSleepTimeMillis > 0) {
@ -273,14 +359,16 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
throw new PathIOException(path.toString(),
"ErrorCode=" + OPERATION_TIMED_OUT
throw new SocketTimeoutException(
path.toString() + ": " + operation
+ " ErrorCode=" + OPERATION_TIMED_OUT
+ " ErrorMessage=" + E_TIMEOUT);
}
}
@Override
public FileStatus getFileStatus(final Path path) throws IOException {
maybeTimeout("getFileStatus()", path, pathNotFound);
verifyExists(path);
return wrappedOperations.getFileStatus(path);
}
@ -304,17 +392,23 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
public boolean renameFile(final Path source, final Path dest)
throws IOException {
String op = "rename";
maybeTimeout(op, source, renamePathsToTimeoutBeforeRename);
if (renameToFailWithException) {
maybeRaiseIOE(op, source, renameSourceFilesToFail);
maybeRaiseIOE(op, dest.getParent(), renameDestDirsToFail);
} else {
if (renameSourceFilesToFail.contains(source)
|| renameDestDirsToFail.contains(dest.getParent())) {
// logic to determine whether rename should just return false.
if ((renameSourceFilesToFail.contains(source)
|| renameDestDirsToFail.contains(dest.getParent())
&& decrementAndCheckFailureLimit())) {
LOG.info("Failing rename({}, {})", source, dest);
return false;
}
}
return wrappedOperations.renameFile(source, dest);
final boolean b = wrappedOperations.renameFile(source, dest);
// post rename timeout.
maybeTimeout(op, source, renamePathsToTimeoutAfterRename);
return b;
}
@Override
@ -358,13 +452,19 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations {
@Override
public CommitFileResult commitFile(final FileEntry entry)
throws IOException {
final String op = "commitFile";
final Path source = entry.getSourcePath();
maybeTimeout(op, source, renamePathsToTimeoutBeforeRename);
if (renameToFailWithException) {
maybeRaiseIOE("commitFile",
entry.getSourcePath(), renameSourceFilesToFail);
maybeRaiseIOE("commitFile",
maybeRaiseIOE(op,
source, renameSourceFilesToFail);
maybeRaiseIOE(op,
entry.getDestPath().getParent(), renameDestDirsToFail);
}
return wrappedOperations.commitFile(entry);
final CommitFileResult result = wrappedOperations.commitFile(entry);
// post rename timeout.
maybeTimeout(op, source, renamePathsToTimeoutAfterRename);
return result;
}
@Override

View File

@ -17,3 +17,5 @@ log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
log4j.logger.org.apache.hadoop.mapreduce.lib.output.committer.manifest=DEBUG

View File

@ -45,6 +45,8 @@
<fs.azure.scale.test.timeout>7200</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>10</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>1000</fs.azure.scale.test.list.performance.files>
<!-- http connection pool size; passed down as a system property -->
<http.maxConnections>100</http.maxConnections>
</properties>
<build>
@ -400,7 +402,8 @@
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
</systemPropertyVariables>
<!-- http connection pool size -->
<http.maxConnections>${http.maxConnections}</http.maxConnections> </systemPropertyVariables>
<includes>
<include>**/azure/Test*.java</include>
<include>**/azure/**/Test*.java</include>
@ -431,6 +434,8 @@
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
<!-- http connection pool size -->
<http.maxConnections>${http.maxConnections}</http.maxConnections>
</systemPropertyVariables>
<includes>
<include>**/azure/**/TestRollingWindowAverage*.java</include>
@ -604,6 +609,8 @@
<!-- Propagate scale parameters -->
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<!-- http connection pool size -->
<http.maxConnections>${http.maxConnections}</http.maxConnections>
</systemPropertyVariables>
<includes>
@ -792,6 +799,8 @@
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
<!-- http connection pool size -->
<http.maxConnections>${http.maxConnections}</http.maxConnections>
</systemPropertyVariables>
<!-- Some tests cannot run in parallel. Tests that cover -->
<!-- access to the root directory must run in isolation -->
@ -842,7 +851,8 @@
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
</systemPropertyVariables>
<!-- http connection pool size -->
<http.maxConnections>${http.maxConnections}</http.maxConnections> </systemPropertyVariables>
<includes>
<include>**/ITestWasbAbfsCompatibility.java</include>
<include>**/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java</include>
@ -891,6 +901,8 @@
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
<!-- http connection pool size -->
<http.maxConnections>${http.maxConnections}</http.maxConnections>
</systemPropertyVariables>
<forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
<trimStackTrace>false</trimStackTrace>

View File

@ -159,7 +159,7 @@ public final class FileSystemConfigurations {
/**
* IO rate limit. Value: {@value}
*/
public static final int RATE_LIMIT_DEFAULT = 10_000;
public static final int RATE_LIMIT_DEFAULT = 1_000;
private FileSystemConfigurations() {}
}

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
/**
@ -51,9 +52,10 @@ final class AbfsCommitTestHelper {
final String size = Integer.toString(192);
conf.setIfUnset(ManifestCommitterConstants.OPT_IO_PROCESSORS, size);
conf.setIfUnset(ManifestCommitterConstants.OPT_WRITER_QUEUE_CAPACITY, size);
// no need for parallel delete here as we aren't at the scale where unified delete
// is going to time out
conf.setBooleanIfUnset(ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE, false);
// enable parallel delete but ask for base deletion first,
// which is now our recommended azure option
conf.setBoolean(ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE, true);
conf.setBoolean(OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST, true);
return conf;
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.examples.terasort.TeraSort;
import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
import org.apache.hadoop.examples.terasort.TeraValidate;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.mapred.JobConf;
@ -52,6 +53,9 @@ import org.apache.hadoop.util.functional.RemoteIterators;
import static java.util.Optional.empty;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.assertNoFailureStatistics;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.loadSuccessFile;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile;
@ -95,6 +99,11 @@ public class ITestAbfsTerasort extends AbstractAbfsClusterITest {
protected static final IOStatisticsSnapshot JOB_IOSTATS =
snapshotIOStatistics();
/**
* Map of stage -> success file.
*/
private static final Map<String, ManifestSuccessData> SUCCESS_FILES = new HashMap<>();
/** Base path for all the terasort input and output paths. */
private Path terasortPath;
@ -188,9 +197,10 @@ public class ITestAbfsTerasort extends AbstractAbfsClusterITest {
* @param tool tool to run.
* @param args args for the tool.
* @param minimumFileCount minimum number of files to have been created
* @return the job success file.
* @throws Exception any failure
*/
private void executeStage(
private ManifestSuccessData executeStage(
final String stage,
final JobConf jobConf,
final Path dest,
@ -213,9 +223,20 @@ public class ITestAbfsTerasort extends AbstractAbfsClusterITest {
+ " failed", 0, result);
final ManifestSuccessData successFile = validateSuccessFile(getFileSystem(), dest,
minimumFileCount, "");
JOB_IOSTATS.aggregate(successFile.getIOStatistics());
final IOStatistics iostats = successFile.getIOStatistics();
JOB_IOSTATS.aggregate(iostats);
SUCCESS_FILES.put(stage, successFile);
completedStage(stage, d);
// now assert there were no failures recorded in the IO statistics
// for critical functions.
// these include collected statistics from manifest save
// operations.
assertNoFailureStatistics(iostats,
stage,
OP_SAVE_TASK_MANIFEST,
OP_RENAME_FILE);
return successFile;
}
/**
@ -319,6 +340,7 @@ public class ITestAbfsTerasort extends AbstractAbfsClusterITest {
File resultsFile = File.createTempFile("results", ".csv");
FileUtils.write(resultsFile, text, StandardCharsets.UTF_8);
LOG.info("Results are in {}\n{}", resultsFile, text);
LOG.info("Report directory {}", getReportDir());
}
/**