HADOOP-17318. Support concurrent S3A commit jobs with same app attempt ID. (#2399)

See also [SPARK-33402]: Jobs launched in same second have duplicate MapReduce JobIDs

Contributed by Steve Loughran.

Change-Id: Iae65333cddc84692997aae5d902ad8765b45772a
This commit is contained in:
Steve Loughran 2020-11-18 13:34:36 +00:00
parent c48c774d6c
commit 1eeb9d9d67
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
30 changed files with 1401 additions and 311 deletions

View File

@ -1925,20 +1925,13 @@
</property> </property>
<property> <property>
<name>fs.s3a.committer.staging.abort.pending.uploads</name> <name>fs.s3a.committer.abort.pending.uploads</name>
<value>true</value> <value>true</value>
<description> <description>
Should the staging committers abort all pending uploads to the destination Should the committers abort all pending uploads to the destination
directory? directory?
Changing this if more than one partitioned committer is Set to false if more than one job is writing to the same directory tree.
writing to the same destination tree simultaneously; otherwise
the first job to complete will cancel all outstanding uploads from the
others. However, it may lead to leaked outstanding uploads from failed
tasks. If disabled, configure the bucket lifecycle to remove uploads
after a time period, and/or set up a workflow to explicitly delete
entries. Otherwise there is a risk that uncommitted uploads may run up
bills.
</description> </description>
</property> </property>

View File

@ -180,6 +180,8 @@
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations; import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable; import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
@ -314,9 +316,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** Add any deprecated keys. */ /** Add any deprecated keys. */
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private static void addDeprecatedKeys() { private static void addDeprecatedKeys() {
// this is retained as a placeholder for when new deprecated keys
// need to be added.
Configuration.DeprecationDelta[] deltas = { Configuration.DeprecationDelta[] deltas = {
new Configuration.DeprecationDelta(
FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS,
FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS)
}; };
if (deltas.length > 0) { if (deltas.length > 0) {
@ -4593,7 +4596,7 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
*/ */
@Retries.OnceRaw @Retries.OnceRaw
void abortMultipartUpload(String destKey, String uploadId) { void abortMultipartUpload(String destKey, String uploadId) {
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey); LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
getAmazonS3Client().abortMultipartUpload( getAmazonS3Client().abortMultipartUpload(
new AbortMultipartUploadRequest(getBucket(), new AbortMultipartUploadRequest(getBucket(),
destKey, destKey,

View File

@ -131,6 +131,8 @@ protected WriteOperationHelper(S3AFileSystem owner, Configuration conf) {
*/ */
void operationRetried(String text, Exception ex, int retries, void operationRetried(String text, Exception ex, int retries,
boolean idempotent) { boolean idempotent) {
LOG.info("{}: Retried {}: {}", text, retries, ex.toString());
LOG.debug("Stack", ex);
owner.operationRetried(text, ex, retries, idempotent); owner.operationRetried(text, ex, retries, idempotent);
} }
@ -323,7 +325,9 @@ public CompleteMultipartUploadResult completeMPUwithRetries(
public void abortMultipartUpload(String destKey, String uploadId, public void abortMultipartUpload(String destKey, String uploadId,
Retried retrying) Retried retrying)
throws IOException { throws IOException {
invoker.retry("Aborting multipart upload", destKey, true, invoker.retry("Aborting multipart upload ID " + uploadId,
destKey,
true,
retrying, retrying,
() -> owner.abortMultipartUpload( () -> owner.abortMultipartUpload(
destKey, destKey,
@ -585,7 +589,8 @@ public BulkOperationState initiateOperation(final Path path,
@Retries.RetryTranslated @Retries.RetryTranslated
public UploadPartResult uploadPart(UploadPartRequest request) public UploadPartResult uploadPart(UploadPartRequest request)
throws IOException { throws IOException {
return retry("upload part", return retry("upload part #" + request.getPartNumber()
+ " upload ID "+ request.getUploadId(),
request.getKey(), request.getKey(),
true, true,
() -> owner.uploadPart(request)); () -> owner.uploadPart(request));

View File

@ -20,13 +20,14 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.text.DateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.MultipartUpload; import com.amazonaws.services.s3.model.MultipartUpload;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -35,6 +36,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -45,8 +48,10 @@
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData; import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
@ -58,6 +63,10 @@
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*; import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
/** /**
* Abstract base class for S3A committers; allows for any commonality * Abstract base class for S3A committers; allows for any commonality
@ -86,11 +95,40 @@
* committer was large enough for more all the parallel POST requests. * committer was large enough for more all the parallel POST requests.
*/ */
public abstract class AbstractS3ACommitter extends PathOutputCommitter { public abstract class AbstractS3ACommitter extends PathOutputCommitter {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(AbstractS3ACommitter.class); LoggerFactory.getLogger(AbstractS3ACommitter.class);
public static final String THREAD_PREFIX = "s3a-committer-pool-"; public static final String THREAD_PREFIX = "s3a-committer-pool-";
/**
* Error string when task setup fails.
*/
@VisibleForTesting
public static final String E_SELF_GENERATED_JOB_UUID
= "has a self-generated job UUID";
/**
* Unique ID for a Job.
* In MapReduce Jobs the YARN JobID suffices.
* On Spark this only be the YARN JobID
* it is known to be creating strongly unique IDs
* (i.e. SPARK-33402 is on the branch).
*/
private final String uuid;
/**
* Source of the {@link #uuid} value.
*/
private final JobUUIDSource uuidSource;
/**
* Has this instance been used for job setup?
* If so then it is safe for a locally generated
* UUID to be used for task setup.
*/
private boolean jobSetup;
/** /**
* Thread pool for task execution. * Thread pool for task execution.
*/ */
@ -147,14 +185,19 @@ protected AbstractS3ACommitter(
this.jobContext = context; this.jobContext = context;
this.role = "Task committer " + context.getTaskAttemptID(); this.role = "Task committer " + context.getTaskAttemptID();
setConf(context.getConfiguration()); setConf(context.getConfiguration());
Pair<String, JobUUIDSource> id = buildJobUUID(
conf, context.getJobID());
this.uuid = id.getLeft();
this.uuidSource = id.getRight();
LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText());
initOutput(outputPath); initOutput(outputPath);
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
role, jobName(context), jobIdString(context), outputPath); role, jobName(context), jobIdString(context), outputPath);
S3AFileSystem fs = getDestS3AFS(); S3AFileSystem fs = getDestS3AFS();
createJobMarker = context.getConfiguration().getBoolean( this.createJobMarker = context.getConfiguration().getBoolean(
CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER); DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
commitOperations = new CommitOperations(fs); this.commitOperations = new CommitOperations(fs);
} }
/** /**
@ -202,7 +245,7 @@ protected final void setOutputPath(Path outputPath) {
* @return the working path. * @return the working path.
*/ */
@Override @Override
public Path getWorkPath() { public final Path getWorkPath() {
return workPath; return workPath;
} }
@ -210,16 +253,16 @@ public Path getWorkPath() {
* Set the work path for this committer. * Set the work path for this committer.
* @param workPath the work path to use. * @param workPath the work path to use.
*/ */
protected void setWorkPath(Path workPath) { protected final void setWorkPath(Path workPath) {
LOG.debug("Setting work path to {}", workPath); LOG.debug("Setting work path to {}", workPath);
this.workPath = workPath; this.workPath = workPath;
} }
public Configuration getConf() { public final Configuration getConf() {
return conf; return conf;
} }
protected void setConf(Configuration conf) { protected final void setConf(Configuration conf) {
this.conf = conf; this.conf = conf;
} }
@ -308,6 +351,24 @@ public Path getTaskAttemptPath(TaskAttemptContext context) {
*/ */
public abstract String getName(); public abstract String getName();
/**
* The Job UUID, as passed in or generated.
* @return the UUID for the job.
*/
@VisibleForTesting
public final String getUUID() {
return uuid;
}
/**
* Source of the UUID.
* @return how the job UUID was retrieved/generated.
*/
@VisibleForTesting
public final JobUUIDSource getUUIDSource() {
return uuidSource;
}
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(
@ -316,6 +377,8 @@ public String toString() {
sb.append(", name=").append(getName()); sb.append(", name=").append(getName());
sb.append(", outputPath=").append(getOutputPath()); sb.append(", outputPath=").append(getOutputPath());
sb.append(", workPath=").append(workPath); sb.append(", workPath=").append(workPath);
sb.append(", uuid='").append(getUUID()).append('\'');
sb.append(", uuid source=").append(getUUIDSource());
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }
@ -394,6 +457,8 @@ protected void maybeCreateSuccessMarker(JobContext context,
// create a success data structure and then save it // create a success data structure and then save it
SuccessData successData = new SuccessData(); SuccessData successData = new SuccessData();
successData.setCommitter(getName()); successData.setCommitter(getName());
successData.setJobId(uuid);
successData.setJobIdSource(uuidSource.getText());
successData.setDescription(getRole()); successData.setDescription(getRole());
successData.setHostname(NetUtils.getLocalHostname()); successData.setHostname(NetUtils.getLocalHostname());
Date now = new Date(); Date now = new Date();
@ -411,26 +476,60 @@ protected void maybeCreateSuccessMarker(JobContext context,
* be deleted; creating it now ensures there is something at the end * be deleted; creating it now ensures there is something at the end
* while the job is in progress -and if nothing is created, that * while the job is in progress -and if nothing is created, that
* it is still there. * it is still there.
* <p>
* The option {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}
* is set to the job UUID; if generated locally
* {@link InternalCommitterConstants#SPARK_WRITE_UUID} is also patched.
* The field {@link #jobSetup} is set to true to note that
* this specific committer instance was used to set up a job.
* </p>
* @param context context * @param context context
* @throws IOException IO failure * @throws IOException IO failure
*/ */
@Override @Override
public void setupJob(JobContext context) throws IOException { public void setupJob(JobContext context) throws IOException {
try (DurationInfo d = new DurationInfo(LOG, "preparing destination")) { try (DurationInfo d = new DurationInfo(LOG,
"Job %s setting up", getUUID())) {
// record that the job has been set up
jobSetup = true;
// patch job conf with the job UUID.
Configuration c = context.getConfiguration();
c.set(FS_S3A_COMMITTER_UUID, getUUID());
c.set(FS_S3A_COMMITTER_UUID_SOURCE, getUUIDSource().getText());
Path dest = getOutputPath();
if (createJobMarker){ if (createJobMarker){
commitOperations.deleteSuccessMarker(getOutputPath()); commitOperations.deleteSuccessMarker(dest);
} }
getDestFS().mkdirs(getOutputPath()); getDestFS().mkdirs(dest);
// do a scan for surplus markers
warnOnActiveUploads(dest);
} }
} }
/**
* Task setup. Fails if the the UUID was generated locally, and
* the same committer wasn't used for job setup.
* {@inheritDoc}
* @throws PathCommitException if the task UUID options are unsatisfied.
*/
@Override @Override
public void setupTask(TaskAttemptContext context) throws IOException { public void setupTask(TaskAttemptContext context) throws IOException {
TaskAttemptID attemptID = context.getTaskAttemptID();
try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s", try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s",
context.getTaskAttemptID())) { attemptID)) {
// reject attempts to set up the task where the output won't be
// picked up
if (!jobSetup
&& getUUIDSource() == JobUUIDSource.GeneratedLocally) {
// on anything other than a test run, the context must not have been
// generated locally.
throw new PathCommitException(getOutputPath().toString(),
"Task attempt " + attemptID
+ " " + E_SELF_GENERATED_JOB_UUID);
}
Path taskAttemptPath = getTaskAttemptPath(context); Path taskAttemptPath = getTaskAttemptPath(context);
FileSystem fs = getTaskAttemptFilesystem(context); FileSystem fs = taskAttemptPath.getFileSystem(getConf());
fs.mkdirs(taskAttemptPath); fs.mkdirs(taskAttemptPath);
} }
} }
@ -474,12 +573,12 @@ protected void commitPendingUploads(
.stopOnFailure() .stopOnFailure()
.suppressExceptions(false) .suppressExceptions(false)
.executeWith(buildSubmitter(context)) .executeWith(buildSubmitter(context))
.abortWith(path -> .abortWith(status ->
loadAndAbort(commitContext, pending, path, true, false)) loadAndAbort(commitContext, pending, status, true, false))
.revertWith(path -> .revertWith(status ->
loadAndRevert(commitContext, pending, path)) loadAndRevert(commitContext, pending, status))
.run(path -> .run(status ->
loadAndCommit(commitContext, pending, path)); loadAndCommit(commitContext, pending, status));
} }
} }
@ -504,7 +603,7 @@ protected void precommitCheckPendingFiles(
.stopOnFailure() .stopOnFailure()
.suppressExceptions(false) .suppressExceptions(false)
.executeWith(buildSubmitter(context)) .executeWith(buildSubmitter(context))
.run(path -> PendingSet.load(sourceFS, path)); .run(status -> PendingSet.load(sourceFS, status));
} }
} }
@ -512,17 +611,26 @@ protected void precommitCheckPendingFiles(
* Load a pendingset file and commit all of its contents. * Load a pendingset file and commit all of its contents.
* @param commitContext context to commit through * @param commitContext context to commit through
* @param activeCommit commit state * @param activeCommit commit state
* @param path path to load * @param status file to load
* @throws IOException failure * @throws IOException failure
*/ */
private void loadAndCommit( private void loadAndCommit(
final CommitOperations.CommitContext commitContext, final CommitOperations.CommitContext commitContext,
final ActiveCommit activeCommit, final ActiveCommit activeCommit,
final Path path) throws IOException { final FileStatus status) throws IOException {
final Path path = status.getPath();
try (DurationInfo ignored = try (DurationInfo ignored =
new DurationInfo(LOG, false, "Committing %s", path)) { new DurationInfo(LOG,
PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path); "Loading and committing files in pendingset %s", path)) {
PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
status);
String jobId = pendingSet.getJobId();
if (!StringUtils.isEmpty(jobId) && !getUUID().equals(jobId)) {
throw new PathCommitException(path,
String.format("Mismatch in Job ID (%s) and commit job ID (%s)",
getUUID(), jobId));
}
Tasks.foreach(pendingSet.getCommits()) Tasks.foreach(pendingSet.getCommits())
.stopOnFailure() .stopOnFailure()
.suppressExceptions(false) .suppressExceptions(false)
@ -543,17 +651,19 @@ private void loadAndCommit(
* Load a pendingset file and revert all of its contents. * Load a pendingset file and revert all of its contents.
* @param commitContext context to commit through * @param commitContext context to commit through
* @param activeCommit commit state * @param activeCommit commit state
* @param path path to load * @param status status of file to load
* @throws IOException failure * @throws IOException failure
*/ */
private void loadAndRevert( private void loadAndRevert(
final CommitOperations.CommitContext commitContext, final CommitOperations.CommitContext commitContext,
final ActiveCommit activeCommit, final ActiveCommit activeCommit,
final Path path) throws IOException { final FileStatus status) throws IOException {
final Path path = status.getPath();
try (DurationInfo ignored = try (DurationInfo ignored =
new DurationInfo(LOG, false, "Committing %s", path)) { new DurationInfo(LOG, false, "Committing %s", path)) {
PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path); PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
status);
Tasks.foreach(pendingSet.getCommits()) Tasks.foreach(pendingSet.getCommits())
.suppressExceptions(true) .suppressExceptions(true)
.run(commitContext::revertCommit); .run(commitContext::revertCommit);
@ -564,21 +674,22 @@ private void loadAndRevert(
* Load a pendingset file and abort all of its contents. * Load a pendingset file and abort all of its contents.
* @param commitContext context to commit through * @param commitContext context to commit through
* @param activeCommit commit state * @param activeCommit commit state
* @param path path to load * @param status status of file to load
* @param deleteRemoteFiles should remote files be deleted? * @param deleteRemoteFiles should remote files be deleted?
* @throws IOException failure * @throws IOException failure
*/ */
private void loadAndAbort( private void loadAndAbort(
final CommitOperations.CommitContext commitContext, final CommitOperations.CommitContext commitContext,
final ActiveCommit activeCommit, final ActiveCommit activeCommit,
final Path path, final FileStatus status,
final boolean suppressExceptions, final boolean suppressExceptions,
final boolean deleteRemoteFiles) throws IOException { final boolean deleteRemoteFiles) throws IOException {
final Path path = status.getPath();
try (DurationInfo ignored = try (DurationInfo ignored =
new DurationInfo(LOG, false, "Aborting %s", path)) { new DurationInfo(LOG, false, "Aborting %s", path)) {
PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
path); status);
FileSystem fs = getDestFS(); FileSystem fs = getDestFS();
Tasks.foreach(pendingSet.getCommits()) Tasks.foreach(pendingSet.getCommits())
.executeWith(singleThreadSubmitter()) .executeWith(singleThreadSubmitter())
@ -659,6 +770,13 @@ protected void abortJobInternal(JobContext context,
*/ */
protected void abortPendingUploadsInCleanup( protected void abortPendingUploadsInCleanup(
boolean suppressExceptions) throws IOException { boolean suppressExceptions) throws IOException {
// return early if aborting is disabled.
if (!shouldAbortUploadsInCleanup()) {
LOG.debug("Not cleanup up pending uploads to {} as {} is false ",
getOutputPath(),
FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS);
return;
}
Path dest = getOutputPath(); Path dest = getOutputPath();
try (DurationInfo ignored = try (DurationInfo ignored =
new DurationInfo(LOG, "Aborting all pending commits under %s", new DurationInfo(LOG, "Aborting all pending commits under %s",
@ -674,14 +792,27 @@ protected void abortPendingUploadsInCleanup(
maybeIgnore(suppressExceptions, "aborting pending uploads", e); maybeIgnore(suppressExceptions, "aborting pending uploads", e);
return; return;
} }
Tasks.foreach(pending) if (!pending.isEmpty()) {
.executeWith(buildSubmitter(getJobContext())) LOG.warn("{} pending uploads were found -aborting", pending.size());
.suppressExceptions(suppressExceptions) LOG.warn("If other tasks/jobs are writing to {},"
.run(u -> commitContext.abortMultipartCommit( + "this action may cause them to fail", dest);
u.getKey(), u.getUploadId())); Tasks.foreach(pending)
.executeWith(buildSubmitter(getJobContext()))
.suppressExceptions(suppressExceptions)
.run(u -> commitContext.abortMultipartCommit(
u.getKey(), u.getUploadId()));
} else {
LOG.info("No pending uploads were found");
}
} }
} }
private boolean shouldAbortUploadsInCleanup() {
return getConf()
.getBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS,
DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS);
}
/** /**
* Subclass-specific pre-Job-commit actions. * Subclass-specific pre-Job-commit actions.
* The staging committers all load the pending files to verify that * The staging committers all load the pending files to verify that
@ -1044,6 +1175,166 @@ protected void abortPendingUploads(
} }
} }
/**
* Scan for active uploads and list them along with a warning message.
* Errors are ignored.
* @param path output path of job.
*/
protected void warnOnActiveUploads(final Path path) {
List<MultipartUpload> pending;
try {
pending = getCommitOperations()
.listPendingUploadsUnderPath(path);
} catch (IOException e) {
LOG.debug("Failed to list uploads under {}",
path, e);
return;
}
if (!pending.isEmpty()) {
// log a warning
LOG.warn("{} active upload(s) in progress under {}",
pending.size(),
path);
LOG.warn("Either jobs are running concurrently"
+ " or failed jobs are not being cleaned up");
// and the paths + timestamps
DateFormat df = DateFormat.getDateTimeInstance();
pending.forEach(u ->
LOG.info("[{}] {}",
df.format(u.getInitiated()),
u.getKey()));
if (shouldAbortUploadsInCleanup()) {
LOG.warn("This committer will abort these uploads in job cleanup");
}
}
}
/**
* Build the job UUID.
*
* <p>
* In MapReduce jobs, the application ID is issued by YARN, and
* unique across all jobs.
* </p>
* <p>
* Spark will use a fake app ID based on the current time.
* This can lead to collisions on busy clusters unless
* the specific spark release has SPARK-33402 applied.
* This appends a random long value to the timestamp, so
* is unique enough that the risk of collision is almost
* nonexistent.
* </p>
* <p>
* The order of selection of a uuid is
* </p>
* <ol>
* <li>Value of
* {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.</li>
* <li>Value of
* {@link InternalCommitterConstants#SPARK_WRITE_UUID}.</li>
* <li>If enabled through
* {@link CommitConstants#FS_S3A_COMMITTER_GENERATE_UUID}:
* Self-generated uuid.</li>
* <li>If {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
* is not set: Application ID</li>
* </ol>
* The UUID bonding takes place during construction;
* the staging committers use it to set up their wrapped
* committer to a path in the cluster FS which is unique to the
* job.
* <p>
* In MapReduce jobs, the application ID is issued by YARN, and
* unique across all jobs.
* </p>
* In {@link #setupJob(JobContext)} the job context's configuration
* will be patched
* be valid in all sequences where the job has been set up for the
* configuration passed in.
* <p>
* If the option {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
* is set, then an external UUID MUST be passed in.
* This can be used to verify that the spark engine is reliably setting
* unique IDs for staging.
* </p>
* @param conf job/task configuration
* @param jobId job ID from YARN or spark.
* @return Job UUID and source of it.
* @throws PathCommitException no UUID was found and it was required
*/
public static Pair<String, JobUUIDSource>
buildJobUUID(Configuration conf, JobID jobId)
throws PathCommitException {
String jobUUID = conf.getTrimmed(FS_S3A_COMMITTER_UUID, "");
if (!jobUUID.isEmpty()) {
return Pair.of(jobUUID, JobUUIDSource.CommitterUUIDProperty);
}
// there is no job UUID.
// look for one from spark
jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
if (!jobUUID.isEmpty()) {
return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID);
}
// there is no UUID configuration in the job/task config
// Check the job hasn't declared a requirement for the UUID.
// This allows or fail-fast validation of Spark behavior.
if (conf.getBoolean(FS_S3A_COMMITTER_REQUIRE_UUID,
DEFAULT_S3A_COMMITTER_REQUIRE_UUID)) {
throw new PathCommitException("", E_NO_SPARK_UUID);
}
// see if the job can generate a random UUI`
if (conf.getBoolean(FS_S3A_COMMITTER_GENERATE_UUID,
DEFAULT_S3A_COMMITTER_GENERATE_UUID)) {
// generate a random UUID. This is OK for a job, for a task
// it means that the data may not get picked up.
String newId = UUID.randomUUID().toString();
LOG.warn("No job ID in configuration; generating a random ID: {}",
newId);
return Pair.of(newId, JobUUIDSource.GeneratedLocally);
}
// if no other option was supplied, return the job ID.
// This is exactly what MR jobs expect, but is not what
// Spark jobs can do as there is a risk of jobID collision.
return Pair.of(jobId.toString(), JobUUIDSource.JobID);
}
/**
* Enumeration of Job UUID source.
*/
public enum JobUUIDSource {
SparkWriteUUID(SPARK_WRITE_UUID),
CommitterUUIDProperty(FS_S3A_COMMITTER_UUID),
JobID("JobID"),
GeneratedLocally("Generated Locally");
private final String text;
JobUUIDSource(final String text) {
this.text = text;
}
/**
* Source for messages.
* @return text
*/
public String getText() {
return text;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"JobUUIDSource{");
sb.append("text='").append(text).append('\'');
sb.append('}');
return sb.toString();
}
}
/** /**
* State of the active commit operation. * State of the active commit operation.
* *
@ -1071,7 +1362,7 @@ public static class ActiveCommit {
= new ActiveCommit(null, new ArrayList<>()); = new ActiveCommit(null, new ArrayList<>());
/** All pendingset files to iterate through. */ /** All pendingset files to iterate through. */
private final List<Path> sourceFiles; private final List<FileStatus> sourceFiles;
/** /**
* Filesystem for the source files. * Filesystem for the source files.
@ -1101,8 +1392,8 @@ public static class ActiveCommit {
*/ */
public ActiveCommit( public ActiveCommit(
final FileSystem sourceFS, final FileSystem sourceFS,
final List<Path> sourceFiles) { final List<? extends FileStatus> sourceFiles) {
this.sourceFiles = sourceFiles; this.sourceFiles = (List<FileStatus>) sourceFiles;
this.sourceFS = sourceFS; this.sourceFS = sourceFS;
} }
@ -1115,10 +1406,7 @@ public ActiveCommit(
public static ActiveCommit fromStatusList( public static ActiveCommit fromStatusList(
final FileSystem pendingFS, final FileSystem pendingFS,
final List<? extends FileStatus> statuses) { final List<? extends FileStatus> statuses) {
return new ActiveCommit(pendingFS, return new ActiveCommit(pendingFS, statuses);
statuses.stream()
.map(FileStatus::getPath)
.collect(Collectors.toList()));
} }
/** /**
@ -1129,7 +1417,7 @@ public static ActiveCommit empty() {
return EMPTY; return EMPTY;
} }
public List<Path> getSourceFiles() { public List<FileStatus> getSourceFiles() {
return sourceFiles; return sourceFiles;
} }
@ -1174,8 +1462,8 @@ public boolean isEmpty() {
return sourceFiles.isEmpty(); return sourceFiles.isEmpty();
} }
public void add(Path path) { public void add(FileStatus status) {
sourceFiles.add(path); sourceFiles.add(status);
} }
} }
} }

View File

@ -240,20 +240,39 @@ private CommitConstants() {
/** /**
* Should the staging committers abort all pending uploads to the destination * Should committers abort all pending uploads to the destination
* directory? Default: true. * directory?
* * <p>
* Changing this is if more than one partitioned committer is * Deprecated: switch to {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}.
*/
@Deprecated
public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS =
"fs.s3a.committer.staging.abort.pending.uploads";
/**
* Should committers abort all pending uploads to the destination
* directory?
* <p>
* Value: {@value}.
* <p>
* Change this is if more than one committer is
* writing to the same destination tree simultaneously; otherwise * writing to the same destination tree simultaneously; otherwise
* the first job to complete will cancel all outstanding uploads from the * the first job to complete will cancel all outstanding uploads from the
* others. However, it may lead to leaked outstanding uploads from failed * others. If disabled, configure the bucket lifecycle to remove uploads
* tasks. If disabled, configure the bucket lifecycle to remove uploads
* after a time period, and/or set up a workflow to explicitly delete * after a time period, and/or set up a workflow to explicitly delete
* entries. Otherwise there is a risk that uncommitted uploads may run up * entries. Otherwise there is a risk that uncommitted uploads may run up
* bills. * bills.
*/ */
public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS = public static final String FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS =
"fs.s3a.committer.staging.abort.pending.uploads"; "fs.s3a.committer.abort.pending.uploads";
/**
* Default configuration value for
* {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}.
* Value: {@value}.
*/
public static final boolean DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS =
true;
/** /**
* The limit to the number of committed objects tracked during * The limit to the number of committed objects tracked during
@ -264,4 +283,37 @@ private CommitConstants() {
/** Extra Data key for task attempt in pendingset files. */ /** Extra Data key for task attempt in pendingset files. */
public static final String TASK_ATTEMPT_ID = "task.attempt.id"; public static final String TASK_ATTEMPT_ID = "task.attempt.id";
/**
* Require the spark UUID to be passed down: {@value}.
* This is to verify that SPARK-33230 has been applied to spark, and that
* {@link InternalCommitterConstants#SPARK_WRITE_UUID} is set.
* <p>
* MUST ONLY BE SET WITH SPARK JOBS.
* </p>
*/
public static final String FS_S3A_COMMITTER_REQUIRE_UUID =
"fs.s3a.committer.require.uuid";
/**
* Default value for {@link #FS_S3A_COMMITTER_REQUIRE_UUID}: {@value}.
*/
public static final boolean DEFAULT_S3A_COMMITTER_REQUIRE_UUID =
false;
/**
* Generate a UUID in job setup rather than fall back to
* YARN Application attempt ID.
* <p>
* MUST ONLY BE SET WITH SPARK JOBS.
* </p>
*/
public static final String FS_S3A_COMMITTER_GENERATE_UUID =
"fs.s3a.committer.generate.uuid";
/**
* Default value for {@link #FS_S3A_COMMITTER_GENERATE_UUID}: {@value}.
*/
public static final boolean DEFAULT_S3A_COMMITTER_GENERATE_UUID =
false;
} }

View File

@ -159,7 +159,11 @@ private MaybeIOE commit(
LOG.debug("Committing single commit {}", commit); LOG.debug("Committing single commit {}", commit);
MaybeIOE outcome; MaybeIOE outcome;
String destKey = "unknown destination"; String destKey = "unknown destination";
try { try (DurationInfo d = new DurationInfo(LOG,
"Committing file %s size %s",
commit.getDestinationKey(),
commit.getLength())) {
commit.validate(); commit.validate();
destKey = commit.getDestinationKey(); destKey = commit.getDestinationKey();
long l = innerCommit(commit, operationState); long l = innerCommit(commit, operationState);
@ -273,7 +277,7 @@ private void abortSingleCommit(SinglePendingCommit commit)
? (" defined in " + commit.getFilename()) ? (" defined in " + commit.getFilename())
: ""; : "";
String uploadId = commit.getUploadId(); String uploadId = commit.getUploadId();
LOG.info("Aborting commit to object {}{}", destKey, origin); LOG.info("Aborting commit ID {} to object {}{}", uploadId, destKey, origin);
abortMultipartCommit(destKey, uploadId); abortMultipartCommit(destKey, uploadId);
} }
@ -287,7 +291,8 @@ private void abortSingleCommit(SinglePendingCommit commit)
*/ */
private void abortMultipartCommit(String destKey, String uploadId) private void abortMultipartCommit(String destKey, String uploadId)
throws IOException { throws IOException {
try { try (DurationInfo d = new DurationInfo(LOG,
"Aborting commit ID %s to path %s", uploadId, destKey)) {
writeOperations.abortMultipartCommit(destKey, uploadId); writeOperations.abortMultipartCommit(destKey, uploadId);
} finally { } finally {
statistics.commitAborted(); statistics.commitAborted();
@ -462,7 +467,11 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile,
String uploadId = null; String uploadId = null;
boolean threw = true; boolean threw = true;
try { try (DurationInfo d = new DurationInfo(LOG,
"Upload staged file from %s to %s",
localFile.getAbsolutePath(),
destPath)) {
statistics.commitCreated(); statistics.commitCreated();
uploadId = writeOperations.initiateMultiPartUpload(destKey); uploadId = writeOperations.initiateMultiPartUpload(destKey);
long length = localFile.length(); long length = localFile.length();

View File

@ -58,7 +58,7 @@ public static Path getMagicJobAttemptsPath(Path out) {
/** /**
* Get the Application Attempt ID for this job. * Get the Application Attempt ID for this job.
* @param context the context to look in * @param context the context to look in
* @return the Application Attempt ID for a given job. * @return the Application Attempt ID for a given job, or 0
*/ */
public static int getAppAttemptId(JobContext context) { public static int getAppAttemptId(JobContext context) {
return context.getConfiguration().getInt( return context.getConfiguration().getInt(
@ -67,33 +67,32 @@ public static int getAppAttemptId(JobContext context) {
/** /**
* Compute the "magic" path for a job attempt. * Compute the "magic" path for a job attempt.
* @param appAttemptId the ID of the application attempt for this job. * @param jobUUID unique Job ID.
* @param dest the final output directory * @param dest the final output directory
* @return the path to store job attempt data. * @return the path to store job attempt data.
*/ */
public static Path getMagicJobAttemptPath(int appAttemptId, Path dest) { public static Path getMagicJobAttemptPath(String jobUUID, Path dest) {
return new Path(getMagicJobAttemptsPath(dest), return new Path(getMagicJobAttemptsPath(dest),
formatAppAttemptDir(appAttemptId)); formatAppAttemptDir(jobUUID));
} }
/** /**
* Format the application attempt directory. * Format the application attempt directory.
* @param attemptId attempt ID * @param jobUUID unique Job ID.
* @return the directory name for the application attempt * @return the directory name for the application attempt
*/ */
public static String formatAppAttemptDir(int attemptId) { public static String formatAppAttemptDir(String jobUUID) {
return String.format("app-attempt-%04d", attemptId); return String.format("job-%s", jobUUID);
} }
/** /**
* Compute the path where the output of magic task attempts are stored. * Compute the path where the output of magic task attempts are stored.
* @param context the context of the job with magic tasks. * @param jobUUID unique Job ID.
* @param dest destination of work * @param dest destination of work
* @return the path where the output of magic task attempts are stored. * @return the path where the output of magic task attempts are stored.
*/ */
public static Path getMagicTaskAttemptsPath(JobContext context, Path dest) { public static Path getMagicTaskAttemptsPath(String jobUUID, Path dest) {
return new Path(getMagicJobAttemptPath( return new Path(getMagicJobAttemptPath(jobUUID, dest), "tasks");
getAppAttemptId(context), dest), "tasks");
} }
/** /**
@ -102,48 +101,56 @@ public static Path getMagicTaskAttemptsPath(JobContext context, Path dest) {
* This path is marked as a base path for relocations, so subdirectory * This path is marked as a base path for relocations, so subdirectory
* information is preserved. * information is preserved.
* @param context the context of the task attempt. * @param context the context of the task attempt.
* @param jobUUID unique Job ID.
* @param dest The output path to commit work into * @param dest The output path to commit work into
* @return the path where a task attempt should be stored. * @return the path where a task attempt should be stored.
*/ */
public static Path getMagicTaskAttemptPath(TaskAttemptContext context, public static Path getMagicTaskAttemptPath(TaskAttemptContext context,
String jobUUID,
Path dest) { Path dest) {
return new Path(getBaseMagicTaskAttemptPath(context, dest), BASE); return new Path(getBaseMagicTaskAttemptPath(context, jobUUID, dest),
BASE);
} }
/** /**
* Get the base Magic attempt path, without any annotations to mark relative * Get the base Magic attempt path, without any annotations to mark relative
* references. * references.
* @param context task context. * @param context task context.
* @param jobUUID unique Job ID.
* @param dest The output path to commit work into * @param dest The output path to commit work into
* @return the path under which all attempts go * @return the path under which all attempts go
*/ */
public static Path getBaseMagicTaskAttemptPath(TaskAttemptContext context, public static Path getBaseMagicTaskAttemptPath(TaskAttemptContext context,
String jobUUID,
Path dest) { Path dest) {
return new Path(getMagicTaskAttemptsPath(context, dest), return new Path(getMagicTaskAttemptsPath(jobUUID, dest),
String.valueOf(context.getTaskAttemptID())); String.valueOf(context.getTaskAttemptID()));
} }
/** /**
* Compute a path for temporary data associated with a job. * Compute a path for temporary data associated with a job.
* This data is <i>not magic</i> * This data is <i>not magic</i>
* @param appAttemptId the ID of the application attempt for this job. * @param jobUUID unique Job ID.
* @param out output directory of job * @param out output directory of job
* @return the path to store temporary job attempt data. * @return the path to store temporary job attempt data.
*/ */
public static Path getTempJobAttemptPath(int appAttemptId, Path out) { public static Path getTempJobAttemptPath(String jobUUID,
Path out) {
return new Path(new Path(out, TEMP_DATA), return new Path(new Path(out, TEMP_DATA),
formatAppAttemptDir(appAttemptId)); formatAppAttemptDir(jobUUID));
} }
/** /**
* Compute the path where the output of a given job attempt will be placed. * Compute the path where the output of a given task attempt will be placed.
* @param context task context * @param context task context
* @param jobUUID unique Job ID.
* @param out output directory of job * @param out output directory of job
* @return the path to store temporary job attempt data. * @return the path to store temporary job attempt data.
*/ */
public static Path getTempTaskAttemptPath(TaskAttemptContext context, public static Path getTempTaskAttemptPath(TaskAttemptContext context,
Path out) { final String jobUUID, Path out) {
return new Path(getTempJobAttemptPath(getAppAttemptId(context), out), return new Path(
getTempJobAttemptPath(jobUUID, out),
String.valueOf(context.getTaskAttemptID())); String.valueOf(context.getTaskAttemptID()));
} }

View File

@ -46,8 +46,14 @@ private InternalCommitterConstants() {
/** /**
* A unique identifier to use for this work: {@value}. * A unique identifier to use for this work: {@value}.
*/ */
public static final String FS_S3A_COMMITTER_STAGING_UUID = public static final String FS_S3A_COMMITTER_UUID =
"fs.s3a.committer.staging.uuid"; "fs.s3a.committer.uuid";
/**
* Where did the UUID come from? {@value}.
*/
public static final String FS_S3A_COMMITTER_UUID_SOURCE =
"fs.s3a.committer.uuid.source";
/** /**
* Directory committer factory: {@value}. * Directory committer factory: {@value}.
@ -97,4 +103,25 @@ private InternalCommitterConstants() {
/** Error message for a path without a magic element in the list: {@value}. */ /** Error message for a path without a magic element in the list: {@value}. */
public static final String E_NO_MAGIC_PATH_ELEMENT public static final String E_NO_MAGIC_PATH_ELEMENT
= "No " + MAGIC + " element in path"; = "No " + MAGIC + " element in path";
/**
* The UUID for jobs: {@value}.
* This was historically created in Spark 1.x's SQL queries, but "went away".
*/
public static final String SPARK_WRITE_UUID =
"spark.sql.sources.writeJobUUID";
/**
* Java temp dir: {@value}.
*/
public static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
/**
* Incoming Job/task configuration didn't contain any option
* {@link #SPARK_WRITE_UUID}.
*/
public static final String E_NO_SPARK_UUID =
"Job/task context does not contain a unique ID in "
+ SPARK_WRITE_UUID;
} }

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.ValidationFailure; import org.apache.hadoop.fs.s3a.commit.ValidationFailure;
@ -56,7 +57,7 @@ public class PendingSet extends PersistentCommitData {
* If this is changed the value of {@link #serialVersionUID} will change, * If this is changed the value of {@link #serialVersionUID} will change,
* to avoid deserialization problems. * to avoid deserialization problems.
*/ */
public static final int VERSION = 1; public static final int VERSION = 2;
/** /**
* Serialization ID: {@value}. * Serialization ID: {@value}.
@ -67,6 +68,9 @@ public class PendingSet extends PersistentCommitData {
/** Version marker. */ /** Version marker. */
private int version = VERSION; private int version = VERSION;
/** Job ID, if known. */
private String jobId = "";
/** /**
* Commit list. * Commit list.
*/ */
@ -110,6 +114,19 @@ public static PendingSet load(FileSystem fs, Path path)
return instance; return instance;
} }
/**
* Load an instance from a file, then validate it.
* @param fs filesystem
* @param status status of file to load
* @return the loaded instance
* @throws IOException IO failure
* @throws ValidationFailure if the data is invalid
*/
public static PendingSet load(FileSystem fs, FileStatus status)
throws IOException {
return load(fs, status.getPath());
}
/** /**
* Add a commit. * Add a commit.
* @param commit the single commit * @param commit the single commit
@ -198,4 +215,14 @@ public void setCommits(List<SinglePendingCommit> commits) {
public void putExtraData(String key, String value) { public void putExtraData(String key, String value) {
extraData.put(key, value); extraData.put(key, value);
} }
/** @return Job ID, if known. */
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
} }

View File

@ -40,7 +40,7 @@ public abstract class PersistentCommitData implements Serializable {
* If this is changed the value of {@code serialVersionUID} will change, * If this is changed the value of {@code serialVersionUID} will change,
* to avoid deserialization problems. * to avoid deserialization problems.
*/ */
public static final int VERSION = 1; public static final int VERSION = 2;
/** /**
* Validate the data: those fields which must be non empty, must be set. * Validate the data: those fields which must be non empty, must be set.

View File

@ -207,7 +207,7 @@ public void validate() throws ValidationFailure {
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(
"DelayedCompleteData{"); "SinglePendingCommit{");
sb.append("version=").append(version); sb.append("version=").append(version);
sb.append(", uri='").append(uri).append('\''); sb.append(", uri='").append(uri).append('\'');
sb.append(", destination='").append(destinationKey).append('\''); sb.append(", destination='").append(destinationKey).append('\'');

View File

@ -68,7 +68,7 @@ public class SuccessData extends PersistentCommitData {
/** /**
* Serialization ID: {@value}. * Serialization ID: {@value}.
*/ */
private static final long serialVersionUID = 507133045258460084L; private static final long serialVersionUID = 507133045258460083L + VERSION;
/** /**
* Name to include in persisted data, so as to differentiate from * Name to include in persisted data, so as to differentiate from
@ -103,6 +103,14 @@ public class SuccessData extends PersistentCommitData {
*/ */
private String description; private String description;
/** Job ID, if known. */
private String jobId = "";
/**
* Source of the job ID.
*/
private String jobIdSource = "";
/** /**
* Metrics. * Metrics.
*/ */
@ -325,4 +333,21 @@ public void setDiagnostics(Map<String, String> diagnostics) {
public void addDiagnostic(String key, String value) { public void addDiagnostic(String key, String value) {
diagnostics.put(key, value); diagnostics.put(key, value);
} }
/** @return Job ID, if known. */
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
public String getJobIdSource() {
return jobIdSource;
}
public void setJobIdSource(final String jobIdSource) {
this.jobIdSource = jobIdSource;
}
} }

View File

@ -97,6 +97,7 @@ protected boolean requiresDelayedCommitOutputInFileSystem() {
public void setupJob(JobContext context) throws IOException { public void setupJob(JobContext context) throws IOException {
try (DurationInfo d = new DurationInfo(LOG, try (DurationInfo d = new DurationInfo(LOG,
"Setup Job %s", jobIdString(context))) { "Setup Job %s", jobIdString(context))) {
super.setupJob(context);
Path jobAttemptPath = getJobAttemptPath(context); Path jobAttemptPath = getJobAttemptPath(context);
getDestinationFS(jobAttemptPath, getDestinationFS(jobAttemptPath,
context.getConfiguration()).mkdirs(jobAttemptPath); context.getConfiguration()).mkdirs(jobAttemptPath);
@ -131,16 +132,6 @@ public void cleanupStagingDirs() {
} }
} }
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
try (DurationInfo d = new DurationInfo(LOG,
"Setup Task %s", context.getTaskAttemptID())) {
Path taskAttemptPath = getTaskAttemptPath(context);
FileSystem fs = taskAttemptPath.getFileSystem(getConf());
fs.mkdirs(taskAttemptPath);
}
}
/** /**
* Did this task write any files in the work directory? * Did this task write any files in the work directory?
* Probes for a task existing by looking to see if the attempt dir exists. * Probes for a task existing by looking to see if the attempt dir exists.
@ -208,13 +199,14 @@ private PendingSet innerCommitTask(
throw failures.get(0).getValue(); throw failures.get(0).getValue();
} }
// patch in IDs // patch in IDs
String jobId = String.valueOf(context.getJobID()); String jobId = getUUID();
String taskId = String.valueOf(context.getTaskAttemptID()); String taskId = String.valueOf(context.getTaskAttemptID());
for (SinglePendingCommit commit : pendingSet.getCommits()) { for (SinglePendingCommit commit : pendingSet.getCommits()) {
commit.setJobId(jobId); commit.setJobId(jobId);
commit.setTaskId(taskId); commit.setTaskId(taskId);
} }
pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId); pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId);
pendingSet.setJobId(jobId);
Path jobAttemptPath = getJobAttemptPath(context); Path jobAttemptPath = getJobAttemptPath(context);
TaskAttemptID taskAttemptID = context.getTaskAttemptID(); TaskAttemptID taskAttemptID = context.getTaskAttemptID();
Path taskOutcomePath = new Path(jobAttemptPath, Path taskOutcomePath = new Path(jobAttemptPath,
@ -259,11 +251,12 @@ public void abortTask(TaskAttemptContext context) throws IOException {
/** /**
* Compute the path where the output of a given job attempt will be placed. * Compute the path where the output of a given job attempt will be placed.
* For the magic committer, the path includes the job UUID.
* @param appAttemptId the ID of the application attempt for this job. * @param appAttemptId the ID of the application attempt for this job.
* @return the path to store job attempt data. * @return the path to store job attempt data.
*/ */
protected Path getJobAttemptPath(int appAttemptId) { protected Path getJobAttemptPath(int appAttemptId) {
return getMagicJobAttemptPath(appAttemptId, getOutputPath()); return getMagicJobAttemptPath(getUUID(), getOutputPath());
} }
/** /**
@ -274,12 +267,12 @@ protected Path getJobAttemptPath(int appAttemptId) {
* @return the path where a task attempt should be stored. * @return the path where a task attempt should be stored.
*/ */
public Path getTaskAttemptPath(TaskAttemptContext context) { public Path getTaskAttemptPath(TaskAttemptContext context) {
return getMagicTaskAttemptPath(context, getOutputPath()); return getMagicTaskAttemptPath(context, getUUID(), getOutputPath());
} }
@Override @Override
protected Path getBaseTaskAttemptPath(TaskAttemptContext context) { protected Path getBaseTaskAttemptPath(TaskAttemptContext context) {
return getBaseMagicTaskAttemptPath(context, getOutputPath()); return getBaseMagicTaskAttemptPath(context, getUUID(), getOutputPath());
} }
/** /**
@ -289,13 +282,16 @@ protected Path getBaseTaskAttemptPath(TaskAttemptContext context) {
* @return a path for temporary data. * @return a path for temporary data.
*/ */
public Path getTempTaskAttemptPath(TaskAttemptContext context) { public Path getTempTaskAttemptPath(TaskAttemptContext context) {
return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath()); return CommitUtilsWithMR.getTempTaskAttemptPath(context,
getUUID(),
getOutputPath());
} }
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(
"MagicCommitter{"); "MagicCommitter{");
sb.append(super.toString());
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }

View File

@ -198,8 +198,9 @@ private void replacePartitions(
.stopOnFailure() .stopOnFailure()
.suppressExceptions(false) .suppressExceptions(false)
.executeWith(submitter) .executeWith(submitter)
.run(path -> { .run(status -> {
PendingSet pendingSet = PendingSet.load(sourceFS, path); PendingSet pendingSet = PendingSet.load(sourceFS,
status);
Path lastParent = null; Path lastParent = null;
for (SinglePendingCommit commit : pendingSet.getCommits()) { for (SinglePendingCommit commit : pendingSet.getCommits()) {
Path parent = commit.destinationPath().getParent(); Path parent = commit.destinationPath().getParent();

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.JAVA_IO_TMPDIR;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*; import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;
/** /**
@ -141,14 +142,18 @@ public static Path path(Path parent, String... child) {
} }
/** /**
* A cache of temporary folders. There's a risk here that the cache * A cache of temporary folders, using a generated ID which must be unique for
* gets too big * each active task attempt.
*/ */
private static Cache<TaskAttemptID, Path> tempFolders = CacheBuilder private static Cache<String, Path> tempFolders = CacheBuilder
.newBuilder().build(); .newBuilder().build();
/** /**
* Get the task attempt temporary directory in the local filesystem. * Get the task attempt temporary directory in the local filesystem.
* This must be unique to all tasks on all jobs running on all processes
* on this host.
* It's constructed as uuid+task-attempt-ID, relying on UUID to be unique
* for each job.
* @param conf configuration * @param conf configuration
* @param uuid some UUID, such as a job UUID * @param uuid some UUID, such as a job UUID
* @param attemptID attempt ID * @param attemptID attempt ID
@ -162,10 +167,11 @@ public static Path getLocalTaskAttemptTempDir(final Configuration conf,
try { try {
final LocalDirAllocator allocator = final LocalDirAllocator allocator =
new LocalDirAllocator(Constants.BUFFER_DIR); new LocalDirAllocator(Constants.BUFFER_DIR);
return tempFolders.get(attemptID, String name = uuid + "-" + attemptID;
return tempFolders.get(name,
() -> { () -> {
return FileSystem.getLocal(conf).makeQualified( return FileSystem.getLocal(conf).makeQualified(
allocator.getLocalPathForWrite(uuid, conf)); allocator.getLocalPathForWrite(name, conf));
}); });
} catch (ExecutionException | UncheckedExecutionException e) { } catch (ExecutionException | UncheckedExecutionException e) {
Throwable cause = e.getCause(); Throwable cause = e.getCause();

View File

@ -26,7 +26,6 @@
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.base.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -46,7 +45,6 @@
import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
@ -55,7 +53,6 @@
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Invoker.*; import static org.apache.hadoop.fs.s3a.Invoker.*;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*; import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
@ -95,7 +92,6 @@ public class StagingCommitter extends AbstractS3ACommitter {
public static final String NAME = "staging"; public static final String NAME = "staging";
private final Path constructorOutputPath; private final Path constructorOutputPath;
private final long uploadPartSize; private final long uploadPartSize;
private final String uuid;
private final boolean uniqueFilenames; private final boolean uniqueFilenames;
private final FileOutputCommitter wrappedCommitter; private final FileOutputCommitter wrappedCommitter;
@ -118,15 +114,14 @@ public StagingCommitter(Path outputPath,
Configuration conf = getConf(); Configuration conf = getConf();
this.uploadPartSize = conf.getLongBytes( this.uploadPartSize = conf.getLongBytes(
MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
this.uuid = getUploadUUID(conf, context.getJobID());
this.uniqueFilenames = conf.getBoolean( this.uniqueFilenames = conf.getBoolean(
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES); DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
setWorkPath(buildWorkPath(context, uuid)); setWorkPath(buildWorkPath(context, getUUID()));
this.wrappedCommitter = createWrappedCommitter(context, conf); this.wrappedCommitter = createWrappedCommitter(context, conf);
setOutputPath(constructorOutputPath); setOutputPath(constructorOutputPath);
Path finalOutputPath = getOutputPath(); Path finalOutputPath = getOutputPath();
Preconditions.checkNotNull(finalOutputPath, "Output path cannot be null"); checkNotNull(finalOutputPath, "Output path cannot be null");
S3AFileSystem fs = getS3AFileSystem(finalOutputPath, S3AFileSystem fs = getS3AFileSystem(finalOutputPath,
context.getConfiguration(), false); context.getConfiguration(), false);
s3KeyPrefix = fs.pathToKey(finalOutputPath); s3KeyPrefix = fs.pathToKey(finalOutputPath);
@ -156,7 +151,8 @@ protected FileOutputCommitter createWrappedCommitter(JobContext context,
// explicitly choose commit algorithm // explicitly choose commit algorithm
initFileOutputCommitterOptions(context); initFileOutputCommitterOptions(context);
commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf, uuid); commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf,
getUUID());
return new FileOutputCommitter(commitsDirectory, context); return new FileOutputCommitter(commitsDirectory, context);
} }
@ -175,7 +171,10 @@ protected void initFileOutputCommitterOptions(JobContext context) {
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder("StagingCommitter{"); final StringBuilder sb = new StringBuilder("StagingCommitter{");
sb.append(super.toString()); sb.append(super.toString());
sb.append(", commitsDirectory=").append(commitsDirectory);
sb.append(", uniqueFilenames=").append(uniqueFilenames);
sb.append(", conflictResolution=").append(conflictResolution); sb.append(", conflictResolution=").append(conflictResolution);
sb.append(", uploadPartSize=").append(uploadPartSize);
if (wrappedCommitter != null) { if (wrappedCommitter != null) {
sb.append(", wrappedCommitter=").append(wrappedCommitter); sb.append(", wrappedCommitter=").append(wrappedCommitter);
} }
@ -183,40 +182,6 @@ public String toString() {
return sb.toString(); return sb.toString();
} }
/**
* Get the UUID of an upload; may be the job ID.
* Spark will use a fake app ID based on the current minute and job ID 0.
* To avoid collisions, the key policy is:
* <ol>
* <li>Value of {@link InternalCommitterConstants#FS_S3A_COMMITTER_STAGING_UUID}.</li>
* <li>Value of {@code "spark.sql.sources.writeJobUUID"}.</li>
* <li>Value of {@code "spark.app.id"}.</li>
* <li>JobId passed in.</li>
* </ol>
* The staging UUID is set in in {@link #setupJob(JobContext)} and so will
* be valid in all sequences where the job has been set up for the
* configuration passed in.
* @param conf job/task configuration
* @param jobId Job ID
* @return an ID for use in paths.
*/
public static String getUploadUUID(Configuration conf, String jobId) {
return conf.getTrimmed(
InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
conf.getTrimmed(SPARK_WRITE_UUID,
conf.getTrimmed(SPARK_APP_ID, jobId)));
}
/**
* Get the UUID of a Job.
* @param conf job/task configuration
* @param jobId Job ID
* @return an ID for use in paths.
*/
public static String getUploadUUID(Configuration conf, JobID jobId) {
return getUploadUUID(conf, jobId.toString());
}
/** /**
* Get the work path for a task. * Get the work path for a task.
* @param context job/task complex * @param context job/task complex
@ -309,7 +274,7 @@ public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
* @return the location of pending job attempts. * @return the location of pending job attempts.
*/ */
private static Path getPendingJobAttemptsPath(Path out) { private static Path getPendingJobAttemptsPath(Path out) {
Preconditions.checkNotNull(out, "Null 'out' path"); checkNotNull(out, "Null 'out' path");
return new Path(out, TEMPORARY); return new Path(out, TEMPORARY);
} }
@ -330,12 +295,12 @@ public Path getCommittedTaskPath(TaskAttemptContext context) {
* @param context task context * @param context task context
*/ */
private static void validateContext(TaskAttemptContext context) { private static void validateContext(TaskAttemptContext context) {
Preconditions.checkNotNull(context, "null context"); checkNotNull(context, "null context");
Preconditions.checkNotNull(context.getTaskAttemptID(), checkNotNull(context.getTaskAttemptID(),
"null task attempt ID"); "null task attempt ID");
Preconditions.checkNotNull(context.getTaskAttemptID().getTaskID(), checkNotNull(context.getTaskAttemptID().getTaskID(),
"null task ID"); "null task ID");
Preconditions.checkNotNull(context.getTaskAttemptID().getJobID(), checkNotNull(context.getTaskAttemptID().getJobID(),
"null job ID"); "null job ID");
} }
@ -377,7 +342,7 @@ protected List<LocatedFileStatus> getTaskOutput(TaskAttemptContext context)
// get files on the local FS in the attempt path // get files on the local FS in the attempt path
Path attemptPath = getTaskAttemptPath(context); Path attemptPath = getTaskAttemptPath(context);
Preconditions.checkNotNull(attemptPath, checkNotNull(attemptPath,
"No attemptPath path in {}", this); "No attemptPath path in {}", this);
LOG.debug("Scanning {} for files to commit", attemptPath); LOG.debug("Scanning {} for files to commit", attemptPath);
@ -401,7 +366,8 @@ protected List<LocatedFileStatus> getTaskOutput(TaskAttemptContext context)
*/ */
protected String getFinalKey(String relative, JobContext context) { protected String getFinalKey(String relative, JobContext context) {
if (uniqueFilenames) { if (uniqueFilenames) {
return getS3KeyPrefix(context) + "/" + Paths.addUUID(relative, uuid); return getS3KeyPrefix(context) + "/"
+ Paths.addUUID(relative, getUUID());
} else { } else {
return getS3KeyPrefix(context) + "/" + relative; return getS3KeyPrefix(context) + "/" + relative;
} }
@ -452,11 +418,8 @@ public Path getJobAttemptPath(JobContext context) {
*/ */
@Override @Override
public void setupJob(JobContext context) throws IOException { public void setupJob(JobContext context) throws IOException {
LOG.debug("{}, Setting up job {}", getRole(), jobIdString(context));
context.getConfiguration().set(
InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, uuid);
wrappedCommitter.setupJob(context);
super.setupJob(context); super.setupJob(context);
wrappedCommitter.setupJob(context);
} }
/** /**
@ -539,19 +502,6 @@ protected void cleanup(JobContext context,
super.cleanup(context, suppressExceptions); super.cleanup(context, suppressExceptions);
} }
@Override
protected void abortPendingUploadsInCleanup(boolean suppressExceptions)
throws IOException {
if (getConf()
.getBoolean(FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS, true)) {
super.abortPendingUploadsInCleanup(suppressExceptions);
} else {
LOG.info("Not cleanup up pending uploads to {} as {} is false ",
getOutputPath(),
FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS);
}
}
@Override @Override
protected void abortJobInternal(JobContext context, protected void abortJobInternal(JobContext context,
boolean suppressExceptions) throws IOException { boolean suppressExceptions) throws IOException {
@ -608,8 +558,7 @@ public void setupTask(TaskAttemptContext context) throws IOException {
Path taskAttemptPath = getTaskAttemptPath(context); Path taskAttemptPath = getTaskAttemptPath(context);
try (DurationInfo d = new DurationInfo(LOG, try (DurationInfo d = new DurationInfo(LOG,
"%s: setup task attempt path %s ", getRole(), taskAttemptPath)) { "%s: setup task attempt path %s ", getRole(), taskAttemptPath)) {
// create the local FS super.setupTask(context);
taskAttemptPath.getFileSystem(getConf()).mkdirs(taskAttemptPath);
wrappedCommitter.setupTask(context); wrappedCommitter.setupTask(context);
} }
} }
@ -832,15 +781,6 @@ private String getS3KeyPrefix(JobContext context) {
return s3KeyPrefix; return s3KeyPrefix;
} }
/**
* A UUID for this upload, as calculated with.
* {@link #getUploadUUID(Configuration, String)}
* @return the UUID for files
*/
protected String getUUID() {
return uuid;
}
/** /**
* Returns the {@link ConflictResolution} mode for this commit. * Returns the {@link ConflictResolution} mode for this commit.
* *

View File

@ -46,19 +46,4 @@ private StagingCommitterConstants() {
*/ */
public static final String STAGING_UPLOADS = "staging-uploads"; public static final String STAGING_UPLOADS = "staging-uploads";
// Spark configuration keys
/**
* The UUID for jobs: {@value}.
*/
public static final String SPARK_WRITE_UUID =
"spark.sql.sources.writeJobUUID";
/**
* The App ID for jobs.
*/
public static final String SPARK_APP_ID = "spark.app.id";
public static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
} }

View File

@ -530,26 +530,33 @@ Amazon S3, that means S3Guard must *always* be enabled.
Conflict management is left to the execution engine itself. Conflict management is left to the execution engine itself.
## Committer Configuration Options ## Common Committer Options
| Option | Magic | Directory | Partitioned | Meaning | Default | | Option | Meaning | Default |
|--------|-------|-----------|-------------|---------|---------| |--------|---------|---------|
| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | X | X | X | Write a `_SUCCESS` file at the end of each job | `true` | | `mapreduce.fileoutputcommitter.marksuccessfuljobs` | Write a `_SUCCESS` file on the successful completion of the job. | `true` |
| `fs.s3a.committer.threads` | X | X | X | Number of threads in committers for parallel operations on files. | 8 | | `fs.s3a.buffer.dir` | Local filesystem directory for data being written and/or staged. | `${hadoop.tmp.dir}/s3a` |
| `fs.s3a.committer.staging.conflict-mode` | | X | X | Conflict resolution: `fail`, `append` or `replace`| `append` | | `fs.s3a.committer.magic.enabled` | Enable "magic committer" support in the filesystem. | `false` |
| `fs.s3a.committer.staging.unique-filenames` | | X | X | Generate unique filenames | `true` | | `fs.s3a.committer.abort.pending.uploads` | list and abort all pending uploads under the destination path when the job is committed or aborted. | `true` |
| `fs.s3a.committer.magic.enabled` | X | | | Enable "magic committer" support in the filesystem | `false` | | `fs.s3a.committer.threads` | Number of threads in committers for parallel operations on files. | 8 |
| `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` |
| `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` |
## Staging committer (Directory and Partitioned) options
| Option | Magic | Directory | Partitioned | Meaning | Default | | Option | Meaning | Default |
|--------|-------|-----------|-------------|---------|---------| |--------|---------|---------|
| `fs.s3a.buffer.dir` | X | X | X | Local filesystem directory for data being written and/or staged. | | | `fs.s3a.committer.staging.conflict-mode` | Conflict resolution: `fail`, `append` or `replace`| `append` |
| `fs.s3a.committer.staging.tmp.path` | | X | X | Path in the cluster filesystem for temporary data | `tmp/staging` | | `fs.s3a.committer.staging.tmp.path` | Path in the cluster filesystem for temporary data. | `tmp/staging` |
| `fs.s3a.committer.staging.unique-filenames` | Generate unique filenames. | `true` |
| `fs.s3a.committer.staging.abort.pending.uploads` | Deprecated; replaced by `fs.s3a.committer.abort.pending.uploads`. | `(false)` |
### Common Committer Options
```xml ```xml
<property> <property>
<name>fs.s3a.committer.name</name> <name>fs.s3a.committer.name</name>
@ -579,6 +586,60 @@ Conflict management is left to the execution engine itself.
</description> </description>
</property> </property>
<property>
<name>fs.s3a.committer.abort.pending.uploads</name>
<value>true</value>
<description>
Should the committers abort all pending uploads to the destination
directory?
Set to false if more than one job is writing to the same directory tree.
Was: "fs.s3a.committer.staging.abort.pending.uploads" when only used
by the staging committers.
</description>
</property>
<property>
<name>mapreduce.outputcommitter.factory.scheme.s3a</name>
<value>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</value>
<description>
The committer factory to use when writing data to S3A filesystems.
If mapreduce.outputcommitter.factory.class is set, it will
override this property.
(This property is set in mapred-default.xml)
</description>
</property>
<property>
<name>fs.s3a.committer.require.uuid</name>
<value>false</value>
<description>
Should the committer fail to initialize if a unique ID isn't set in
"spark.sql.sources.writeJobUUID" or fs.s3a.committer.staging.uuid
This helps guarantee that unique IDs for jobs are being
passed down in spark applications.
Setting this option outside of spark will stop the S3A committer
in job setup. In MapReduce workloads the job attempt ID is unique
and so no unique ID need be passed down.
</description>
</property>
<property>
<name>fs.s3a.committer.generate.uuid</name>
<value>false</value>
<description>
Generate a Job UUID if none is passed down from Spark.
This uuid is only generated if the fs.s3a.committer.require.uuid flag
is false.
</description>
</property>
```
### Staging Committer Options
```xml
<property> <property>
<name>fs.s3a.committer.staging.tmp.path</name> <name>fs.s3a.committer.staging.tmp.path</name>
<value>tmp/staging</value> <value>tmp/staging</value>
@ -613,38 +674,45 @@ Conflict management is left to the execution engine itself.
</description> </description>
</property> </property>
<property>
<name>s.s3a.committer.staging.abort.pending.uploads</name>
<value>true</value>
<description>
Should the staging committers abort all pending uploads to the destination
directory?
Changing this if more than one partitioned committer is
writing to the same destination tree simultaneously; otherwise
the first job to complete will cancel all outstanding uploads from the
others. However, it may lead to leaked outstanding uploads from failed
tasks. If disabled, configure the bucket lifecycle to remove uploads
after a time period, and/or set up a workflow to explicitly delete
entries. Otherwise there is a risk that uncommitted uploads may run up
bills.
</description>
</property>
<property>
<name>mapreduce.outputcommitter.factory.scheme.s3a</name>
<value>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</value>
<description>
The committer factory to use when writing data to S3A filesystems.
If mapreduce.outputcommitter.factory.class is set, it will
override this property.
(This property is set in mapred-default.xml)
</description>
</property>
``` ```
## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination
It is sometimes possible for multiple jobs to simultaneously write to the same destination path.
Before attempting this, the committers must be set to not delete all incomplete uploads on job commit,
by setting `fs.s3a.committer.abort.pending.uploads` to `false`
```xml
<property>
<name>fs.s3a.committer.abort.pending.uploads</name>
<value>false</value>
</property>
```
If more than one job is writing to the same destination path then every task MUST
be creating files with paths/filenames unique to the specific job.
It is not enough for them to be unique by task `part-00000.snappy.parquet`,
because each job will have tasks with the same name, so generate files with conflicting operations.
For the staging committers, setting `fs.s3a.committer.staging.unique-filenames` to ensure unique names are
generated during the upload. Otherwise, use what configuration options are available in the specific `FileOutputFormat`.
Note: by default, the option `mapreduce.output.basename` sets the base name for files;
changing that from the default `part` value to something unique for each job may achieve this.
For example, for any job executed through Hadoop MapReduce, the Job ID can be used in the filename.
```xml
<property>
<name>mapreduce.output.basename</name>
<value>part-${mapreduce.job.id}</value>
</property>
```
Even with these settings, the outcome of concurrent jobs to the same destination is
inherently nondeterministic -use with caution.
## Troubleshooting ## Troubleshooting
@ -700,7 +768,7 @@ Delegation token support is disabled
Exiting with status 46: 46: The magic committer is not enabled for s3a://landsat-pds Exiting with status 46: 46: The magic committer is not enabled for s3a://landsat-pds
``` ```
## Error message: "File being created has a magic path, but the filesystem has magic file support disabled: ### Error message: "File being created has a magic path, but the filesystem has magic file support disabled"
A file is being written to a path which is used for "magic" files, A file is being written to a path which is used for "magic" files,
files which are actually written to a different destination than their stated path files which are actually written to a different destination than their stated path
@ -781,7 +849,7 @@ If you have subclassed `FileOutputCommitter` and want to move to the
factory model, please get in touch. factory model, please get in touch.
## Job/Task fails with PathExistsException: Destination path exists and committer conflict resolution mode is "fail" ### Job/Task fails with PathExistsException: Destination path exists and committer conflict resolution mode is "fail"
This surfaces when either of two conditions are met. This surfaces when either of two conditions are met.
@ -795,7 +863,7 @@ during task commit, which will cause the entire job to fail.
If you are trying to write data and want write conflicts to be rejected, this is the correct If you are trying to write data and want write conflicts to be rejected, this is the correct
behavior: there was data at the destination so the job was aborted. behavior: there was data at the destination so the job was aborted.
## Staging committer task fails with IOException: No space left on device ### Staging committer task fails with IOException: No space left on device
There's not enough space on the local hard disk (real or virtual) There's not enough space on the local hard disk (real or virtual)
to store all the uncommitted data of the active tasks on that host. to store all the uncommitted data of the active tasks on that host.
@ -821,3 +889,169 @@ generating less data each.
1. Use the magic committer. This only needs enough disk storage to buffer 1. Use the magic committer. This only needs enough disk storage to buffer
blocks of the currently being written file during their upload process, blocks of the currently being written file during their upload process,
so can use a lot less disk space. so can use a lot less disk space.
### Jobs run with directory/partitioned committers complete but the output is empty.
Make sure that `fs.s3a.committer.staging.tmp.path` is set to a path on the shared cluster
filesystem (usually HDFS). It MUST NOT be set to a local directory, as then the job committer,
running on a different host *will not see the lists of pending uploads to commit*.
### Magic output committer task fails "The specified upload does not exist" "Error Code: NoSuchUpload"
The magic committer is being used and a task writing data to the S3 store fails
with an error message about the upload not existing.
```
java.io.FileNotFoundException: upload part #1 upload
YWHTRqBaxlsutujKYS3eZHfdp6INCNXbk0JVtydX_qzL5fZcoznxRbbBZRfswOjomddy3ghRyguOqywJTfGG1Eq6wOW2gitP4fqWrBYMroasAygkmXNYF7XmUyFHYzja
on test/ITestMagicCommitProtocol-testParallelJobsToSameDestPaths/part-m-00000:
com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not
exist. The upload ID may be invalid, or the upload may have been aborted or
completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload;
Request ID: EBE6A0C9F8213AC3; S3 Extended Request ID:
cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=),
S3 Extended Request ID:
cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=:NoSuchUpload
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:259)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:112)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:315)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:407)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:311)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:286)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:154)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:590)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.lambda$uploadBlockAsync$0(S3ABlockOutputStream.java:652)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception:
The specified upload does not exist.
The upload ID may be invalid, or the upload may have been aborted or completed.
(Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: EBE6A0C9F8213AC3; S3 Extended Request ID:
cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=),
S3 Extended Request ID: cQFm2N+666V/1HehZYRPTHX9tFK3ppvHSX2a8Oy3qVDyTpOFlJZQqJpSixMVyMI1D0dZkHHOI+E=
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4920)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4866)
at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3715)
at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3700)
at org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:2343)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:594)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:110)
... 15 more
```
The block write failed because the previously created upload was aborted before the data could be written.
Causes
1. Another job has written to the same directory tree with an S3A committer
-and when that job was committed, all incomplete uploads were aborted.
1. The `hadoop s3guard uploads --abort` command has being called on/above the directory.
1. Some other program is cancelling uploads to that bucket/path under it.
1. The job is lasting over 24h and a bucket lifecycle policy is aborting the uploads.
The `_SUCCESS` file from the previous job may provide diagnostics.
If the cause is Concurrent Jobs, see [Concurrent Jobs writing to the same destination](#concurrent-jobs).
### Job commit fails "java.io.FileNotFoundException: Completing multipart upload" "The specified upload does not exist"
The job commit fails with an error about the specified upload not existing.
```
java.io.FileNotFoundException: Completing multipart upload on
test/DELAY_LISTING_ME/ITestDirectoryCommitProtocol-testParallelJobsToSameDestPaths/part-m-00001:
com.amazonaws.services.s3.model.AmazonS3Exception:
The specified upload does not exist.
The upload ID may be invalid, or the upload may have been aborted or completed.
(Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload;
Request ID: 8E6173241D2970CB; S3 Extended Request ID:
Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=),
S3 Extended Request ID:
Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=:NoSuchUpload
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:259)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:112)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:315)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:407)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:311)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:261)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.commitUpload(WriteOperationHelper.java:549)
at org.apache.hadoop.fs.s3a.commit.CommitOperations.innerCommit(CommitOperations.java:199)
at org.apache.hadoop.fs.s3a.commit.CommitOperations.commit(CommitOperations.java:168)
at org.apache.hadoop.fs.s3a.commit.CommitOperations.commitOrFail(CommitOperations.java:144)
at org.apache.hadoop.fs.s3a.commit.CommitOperations.access$100(CommitOperations.java:74)
at org.apache.hadoop.fs.s3a.commit.CommitOperations$CommitContext.commitOrFail(CommitOperations.java:612)
at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.lambda$loadAndCommit$5(AbstractS3ACommitter.java:535)
at org.apache.hadoop.fs.s3a.commit.Tasks$Builder.runSingleThreaded(Tasks.java:164)
at org.apache.hadoop.fs.s3a.commit.Tasks$Builder.run(Tasks.java:149)
at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.loadAndCommit(AbstractS3ACommitter.java:534)
at org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.lambda$commitPendingUploads$2(AbstractS3ACommitter.java:482)
at org.apache.hadoop.fs.s3a.commit.Tasks$Builder$1.run(Tasks.java:253)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist.
The upload ID may be invalid, or the upload may have been aborted or completed.
(Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 8E6173241D2970CB;
S3 Extended Request ID: Pg6x75Q60UrbSJgfShCFX7czFTZAHR1Cy7W0Kh+o1uj60CG9jw7hL40tSa+wa7BRLbaz3rhX8Ds=),
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4920)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4866)
at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3464)
at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:267)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:110)
```
The problem is likely to be that of the previous one: concurrent jobs are writing the same output directory,
or another program has cancelled all pending uploads.
See [Concurrent Jobs writing to the same destination](#concurrent-jobs).
### Job commit fails `java.io.FileNotFoundException` "File hdfs://.../staging-uploads/_temporary/0 does not exist"
The Staging committer will fail in job commit if the intermediate directory on the cluster FS is missing during job commit.
This is possible if another job used the same staging upload directory and,
after committing its work, it deleted the directory.
A unique Job ID is required for each spark job run by a specific user.
Spark generates job IDs for its committers using the current timestamp,
and if two jobs/stages are started in the same second, they will have the same job ID.
See [SPARK-33230](https://issues.apache.org/jira/browse/SPARK-33230).
This is fixed in all spark releases which have the patch applied.
You can set the property `fs.s3a.committer.staging.require.uuid` to fail
the staging committers fast if a unique Job ID isn't found in
`spark.sql.sources.writeJobUUID`.
### Job setup fails `Job/task context does not contain a unique ID in spark.sql.sources.writeJobUUID`
This will surface in job setup if the option `fs.s3a.committer.require.uuid` is `true`, and
one of the following conditions are met
1. The committer is being used in a Hadoop MapReduce job, whose job attempt ID is unique
-there is no need to add this requirement.
Fix: unset `fs.s3a.committer.require.uuid`.
1. The committer is being used in spark, and the version of spark being used does not
set the `spark.sql.sources.writeJobUUID` property.
Either upgrade to a new spark release, or set `fs.s3a.committer.generate.uuid` to true.

View File

@ -248,6 +248,57 @@ As an example, the endpoint for S3 Frankfurt is `s3.eu-central-1.amazonaws.com`:
</property> </property>
``` ```
### `Class does not implement AWSCredentialsProvider`
A credential provider listed in `fs.s3a.aws.credentials.provider` does not implement
the interface `com.amazonaws.auth.AWSCredentialsProvider`.
```
Cause: java.lang.RuntimeException: java.io.IOException: Class class com.amazonaws.auth.EnvironmentVariableCredentialsProvider does not implement AWSCredentialsProvider
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:686)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:621)
at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:219)
at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:126)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:306)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:433)
...
Cause: java.io.IOException: Class class com.amazonaws.auth.EnvironmentVariableCredentialsProvider does not implement AWSCredentialsProvider
at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:722)
at org.apache.hadoop.fs.s3a.S3AUtils.buildAWSProviderList(S3AUtils.java:687)
at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:620)
at org.apache.hadoop.fs.s3a.S3AFileSystem.bindAWSClient(S3AFileSystem.java:673)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:414)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3462)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:171)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3522)
at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:3496)
at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:591)
```
There's two main causes
1. A class listed there is not an implementation of the interface.
Fix: review the settings and correct as appropriate.
1. A class listed there does implement the interface, but it has been loaded in a different
classloader, so the JVM does not consider it to be an implementation.
Fix: learn the entire JVM classloader model and see if you can then debug it.
Tip: having both the AWS Shaded SDK and individual AWS SDK modules on your classpath
may be a cause of this.
If you see this and you are trying to use the S3A connector with Spark, then the cause can
be that the isolated classloader used to load Hive classes is interfering with the S3A
connector's dynamic loading of `com.amazonaws` classes. To fix this, declare that that
the classes in the aws SDK are loaded from the same classloader which instantiated
the S3A FileSystem instance:
```
spark.sql.hive.metastore.sharedPrefixes com.amazonaws.
```
## <a name="access_denied"></a> "The security token included in the request is invalid" ## <a name="access_denied"></a> "The security token included in the request is invalid"
You are trying to use session/temporary credentials and the session token You are trying to use session/temporary credentials and the session token
@ -1262,11 +1313,11 @@ Number of parts in multipart upload exceeded
``` ```
org.apache.hadoop.fs.PathIOException: `test/testMultiPartUploadFailure': Number of parts in multipart upload exceeded. Current part count = X, Part count limit = Y org.apache.hadoop.fs.PathIOException: `test/testMultiPartUploadFailure': Number of parts in multipart upload exceeded. Current part count = X, Part count limit = Y
at org.apache.hadoop.fs.s3a.WriteOperationHelper.newUploadPartRequest(WriteOperationHelper.java:432) at org.apache.hadoop.fs.s3a.WriteOperationHelper.newUploadPartRequest(WriteOperationHelper.java:432)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.uploadBlockAsync(S3ABlockOutputStream.java:627) at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.uploadBlockAsync(S3ABlockOutputStream.java:627)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$000(S3ABlockOutputStream.java:532) at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$000(S3ABlockOutputStream.java:532)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.uploadCurrentBlock(S3ABlockOutputStream.java:316) at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.uploadCurrentBlock(S3ABlockOutputStream.java:316)
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.write(S3ABlockOutputStream.java:301) at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.write(S3ABlockOutputStream.java:301)
``` ```
This is a known issue where upload fails if number of parts This is a known issue where upload fails if number of parts

View File

@ -359,11 +359,13 @@ private String pathToPrefix(Path path) {
* and that it can be loaded. * and that it can be loaded.
* The contents will be logged and returned. * The contents will be logged and returned.
* @param dir directory to scan * @param dir directory to scan
* @param jobId job ID, only verified if non-empty
* @return the loaded success data * @return the loaded success data
* @throws IOException IO Failure * @throws IOException IO Failure
*/ */
protected SuccessData verifySuccessMarker(Path dir) throws IOException { protected SuccessData verifySuccessMarker(Path dir, String jobId)
return validateSuccessFile(dir, "", getFileSystem(), "query", 0); throws IOException {
return validateSuccessFile(dir, "", getFileSystem(), "query", 0, jobId);
} }
/** /**
@ -442,6 +444,7 @@ public static TaskAttemptContext taskAttemptForJob(JobId jobId,
* @param fs filesystem * @param fs filesystem
* @param origin origin (e.g. "teragen" for messages) * @param origin origin (e.g. "teragen" for messages)
* @param minimumFileCount minimum number of files to have been created * @param minimumFileCount minimum number of files to have been created
* @param jobId job ID, only verified if non-empty
* @return the success data * @return the success data
* @throws IOException IO failure * @throws IOException IO failure
*/ */
@ -449,7 +452,8 @@ public static SuccessData validateSuccessFile(final Path outputPath,
final String committerName, final String committerName,
final S3AFileSystem fs, final S3AFileSystem fs,
final String origin, final String origin,
final int minimumFileCount) throws IOException { final int minimumFileCount,
final String jobId) throws IOException {
SuccessData successData = loadSuccessFile(fs, outputPath, origin); SuccessData successData = loadSuccessFile(fs, outputPath, origin);
String commitDetails = successData.toString(); String commitDetails = successData.toString();
LOG.info("Committer name " + committerName + "\n{}", LOG.info("Committer name " + committerName + "\n{}",
@ -463,8 +467,13 @@ public static SuccessData validateSuccessFile(final Path outputPath,
committerName, successData.getCommitter()); committerName, successData.getCommitter());
} }
Assertions.assertThat(successData.getFilenames()) Assertions.assertThat(successData.getFilenames())
.describedAs("Files committed") .describedAs("Files committed in " + commitDetails)
.hasSizeGreaterThanOrEqualTo(minimumFileCount); .hasSizeGreaterThanOrEqualTo(minimumFileCount);
if (StringUtils.isNotEmpty(jobId)) {
Assertions.assertThat(successData.getJobId())
.describedAs("JobID in " + commitDetails)
.isEqualTo(jobId);
}
return successData; return successData;
} }

View File

@ -23,6 +23,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -41,6 +42,7 @@
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData; import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.MapFile;
@ -69,7 +71,12 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.E_SELF_GENERATED_JOB_UUID;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
import static org.apache.hadoop.test.LambdaTestUtils.*; import static org.apache.hadoop.test.LambdaTestUtils.*;
/** /**
@ -377,6 +384,7 @@ public static class JobData {
private final TaskAttemptContext tContext; private final TaskAttemptContext tContext;
private final AbstractS3ACommitter committer; private final AbstractS3ACommitter committer;
private final Configuration conf; private final Configuration conf;
private Path writtenTextPath; // null if not written to
public JobData(Job job, public JobData(Job job,
JobContext jContext, JobContext jContext,
@ -467,7 +475,7 @@ protected JobData startJob(CommitterFactory factory, boolean writeText)
if (writeText) { if (writeText) {
// write output // write output
writeTextOutput(tContext); jobData.writtenTextPath = writeTextOutput(tContext);
} }
return jobData; return jobData;
} }
@ -659,12 +667,14 @@ protected void assertJobAttemptPathDoesNotExist(
* file existence and contents, as well as optionally, the success marker. * file existence and contents, as well as optionally, the success marker.
* @param dir directory to scan. * @param dir directory to scan.
* @param expectSuccessMarker check the success marker? * @param expectSuccessMarker check the success marker?
* @param expectedJobId job ID, verified if non-empty and success data loaded
* @throws Exception failure. * @throws Exception failure.
*/ */
private void validateContent(Path dir, boolean expectSuccessMarker) private void validateContent(Path dir,
throws Exception { boolean expectSuccessMarker,
String expectedJobId) throws Exception {
if (expectSuccessMarker) { if (expectSuccessMarker) {
verifySuccessMarker(dir); SuccessData successData = verifySuccessMarker(dir, expectedJobId);
} }
Path expectedFile = getPart0000(dir); Path expectedFile = getPart0000(dir);
log().debug("Validating content in {}", expectedFile); log().debug("Validating content in {}", expectedFile);
@ -793,7 +803,8 @@ public void testCommitLifecycle() throws Exception {
// validate output // validate output
describe("4. Validating content"); describe("4. Validating content");
validateContent(outDir, shouldExpectSuccessMarker()); validateContent(outDir, shouldExpectSuccessMarker(),
committer.getUUID());
assertNoMultipartUploadsPending(outDir); assertNoMultipartUploadsPending(outDir);
} }
@ -810,7 +821,8 @@ public void testCommitterWithDuplicatedCommit() throws Exception {
commit(committer, jContext, tContext); commit(committer, jContext, tContext);
// validate output // validate output
validateContent(outDir, shouldExpectSuccessMarker()); validateContent(outDir, shouldExpectSuccessMarker(),
committer.getUUID());
assertNoMultipartUploadsPending(outDir); assertNoMultipartUploadsPending(outDir);
@ -875,7 +887,8 @@ public void testTwoTaskAttemptsCommit() throws Exception {
// validate output // validate output
S3AFileSystem fs = getFileSystem(); S3AFileSystem fs = getFileSystem();
SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1); SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1,
"");
Assertions.assertThat(successData.getFilenames()) Assertions.assertThat(successData.getFilenames())
.describedAs("Files committed") .describedAs("Files committed")
.hasSize(1); .hasSize(1);
@ -911,7 +924,8 @@ public void testCommitterWithFailure() throws Exception {
commitJob(committer, jContext); commitJob(committer, jContext);
// but the data got there, due to the order of operations. // but the data got there, due to the order of operations.
validateContent(outDir, shouldExpectSuccessMarker()); validateContent(outDir, shouldExpectSuccessMarker(),
committer.getUUID());
expectJobCommitToFail(jContext, committer); expectJobCommitToFail(jContext, committer);
} }
@ -1007,7 +1021,7 @@ public void testMapFileOutputCommitter() throws Exception {
describe("\nvalidating"); describe("\nvalidating");
// validate output // validate output
verifySuccessMarker(outDir); verifySuccessMarker(outDir, committer.getUUID());
describe("validate output of %s", outDir); describe("validate output of %s", outDir);
validateMapFileOutputContent(fs, outDir); validateMapFileOutputContent(fs, outDir);
@ -1269,7 +1283,7 @@ public Path getDefaultWorkFile(
// validate output // validate output
// There's no success marker in the subdirectory // There's no success marker in the subdirectory
validateContent(outSubDir, false); validateContent(outSubDir, false, "");
} }
/** /**
@ -1327,7 +1341,7 @@ public void testOutputFormatIntegration() throws Throwable {
commitTask(committer, tContext); commitTask(committer, tContext);
commitJob(committer, jContext); commitJob(committer, jContext);
// validate output // validate output
verifySuccessMarker(outDir); verifySuccessMarker(outDir, committer.getUUID());
} }
/** /**
@ -1387,7 +1401,9 @@ public void testParallelJobsToAdjacentPaths() throws Throwable {
assertNotEquals(job1Dest, job2Dest); assertNotEquals(job1Dest, job2Dest);
// create the second job // create the second job
Job job2 = newJob(job2Dest, new JobConf(getConfiguration()), attempt20); Job job2 = newJob(job2Dest,
unsetUUIDOptions(new JobConf(getConfiguration())),
attempt20);
Configuration conf2 = job2.getConfiguration(); Configuration conf2 = job2.getConfiguration();
conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
try { try {
@ -1400,7 +1416,13 @@ public void testParallelJobsToAdjacentPaths() throws Throwable {
setup(jobData2); setup(jobData2);
abortInTeardown(jobData2); abortInTeardown(jobData2);
// make sure the directories are different // make sure the directories are different
assertEquals(job2Dest, committer2.getOutputPath()); assertNotEquals("Committer output paths",
committer1.getOutputPath(),
committer2.getOutputPath());
assertNotEquals("job UUIDs",
committer1.getUUID(),
committer2.getUUID());
// job2 setup, write some data there // job2 setup, write some data there
writeTextOutput(tContext2); writeTextOutput(tContext2);
@ -1430,6 +1452,259 @@ public void testParallelJobsToAdjacentPaths() throws Throwable {
} }
/**
* Run two jobs with the same destination and different output paths.
* <p></p>
* This only works if the jobs are set to NOT delete all outstanding
* uploads under the destination path.
* <p></p>
* See HADOOP-17318.
*/
@Test
public void testParallelJobsToSameDestination() throws Throwable {
describe("Run two jobs to the same destination, assert they both complete");
Configuration conf = getConfiguration();
conf.setBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS, false);
// this job has a job ID generated and set as the spark UUID;
// the config is also set to require it.
// This mimics the Spark setup process.
String stage1Id = UUID.randomUUID().toString();
conf.set(SPARK_WRITE_UUID, stage1Id);
conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
// create the job and write data in its task attempt
JobData jobData = startJob(true);
Job job1 = jobData.job;
AbstractS3ACommitter committer1 = jobData.committer;
JobContext jContext1 = jobData.jContext;
TaskAttemptContext tContext1 = jobData.tContext;
Path job1TaskOutputFile = jobData.writtenTextPath;
// the write path
Assertions.assertThat(committer1.getWorkPath().toString())
.describedAs("Work path path of %s", committer1)
.contains(stage1Id);
// now build up a second job
String jobId2 = randomJobId();
// second job will use same ID
String attempt2 = taskAttempt0.toString();
TaskAttemptID taskAttempt2 = taskAttempt0;
// create the second job
Configuration c2 = unsetUUIDOptions(new JobConf(conf));
c2.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
Job job2 = newJob(outDir,
c2,
attempt2);
Configuration jobConf2 = job2.getConfiguration();
jobConf2.set("mapreduce.output.basename", "task2");
String stage2Id = UUID.randomUUID().toString();
jobConf2.set(SPARK_WRITE_UUID,
stage2Id);
JobContext jContext2 = new JobContextImpl(jobConf2,
taskAttempt2.getJobID());
TaskAttemptContext tContext2 =
new TaskAttemptContextImpl(jobConf2, taskAttempt2);
AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2);
Assertions.assertThat(committer2.getJobAttemptPath(jContext2))
.describedAs("Job attempt path of %s", committer2)
.isNotEqualTo(committer1.getJobAttemptPath(jContext1));
Assertions.assertThat(committer2.getTaskAttemptPath(tContext2))
.describedAs("Task attempt path of %s", committer2)
.isNotEqualTo(committer1.getTaskAttemptPath(tContext1));
Assertions.assertThat(committer2.getWorkPath().toString())
.describedAs("Work path path of %s", committer2)
.isNotEqualTo(committer1.getWorkPath().toString())
.contains(stage2Id);
Assertions.assertThat(committer2.getUUIDSource())
.describedAs("UUID source of %s", committer2)
.isEqualTo(AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID);
JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2);
setup(jobData2);
abortInTeardown(jobData2);
// the sequence is designed to ensure that job2 has active multipart
// uploads during/after job1's work
// if the committer is a magic committer, MPUs start in the write,
// otherwise in task commit.
boolean multipartInitiatedInWrite =
committer2 instanceof MagicS3GuardCommitter;
// job2. Here we start writing a file and have that write in progress
// when job 1 commits.
LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
recordWriter2 = new LoggingTextOutputFormat<>().getRecordWriter(
tContext2);
LOG.info("Commit Task 1");
commitTask(committer1, tContext1);
if (multipartInitiatedInWrite) {
// magic committer runs -commit job1 while a job2 TA has an open
// writer (and hence: open MP Upload)
LOG.info("With Multipart Initiated In Write: Commit Job 1");
commitJob(committer1, jContext1);
}
// job2/task writes its output to the destination and
// closes the file
writeOutput(recordWriter2, tContext2);
// get the output file
Path job2TaskOutputFile = recordWriter2.getDest();
// commit the second task
LOG.info("Commit Task 2");
commitTask(committer2, tContext2);
if (!multipartInitiatedInWrite) {
// if not a magic committer, commit the job now. Because at
// this point the staging committer tasks from job2 will be pending
LOG.info("With Multipart NOT Initiated In Write: Commit Job 1");
assertJobAttemptPathExists(committer1, jContext1);
commitJob(committer1, jContext1);
}
// run the warning scan code, which will find output.
// this can be manually reviewed in the logs to verify
// readability
committer2.warnOnActiveUploads(outDir);
// and second job
LOG.info("Commit Job 2");
assertJobAttemptPathExists(committer2, jContext2);
commitJob(committer2, jContext2);
// validate the output
Path job1Output = new Path(outDir, job1TaskOutputFile.getName());
Path job2Output = new Path(outDir, job2TaskOutputFile.getName());
assertNotEquals("Job output file filenames must be different",
job1Output, job2Output);
// job1 output must be there
assertPathExists("job 1 output", job1Output);
// job 2 file is there
assertPathExists("job 2 output", job2Output);
// and nothing is pending
assertNoMultipartUploadsPending(outDir);
}
/**
* Verify self-generated UUID logic.
* A committer used for job setup can also use it for task setup,
* but a committer which generated a job ID but was only
* used for task setup -that is rejected.
* Task abort will still work.
*/
@Test
public void testSelfGeneratedUUID() throws Throwable {
describe("Run two jobs to the same destination, assert they both complete");
Configuration conf = getConfiguration();
unsetUUIDOptions(conf);
// job is set to generate UUIDs
conf.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true);
// create the job. don't write anything
JobData jobData = startJob(false);
AbstractS3ACommitter committer = jobData.committer;
String uuid = committer.getUUID();
Assertions.assertThat(committer.getUUIDSource())
.describedAs("UUID source of %s", committer)
.isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally);
// examine the job configuration and verify that it has been updated
Configuration jobConf = jobData.conf;
Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID, null))
.describedAs("Config option " + FS_S3A_COMMITTER_UUID)
.isEqualTo(uuid);
Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID_SOURCE, null))
.describedAs("Config option " + FS_S3A_COMMITTER_UUID_SOURCE)
.isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally
.getText());
// because the task was set up in the job, it can have task
// setup called, even though it had a random ID.
committer.setupTask(jobData.tContext);
// but a new committer will not be set up
TaskAttemptContext tContext2 =
new TaskAttemptContextImpl(conf, taskAttempt1);
AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2);
Assertions.assertThat(committer2.getUUIDSource())
.describedAs("UUID source of %s", committer2)
.isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally);
assertNotEquals("job UUIDs",
committer.getUUID(),
committer2.getUUID());
// Task setup MUST fail.
intercept(PathCommitException.class,
E_SELF_GENERATED_JOB_UUID, () -> {
committer2.setupTask(tContext2);
return committer2;
});
// task abort with the self-generated option is fine.
committer2.abortTask(tContext2);
}
/**
* Verify the option to require a UUID applies and
* when a committer is instantiated without those options,
* it fails early.
*/
@Test
public void testRequirePropagatedUUID() throws Throwable {
Configuration conf = getConfiguration();
unsetUUIDOptions(conf);
conf.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
conf.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true);
// create the job, expect a failure, even if UUID generation
// is enabled.
intercept(PathCommitException.class, E_NO_SPARK_UUID, () ->
startJob(false));
}
/**
* Strip staging/spark UUID options.
* @param conf config
* @return the patched config
*/
protected Configuration unsetUUIDOptions(final Configuration conf) {
conf.unset(SPARK_WRITE_UUID);
conf.unset(FS_S3A_COMMITTER_UUID);
conf.unset(FS_S3A_COMMITTER_GENERATE_UUID);
conf.unset(FS_S3A_COMMITTER_REQUIRE_UUID);
return conf;
}
/**
* Assert that a committer's job attempt path exists.
* For the staging committers, this is in the cluster FS.
* @param committer committer
* @param jobContext job context
* @throws IOException failure
*/
protected void assertJobAttemptPathExists(
final AbstractS3ACommitter committer,
final JobContext jobContext) throws IOException {
Path attemptPath = committer.getJobAttemptPath(jobContext);
ContractTestUtils.assertIsDirectory(
attemptPath.getFileSystem(committer.getConf()),
attemptPath);
}
@Test @Test
public void testS3ACommitterFactoryBinding() throws Throwable { public void testS3ACommitterFactoryBinding() throws Throwable {
describe("Verify that the committer factory returns this " describe("Verify that the committer factory returns this "

View File

@ -66,7 +66,7 @@ public LoggingLineRecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
} }
Path file = getDefaultWorkFile(job, extension); Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf); FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false); FSDataOutputStream fileOut = fs.create(file, true);
LOG.debug("Creating LineRecordWriter with destination {}", file); LOG.debug("Creating LineRecordWriter with destination {}", file);
if (isCompressed) { if (isCompressed) {
return new LoggingLineRecordWriter<>( return new LoggingLineRecordWriter<>(

View File

@ -77,7 +77,7 @@
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS; import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS; import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@ -254,7 +254,7 @@ public void test_200_execute() throws Exception {
jobConf.set("mock-results-file", committerPath); jobConf.set("mock-results-file", committerPath);
// setting up staging options is harmless for other committers // setting up staging options is harmless for other committers
jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID); jobConf.set(FS_S3A_COMMITTER_UUID, commitUUID);
mrJob.setInputFormatClass(TextInputFormat.class); mrJob.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(mrJob, FileInputFormat.addInputPath(mrJob,
@ -310,7 +310,8 @@ public void test_200_execute() throws Exception {
committerName(), committerName(),
fs, fs,
"MR job " + jobID, "MR job " + jobID,
1); 1,
"");
String commitData = successData.toString(); String commitData = successData.toString();
FileStatus[] results = fs.listStatus(outputPath, FileStatus[] results = fs.listStatus(outputPath,

View File

@ -21,6 +21,9 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -34,6 +37,7 @@
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
@ -91,14 +95,14 @@ public void assertJobAbortCleanedUp(JobData jobData)
} }
@Override @Override
protected AbstractS3ACommitter createCommitter( protected MagicS3GuardCommitter createCommitter(
Path outputPath, Path outputPath,
TaskAttemptContext context) TaskAttemptContext context)
throws IOException { throws IOException {
return new MagicS3GuardCommitter(outputPath, context); return new MagicS3GuardCommitter(outputPath, context);
} }
public AbstractS3ACommitter createFailingCommitter( public MagicS3GuardCommitter createFailingCommitter(
TaskAttemptContext tContext) throws IOException { TaskAttemptContext tContext) throws IOException {
return new CommitterWithFailedThenSucceed(getOutDir(), tContext); return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
} }
@ -136,6 +140,41 @@ protected void validateTaskAttemptWorkingDirectory(
containsString('/' + CommitConstants.MAGIC + '/')); containsString('/' + CommitConstants.MAGIC + '/'));
} }
/**
* Verify that the __magic path for the application/tasks use the
* committer UUID to ensure uniqueness in the case of more than
* one job writing to the same destination path.
*/
@Test
public void testCommittersPathsHaveUUID() throws Throwable {
TaskAttemptContext tContext = new TaskAttemptContextImpl(
getConfiguration(),
getTaskAttempt0());
MagicS3GuardCommitter committer = createCommitter(getOutDir(), tContext);
String ta0 = getTaskAttempt0().toString();
// magic path for the task attempt
Path taskAttemptPath = committer.getTaskAttemptPath(tContext);
Assertions.assertThat(taskAttemptPath.toString())
.describedAs("task path of %s", committer)
.contains(committer.getUUID())
.contains(MAGIC)
.doesNotContain(TEMP_DATA)
.endsWith(BASE)
.contains(ta0);
// temp path for files which the TA will create with an absolute path
// and which need renaming into place.
Path tempTaskAttemptPath = committer.getTempTaskAttemptPath(tContext);
Assertions.assertThat(tempTaskAttemptPath.toString())
.describedAs("Temp task path of %s", committer)
.contains(committer.getUUID())
.contains(TEMP_DATA)
.doesNotContain(MAGIC)
.doesNotContain(BASE)
.contains(ta0);
}
/** /**
* The class provides a overridden implementation of commitJobInternal which * The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed. * causes the commit failed for the first time then succeed.

View File

@ -341,7 +341,7 @@ public void setupJob() throws Exception {
protected JobConf createJobConf() { protected JobConf createJobConf() {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
conf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, conf.set(InternalCommitterConstants.FS_S3A_COMMITTER_UUID,
UUID.randomUUID().toString()); UUID.randomUUID().toString());
conf.setBoolean( conf.setBoolean(
CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
@ -401,7 +401,7 @@ public void setupTask() throws Exception {
// get the task's configuration copy so modifications take effect // get the task's configuration copy so modifications take effect
String tmp = System.getProperty( String tmp = System.getProperty(
StagingCommitterConstants.JAVA_IO_TMPDIR); InternalCommitterConstants.JAVA_IO_TMPDIR);
tempDir = new File(tmp); tempDir = new File(tmp);
tac.getConfiguration().set(Constants.BUFFER_DIR, tmp + "/buffer"); tac.getConfiguration().set(Constants.BUFFER_DIR, tmp + "/buffer");
tac.getConfiguration().set( tac.getConfiguration().set(

View File

@ -54,6 +54,8 @@
import org.apache.hadoop.fs.s3a.AWSClientIOException; import org.apache.hadoop.fs.s3a.AWSClientIOException;
import org.apache.hadoop.fs.s3a.MockS3AFileSystem; import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
@ -84,8 +86,13 @@
public class TestStagingCommitter extends StagingTestBase.MiniDFSTest { public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
private static final JobID JOB_ID = new JobID("job", 1); private static final JobID JOB_ID = new JobID("job", 1);
public static final TaskID TASK_ID = new TaskID(JOB_ID, TaskType.REDUCE, 2);
private static final TaskAttemptID AID = new TaskAttemptID( private static final TaskAttemptID AID = new TaskAttemptID(
new TaskID(JOB_ID, TaskType.REDUCE, 2), 3); TASK_ID, 1);
private static final TaskAttemptID AID2 = new TaskAttemptID(
TASK_ID, 2);
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestStagingCommitter.class); LoggerFactory.getLogger(TestStagingCommitter.class);
@ -141,8 +148,8 @@ public void setupCommitter() throws Exception {
jobConf.setInt(FS_S3A_COMMITTER_THREADS, numThreads); jobConf.setInt(FS_S3A_COMMITTER_THREADS, numThreads);
jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
uniqueFilenames); uniqueFilenames);
jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, jobConf.set(FS_S3A_COMMITTER_UUID,
UUID.randomUUID().toString()); uuid());
jobConf.set(RETRY_INTERVAL, "100ms"); jobConf.set(RETRY_INTERVAL, "100ms");
jobConf.setInt(RETRY_LIMIT, 1); jobConf.setInt(RETRY_LIMIT, 1);
@ -190,36 +197,137 @@ public void cleanup() {
} }
} }
@Test private Configuration newConfig() {
public void testUUIDPropagation() throws Exception { return new Configuration(false);
Configuration config = new Configuration();
String jobUUID = addUUID(config);
assertEquals("Upload UUID", jobUUID,
StagingCommitter.getUploadUUID(config, JOB_ID));
} }
@Test
public void testUUIDPropagation() throws Exception {
Configuration config = newConfig();
String uuid = uuid();
config.set(SPARK_WRITE_UUID, uuid);
config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
.buildJobUUID(config, JOB_ID);
assertEquals("Job UUID", uuid, t3.getLeft());
assertEquals("Job UUID source: " + t3,
AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID,
t3.getRight());
}
/**
* If the Spark UUID is required, then binding will fail
* if a UUID did not get passed in.
*/
@Test
public void testUUIDValidation() throws Exception {
Configuration config = newConfig();
config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
intercept(PathCommitException.class, E_NO_SPARK_UUID, () ->
AbstractS3ACommitter.buildJobUUID(config, JOB_ID));
}
/**
* Validate ordering of UUID retrieval.
*/
@Test
public void testUUIDLoadOrdering() throws Exception {
Configuration config = newConfig();
config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
String uuid = uuid();
// MUST be picked up
config.set(FS_S3A_COMMITTER_UUID, uuid);
config.set(SPARK_WRITE_UUID, "something");
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
.buildJobUUID(config, JOB_ID);
assertEquals("Job UUID", uuid, t3.getLeft());
assertEquals("Job UUID source: " + t3,
AbstractS3ACommitter.JobUUIDSource.CommitterUUIDProperty,
t3.getRight());
}
/**
* Verify that unless the config enables self-generation, JobIDs
* are used.
*/
@Test
public void testJobIDIsUUID() throws Exception {
Configuration config = newConfig();
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
.buildJobUUID(config, JOB_ID);
assertEquals("Job UUID source: " + t3,
AbstractS3ACommitter.JobUUIDSource.JobID,
t3.getRight());
// parse it as a JobID
JobID.forName(t3.getLeft());
}
/**
* Verify self-generated UUIDs are supported when enabled,
* and come before JobID.
*/
@Test
public void testSelfGeneratedUUID() throws Exception {
Configuration config = newConfig();
config.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true);
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
.buildJobUUID(config, JOB_ID);
assertEquals("Job UUID source: " + t3,
AbstractS3ACommitter.JobUUIDSource.GeneratedLocally,
t3.getRight());
// parse it
UUID.fromString(t3.getLeft());
}
/**
* Create a UUID and add it as the staging UUID.
* @param config config to patch
* @return the UUID
*/
private String addUUID(Configuration config) { private String addUUID(Configuration config) {
String jobUUID = UUID.randomUUID().toString(); String jobUUID = uuid();
config.set(FS_S3A_COMMITTER_STAGING_UUID, jobUUID); config.set(FS_S3A_COMMITTER_UUID, jobUUID);
return jobUUID; return jobUUID;
} }
/**
* Create a new UUID.
* @return a uuid as a string.
*/
private String uuid() {
return UUID.randomUUID().toString();
}
@Test @Test
public void testAttemptPathConstructionNoSchema() throws Exception { public void testAttemptPathConstructionNoSchema() throws Exception {
Configuration config = new Configuration(); Configuration config = newConfig();
final String jobUUID = addUUID(config); final String jobUUID = addUUID(config);
config.set(BUFFER_DIR, "/tmp/mr-local-0,/tmp/mr-local-1"); config.set(BUFFER_DIR, "/tmp/mr-local-0,/tmp/mr-local-1");
String commonPath = "file:/tmp/mr-local-"; String commonPath = "file:/tmp/mr-local-";
Assertions.assertThat(getLocalTaskAttemptTempDir(config,
jobUUID, tac.getTaskAttemptID()).toString())
.describedAs("Missing scheme should produce local file paths")
.startsWith(commonPath)
.contains(jobUUID);
}
assertThat("Missing scheme should produce local file paths", @Test
getLocalTaskAttemptTempDir(config, public void testAttemptPathsDifferentByTaskAttempt() throws Exception {
jobUUID, tac.getTaskAttemptID()).toString(), Configuration config = newConfig();
StringStartsWith.startsWith(commonPath)); final String jobUUID = addUUID(config);
config.set(BUFFER_DIR, "file:/tmp/mr-local-0");
String attempt1Path = getLocalTaskAttemptTempDir(config,
jobUUID, AID).toString();
String attempt2Path = getLocalTaskAttemptTempDir(config,
jobUUID, AID2).toString();
Assertions.assertThat(attempt2Path)
.describedAs("local task attempt dir of TA1 must not match that of TA2")
.isNotEqualTo(attempt1Path);
} }
@Test @Test
public void testAttemptPathConstructionWithSchema() throws Exception { public void testAttemptPathConstructionWithSchema() throws Exception {
Configuration config = new Configuration(); Configuration config = newConfig();
final String jobUUID = addUUID(config); final String jobUUID = addUUID(config);
String commonPath = "file:/tmp/mr-local-"; String commonPath = "file:/tmp/mr-local-";
@ -234,7 +342,7 @@ public void testAttemptPathConstructionWithSchema() throws Exception {
@Test @Test
public void testAttemptPathConstructionWrongSchema() throws Exception { public void testAttemptPathConstructionWrongSchema() throws Exception {
Configuration config = new Configuration(); Configuration config = newConfig();
final String jobUUID = addUUID(config); final String jobUUID = addUUID(config);
config.set(BUFFER_DIR, config.set(BUFFER_DIR,
"hdfs://nn:8020/tmp/mr-local-0,hdfs://nn:8020/tmp/mr-local-1"); "hdfs://nn:8020/tmp/mr-local-0,hdfs://nn:8020/tmp/mr-local-1");
@ -270,7 +378,7 @@ public void testSingleTaskCommit() throws Exception {
assertEquals("Should name the commits file with the task ID: " + results, assertEquals("Should name the commits file with the task ID: " + results,
"task_job_0001_r_000002", stats[0].getPath().getName()); "task_job_0001_r_000002", stats[0].getPath().getName());
PendingSet pending = PendingSet.load(dfs, stats[0].getPath()); PendingSet pending = PendingSet.load(dfs, stats[0]);
assertEquals("Should have one pending commit", 1, pending.size()); assertEquals("Should have one pending commit", 1, pending.size());
SinglePendingCommit commit = pending.getCommits().get(0); SinglePendingCommit commit = pending.getCommits().get(0);
assertEquals("Should write to the correct bucket:" + results, assertEquals("Should write to the correct bucket:" + results,
@ -310,8 +418,7 @@ public void testSingleTaskEmptyFileCommit() throws Exception {
assertEquals("Should name the commits file with the task ID", assertEquals("Should name the commits file with the task ID",
"task_job_0001_r_000002", stats[0].getPath().getName()); "task_job_0001_r_000002", stats[0].getPath().getName());
PendingSet pending = PendingSet.load(dfs, PendingSet pending = PendingSet.load(dfs, stats[0]);
stats[0].getPath());
assertEquals("Should have one pending commit", 1, pending.size()); assertEquals("Should have one pending commit", 1, pending.size());
} }
@ -334,7 +441,7 @@ public void testSingleTaskMultiFileCommit() throws Exception {
"task_job_0001_r_000002", stats[0].getPath().getName()); "task_job_0001_r_000002", stats[0].getPath().getName());
List<SinglePendingCommit> pending = List<SinglePendingCommit> pending =
PendingSet.load(dfs, stats[0].getPath()).getCommits(); PendingSet.load(dfs, stats[0]).getCommits();
assertEquals("Should have correct number of pending commits", assertEquals("Should have correct number of pending commits",
files.size(), pending.size()); files.size(), pending.size());

View File

@ -110,7 +110,7 @@ protected ActiveCommit listPendingUploadsToCommit(
file.deleteOnExit(); file.deleteOnExit();
Path path = new Path(file.toURI()); Path path = new Path(file.toURI());
pendingSet.save(localFS, path, true); pendingSet.save(localFS, path, true);
activeCommit.add(path); activeCommit.add(localFS.getFileStatus(path));
} }
return activeCommit; return activeCommit;
} }

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.fs.s3a.commit.staging.integration; package org.apache.hadoop.fs.s3a.commit.staging.integration;
import java.io.IOException; import java.io.IOException;
import java.util.UUID;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -34,6 +36,7 @@
import org.apache.hadoop.fs.s3a.commit.staging.Paths; import org.apache.hadoop.fs.s3a.commit.staging.Paths;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
@ -68,8 +71,15 @@ public void setup() throws Exception {
// identify working dir for staging and delete // identify working dir for staging and delete
Configuration conf = getConfiguration(); Configuration conf = getConfiguration();
String uuid = StagingCommitter.getUploadUUID(conf, String uuid = UUID.randomUUID().toString();
getTaskAttempt0().getJobID()); conf.set(InternalCommitterConstants.SPARK_WRITE_UUID,
uuid);
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
.buildJobUUID(conf, JobID.forName("job_" + getJobId()));
assertEquals("Job UUID", uuid, t3.getLeft());
assertEquals("Job UUID source: " + t3,
AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID,
t3.getRight());
Path tempDir = Paths.getLocalTaskAttemptTempDir(conf, uuid, Path tempDir = Paths.getLocalTaskAttemptTempDir(conf, uuid,
getTaskAttempt0()); getTaskAttempt0());
rmdir(tempDir, conf); rmdir(tempDir, conf);

View File

@ -242,7 +242,7 @@ private void executeStage(
+ "(" + StringUtils.join(", ", args) + ")" + "(" + StringUtils.join(", ", args) + ")"
+ " failed", 0, result); + " failed", 0, result);
validateSuccessFile(dest, committerName(), getFileSystem(), stage, validateSuccessFile(dest, committerName(), getFileSystem(), stage,
minimumFileCount); minimumFileCount, "");
completedStage(stage, d); completedStage(stage, d);
} }

View File

@ -103,7 +103,7 @@ protected Configuration createScaleConfiguration() {
KEY_HUGE_PARTITION_SIZE, KEY_HUGE_PARTITION_SIZE,
DEFAULT_HUGE_PARTITION_SIZE); DEFAULT_HUGE_PARTITION_SIZE);
assertTrue("Partition size too small: " + partitionSize, assertTrue("Partition size too small: " + partitionSize,
partitionSize > MULTIPART_MIN_SIZE); partitionSize >= MULTIPART_MIN_SIZE);
conf.setLong(SOCKET_SEND_BUFFER, _1MB); conf.setLong(SOCKET_SEND_BUFFER, _1MB);
conf.setLong(SOCKET_RECV_BUFFER, _1MB); conf.setLong(SOCKET_RECV_BUFFER, _1MB);
conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize); conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize);