MAPREDUCE-5222. Bring back some methods and constants in Jobclient for binary compatibility with mapred in 1.x. Contributed by Karthik Kambatla.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1482208 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1a119f87b4
commit
febf951220
@ -230,6 +230,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||
MAPREDUCE-5157. Bring back old sampler related code so that we can support
|
||||
binary compatibility with hadoop-1 sorter example. (Zhijie Shen via vinodkv)
|
||||
|
||||
MAPREDUCE-5222. Bring back some methods and constants in Jobclient for
|
||||
binary compatibility with mapred in 1.x. (Karthik Kambatla via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
|
||||
|
@ -17,9 +17,13 @@
|
||||
*/
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
||||
@ -27,6 +31,9 @@
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestJobClient {
|
||||
final static String TEST_DIR = new File(System.getProperty("test.build.data",
|
||||
"/tmp")).getAbsolutePath();
|
||||
|
||||
@Test
|
||||
public void testGetClusterStatusWithLocalJobRunner() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
@ -43,4 +50,32 @@ public void testGetClusterStatusWithLocalJobRunner() throws Exception {
|
||||
.getBlackListedTrackersInfo();
|
||||
Assert.assertEquals(0, blackListedTrackersInfo.size());
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testIsJobDirValid() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
Path testDir = new Path(TEST_DIR);
|
||||
Assert.assertFalse(JobClient.isJobDirValid(testDir, fs));
|
||||
|
||||
Path jobconf = new Path(testDir, "job.xml");
|
||||
Path jobsplit = new Path(testDir, "job.split");
|
||||
fs.create(jobconf);
|
||||
fs.create(jobsplit);
|
||||
Assert.assertTrue(JobClient.isJobDirValid(testDir, fs));
|
||||
|
||||
fs.delete(jobconf, true);
|
||||
fs.delete(jobsplit, true);
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testGetStagingAreaDir() throws IOException, InterruptedException {
|
||||
Configuration conf = new Configuration();
|
||||
JobClient client = new JobClient(conf);
|
||||
|
||||
Assert.assertTrue(
|
||||
"Mismatch in paths",
|
||||
client.getClusterHandle().getStagingAreaDir().toString()
|
||||
.equals(client.getStagingAreaDir().toString()));
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
@ -136,6 +137,20 @@
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public class JobClient extends CLI {
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
|
||||
"mapreduce.jobclient.retry.policy.enabled";
|
||||
@InterfaceAudience.Private
|
||||
public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
|
||||
false;
|
||||
@InterfaceAudience.Private
|
||||
public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
|
||||
"mapreduce.jobclient.retry.policy.spec";
|
||||
@InterfaceAudience.Private
|
||||
public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
||||
"10000,6,60000,10"; // t1,n1,t2,n2,...
|
||||
|
||||
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
|
||||
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
|
||||
|
||||
@ -525,6 +540,12 @@ public RunningJob submitJob(String jobFile) throws FileNotFoundException,
|
||||
*/
|
||||
public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
|
||||
IOException {
|
||||
return submitJobInternal(conf);
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public RunningJob submitJobInternal(final JobConf conf)
|
||||
throws FileNotFoundException, IOException {
|
||||
try {
|
||||
conf.setBooleanIfUnset("mapred.mapper.new-api", false);
|
||||
conf.setBooleanIfUnset("mapred.reducer.new-api", false);
|
||||
@ -958,6 +979,50 @@ public Path run() throws IOException, InterruptedException {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the job directory is clean and has all the required components
|
||||
* for (re) starting the job
|
||||
*/
|
||||
public static boolean isJobDirValid(Path jobDirPath, FileSystem fs)
|
||||
throws IOException {
|
||||
FileStatus[] contents = fs.listStatus(jobDirPath);
|
||||
int matchCount = 0;
|
||||
if (contents != null && contents.length >= 2) {
|
||||
for (FileStatus status : contents) {
|
||||
if ("job.xml".equals(status.getPath().getName())) {
|
||||
++matchCount;
|
||||
}
|
||||
if ("job.split".equals(status.getPath().getName())) {
|
||||
++matchCount;
|
||||
}
|
||||
}
|
||||
if (matchCount == 2) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the staging area directory for the application
|
||||
*
|
||||
* @return path to staging area directory
|
||||
* @throws IOException
|
||||
*/
|
||||
public Path getStagingAreaDir() throws IOException {
|
||||
try {
|
||||
return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
|
||||
@Override
|
||||
public Path run() throws IOException, InterruptedException {
|
||||
return cluster.getStagingAreaDir();
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
// throw RuntimeException instead for compatibility reasons
|
||||
throw new RuntimeException(ie);
|
||||
}
|
||||
}
|
||||
|
||||
private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
|
||||
JobQueueInfo ret = new JobQueueInfo(queue);
|
||||
// make sure to convert any children
|
||||
|
Loading…
Reference in New Issue
Block a user