MAPREDUCE-3004. Fix ReduceTask to not assume 'local' mode in YARN. Contributed by Hitesh Shah.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1172893 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e16f8a9fdf
commit
482d840bcf
@ -1360,6 +1360,9 @@ Release 0.23.0 - Unreleased
|
|||||||
MAPREDUCE-3038. job history server not starting because conf() missing
|
MAPREDUCE-3038. job history server not starting because conf() missing
|
||||||
HsController (Jeffrey Naisbitt via mahadev)
|
HsController (Jeffrey Naisbitt via mahadev)
|
||||||
|
|
||||||
|
MAPREDUCE-3004. Fix ReduceTask to not assume 'local' mode in YARN. (Hitesh
|
||||||
|
Shah via acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -340,7 +340,14 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
|
|||||||
// Initialize the codec
|
// Initialize the codec
|
||||||
codec = initCodec();
|
codec = initCodec();
|
||||||
RawKeyValueIterator rIter = null;
|
RawKeyValueIterator rIter = null;
|
||||||
boolean isLocal = "local".equals(job.get(MRConfig.MASTER_ADDRESS, "local"));
|
|
||||||
|
boolean isLocal = false;
|
||||||
|
// local iff framework == classic && master address == local
|
||||||
|
String framework = job.get(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
|
||||||
|
if (framework.equals(MRConfig.CLASSIC_FRAMEWORK_NAME)) {
|
||||||
|
isLocal = "local".equals(job.get(MRConfig.MASTER_ADDRESS, "local"));
|
||||||
|
}
|
||||||
|
|
||||||
if (!isLocal) {
|
if (!isLocal) {
|
||||||
Class combinerClass = conf.getCombinerClass();
|
Class combinerClass = conf.getCombinerClass();
|
||||||
CombineOutputCollector combineCollector =
|
CombineOutputCollector combineCollector =
|
||||||
|
@ -65,6 +65,9 @@ public interface MRConfig {
|
|||||||
"mapreduce.jobtracker.kerberos.principal";
|
"mapreduce.jobtracker.kerberos.principal";
|
||||||
|
|
||||||
public static final String FRAMEWORK_NAME = "mapreduce.framework.name";
|
public static final String FRAMEWORK_NAME = "mapreduce.framework.name";
|
||||||
|
public static final String CLASSIC_FRAMEWORK_NAME = "classic";
|
||||||
|
public static final String YARN_FRAMEWORK_NAME = "yarn";
|
||||||
|
|
||||||
public static final String TASK_LOCAL_OUTPUT_CLASS =
|
public static final String TASK_LOCAL_OUTPUT_CLASS =
|
||||||
"mapreduce.task.local.output.class";
|
"mapreduce.task.local.output.class";
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ public class YarnClientProtocolProvider extends ClientProtocolProvider {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientProtocol create(Configuration conf) throws IOException {
|
public ClientProtocol create(Configuration conf) throws IOException {
|
||||||
if ("yarn".equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
|
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
|
||||||
return new YARNRunner(new YarnConfiguration(conf));
|
return new YARNRunner(new YarnConfiguration(conf));
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -119,7 +119,7 @@ public class TestClientRedirect {
|
|||||||
public void testRedirect() throws Exception {
|
public void testRedirect() throws Exception {
|
||||||
|
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, "yarn");
|
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
||||||
conf.set(YarnConfiguration.RM_ADDRESS, RMADDRESS);
|
conf.set(YarnConfiguration.RM_ADDRESS, RMADDRESS);
|
||||||
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, HSHOSTADDRESS);
|
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, HSHOSTADDRESS);
|
||||||
RMService rmService = new RMService("test");
|
RMService rmService = new RMService("test");
|
||||||
|
@ -59,7 +59,7 @@ public MiniMRYarnCluster(String testName) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, "yarn");
|
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
||||||
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
|
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
|
||||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
|
||||||
"apps_staging_dir/${user.name}/").getAbsolutePath());
|
"apps_staging_dir/${user.name}/").getAbsolutePath());
|
||||||
|
@ -52,6 +52,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.JobCounter;
|
import org.apache.hadoop.mapreduce.JobCounter;
|
||||||
import org.apache.hadoop.mapreduce.JobStatus;
|
import org.apache.hadoop.mapreduce.JobStatus;
|
||||||
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.Mapper;
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
@ -133,10 +134,14 @@ public void testSleepJob() throws IOException, InterruptedException,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SleepJob sleepJob = new SleepJob();
|
Configuration sleepConf = new Configuration(mrCluster.getConfig());
|
||||||
sleepJob.setConf(mrCluster.getConfig());
|
// set master address to local to test that local mode applied iff framework == classic and master_address == local
|
||||||
|
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
|
||||||
|
|
||||||
int numReduces = mrCluster.getConfig().getInt("TestMRJobs.testSleepJob.reduces", 2); // or mrCluster.getConfig().getInt(MRJobConfig.NUM_REDUCES, 2);
|
SleepJob sleepJob = new SleepJob();
|
||||||
|
sleepJob.setConf(sleepConf);
|
||||||
|
|
||||||
|
int numReduces = sleepConf.getInt("TestMRJobs.testSleepJob.reduces", 2); // or sleepConf.getConfig().getInt(MRJobConfig.NUM_REDUCES, 2);
|
||||||
|
|
||||||
// job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
|
// job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
|
||||||
Job job = sleepJob.createJob(3, numReduces, 10000, 1, 5000, 1);
|
Job job = sleepJob.createJob(3, numReduces, 10000, 1, 5000, 1);
|
||||||
|
Loading…
Reference in New Issue
Block a user