diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e4e086cb80..f64af4a4a2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -34,6 +34,9 @@ Trunk (unreleased changes) MAPREDUCE-3149. Add a test to verify that TokenCache handles file system uri with no authority. (John George via jitendra) + MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides + client APIs cross MR1 and MR2. (Ahmed via tucu) + BUG FIXES MAPREDUCE-3346. [Rumen] LoggedTaskAttempt#getHostName() returns null. (amarrk) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java new file mode 100644 index 0000000000..dc4687b580 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/* + * A simple interface for a client MR cluster used for testing. This interface + * provides basic methods which are independent of the underlying Mini Cluster ( + * either through MR1 or MR2). + */ +public interface MiniMRClientCluster { + + public void start() throws IOException; + + public void stop() throws IOException; + + public Configuration getConfig() throws IOException; + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java new file mode 100644 index 0000000000..3e4b34b556 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; + +/** + * A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster + * interface around the MiniMRYarnCluster. While in MR1, it provides such + * wrapper around MiniMRCluster. This factory should be used in tests to provide + * an easy migration of tests across MR1 and MR2. + */ +public class MiniMRClientClusterFactory { + + public static MiniMRClientCluster create(Class caller, int noOfNMs, + Configuration conf) throws IOException { + + if (conf == null) { + conf = new Configuration(); + } + + FileSystem fs = FileSystem.get(conf); + + Path testRootDir = new Path("target", caller.getName() + "-tmpDir") + .makeQualified(fs); + Path appJar = new Path(testRootDir, "MRAppJar.jar"); + + // Copy MRAppJar and make it private. + Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR); + + fs.copyFromLocalFile(appMasterJar, appJar); + fs.setPermission(appJar, new FsPermission("700")); + + Job job = Job.getInstance(conf); + + job.addFileToClassPath(appJar); + job.setJarByClass(caller); + + MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller + .getName(), noOfNMs); + miniMRYarnCluster.init(job.getConfiguration()); + miniMRYarnCluster.start(); + + return new MiniMRYarnClusterAdapter(miniMRYarnCluster); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java new file mode 100644 index 0000000000..a73b959ec3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * This class is an MR2 replacement for older MR1 MiniMRCluster, that was used + * by tests prior to MR2. This replacement class uses the new MiniMRYarnCluster + * in MR2 but provides the same old MR1 interface, so tests can be migrated from + * MR1 to MR2 with minimal changes. + * + * Due to major differences between MR1 and MR2, a number of methods are either + * unimplemented/unsupported or were re-implemented to provide wrappers around + * MR2 functionality. + */ +public class MiniMRCluster { + private static final Log LOG = LogFactory.getLog(MiniMRCluster.class); + + private MiniMRClientCluster mrClientCluster; + + public String getTaskTrackerLocalDir(int taskTracker) { + throw new UnsupportedOperationException(); + } + + public String[] getTaskTrackerLocalDirs(int taskTracker) { + throw new UnsupportedOperationException(); + } + + class JobTrackerRunner { + // Mock class + } + + class TaskTrackerRunner { + // Mock class + } + + public JobTrackerRunner getJobTrackerRunner() { + throw new UnsupportedOperationException(); + } + + TaskTrackerRunner getTaskTrackerRunner(int id) { + throw new UnsupportedOperationException(); + } + + public int getNumTaskTrackers() { + throw new UnsupportedOperationException(); + } + + public void setInlineCleanupThreads() { + throw new UnsupportedOperationException(); + } + + public void waitUntilIdle() { + throw new UnsupportedOperationException(); + } + + private void waitTaskTrackers() { + throw new UnsupportedOperationException(); + } + + public int getJobTrackerPort() { + throw new UnsupportedOperationException(); + } + + public JobConf createJobConf() { + JobConf jobConf = null; + try { + jobConf = new JobConf(mrClientCluster.getConfig()); + } catch (IOException e) { + LOG.error(e); + } + return jobConf; + } + + public JobConf createJobConf(JobConf conf) { + JobConf jobConf = null; + try { + jobConf = new JobConf(mrClientCluster.getConfig()); + } catch (IOException e) { + LOG.error(e); + } + return jobConf; + } + + static JobConf configureJobConf(JobConf conf, String namenode, + int jobTrackerPort, int jobTrackerInfoPort, UserGroupInformation ugi) { + throw new UnsupportedOperationException(); + } + + public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, + String[] racks, String[] hosts) throws IOException { + this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts); + } + + public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, + String[] racks, String[] hosts, JobConf conf) throws IOException { + this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf); + } + + public MiniMRCluster(int numTaskTrackers, String namenode, int numDir) + throws IOException { + this(0, 0, numTaskTrackers, namenode, numDir); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir) throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + null); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks) + throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + racks, null); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks, + String[] hosts) throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + racks, hosts, null); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks, + String[] hosts, UserGroupInformation ugi) throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + racks, hosts, ugi, null); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks, + String[] hosts, UserGroupInformation ugi, JobConf conf) + throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + racks, hosts, ugi, conf, 0); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks, + String[] hosts, UserGroupInformation ugi, JobConf conf, + int numTrackerToExclude) throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + racks, hosts, ugi, conf, numTrackerToExclude, new Clock()); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks, + String[] hosts, UserGroupInformation ugi, JobConf conf, + int numTrackerToExclude, Clock clock) throws IOException { + if (conf == null) conf = new JobConf(); + FileSystem.setDefaultUri(conf, namenode); + mrClientCluster = MiniMRClientClusterFactory.create(this.getClass(), + numTaskTrackers, conf); + } + + public UserGroupInformation getUgi() { + throw new UnsupportedOperationException(); + } + + public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from, + int max) throws IOException { + throw new UnsupportedOperationException(); + } + + public void setJobPriority(JobID jobId, JobPriority priority) + throws AccessControlException, IOException { + throw new UnsupportedOperationException(); + } + + public JobPriority getJobPriority(JobID jobId) { + throw new UnsupportedOperationException(); + } + + public long getJobFinishTime(JobID jobId) { + throw new UnsupportedOperationException(); + } + + public void initializeJob(JobID jobId) throws IOException { + throw new UnsupportedOperationException(); + } + + public MapTaskCompletionEventsUpdate getMapTaskCompletionEventsUpdates( + int index, JobID jobId, int max) throws IOException { + throw new UnsupportedOperationException(); + } + + public JobConf getJobTrackerConf() { + JobConf jobConf = null; + try { + jobConf = new JobConf(mrClientCluster.getConfig()); + } catch (IOException e) { + LOG.error(e); + } + return jobConf; + } + + public int getFaultCount(String hostName) { + throw new UnsupportedOperationException(); + } + + public void startJobTracker() { + // Do nothing + } + + public void startJobTracker(boolean wait) { + // Do nothing + } + + public void stopJobTracker() { + // Do nothing + } + + public void stopTaskTracker(int id) { + // Do nothing + } + + public void startTaskTracker(String host, String rack, int idx, int numDir) + throws IOException { + // Do nothing + } + + void addTaskTracker(TaskTrackerRunner taskTracker) { + throw new UnsupportedOperationException(); + } + + int getTaskTrackerID(String trackerName) { + throw new UnsupportedOperationException(); + } + + public void shutdown() { + try { + mrClientCluster.stop(); + } catch (IOException e) { + LOG.error(e); + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java new file mode 100644 index 0000000000..81329a97c3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; + +/** + * An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface. + * This interface could be used by tests across both MR1 and MR2. + */ +public class MiniMRYarnClusterAdapter implements MiniMRClientCluster { + + private MiniMRYarnCluster miniMRYarnCluster; + + public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) { + this.miniMRYarnCluster = miniMRYarnCluster; + } + + @Override + public Configuration getConfig() { + return miniMRYarnCluster.getConfig(); + } + + @Override + public void start() { + miniMRYarnCluster.start(); + } + + @Override + public void stop() { + miniMRYarnCluster.stop(); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java new file mode 100644 index 0000000000..ddadac9900 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Basic testing for the MiniMRClientCluster. This test shows an example class + * that can be used in MR1 or MR2, without any change to the test. The test will + * use MiniMRYarnCluster in MR2, and MiniMRCluster in MR1. + */ +public class TestMiniMRClientCluster { + + private static Path inDir = null; + private static Path outDir = null; + private static Path testdir = null; + private static Path[] inFiles = new Path[5]; + private static MiniMRClientCluster mrCluster; + + @BeforeClass + public static void setup() throws IOException { + final Configuration conf = new Configuration(); + final Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", + "/tmp")); + testdir = new Path(TEST_ROOT_DIR, "TestMiniMRClientCluster"); + inDir = new Path(testdir, "in"); + outDir = new Path(testdir, "out"); + + FileSystem fs = FileSystem.getLocal(conf); + if (fs.exists(testdir) && !fs.delete(testdir, true)) { + throw new IOException("Could not delete " + testdir); + } + if (!fs.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir); + } + + for (int i = 0; i < inFiles.length; i++) { + inFiles[i] = new Path(inDir, "part_" + i); + createFile(inFiles[i], conf); + } + + // create the mini cluster to be used for the tests + mrCluster = MiniMRClientClusterFactory.create( + TestMiniMRClientCluster.class, 1, new Configuration()); + } + + @AfterClass + public static void cleanup() throws IOException { + // clean up the input and output files + final Configuration conf = new Configuration(); + final FileSystem fs = testdir.getFileSystem(conf); + if (fs.exists(testdir)) { + fs.delete(testdir, true); + } + // stopping the mini cluster + mrCluster.stop(); + } + + @Test + public void testJob() throws Exception { + final Job job = createJob(); + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, + inDir); + org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, + new Path(outDir, "testJob")); + assertTrue(job.waitForCompletion(true)); + validateCounters(job.getCounters(), 5, 25, 5, 5); + } + + private void validateCounters(Counters counters, long mapInputRecords, + long mapOutputRecords, long reduceInputGroups, long reduceOutputRecords) { + assertEquals("MapInputRecords", mapInputRecords, counters.findCounter( + "MyCounterGroup", "MAP_INPUT_RECORDS").getValue()); + assertEquals("MapOutputRecords", mapOutputRecords, counters.findCounter( + "MyCounterGroup", "MAP_OUTPUT_RECORDS").getValue()); + assertEquals("ReduceInputGroups", reduceInputGroups, counters.findCounter( + "MyCounterGroup", "REDUCE_INPUT_GROUPS").getValue()); + assertEquals("ReduceOutputRecords", reduceOutputRecords, counters + .findCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS").getValue()); + } + + private static void createFile(Path inFile, Configuration conf) + throws IOException { + final FileSystem fs = inFile.getFileSystem(conf); + if (fs.exists(inFile)) { + return; + } + FSDataOutputStream out = fs.create(inFile); + out.writeBytes("This is a test file"); + out.close(); + } + + public static Job createJob() throws IOException { + final Job baseJob = new Job(mrCluster.getConfig()); + baseJob.setOutputKeyClass(Text.class); + baseJob.setOutputValueClass(IntWritable.class); + baseJob.setMapperClass(MyMapper.class); + baseJob.setReducerClass(MyReducer.class); + baseJob.setNumReduceTasks(1); + return baseJob; + } + + public static class MyMapper extends + org.apache.hadoop.mapreduce.Mapper { + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + context.getCounter("MyCounterGroup", "MAP_INPUT_RECORDS").increment(1); + StringTokenizer iter = new StringTokenizer(value.toString()); + while (iter.hasMoreTokens()) { + word.set(iter.nextToken()); + context.write(word, one); + context.getCounter("MyCounterGroup", "MAP_OUTPUT_RECORDS").increment(1); + } + } + } + + public static class MyReducer extends + org.apache.hadoop.mapreduce.Reducer { + private IntWritable result = new IntWritable(); + + public void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + context.getCounter("MyCounterGroup", "REDUCE_INPUT_GROUPS").increment(1); + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + context.getCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS") + .increment(1); + } + } + +}