diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1eb93e6f5f..55d2442e5a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -451,6 +451,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-5870. Support for passing Job priority through Application Submission Context in Mapreduce Side (Sunil G via jlowe) + MAPREDUCE-6566. Add retry support to mapreduce CLI tool. + (Varun Vasudev via xgong) + OPTIMIZATIONS MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java index 59f5adacd4..db87d9dda1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Arrays; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.TaskReport; @@ -268,7 +270,7 @@ public int run(String[] argv) throws Exception { System.out.println("Created job " + job.getJobID()); exitCode = 0; } else if (getStatus) { - Job job = cluster.getJob(JobID.forName(jobid)); + Job job = getJob(JobID.forName(jobid)); if (job == null) { System.out.println("Could not find job " + jobid); } else { @@ -283,7 +285,7 @@ public int run(String[] argv) throws Exception { exitCode = 0; } } else if (getCounter) { - Job job = cluster.getJob(JobID.forName(jobid)); + Job job = getJob(JobID.forName(jobid)); if (job == null) { System.out.println("Could not find job " + jobid); } else { @@ -299,7 +301,7 @@ public int run(String[] argv) throws Exception { } } } else if (killJob) { - Job job = cluster.getJob(JobID.forName(jobid)); + Job job = getJob(JobID.forName(jobid)); if (job == null) { System.out.println("Could not find job " + jobid); } else { @@ -323,7 +325,7 @@ public int run(String[] argv) throws Exception { } } } else if (setJobPriority) { - Job job = cluster.getJob(JobID.forName(jobid)); + Job job = getJob(JobID.forName(jobid)); if (job == null) { System.out.println("Could not find job " + jobid); } else { @@ -339,7 +341,7 @@ public int run(String[] argv) throws Exception { viewHistory(historyFile, viewAllHistory); exitCode = 0; } else if (listEvents) { - listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents); + listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents); exitCode = 0; } else if (listJobs) { listJobs(cluster); @@ -354,11 +356,11 @@ public int run(String[] argv) throws Exception { listBlacklistedTrackers(cluster); exitCode = 0; } else if (displayTasks) { - displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState); + displayTasks(getJob(JobID.forName(jobid)), taskType, taskState); exitCode = 0; } else if(killTask) { TaskAttemptID taskID = TaskAttemptID.forName(taskid); - Job job = cluster.getJob(taskID.getJobID()); + Job job = getJob(taskID.getJobID()); if (job == null) { System.out.println("Could not find job " + jobid); } else if (job.killTask(taskID, false)) { @@ -370,7 +372,7 @@ public int run(String[] argv) throws Exception { } } else if(failTask) { TaskAttemptID taskID = TaskAttemptID.forName(taskid); - Job job = cluster.getJob(taskID.getJobID()); + Job job = getJob(taskID.getJobID()); if (job == null) { System.out.println("Could not find job " + jobid); } else if(job.killTask(taskID, true)) { @@ -531,6 +533,29 @@ private void listEvents(Job job, int fromEventId, int numEvents) protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); } + + @VisibleForTesting + Job getJob(JobID jobid) throws IOException, InterruptedException { + + int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, + MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES); + long retryInterval = getConf() + .getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, + MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL); + Job job = cluster.getJob(jobid); + + for (int i = 0; i < maxRetry; ++i) { + if (job != null) { + return job; + } + LOG.info("Could not obtain job info after " + String.valueOf(i + 1) + + " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000) + + " seconds and retrying."); + Thread.sleep(retryInterval); + job = cluster.getJob(jobid); + } + return job; + } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java index fdc916ebb5..73f57d5d7d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java @@ -20,14 +20,19 @@ import static org.junit.Assert.*; import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus.State; +import org.apache.hadoop.util.Time; +import org.junit.Assert; import org.junit.Test; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -44,7 +49,7 @@ public void testListAttemptIdsWithValidInput() throws Exception { JobID jobId = JobID.forName(jobIdStr); Cluster mockCluster = mock(Cluster.class); Job job = mock(Job.class); - CLI cli = spy(new CLI()); + CLI cli = spy(new CLI(new Configuration())); doReturn(mockCluster).when(cli).createCluster(); when(job.getTaskReports(TaskType.MAP)).thenReturn( @@ -112,7 +117,7 @@ private TaskReport[] getTaskReports(JobID jobId, TaskType type) { @Test public void testJobKIll() throws Exception { Cluster mockCluster = mock(Cluster.class); - CLI cli = spy(new CLI()); + CLI cli = spy(new CLI(new Configuration())); doReturn(mockCluster).when(cli).createCluster(); String jobId1 = "job_1234654654_001"; String jobId2 = "job_1234654654_002"; @@ -149,4 +154,26 @@ private Job mockJob(Cluster mockCluster, String jobId, State jobState) when(mockJob.getStatus()).thenReturn(status); return mockJob; } + + @Test + public void testGetJob() throws Exception { + Configuration conf = new Configuration(); + long sleepTime = 100; + conf.setLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, sleepTime); + Cluster mockCluster = mock(Cluster.class); + JobID jobId1 = JobID.forName("job_1234654654_001"); + when(mockCluster.getJob(jobId1)).thenReturn(null); + + for (int i = 0; i < 2; ++i) { + conf.setInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, i); + CLI cli = spy(new CLI(conf)); + cli.cluster = mockCluster; + doReturn(mockCluster).when(cli).createCluster(); + long start = Time.monotonicNow(); + cli.getJob(jobId1); + long end = Time.monotonicNow(); + Assert.assertTrue(end - start > (i * sleepTime)); + Assert.assertTrue(end - start < ((i + 1) * sleepTime)); + } + } }