From 906ae5138e772e4bd57fbb90a202c7f26679057a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 15 Aug 2022 11:18:47 +0100 Subject: [PATCH] HADOOP-18402. S3A committer NPE in spark job abort (#4735) JobID.toString() and TaskID.toString() to only be called when the IDs are not null. This doesn't surface in MapReduce, but Spark SQL can trigger in job abort, where it may invok abortJob() with an incomplete TaskContext. This patch MUST be applied to branches containing HADOOP-17833. "Improve Magic Committer Performance." Contributed by Steve Loughran. --- .../fs/s3a/commit/impl/AuditContextUpdater.java | 16 +++++++++++++--- .../hadoop/fs/s3a/commit/impl/CommitContext.java | 8 +++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java index 20024ba601..17f63e6dff 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.audit.AuditConstants; import org.apache.hadoop.fs.audit.CommonAuditContext; import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -49,12 +50,17 @@ public final class AuditContextUpdater { * @param jobContext job/task context. */ public AuditContextUpdater(final JobContext jobContext) { - this.jobId = jobContext.getJobID().toString(); + JobID contextJobID = jobContext.getJobID(); + this.jobId = contextJobID != null + ? contextJobID.toString() + : null; if (jobContext instanceof TaskAttemptContext) { // it's a task, extract info for auditing final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID(); - this.taskAttemptId = tid.toString(); + this.taskAttemptId = tid != null + ? tid.toString() + : null; } else { this.taskAttemptId = null; } @@ -70,7 +76,11 @@ public final class AuditContextUpdater { */ public void updateCurrentAuditContext() { final CommonAuditContext auditCtx = currentAuditContext(); - auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId); + if (jobId != null) { + auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId); + } else { + currentAuditContext().remove(AuditConstants.PARAM_JOB_ID); + } if (taskAttemptId != null) { auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId); } else { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java index 8ac3dcb231..c93d2d8f73 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.util.JsonSerialization; import org.apache.hadoop.util.Preconditions; @@ -156,7 +157,12 @@ public final class CommitContext implements Closeable { this.commitOperations = commitOperations; this.jobContext = jobContext; this.conf = jobContext.getConfiguration(); - this.jobId = jobContext.getJobID().toString(); + JobID contextJobID = jobContext.getJobID(); + // either the job ID or make one up as it will be + // used for the filename of any reports. + this.jobId = contextJobID != null + ? contextJobID.toString() + : ("job-without-id-at-" + System.currentTimeMillis()); this.collectIOStatistics = conf.getBoolean( S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);