From e1acb1222dd6fdb8fa688c815cbca6ae4193745d Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Sun, 18 Sep 2011 02:50:30 +0000 Subject: [PATCH] MAPREDUCE-1788. o.a.h.mapreduce.Job shouldn't make a copy of the JobConf. (Arun Murthy via mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1172171 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../org/apache/hadoop/mapred/JobClient.java | 2 +- .../org/apache/hadoop/mapreduce/Cluster.java | 4 +- .../java/org/apache/hadoop/mapreduce/Job.java | 141 ++++++++++++++---- .../apache/hadoop/mapreduce/JobSubmitter.java | 1 - .../hadoop/mapreduce/task/JobContextImpl.java | 7 +- .../apache/hadoop/mapreduce/tools/CLI.java | 2 +- .../mapreduce/TestJobMonitorAndPrint.java | 2 +- .../org/apache/hadoop/mapred/YARNRunner.java | 13 -- .../hadoop/mapred/TestClientRedirect.java | 4 +- .../examples/terasort/TeraChecksum.java | 2 +- .../hadoop/examples/terasort/TeraGen.java | 2 +- .../hadoop/examples/terasort/TeraSort.java | 2 +- .../examples/terasort/TeraValidate.java | 2 +- .../hadoop/mapred/TestClusterStatus.java | 2 +- .../apache/hadoop/mapred/TestJobCounters.java | 2 +- .../hadoop/mapreduce/TestMapCollection.java | 10 +- .../TestTrackerDistributedCacheManager.java | 8 +- 18 files changed, 145 insertions(+), 64 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index bedeeb86c6..38c3412696 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1352,6 +1352,9 @@ Release 0.22.0 - Unreleased MAPREDUCE-2994. Fixed a bug in ApplicationID parsing that affects RM UI. (Devaraj K via vinodkv) + MAPREDUCE-1788. o.a.h.mapreduce.Job shouldn't make a copy of the JobConf. + (Arun Murthy via mahadev) + NEW FEATURES MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index 8a45bef3e6..9382dc4a97 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -535,7 +535,7 @@ public RunningJob submitJob(JobConf conf) throws FileNotFoundException, try { conf.setBooleanIfUnset("mapred.mapper.new-api", false); conf.setBooleanIfUnset("mapred.reducer.new-api", false); - Job job = Job.getInstance(cluster, conf); + Job job = Job.getInstance(conf); job.submit(); return new NetworkedJob(job); } catch (InterruptedException ie) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java index 3a508ce665..5112c86e7b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java @@ -112,7 +112,7 @@ public synchronized void close() throws IOException { private Job[] getJobs(JobStatus[] stats) throws IOException { List jobs = new ArrayList(); for (JobStatus stat : stats) { - jobs.add(new Job(this, stat, new JobConf(stat.getJobFile()))); + jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile()))); } return jobs.toArray(new Job[0]); } @@ -152,7 +152,7 @@ public FileSystem run() throws IOException, InterruptedException { public Job getJob(JobID jobId) throws IOException, InterruptedException { JobStatus status = client.getJobStatus(jobId); if (status != null) { - return new Job(this, status, new JobConf(status.getJobFile())); + return Job.getInstance(this, status, new JobConf(status.getJobFile())); } return null; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index e59bbc9bdf..c30216e066 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -31,22 +31,22 @@ import java.net.URI; import java.security.PrivilegedExceptionAction; -import javax.security.auth.login.LoginException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration.IntegerRanges; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.util.ConfigUtil; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; /** @@ -130,7 +130,7 @@ public Job() throws IOException { @Deprecated public Job(Configuration conf) throws IOException { - this(new Cluster(conf), conf); + this(new JobConf(conf)); } @Deprecated @@ -139,18 +139,13 @@ public Job(Configuration conf, String jobName) throws IOException { setJobName(jobName); } - Job(Cluster cluster) throws IOException { - this(cluster, new Configuration()); - } - - Job(Cluster cluster, Configuration conf) throws IOException { + Job(JobConf conf) throws IOException { super(conf, null); - this.cluster = cluster; + this.cluster = null; } - Job(Cluster cluster, JobStatus status, - Configuration conf) throws IOException { - this(cluster, conf); + Job(JobStatus status, JobConf conf) throws IOException { + this(conf); setJobID(status.getJobID()); this.status = status; state = JobState.RUNNING; @@ -170,7 +165,13 @@ public static Job getInstance() throws IOException { } /** - * Creates a new {@link Job} with no particular {@link Cluster} . + * Creates a new {@link Job} with no particular {@link Cluster} and a + * given {@link Configuration}. + * + * The Job makes a copy of the Configuration so + * that any necessary internal modifications do not reflect on the incoming + * parameter. + * * A Cluster will be created from the conf parameter only when it's needed. * * @param conf the configuration @@ -179,13 +180,18 @@ public static Job getInstance() throws IOException { */ public static Job getInstance(Configuration conf) throws IOException { // create with a null Cluster - return new Job(null, conf); + JobConf jobConf = new JobConf(conf); + return new Job(jobConf); } /** * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. * A Cluster will be created from the conf parameter only when it's needed. + * + * The Job makes a copy of the Configuration so + * that any necessary internal modifications do not reflect on the incoming + * parameter. * * @param conf the configuration * @return the {@link Job} , with no connection to a cluster yet. @@ -194,25 +200,92 @@ public static Job getInstance(Configuration conf) throws IOException { public static Job getInstance(Configuration conf, String jobName) throws IOException { // create with a null Cluster - Job result = new Job(null, conf); + Job result = getInstance(conf); result.setJobName(jobName); return result; } - public static Job getInstance(Cluster cluster) throws IOException { - return new Job(cluster); + /** + * Creates a new {@link Job} with no particular {@link Cluster} and given + * {@link Configuration} and {@link JobStatus}. + * A Cluster will be created from the conf parameter only when it's needed. + * + * The Job makes a copy of the Configuration so + * that any necessary internal modifications do not reflect on the incoming + * parameter. + * + * @param status job status + * @param conf job configuration + * @return the {@link Job} , with no connection to a cluster yet. + * @throws IOException + */ + public static Job getInstance(JobStatus status, Configuration conf) + throws IOException { + return new Job(status, new JobConf(conf)); + } + + /** + * Creates a new {@link Job} with no particular {@link Cluster}. + * A Cluster will be created from the conf parameter only when it's needed. + * + * The Job makes a copy of the Configuration so + * that any necessary internal modifications do not reflect on the incoming + * parameter. + * + * @param ignored + * @return the {@link Job} , with no connection to a cluster yet. + * @throws IOException + * @deprecated Use {@link #getInstance()} + */ + @Deprecated + public static Job getInstance(Cluster ignored) throws IOException { + return getInstance(); } - public static Job getInstance(Cluster cluster, Configuration conf) + /** + * Creates a new {@link Job} with no particular {@link Cluster} and given + * {@link Configuration}. + * A Cluster will be created from the conf parameter only when it's needed. + * + * The Job makes a copy of the Configuration so + * that any necessary internal modifications do not reflect on the incoming + * parameter. + * + * @param ignored + * @param conf job configuration + * @return the {@link Job} , with no connection to a cluster yet. + * @throws IOException + * @deprecated Use {@link #getInstance(Configuration)} + */ + @Deprecated + public static Job getInstance(Cluster ignored, Configuration conf) throws IOException { - return new Job(cluster, conf); + return getInstance(conf); } + /** + * Creates a new {@link Job} with no particular {@link Cluster} and given + * {@link Configuration} and {@link JobStatus}. + * A Cluster will be created from the conf parameter only when it's needed. + * + * The Job makes a copy of the Configuration so + * that any necessary internal modifications do not reflect on the incoming + * parameter. + * + * @param cluster cluster + * @param status job status + * @param conf job configuration + * @return the {@link Job} , with no connection to a cluster yet. + * @throws IOException + */ + @Private public static Job getInstance(Cluster cluster, JobStatus status, Configuration conf) throws IOException { - return new Job(cluster, status, conf); + Job job = getInstance(status, conf); + job.setCluster(cluster); + return job; } - + private void ensureState(JobState state) throws IllegalStateException { if (state != this.state) { throw new IllegalStateException("Job in state "+ this.state + @@ -254,6 +327,10 @@ public JobStatus getStatus() throws IOException, InterruptedException { updateStatus(); return status; } + + private void setStatus(JobStatus status) { + this.status = status; + } /** * Returns the current state of the Job. @@ -354,6 +431,12 @@ public boolean isRetired() throws IOException, InterruptedException { return status.isRetired(); } + /** Only for mocks in unit tests. */ + @Private + private void setCluster(Cluster cluster) { + this.cluster = cluster; + } + /** * Dump stats to screen. */ @@ -1055,6 +1138,12 @@ boolean isConnected() { return cluster != null; } + /** Only for mocking via unit tests. */ + @Private + public JobSubmitter getJobSubmitter(FileSystem fs, + ClientProtocol submitClient) throws IOException { + return new JobSubmitter(fs, submitClient); + } /** * Submit the job to the cluster and return immediately. * @throws IOException @@ -1064,8 +1153,8 @@ public void submit() ensureState(JobState.DEFINE); setUseNewAPI(); connect(); - final JobSubmitter submitter = new JobSubmitter(cluster.getFileSystem(), - cluster.getClient()); + final JobSubmitter submitter = + getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { @@ -1114,7 +1203,7 @@ public boolean monitorAndPrintJob() throws IOException, InterruptedException { String lastReport = null; Job.TaskStatusFilter filter; - Configuration clientConf = cluster.getConf(); + Configuration clientConf = getConfiguration(); filter = Job.getTaskOutputFilter(clientConf); JobID jobId = getJobID(); LOG.info("Running job: " + jobId); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index 2c268f176f..2224cb967f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -319,7 +319,6 @@ private void copyAndConfigureFiles(Job job, Path jobSubmitDir) * @throws InterruptedException * @throws IOException */ - @SuppressWarnings("unchecked") JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java index 85cdaf07be..fb48ad900a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -60,7 +61,11 @@ public class JobContextImpl implements JobContext { protected final Credentials credentials; public JobContextImpl(Configuration conf, JobID jobId) { - this.conf = new org.apache.hadoop.mapred.JobConf(conf); + if (conf instanceof JobConf) { + this.conf = (JobConf)conf; + } else { + this.conf = new JobConf(conf); + } this.jobId = jobId; this.credentials = this.conf.getCredentials(); try { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java index 518bc4025d..464161bb93 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java @@ -215,7 +215,7 @@ public int run(String[] argv) throws Exception { // Submit the request try { if (submitJobFile != null) { - Job job = Job.getInstance(cluster, new JobConf(submitJobFile)); + Job job = Job.getInstance(new JobConf(submitJobFile)); job.submit(); System.out.println("Created job " + job.getJobID()); exitCode = 0; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java index 2098cd8d1b..f18cbe3318 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java @@ -64,7 +64,7 @@ public void setUp() throws IOException { when(cluster.getClient()).thenReturn(clientProtocol); JobStatus jobStatus = new JobStatus(new JobID("job_000", 1), 0f, 0f, 0f, 0f, State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url"); - job = new Job(cluster, jobStatus, conf); + job = Job.getInstance(cluster, jobStatus, conf); job = spy(job); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 3751646010..ff59b0bb44 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -223,23 +223,10 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throw new YarnException(e); } - // XXX Remove - Path submitJobDir = new Path(jobSubmitDir); - FileContext defaultFS = FileContext.getFileContext(conf); - Path submitJobFile = - defaultFS.makeQualified(JobSubmissionFiles.getJobConfPath(submitJobDir)); - FSDataInputStream in = defaultFS.open(submitJobFile); - conf.addResource(in); - // --- - // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); - // XXX Remove - in.close(); - // --- - // Submit to ResourceManager ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 492ecc8780..928f25288d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -139,8 +139,8 @@ public void testRedirect() throws Exception { Cluster cluster = new Cluster(conf); org.apache.hadoop.mapreduce.JobID jobID = new org.apache.hadoop.mapred.JobID("201103121733", 1); - org.apache.hadoop.mapreduce.Counters counters = cluster.getJob(jobID) - .getCounters(); + org.apache.hadoop.mapreduce.Counters counters = + cluster.getJob(jobID).getCounters(); validateCounters(counters); Assert.assertTrue(amContact); diff --git a/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java b/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java index 1664742c25..652f32c00b 100644 --- a/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java +++ b/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java @@ -74,7 +74,7 @@ private static void usage() throws IOException { } public int run(String[] args) throws Exception { - Job job = Job.getInstance(new Cluster(getConf()), getConf()); + Job job = Job.getInstance(getConf()); if (args.length != 2) { usage(); return 2; diff --git a/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java b/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java index ad513b83b5..ec99c19bc4 100644 --- a/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java +++ b/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java @@ -280,7 +280,7 @@ private static long parseHumanLong(String str) { */ public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException { - Job job = Job.getInstance(new Cluster(getConf()), getConf()); + Job job = Job.getInstance(getConf()); if (args.length != 2) { usage(); return 2; diff --git a/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java b/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java index 167ab4e539..2ce8155b47 100644 --- a/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java +++ b/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java @@ -280,7 +280,7 @@ public static void setOutputReplication(Job job, int value) { public int run(String[] args) throws Exception { LOG.info("starting"); - Job job = Job.getInstance(new Cluster(getConf()), getConf()); + Job job = Job.getInstance(getConf()); Path inputDir = new Path(args[0]); Path outputDir = new Path(args[1]); boolean useSimplePartitioner = getUseSimplePartitioner(job); diff --git a/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java b/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java index c0539520e9..6cff25f5de 100644 --- a/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java +++ b/hadoop-mapreduce-project/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java @@ -157,7 +157,7 @@ private static void usage() throws IOException { } public int run(String[] args) throws Exception { - Job job = Job.getInstance(new Cluster(getConf()), getConf()); + Job job = Job.getInstance(getConf()); if (args.length != 2) { usage(); return 1; diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java index 86cbf31984..eb0f77c586 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java @@ -208,7 +208,7 @@ public void testReservedSlots() throws Exception { Configuration conf = mr.createJobConf(); conf.setInt(JobContext.NUM_MAPS, 1); - Job job = Job.getInstance(cluster, conf); + Job job = Job.getInstance(conf); job.setNumReduceTasks(1); job.setSpeculativeExecution(false); job.setJobSetupCleanupNeeded(false); diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java index 5d7e21af26..88b5fa295f 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java @@ -199,7 +199,7 @@ public static JobConf createConfiguration() throws IOException { public static Job createJob() throws IOException { final Configuration conf = new Configuration(); - final Job baseJob = Job.getInstance(new Cluster(conf), conf); + final Job baseJob = Job.getInstance(conf); baseJob.setOutputKeyClass(Text.class); baseJob.setOutputValueClass(IntWritable.class); baseJob.setMapperClass(NewMapTokenizer.class); diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestMapCollection.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestMapCollection.java index 342f549233..05b434808e 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestMapCollection.java +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestMapCollection.java @@ -298,7 +298,7 @@ private static void runTest(String name, int keylen, int vallen, throws Exception { Configuration conf = new Configuration(); conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100); - Job job = Job.getInstance(new Cluster(conf), conf); + Job job = Job.getInstance(conf); conf = job.getConfiguration(); conf.setInt(MRJobConfig.IO_SORT_MB, ioSortMB); conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(spillPer)); @@ -409,7 +409,7 @@ public void testPostSpillMeta() throws Exception { // no writes into the serialization buffer Configuration conf = new Configuration(); conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100); - Job job = Job.getInstance(new Cluster(conf), conf); + Job job = Job.getInstance(conf); conf = job.getConfiguration(); conf.setInt(MRJobConfig.IO_SORT_MB, 1); // 2^20 * spill = 14336 bytes available post-spill, at most 896 meta @@ -427,7 +427,7 @@ public void testPostSpillMeta() throws Exception { public void testLargeRecConcurrent() throws Exception { Configuration conf = new Configuration(); conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100); - Job job = Job.getInstance(new Cluster(conf), conf); + Job job = Job.getInstance(conf); conf = job.getConfiguration(); conf.setInt(MRJobConfig.IO_SORT_MB, 1); conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(.986328125f)); @@ -496,7 +496,7 @@ public int valLen(int i) { public void testRandom() throws Exception { Configuration conf = new Configuration(); conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100); - Job job = Job.getInstance(new Cluster(conf), conf); + Job job = Job.getInstance(conf); conf = job.getConfiguration(); conf.setInt(MRJobConfig.IO_SORT_MB, 1); conf.setClass("test.mapcollection.class", RandomFactory.class, @@ -517,7 +517,7 @@ public void testRandom() throws Exception { public void testRandomCompress() throws Exception { Configuration conf = new Configuration(); conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100); - Job job = Job.getInstance(new Cluster(conf), conf); + Job job = Job.getInstance(conf); conf = job.getConfiguration(); conf.setInt(MRJobConfig.IO_SORT_MB, 1); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java index 5ff804a5ad..8f44754f5e 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java @@ -234,12 +234,11 @@ public void testReferenceCount() throws IOException, LoginException, } TrackerDistributedCacheManager manager = new FakeTrackerDistributedCacheManager(conf); - Cluster cluster = new Cluster(conf); String userName = getJobOwnerName(); File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString()); // Configures a job with a regular file - Job job1 = Job.getInstance(cluster, conf); + Job job1 = Job.getInstance(conf); job1.setUser(userName); job1.addCacheFile(secondCacheFile.toUri()); Configuration conf1 = job1.getConfiguration(); @@ -262,7 +261,7 @@ public void testReferenceCount() throws IOException, LoginException, createPrivateTempFile(thirdCacheFile); // Configures another job with three regular files. - Job job2 = Job.getInstance(cluster, conf); + Job job2 = Job.getInstance(conf); job2.setUser(userName); // add a file that would get failed to localize job2.addCacheFile(firstCacheFile.toUri()); @@ -366,7 +365,6 @@ private Path checkLocalizedPath(boolean visibility) throws IOException, LoginException, InterruptedException { TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); - Cluster cluster = new Cluster(conf); String userName = getJobOwnerName(); File workDir = new File(TEST_ROOT_DIR, "workdir"); Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile"); @@ -376,7 +374,7 @@ private Path checkLocalizedPath(boolean visibility) createPrivateTempFile(cacheFile); } - Job job1 = Job.getInstance(cluster, conf); + Job job1 = Job.getInstance(conf); job1.setUser(userName); job1.addCacheFile(cacheFile.toUri()); Configuration conf1 = job1.getConfiguration();