diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 008f190843..f591bc8d92 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -283,6 +283,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-5335. Rename Job Tracker terminology in ShuffleSchedulerImpl. (devaraj via ozawa) + MAPREDUCE-4431. mapred command should print the reason on killing already + completed jobs. (devaraj via ozawa) + OPTIMIZATIONS MAPREDUCE-6169. MergeQueue should release reference to the current item diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java index 0d74d9fa3a..37ba5b7119 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java @@ -296,9 +296,24 @@ public int run(String[] argv) throws Exception { if (job == null) { System.out.println("Could not find job " + jobid); } else { - job.killJob(); - System.out.println("Killed job " + jobid); - exitCode = 0; + JobStatus jobStatus = job.getStatus(); + if (jobStatus.getState() == JobStatus.State.FAILED) { + System.out.println("Could not mark the job " + jobid + + " as killed, as it has already failed."); + exitCode = -1; + } else if (jobStatus.getState() == JobStatus.State.KILLED) { + System.out + .println("The job " + jobid + " has already been killed."); + exitCode = -1; + } else if (jobStatus.getState() == JobStatus.State.SUCCEEDED) { + System.out.println("Could not kill the job " + jobid + + ", as it has already succeeded."); + exitCode = -1; + } else { + job.killJob(); + System.out.println("Killed job " + jobid); + exitCode = 0; + } } } else if (setJobPriority) { Job job = cluster.getJob(JobID.forName(jobid)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java index 0060e8579b..fdc916ebb5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java @@ -19,11 +19,15 @@ import static org.junit.Assert.*; +import java.io.IOException; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.JobPriority; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.JobStatus.State; import org.junit.Test; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -104,4 +108,45 @@ public void testListAttemptIdsWithInvalidInputs() throws Exception { private TaskReport[] getTaskReports(JobID jobId, TaskType type) { return new TaskReport[] { new TaskReport(), new TaskReport() }; } + + @Test + public void testJobKIll() throws Exception { + Cluster mockCluster = mock(Cluster.class); + CLI cli = spy(new CLI()); + doReturn(mockCluster).when(cli).createCluster(); + String jobId1 = "job_1234654654_001"; + String jobId2 = "job_1234654654_002"; + String jobId3 = "job_1234654654_003"; + String jobId4 = "job_1234654654_004"; + Job mockJob1 = mockJob(mockCluster, jobId1, State.RUNNING); + Job mockJob2 = mockJob(mockCluster, jobId2, State.KILLED); + Job mockJob3 = mockJob(mockCluster, jobId3, State.FAILED); + Job mockJob4 = mockJob(mockCluster, jobId4, State.PREP); + + int exitCode1 = cli.run(new String[] { "-kill", jobId1 }); + assertEquals(0, exitCode1); + verify(mockJob1, times(1)).killJob(); + + int exitCode2 = cli.run(new String[] { "-kill", jobId2 }); + assertEquals(-1, exitCode2); + verify(mockJob2, times(0)).killJob(); + + int exitCode3 = cli.run(new String[] { "-kill", jobId3 }); + assertEquals(-1, exitCode3); + verify(mockJob3, times(0)).killJob(); + + int exitCode4 = cli.run(new String[] { "-kill", jobId4 }); + assertEquals(0, exitCode4); + verify(mockJob4, times(1)).killJob(); + } + + private Job mockJob(Cluster mockCluster, String jobId, State jobState) + throws IOException, InterruptedException { + Job mockJob = mock(Job.class); + when(mockCluster.getJob(JobID.forName(jobId))).thenReturn(mockJob); + JobStatus status = new JobStatus(null, 0, 0, 0, 0, jobState, + JobPriority.HIGH, null, null, null, null); + when(mockJob.getStatus()).thenReturn(status); + return mockJob; + } }