diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java index 20024ba601..17f63e6dff 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.audit.CommonAuditContext; import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -49,12 +50,17 @@ public final class AuditContextUpdater { * @param jobContext job/task context. */ public AuditContextUpdater(final JobContext jobContext) { - this.jobId = jobContext.getJobID().toString(); + JobID contextJobID = jobContext.getJobID(); + this.jobId = contextJobID != null + ? contextJobID.toString() + : null; if (jobContext instanceof TaskAttemptContext) { // it's a task, extract info for auditing final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID(); - this.taskAttemptId = tid.toString(); + this.taskAttemptId = tid != null + ? tid.toString() + : null; } else { this.taskAttemptId = null; } @@ -70,7 +76,11 @@ public AuditContextUpdater(String jobId) { */ public void updateCurrentAuditContext() { final CommonAuditContext auditCtx = currentAuditContext(); - auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId); + if (jobId != null) { + auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId); + } else { + currentAuditContext().remove(AuditConstants.PARAM_JOB_ID); + } if (taskAttemptId != null) { auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId); } else { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java index 8ac3dcb231..c93d2d8f73 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.util.JsonSerialization; import org.apache.hadoop.util.Preconditions; @@ -156,7 +157,12 @@ public CommitContext( this.commitOperations = commitOperations; this.jobContext = jobContext; this.conf = jobContext.getConfiguration(); - this.jobId = jobContext.getJobID().toString(); + JobID contextJobID = jobContext.getJobID(); + // either the job ID or make one up as it will be + // used for the filename of any reports. + this.jobId = contextJobID != null + ? contextJobID.toString() + : ("job-without-id-at-" + System.currentTimeMillis()); this.collectIOStatistics = conf.getBoolean( S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);