MAPREDUCE-5800. Use Job#getInstance instead of deprecated constructors. (aajisaka)
This commit is contained in:
parent
c559df2219
commit
bd69fb2d44
@ -271,6 +271,9 @@ Release 2.7.0 - UNRELEASED
|
||||
MAPREDUCE-6143. add configuration for mapreduce speculative execution in
|
||||
MR2 (zxu via rkanter)
|
||||
|
||||
MAPREDUCE-5800. Use Job#getInstance instead of deprecated constructors
|
||||
(aajisaka)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-6169. MergeQueue should release reference to the current item
|
||||
|
@ -51,7 +51,7 @@ public class Job extends ControlledJob {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Job(JobConf jobConf, ArrayList<?> dependingJobs) throws IOException {
|
||||
super(new org.apache.hadoop.mapreduce.Job(jobConf),
|
||||
super(org.apache.hadoop.mapreduce.Job.getInstance(jobConf),
|
||||
(List<ControlledJob>) dependingJobs);
|
||||
}
|
||||
|
||||
@ -93,7 +93,7 @@ public synchronized JobConf getJobConf() {
|
||||
*/
|
||||
public synchronized void setJobConf(JobConf jobConf) {
|
||||
try {
|
||||
super.setJob(new org.apache.hadoop.mapreduce.Job(jobConf));
|
||||
super.setJob(org.apache.hadoop.mapreduce.Job.getInstance(jobConf));
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Exception" + ioe);
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ public CombineFileInputFormat() {
|
||||
public InputSplit[] getSplits(JobConf job, int numSplits)
|
||||
throws IOException {
|
||||
List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =
|
||||
super.getSplits(new Job(job));
|
||||
super.getSplits(Job.getInstance(job));
|
||||
InputSplit[] ret = new InputSplit[newStyleSplits.size()];
|
||||
for(int pos = 0; pos < newStyleSplits.size(); ++pos) {
|
||||
org.apache.hadoop.mapreduce.lib.input.CombineFileSplit newStyleSplit =
|
||||
@ -129,7 +129,7 @@ public org.apache.hadoop.mapreduce.RecordReader<K, V> createRecordReader(
|
||||
* @throws IOException if zero items.
|
||||
*/
|
||||
protected FileStatus[] listStatus(JobConf job) throws IOException {
|
||||
List<FileStatus> result = super.listStatus(new Job(job));
|
||||
List<FileStatus> result = super.listStatus(Job.getInstance(job));
|
||||
return result.toArray(new FileStatus[result.size()]);
|
||||
}
|
||||
|
||||
|
@ -46,7 +46,7 @@ public InputSampler(JobConf conf) {
|
||||
|
||||
public static <K,V> void writePartitionFile(JobConf job, Sampler<K,V> sampler)
|
||||
throws IOException, ClassNotFoundException, InterruptedException {
|
||||
writePartitionFile(new Job(job), sampler);
|
||||
writePartitionFile(Job.getInstance(job), sampler);
|
||||
}
|
||||
/**
|
||||
* Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
|
||||
|
@ -177,7 +177,7 @@ public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
|
||||
/** {@inheritDoc} */
|
||||
public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
|
||||
List<org.apache.hadoop.mapreduce.InputSplit> newSplits =
|
||||
super.getSplits(new Job(job));
|
||||
super.getSplits(Job.getInstance(job));
|
||||
InputSplit[] ret = new InputSplit[newSplits.size()];
|
||||
int i = 0;
|
||||
for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
|
||||
|
@ -120,7 +120,7 @@ public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
|
||||
*/
|
||||
@Deprecated
|
||||
public Job() throws IOException {
|
||||
this(new Configuration());
|
||||
this(new JobConf(new Configuration()));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -136,7 +136,7 @@ public Job(Configuration conf) throws IOException {
|
||||
*/
|
||||
@Deprecated
|
||||
public Job(Configuration conf, String jobName) throws IOException {
|
||||
this(conf);
|
||||
this(new JobConf(conf));
|
||||
setJobName(jobName);
|
||||
}
|
||||
|
||||
|
@ -164,7 +164,7 @@ public static Job createValueAggregatorJob(Configuration conf, String args[])
|
||||
conf.set(MRJobConfig.JAR, userJarFile);
|
||||
}
|
||||
|
||||
Job theJob = new Job(conf);
|
||||
Job theJob = Job.getInstance(conf);
|
||||
if (userJarFile == null) {
|
||||
theJob.setJarByClass(ValueAggregator.class);
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ public class DelegatingInputFormat<K, V> extends InputFormat<K, V> {
|
||||
public List<InputSplit> getSplits(JobContext job)
|
||||
throws IOException, InterruptedException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
Job jobCopy =new Job(conf);
|
||||
Job jobCopy = Job.getInstance(conf);
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
Map<Path, InputFormat> formatMap =
|
||||
MultipleInputs.getInputFormatMap(job);
|
||||
|
@ -84,7 +84,7 @@ public ControlledJob(Job job, List<ControlledJob> dependingJobs)
|
||||
* @throws IOException
|
||||
*/
|
||||
public ControlledJob(Configuration conf) throws IOException {
|
||||
this(new Job(conf), null);
|
||||
this(Job.getInstance(conf), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -323,7 +323,7 @@ public void parse(List<Token> ll, Configuration conf) throws IOException {
|
||||
}
|
||||
|
||||
private Configuration getConf(Configuration jconf) throws IOException {
|
||||
Job job = new Job(jconf);
|
||||
Job job = Job.getInstance(jconf);
|
||||
FileInputFormat.setInputPaths(job, indir);
|
||||
return job.getConfiguration();
|
||||
}
|
||||
|
@ -503,7 +503,7 @@ private TaskAttemptContext getContext(String nameOutput) throws IOException {
|
||||
|
||||
// The following trick leverages the instantiation of a record writer via
|
||||
// the job thus supporting arbitrary output formats.
|
||||
Job job = new Job(context.getConfiguration());
|
||||
Job job = Job.getInstance(context.getConfiguration());
|
||||
job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
|
||||
job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
|
||||
job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
|
||||
|
@ -348,7 +348,7 @@ public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
|
||||
* Configures a JobConf instance and calls {@link #writePartitionFile}.
|
||||
*/
|
||||
public int run(String[] args) throws Exception {
|
||||
Job job = new Job(getConf());
|
||||
Job job = Job.getInstance(getConf());
|
||||
ArrayList<String> otherArgs = new ArrayList<String>();
|
||||
Sampler<K,V> sampler = null;
|
||||
for(int i=0; i < args.length; ++i) {
|
||||
|
@ -83,7 +83,7 @@ public void setConf(Configuration conf) {
|
||||
? FileSystem.getLocal(conf) // assume in DistributedCache
|
||||
: partFile.getFileSystem(conf);
|
||||
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
|
||||
K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
|
||||
if (splitPoints.length != job.getNumReduceTasks() - 1) {
|
||||
|
@ -65,7 +65,7 @@ public Job createJob(Configuration conf) throws IOException {
|
||||
}
|
||||
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
|
||||
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
|
||||
job.setJarByClass(RandomTextWriterJob.class);
|
||||
job.setJobName("random-text-writer");
|
||||
|
@ -185,7 +185,7 @@ public void launchTest(JobConf conf,
|
||||
|
||||
// Launch job with default option for temp dir.
|
||||
// i.e. temp dir is ./tmp
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
job.addFileToClassPath(APP_JAR);
|
||||
job.setJarByClass(TestMiniMRChildTask.class);
|
||||
job.setMaxMapAttempts(1); // speed up failures
|
||||
@ -537,7 +537,7 @@ void runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs)
|
||||
conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts);
|
||||
conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts);
|
||||
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
job.addFileToClassPath(APP_JAR);
|
||||
job.setJarByClass(TestMiniMRChildTask.class);
|
||||
job.setMaxMapAttempts(1); // speed up failures
|
||||
|
@ -187,7 +187,7 @@ private static void createFile(Path inFile, Configuration conf)
|
||||
}
|
||||
|
||||
public static Job createJob() throws IOException {
|
||||
final Job baseJob = new Job(mrCluster.getConfig());
|
||||
final Job baseJob = Job.getInstance(mrCluster.getConfig());
|
||||
baseJob.setOutputKeyClass(Text.class);
|
||||
baseJob.setOutputValueClass(IntWritable.class);
|
||||
baseJob.setMapperClass(MyMapper.class);
|
||||
|
@ -231,8 +231,7 @@ public int run(String[] args) throws Exception {
|
||||
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMb);
|
||||
conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
job.setJarByClass(LargeSorter.class);
|
||||
job.setJobName("large-sorter");
|
||||
FileOutputFormat.setOutputPath(job, outDir);
|
||||
|
@ -195,7 +195,7 @@ public int run(String[] args) throws Exception {
|
||||
}
|
||||
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
|
||||
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
|
||||
job.setJarByClass(RandomTextWriter.class);
|
||||
job.setJobName("random-text-writer");
|
||||
|
@ -261,7 +261,7 @@ public int run(String[] args) throws Exception {
|
||||
}
|
||||
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
|
||||
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
|
||||
job.setJarByClass(RandomWriter.class);
|
||||
job.setJobName("random-writer");
|
||||
|
@ -547,7 +547,7 @@ protected Job runFailingMapperJob()
|
||||
myConf.setInt(MRJobConfig.NUM_MAPS, 1);
|
||||
myConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts
|
||||
|
||||
Job job = new Job(myConf);
|
||||
Job job = Job.getInstance(myConf);
|
||||
|
||||
job.setJarByClass(FailingMapper.class);
|
||||
job.setJobName("failmapper");
|
||||
|
@ -77,7 +77,7 @@ public static void main(String [] args) throws Exception
|
||||
{
|
||||
Path outDir = new Path("output");
|
||||
Configuration conf = new Configuration();
|
||||
Job job = new Job(conf, "user name check");
|
||||
Job job = Job.getInstance(conf, "user name check");
|
||||
|
||||
|
||||
job.setJarByClass(UserNamePermission.class);
|
||||
|
@ -315,7 +315,7 @@ public void close() {
|
||||
/** Create and setup a job */
|
||||
private static Job createJob(String name, Configuration conf
|
||||
) throws IOException {
|
||||
final Job job = new Job(conf, NAME + "_" + name);
|
||||
final Job job = Job.getInstance(conf, NAME + "_" + name);
|
||||
final Configuration jobconf = job.getConfiguration();
|
||||
job.setJarByClass(BaileyBorweinPlouffe.class);
|
||||
|
||||
|
@ -56,7 +56,7 @@ public int run(String[] args) throws Exception {
|
||||
if (args.length == 4)
|
||||
conf.set(RegexMapper.GROUP, args[3]);
|
||||
|
||||
Job grepJob = new Job(conf);
|
||||
Job grepJob = Job.getInstance(conf);
|
||||
|
||||
try {
|
||||
|
||||
@ -77,7 +77,7 @@ public int run(String[] args) throws Exception {
|
||||
|
||||
grepJob.waitForCompletion(true);
|
||||
|
||||
Job sortJob = new Job(conf);
|
||||
Job sortJob = Job.getInstance(conf);
|
||||
sortJob.setJobName("grep-sort");
|
||||
sortJob.setJarByClass(Grep.class);
|
||||
|
||||
|
@ -89,7 +89,7 @@ public int run(String[] args) throws Exception {
|
||||
num_reduces = cluster.getTaskTrackers() *
|
||||
Integer.parseInt(join_reduces);
|
||||
}
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
job.setJobName("join");
|
||||
job.setJarByClass(Sort.class);
|
||||
|
||||
|
@ -229,7 +229,7 @@ public int run(String[] args) throws Exception {
|
||||
return 2;
|
||||
}
|
||||
|
||||
Job job = new Job(getConf());
|
||||
Job job = Job.getInstance(getConf());
|
||||
job.setJobName("MultiFileWordCount");
|
||||
job.setJarByClass(MultiFileWordCount.class);
|
||||
|
||||
|
@ -248,7 +248,7 @@ public void cleanup(Context context) throws IOException {
|
||||
public static BigDecimal estimatePi(int numMaps, long numPoints,
|
||||
Path tmpDir, Configuration conf
|
||||
) throws IOException, ClassNotFoundException, InterruptedException {
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
//setup job conf
|
||||
job.setJobName(QuasiMonteCarlo.class.getSimpleName());
|
||||
job.setJarByClass(QuasiMonteCarlo.class);
|
||||
|
@ -195,7 +195,7 @@ public int run(String[] args) throws Exception {
|
||||
}
|
||||
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
|
||||
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
|
||||
job.setJarByClass(RandomTextWriter.class);
|
||||
job.setJobName("random-text-writer");
|
||||
|
@ -261,7 +261,7 @@ public int run(String[] args) throws Exception {
|
||||
}
|
||||
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
|
||||
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
|
||||
job.setJarByClass(RandomWriter.class);
|
||||
job.setJobName("random-writer");
|
||||
|
@ -214,7 +214,7 @@ public static void main(String[] args) throws Exception {
|
||||
System.err.println("Usage: secondarysort <in> <out>");
|
||||
System.exit(2);
|
||||
}
|
||||
Job job = new Job(conf, "secondary sort");
|
||||
Job job = Job.getInstance(conf, "secondary sort");
|
||||
job.setJarByClass(SecondarySort.class);
|
||||
job.setMapperClass(MapClass.class);
|
||||
job.setReducerClass(Reduce.class);
|
||||
|
@ -132,7 +132,7 @@ public int run(String[] args) throws Exception {
|
||||
}
|
||||
}
|
||||
// Set user-supplied (possibly default) job configs
|
||||
job = new Job(conf);
|
||||
job = Job.getInstance(conf);
|
||||
job.setJobName("sorter");
|
||||
job.setJarByClass(Sort.class);
|
||||
|
||||
|
@ -72,7 +72,7 @@ public static void main(String[] args) throws Exception {
|
||||
System.err.println("Usage: wordcount <in> [<in>...] <out>");
|
||||
System.exit(2);
|
||||
}
|
||||
Job job = new Job(conf, "word count");
|
||||
Job job = Job.getInstance(conf, "word count");
|
||||
job.setJarByClass(WordCount.class);
|
||||
job.setMapperClass(TokenizerMapper.class);
|
||||
job.setCombinerClass(IntSumReducer.class);
|
||||
|
@ -172,8 +172,7 @@ public int run(String[] args) throws Exception {
|
||||
|
||||
Configuration conf = getConf();
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
Job job = new Job(conf, "word mean");
|
||||
Job job = Job.getInstance(conf, "word mean");
|
||||
job.setJarByClass(WordMean.class);
|
||||
job.setMapperClass(WordMeanMapper.class);
|
||||
job.setCombinerClass(WordMeanReducer.class);
|
||||
|
@ -181,8 +181,7 @@ public int run(String[] args) throws Exception {
|
||||
setConf(new Configuration());
|
||||
Configuration conf = getConf();
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
Job job = new Job(conf, "word median");
|
||||
Job job = Job.getInstance(conf, "word median");
|
||||
job.setJarByClass(WordMedian.class);
|
||||
job.setMapperClass(WordMedianMapper.class);
|
||||
job.setCombinerClass(WordMedianReducer.class);
|
||||
|
@ -189,8 +189,7 @@ public int run(String[] args) throws Exception {
|
||||
|
||||
Configuration conf = getConf();
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
Job job = new Job(conf, "word stddev");
|
||||
Job job = Job.getInstance(conf, "word stddev");
|
||||
job.setJarByClass(WordStandardDeviation.class);
|
||||
job.setMapperClass(WordStandardDeviationMapper.class);
|
||||
job.setCombinerClass(WordStandardDeviationReducer.class);
|
||||
|
@ -198,7 +198,7 @@ public int run(String[] args) throws Exception {
|
||||
Path input = new Path(output + "_input");
|
||||
FileSystem fileSys = FileSystem.get(conf);
|
||||
try {
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
FileInputFormat.setInputPaths(job, input);
|
||||
FileOutputFormat.setOutputPath(job, output);
|
||||
job.setJarByClass(PentMap.class);
|
||||
|
@ -432,7 +432,8 @@ private Machine chooseMachine(Configuration conf) throws IOException {
|
||||
|
||||
/** Create a job */
|
||||
private Job createJob(String name, Summation sigma) throws IOException {
|
||||
final Job job = new Job(getConf(), parameters.remoteDir + "/" + name);
|
||||
final Job job = Job.getInstance(getConf(), parameters.remoteDir + "/" +
|
||||
name);
|
||||
final Configuration jobconf = job.getConfiguration();
|
||||
job.setJarByClass(DistSum.class);
|
||||
jobconf.setInt(N_PARTS, parameters.nParts);
|
||||
|
@ -114,8 +114,8 @@ public Job run() throws IOException {
|
||||
String jobId = null == jobdesc.getJobID()
|
||||
? "<unknown>"
|
||||
: jobdesc.getJobID().toString();
|
||||
Job ret = new Job(conf,
|
||||
nameFormat.get().format("%06d", seq).toString());
|
||||
Job ret = Job.getInstance(conf, nameFormat.get().format("%06d", seq)
|
||||
.toString());
|
||||
ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
|
||||
|
||||
ret.getConfiguration().set(Gridmix.ORIGINAL_JOB_ID, jobId);
|
||||
@ -343,7 +343,7 @@ protected GridmixJob(final Configuration conf, long submissionMillis,
|
||||
try {
|
||||
job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
|
||||
public Job run() throws IOException {
|
||||
Job ret = new Job(conf, name);
|
||||
Job ret = Job.getInstance(conf, name);
|
||||
ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
|
||||
setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
|
||||
return ret;
|
||||
|
@ -157,7 +157,7 @@ private static void runDataGenJob(Configuration conf, Path tempDir)
|
||||
// get the local job runner
|
||||
conf.setInt(MRJobConfig.NUM_MAPS, 1);
|
||||
|
||||
Job job = new Job(conf);
|
||||
Job job = Job.getInstance(conf);
|
||||
|
||||
CompressionEmulationUtil.configure(job);
|
||||
job.setInputFormatClass(CustomInputFormat.class);
|
||||
|
@ -109,7 +109,7 @@ public void createInput() throws IOException {
|
||||
@Test
|
||||
public void testStreamXmlRecordReader() throws Exception {
|
||||
|
||||
Job job = new Job();
|
||||
Job job = Job.getInstance();
|
||||
Configuration conf = job.getConfiguration();
|
||||
job.setJarByClass(TestStreamXmlRecordReader.class);
|
||||
job.setMapperClass(Mapper.class);
|
||||
|
Loading…
Reference in New Issue
Block a user