From 72fb517469e42963b3b92383a2f80e0eba0cdcd5 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Thu, 5 Jul 2012 16:29:09 +0000 Subject: [PATCH] MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1357723 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../org/apache/hadoop/mapred/JobClient.java | 8 +++ .../org/apache/hadoop/mapred/RunningJob.java | 12 +++- .../hadoop/mapred/TestNetworkedJob.java | 70 ++++++++++++++++++- 4 files changed, 87 insertions(+), 5 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index fb561e5c2e..dcebdfa2b7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -132,6 +132,8 @@ Branch-2 ( Unreleased changes ) NEW FEATURES + MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu) + IMPROVEMENTS MAPREDUCE-4146. Support limits on task status string length and number of diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index 5d7919dbea..d221c9892f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -441,6 +441,14 @@ public String getFailureInfo() throws IOException { } } + @Override + public JobStatus getJobStatus() throws IOException { + try { + return JobStatus.downgrade(job.getStatus()); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RunningJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RunningJob.java index 5a11fa876e..5873ace4de 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RunningJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RunningJob.java @@ -149,8 +149,16 @@ public interface RunningJob { public int getJobState() throws IOException; /** - * Kill the running job. Blocks until all job tasks have been - * killed as well. If the job is no longer running, it simply returns. + * Returns a snapshot of the current status, {@link JobStatus}, of the Job. + * Need to call again for latest information. + * + * @throws IOException + */ + public JobStatus getJobStatus() throws IOException; + + /** + * Kill the running job. Blocks until all job tasks have been killed as well. + * If the job is no longer running, it simply returns. * * @throws IOException */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java index b6565e2895..37aa7b24a3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java @@ -18,16 +18,31 @@ package org.apache.hadoop.mapred; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import java.util.List; +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapreduce.Job; import org.junit.Test; -import static org.mockito.Mockito.*; public class TestNetworkedJob { + private static String TEST_ROOT_DIR = new File(System.getProperty( + "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); + private static Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local"); + private static Path inFile = new Path(testDir, "in"); + private static Path outDir = new Path(testDir, "out"); @SuppressWarnings("deprecation") @Test @@ -41,4 +56,53 @@ public void testGetNullCounters() throws Exception { //verification verify(mockJob).getCounters(); } + + @Test + public void testGetJobStatus() throws IOException, InterruptedException, + ClassNotFoundException { + MiniMRClientCluster mr = null; + FileSystem fileSys = null; + + try { + mr = MiniMRClientClusterFactory.create(this.getClass(), 2, + new Configuration()); + + JobConf job = new JobConf(mr.getConfig()); + + fileSys = FileSystem.get(job); + fileSys.delete(testDir, true); + FSDataOutputStream out = fileSys.create(inFile, true); + out.writeBytes("This is a test file"); + out.close(); + + FileInputFormat.setInputPaths(job, inFile); + FileOutputFormat.setOutputPath(job, outDir); + + job.setInputFormat(TextInputFormat.class); + job.setOutputFormat(TextOutputFormat.class); + + job.setMapperClass(IdentityMapper.class); + job.setReducerClass(IdentityReducer.class); + job.setNumReduceTasks(0); + + JobClient client = new JobClient(mr.getConfig()); + RunningJob rj = client.submitJob(job); + JobID jobId = rj.getID(); + + // The following asserts read JobStatus twice and ensure the returned + // JobStatus objects correspond to the same Job. + assertEquals("Expected matching JobIDs", jobId, client.getJob(jobId) + .getJobStatus().getJobID()); + assertEquals("Expected matching startTimes", rj.getJobStatus() + .getStartTime(), client.getJob(jobId).getJobStatus() + .getStartTime()); + } finally { + if (fileSys != null) { + fileSys.delete(testDir, true); + } + if (mr != null) { + mr.stop(); + } + } + } }