Revert "HADOOP-18402. S3A committer NPE in spark job abort (#4735)"

(managed to commit through the github ui before I'd got the message done)

This reverts commit ad83e95046.
This commit is contained in:
Steve Loughran 2022-08-15 11:19:56 +01:00
parent ad83e95046
commit eee59a8372
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
2 changed files with 4 additions and 20 deletions

View File

@ -22,7 +22,6 @@
import org.apache.hadoop.fs.audit.CommonAuditContext; import org.apache.hadoop.fs.audit.CommonAuditContext;
import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
@ -50,17 +49,12 @@ public final class AuditContextUpdater {
* @param jobContext job/task context. * @param jobContext job/task context.
*/ */
public AuditContextUpdater(final JobContext jobContext) { public AuditContextUpdater(final JobContext jobContext) {
JobID contextJobID = jobContext.getJobID(); this.jobId = jobContext.getJobID().toString();
this.jobId = contextJobID != null
? contextJobID.toString()
: null;
if (jobContext instanceof TaskAttemptContext) { if (jobContext instanceof TaskAttemptContext) {
// it's a task, extract info for auditing // it's a task, extract info for auditing
final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID(); final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
this.taskAttemptId = tid != null this.taskAttemptId = tid.toString();
? tid.toString()
: null;
} else { } else {
this.taskAttemptId = null; this.taskAttemptId = null;
} }
@ -76,11 +70,7 @@ public AuditContextUpdater(String jobId) {
*/ */
public void updateCurrentAuditContext() { public void updateCurrentAuditContext() {
final CommonAuditContext auditCtx = currentAuditContext(); final CommonAuditContext auditCtx = currentAuditContext();
if (jobId != null) {
auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId); auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
} else {
currentAuditContext().remove(AuditConstants.PARAM_JOB_ID);
}
if (taskAttemptId != null) { if (taskAttemptId != null) {
auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId); auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
} else { } else {

View File

@ -40,7 +40,6 @@
import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.JsonSerialization; import org.apache.hadoop.util.JsonSerialization;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
@ -157,12 +156,7 @@ public CommitContext(
this.commitOperations = commitOperations; this.commitOperations = commitOperations;
this.jobContext = jobContext; this.jobContext = jobContext;
this.conf = jobContext.getConfiguration(); this.conf = jobContext.getConfiguration();
JobID contextJobID = jobContext.getJobID(); this.jobId = jobContext.getJobID().toString();
// either the job ID or make one up as it will be
// used for the filename of any reports.
this.jobId = contextJobID != null
? contextJobID.toString()
: ("job-without-id-at-" + System.currentTimeMillis());
this.collectIOStatistics = conf.getBoolean( this.collectIOStatistics = conf.getBoolean(
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT); S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);