diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e4c0245f25..910667c0bc 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -230,6 +230,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5157. Bring back old sampler related code so that we can support binary compatibility with hadoop-1 sorter example. (Zhijie Shen via vinodkv) + MAPREDUCE-5222. Bring back some methods and constants in Jobclient for + binary compatibility with mapred in 1.x. (Karthik Kambatla via vinodkv) + OPTIMIZATIONS MAPREDUCE-4974. Optimising the LineRecordReader initialize() method diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java index 6fa12da95d..6f6e31b062 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java @@ -17,9 +17,13 @@ */ package org.apache.hadoop.mapred; +import java.io.File; +import java.io.IOException; import java.util.Collection; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; @@ -27,6 +31,9 @@ import org.junit.Test; public class TestJobClient { + final static String TEST_DIR = new File(System.getProperty("test.build.data", + "/tmp")).getAbsolutePath(); + @Test public void testGetClusterStatusWithLocalJobRunner() throws Exception { Configuration conf = new Configuration(); @@ -43,4 +50,32 @@ public void testGetClusterStatusWithLocalJobRunner() throws Exception { .getBlackListedTrackersInfo(); Assert.assertEquals(0, blackListedTrackersInfo.size()); } + + @Test(timeout = 1000) + public void testIsJobDirValid() throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Path testDir = new Path(TEST_DIR); + Assert.assertFalse(JobClient.isJobDirValid(testDir, fs)); + + Path jobconf = new Path(testDir, "job.xml"); + Path jobsplit = new Path(testDir, "job.split"); + fs.create(jobconf); + fs.create(jobsplit); + Assert.assertTrue(JobClient.isJobDirValid(testDir, fs)); + + fs.delete(jobconf, true); + fs.delete(jobsplit, true); + } + + @Test(timeout = 1000) + public void testGetStagingAreaDir() throws IOException, InterruptedException { + Configuration conf = new Configuration(); + JobClient client = new JobClient(conf); + + Assert.assertTrue( + "Mismatch in paths", + client.getClusterHandle().getStagingAreaDir().toString() + .equals(client.getStagingAreaDir().toString())); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index a4ba42f6b1..598ef99501 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -136,6 +137,20 @@ @InterfaceAudience.Public @InterfaceStability.Stable public class JobClient extends CLI { + + @InterfaceAudience.Private + public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY = + "mapreduce.jobclient.retry.policy.enabled"; + @InterfaceAudience.Private + public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = + false; + @InterfaceAudience.Private + public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY = + "mapreduce.jobclient.retry.policy.spec"; + @InterfaceAudience.Private + public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT = + "10000,6,60000,10"; // t1,n1,t2,n2,... + public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; @@ -525,6 +540,12 @@ public RunningJob submitJob(String jobFile) throws FileNotFoundException, */ public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, IOException { + return submitJobInternal(conf); + } + + @InterfaceAudience.Private + public RunningJob submitJobInternal(final JobConf conf) + throws FileNotFoundException, IOException { try { conf.setBooleanIfUnset("mapred.mapper.new-api", false); conf.setBooleanIfUnset("mapred.reducer.new-api", false); @@ -958,6 +979,50 @@ public Path run() throws IOException, InterruptedException { } } + /** + * Checks if the job directory is clean and has all the required components + * for (re) starting the job + */ + public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) + throws IOException { + FileStatus[] contents = fs.listStatus(jobDirPath); + int matchCount = 0; + if (contents != null && contents.length >= 2) { + for (FileStatus status : contents) { + if ("job.xml".equals(status.getPath().getName())) { + ++matchCount; + } + if ("job.split".equals(status.getPath().getName())) { + ++matchCount; + } + } + if (matchCount == 2) { + return true; + } + } + return false; + } + + /** + * Fetch the staging area directory for the application + * + * @return path to staging area directory + * @throws IOException + */ + public Path getStagingAreaDir() throws IOException { + try { + return clientUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Path run() throws IOException, InterruptedException { + return cluster.getStagingAreaDir(); + } + }); + } catch (InterruptedException ie) { + // throw RuntimeException instead for compatibility reasons + throw new RuntimeException(ie); + } + } + private JobQueueInfo getJobQueueInfo(QueueInfo queue) { JobQueueInfo ret = new JobQueueInfo(queue); // make sure to convert any children