HADOOP-17258. Magic S3Guard Committer to overwrite existing pendingSet file on task commit (#2371)

Contributed by Dongjoon Hyun and Steve Loughran

Change-Id: Ibaf8082e60eff5298ff4e6513edc386c5bae0274
This commit is contained in:
Dongjoon Hyun 2020-10-12 13:39:15 +01:00 committed by Steve Loughran
parent 7cf5bdeec0
commit 5032f8abba
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
6 changed files with 111 additions and 5 deletions

View File

@ -261,4 +261,7 @@ private CommitConstants() {
*/
public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
/** Extra Data key for task attempt in pendingset files. */
public static final String TASK_ATTEMPT_ID = "task.attempt.id";
}

View File

@ -189,4 +189,13 @@ public List<SinglePendingCommit> getCommits() {
public void setCommits(List<SinglePendingCommit> commits) {
this.commits = commits;
}
/**
* Set/Update an extra data entry.
* @param key key
* @param value value
*/
public void putExtraData(String key, String value) {
extraData.put(key, value);
}
}

View File

@ -418,6 +418,15 @@ public void setExtraData(Map<String, String> extraData) {
this.extraData = extraData;
}
/**
* Set/Update an extra data entry.
* @param key key
* @param value value
*/
public void putExtraData(String key, String value) {
extraData.put(key, value);
}
/**
* Destination file size.
* @return size of destination object
@ -429,4 +438,5 @@ public long getLength() {
public void setLength(long length) {
this.length = length;
}
}

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
@ -213,7 +214,7 @@ private PendingSet innerCommitTask(
commit.setJobId(jobId);
commit.setTaskId(taskId);
}
pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId);
Path jobAttemptPath = getJobAttemptPath(context);
TaskAttemptID taskAttemptID = context.getTaskAttemptID();
Path taskOutcomePath = new Path(jobAttemptPath,
@ -221,7 +222,8 @@ private PendingSet innerCommitTask(
CommitConstants.PENDINGSET_SUFFIX);
LOG.info("Saving work of {} to {}", taskAttemptID, taskOutcomePath);
try {
pendingSet.save(getDestFS(), taskOutcomePath, false);
// We will overwrite if there exists a pendingSet file already
pendingSet.save(getDestFS(), taskOutcomePath, true);
} catch (IOException e) {
LOG.warn("Failed to save task commit data to {} ",
taskOutcomePath, e);

View File

@ -695,6 +695,8 @@ protected int commitTaskInternal(final TaskAttemptContext context,
context.progress();
PendingSet pendingCommits = new PendingSet(commitCount);
pendingCommits.putExtraData(TASK_ATTEMPT_ID,
context.getTaskAttemptID().toString());
try {
Tasks.foreach(taskOutput)
.stopOnFailure()

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
@ -307,14 +308,19 @@ public AbstractS3ACommitter createCommitter(TaskAttemptContext context)
* @param context task
* @throws IOException IO failure
* @throws InterruptedException write interrupted
* @return the path written to
*/
protected void writeTextOutput(TaskAttemptContext context)
protected Path writeTextOutput(TaskAttemptContext context)
throws IOException, InterruptedException {
describe("write output");
try (DurationInfo d = new DurationInfo(LOG,
"Writing Text output for task %s", context.getTaskAttemptID())) {
writeOutput(new LoggingTextOutputFormat().getRecordWriter(context),
LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
recordWriter = new LoggingTextOutputFormat<>().getRecordWriter(
context);
writeOutput(recordWriter,
context);
return recordWriter.getDest();
}
}
@ -480,11 +486,17 @@ protected void setup(JobData jobData) throws IOException {
"setup job %s", jContext.getJobID())) {
committer.setupJob(jContext);
}
setupCommitter(committer, tContext);
describe("setup complete\n");
}
private void setupCommitter(
final AbstractS3ACommitter committer,
final TaskAttemptContext tContext) throws IOException {
try (DurationInfo d = new DurationInfo(LOG,
"setup task %s", tContext.getTaskAttemptID())) {
committer.setupTask(tContext);
}
describe("setup complete\n");
}
/**
@ -806,6 +818,74 @@ public void testCommitterWithDuplicatedCommit() throws Exception {
expectFNFEonTaskCommit(committer, tContext);
}
/**
* HADOOP-17258. If a second task attempt is committed, it
* must succeed, and the output of the first TA, even if already
* committed, MUST NOT be visible in the final output.
* <p></p>
* What's important is not just that only one TA must succeed,
* but it must be the last one executed. Why? because that's
* the one
*/
@Test
public void testTwoTaskAttemptsCommit() throws Exception {
describe("Commit two task attempts;" +
" expect the second attempt to succeed.");
JobData jobData = startJob(false);
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
// do commit
describe("\ncommitting task");
// write output for TA 1,
Path outputTA1 = writeTextOutput(tContext);
// speculatively execute committer 2.
// jobconf with a different base to its parts.
Configuration conf2 = jobData.conf;
conf2.set("mapreduce.output.basename", "attempt2");
String attempt2 = "attempt_" + jobId + "_m_000000_1";
TaskAttemptID ta2 = TaskAttemptID.forName(attempt2);
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(
conf2, ta2);
AbstractS3ACommitter committer2 = standardCommitterFactory
.createCommitter(tContext2);
setupCommitter(committer2, tContext2);
// write output for TA 2,
Path outputTA2 = writeTextOutput(tContext2);
// verify the names are different.
String name1 = outputTA1.getName();
String name2 = outputTA2.getName();
Assertions.assertThat(name1)
.describedAs("name of task attempt output %s", outputTA1)
.isNotEqualTo(name2);
// commit task 1
committer.commitTask(tContext);
// then pretend that task1 didn't respond, so
// commit task 2
committer2.commitTask(tContext2);
// and the job
committer2.commitJob(tContext);
// validate output
S3AFileSystem fs = getFileSystem();
SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1);
Assertions.assertThat(successData.getFilenames())
.describedAs("Files committed")
.hasSize(1);
assertPathExists("attempt2 output", new Path(outDir, name2));
assertPathDoesNotExist("attempt1 output", new Path(outDir, name1));
assertNoMultipartUploadsPending(outDir);
}
protected boolean shouldExpectSuccessMarker() {
return true;
}