MAPREDUCE-3563. Fixed LocalJobRunner to work correctly with new mapreduce apis.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1220996 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-12-19 23:07:17 +00:00
parent dfba847eb6
commit a61a18cc09
4 changed files with 201 additions and 4 deletions

View File

@ -331,7 +331,11 @@ Release 0.23.1 - Unreleased
before the job started, so that it works properly with oozie throughout before the job started, so that it works properly with oozie throughout
the job execution. (Robert Joseph Evans via vinodkv) the job execution. (Robert Joseph Evans via vinodkv)
MAPREDUCE-3579. ConverterUtils shouldn't include a port in a path from a url without a port. (atm via harsh) MAPREDUCE-3579. ConverterUtils shouldn't include a port in a path from a url
without a port. (atm via harsh)
MAPREDUCE-3563. Fixed LocalJobRunner to work correctly with new mapreduce
apis. (acmurthy)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.QueueInfo; import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskTrackerInfo;
@ -52,11 +53,13 @@
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
/** Implements MapReduce locally, in-process, for debugging. */ /** Implements MapReduce locally, in-process, for debugging. */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -304,11 +307,44 @@ protected ExecutorService createMapExecutor(int numMapTasks) {
return executor; return executor;
} }
private org.apache.hadoop.mapreduce.OutputCommitter
createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
LOG.info("OutputCommitter set in config "
+ conf.get("mapred.output.committer.class"));
if (newApiCommitter) {
org.apache.hadoop.mapreduce.TaskID taskId =
new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0);
org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID =
new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0);
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new TaskAttemptContextImpl(conf, taskAttemptID);
OutputFormat outputFormat =
ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf);
committer = outputFormat.getOutputCommitter(taskContext);
} else {
committer = ReflectionUtils.newInstance(conf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), conf);
}
LOG.info("OutputCommitter is " + committer.getClass().getName());
return committer;
}
@Override @Override
public void run() { public void run() {
JobID jobId = profile.getJobID(); JobID jobId = profile.getJobID();
JobContext jContext = new JobContextImpl(job, jobId); JobContext jContext = new JobContextImpl(job, jobId);
OutputCommitter outputCommitter = job.getOutputCommitter();
org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
try {
outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
} catch (Exception e) {
LOG.info("Failed to createOutputCommitter", e);
return;
}
try { try {
TaskSplitMetaInfo[] taskSplitMetaInfos = TaskSplitMetaInfo[] taskSplitMetaInfos =

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Random;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestLocalModeWithNewApis {
public static final Log LOG =
LogFactory.getLog(TestLocalModeWithNewApis.class);
Configuration conf;
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
}
@After
public void tearDown() throws Exception {
}
@Test
public void testNewApis() throws Exception {
Random r = new Random(System.currentTimeMillis());
Path tmpBaseDir = new Path("/tmp/wc-" + r.nextInt());
final Path inDir = new Path(tmpBaseDir, "input");
final Path outDir = new Path(tmpBaseDir, "output");
String input = "The quick brown fox\nhas many silly\nred fox sox\n";
FileSystem inFs = inDir.getFileSystem(conf);
FileSystem outFs = outDir.getFileSystem(conf);
outFs.delete(outDir, true);
if (!inFs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
{
DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(TestLocalModeWithNewApis.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
assertEquals(job.waitForCompletion(true), true);
String output = readOutput(outDir, conf);
assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
"quick\t1\nred\t1\nsilly\t1\nsox\t1\n", output);
outFs.delete(tmpBaseDir, true);
}
static String readOutput(Path outDir, Configuration conf)
throws IOException {
FileSystem fs = outDir.getFileSystem(conf);
StringBuffer result = new StringBuffer();
Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outputFile : fileList) {
LOG.info("Path" + ": "+ outputFile);
BufferedReader file =
new BufferedReader(new InputStreamReader(fs.open(outputFile)));
String line = file.readLine();
while (line != null) {
result.append(line);
result.append("\n");
line = file.readLine();
}
file.close();
}
return result.toString();
}
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}

View File

@ -233,7 +233,7 @@ public void commitTask(TaskAttemptContext context)
" directory of task: " + attemptId + " - " + workPath); " directory of task: " + attemptId + " - " + workPath);
} }
LOG.info("Saved output of task '" + attemptId + "' to " + LOG.info("Saved output of task '" + attemptId + "' to " +
outputPath); jobOutputPath);
} }
} }
} }