From 714ae6e62f376eec719007f8338609748986264c Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 1 Dec 2011 22:29:42 +0000 Subject: [PATCH] MAPREDUCE-3369. Migrate MR1 tests to run on MR2 using the new interfaces introduced in MAPREDUCE-3169. Contributed by Ahmed Radwan. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1209281 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/conf/TestNoDefaultsJobConf.java | 13 - .../apache/hadoop/mapred/HadoopTestCase.java | 214 +++++ .../hadoop/mapred/NotificationTestCase.java | 224 +++++ .../apache/hadoop/mapred/SortValidator.java | 584 +++++++++++++ .../hadoop/mapred/TestFileOutputFormat.java | 0 .../apache/hadoop/mapred/TestJobCounters.java | 0 .../apache/hadoop/mapred/TestLazyOutput.java | 0 .../mapred/TestLocalMRNotification.java | 0 .../TestSpecialCharactersInOutputPath.java | 7 +- .../apache/hadoop/mapred/TestTaskCommit.java | 0 .../apache/hadoop/mapred/UtilsForTests.java | 787 ++++++++++++++++++ .../org/apache/hadoop/mapred/WordCount.java | 159 ++++ .../jobcontrol/JobControlTestUtils.java | 154 ++++ .../mapred/jobcontrol/TestJobControl.java | 0 .../jobcontrol/TestLocalJobControl.java | 0 .../hadoop/mapred/lib/TestChainMapReduce.java | 0 .../lib/TestKeyFieldBasedComparator.java | 0 .../mapred/lib/TestMultipleOutputs.java | 0 .../lib/TestMultithreadedMapRunner.java | 0 .../apache/hadoop/mapreduce/TestChild.java | 0 .../mapreduce/TestMapReduceLazyOutput.java | 0 .../mapreduce/lib/chain/TestChainErrors.java | 0 .../lib/chain/TestMapReduceChain.java | 0 .../lib/chain/TestSingleElementChain.java | 0 .../lib/input/TestMultipleInputs.java | 0 .../jobcontrol/TestMapReduceJobControl.java | 0 .../lib/map/TestMultithreadedMapper.java | 0 .../lib/output/TestJobOutputCommitter.java | 0 .../lib/output/TestMRMultipleOutputs.java | 0 .../TestMRKeyFieldBasedComparator.java | 0 31 files changed, 2126 insertions(+), 19 deletions(-) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (87%) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/TestFileOutputFormat.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/TestJobCounters.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/TestLazyOutput.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/TestLocalMRNotification.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java (93%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/TestTaskCommit.java (100%) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/WordCount.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/lib/TestChainMapReduce.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/TestChild.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/lib/chain/TestSingleElementChain.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java (100%) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java (100%) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2389fda741..403ba142b8 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -132,6 +132,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3169 amendment. Deprecate MiniMRCluster. (Ahmed Radwan via sseth) + MAPREDUCE-3369. Migrate MR1 tests to run on MR2 using the new interfaces + introduced in MAPREDUCE-3169. (Ahmed Radwan via tomwhite) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java similarity index 87% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java index 4daf90ddce..d91754d71d 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java @@ -59,19 +59,6 @@ public void testNoDefaults() throws Exception { JobConf conf = new JobConf(false); - //seeding JT and NN info into non-defaults (empty jobconf) - String jobTrackerAddress = createJobConf().get(JTConfig.JT_IPC_ADDRESS); - if (jobTrackerAddress == null) { - jobTrackerAddress = "local"; - } - conf.set(JTConfig.JT_IPC_ADDRESS, jobTrackerAddress); - if (jobTrackerAddress == "local") { - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); - } - else { - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME); - } - conf.set("fs.default.name", createJobConf().get("fs.default.name")); conf.setJobName("mr"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java new file mode 100644 index 0000000000..c102e8f862 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import junit.framework.TestCase; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRConfig; + +import java.io.File; +import java.io.IOException; + +/** + * Abstract Test case class to run MR in local or cluster mode and in local FS + * or DFS. + * + * The Hadoop instance is started and stopped on each test method. + * + * If using DFS the filesystem is reformated at each start (test method). + * + * Job Configurations should be created using a configuration returned by the + * 'createJobConf()' method. + */ +public abstract class HadoopTestCase extends TestCase { + public static final int LOCAL_MR = 1; + public static final int CLUSTER_MR = 2; + public static final int LOCAL_FS = 4; + public static final int DFS_FS = 8; + + private boolean localMR; + private boolean localFS; + + private int taskTrackers; + private int dataNodes; + + /** + * Creates a testcase for local or cluster MR using DFS. + * + * The DFS will be formatted regardless if there was one or not before in the + * given location. + * + * @param mrMode indicates if the MR should be local (LOCAL_MR) or cluster + * (CLUSTER_MR) + * @param fsMode indicates if the FS should be local (LOCAL_FS) or DFS (DFS_FS) + * + * local FS when using relative PATHs) + * + * @param taskTrackers number of task trackers to start when using cluster + * + * @param dataNodes number of data nodes to start when using DFS + * + * @throws IOException thrown if the base directory cannot be set. + */ + public HadoopTestCase(int mrMode, int fsMode, int taskTrackers, int dataNodes) + throws IOException { + if (mrMode != LOCAL_MR && mrMode != CLUSTER_MR) { + throw new IllegalArgumentException( + "Invalid MapRed mode, must be LOCAL_MR or CLUSTER_MR"); + } + if (fsMode != LOCAL_FS && fsMode != DFS_FS) { + throw new IllegalArgumentException( + "Invalid FileSystem mode, must be LOCAL_FS or DFS_FS"); + } + if (taskTrackers < 1) { + throw new IllegalArgumentException( + "Invalid taskTrackers value, must be greater than 0"); + } + if (dataNodes < 1) { + throw new IllegalArgumentException( + "Invalid dataNodes value, must be greater than 0"); + } + localMR = (mrMode == LOCAL_MR); + localFS = (fsMode == LOCAL_FS); + /* + JobConf conf = new JobConf(); + fsRoot = conf.get("hadoop.tmp.dir"); + + if (fsRoot == null) { + throw new IllegalArgumentException( + "hadoop.tmp.dir is not defined"); + } + + fsRoot = fsRoot.replace(' ', '+') + "/fs"; + + File file = new File(fsRoot); + if (!file.exists()) { + if (!file.mkdirs()) { + throw new RuntimeException("Could not create FS base path: " + file); + } + } + */ + this.taskTrackers = taskTrackers; + this.dataNodes = dataNodes; + } + + /** + * Indicates if the MR is running in local or cluster mode. + * + * @return returns TRUE if the MR is running locally, FALSE if running in + * cluster mode. + */ + public boolean isLocalMR() { + return localMR; + } + + /** + * Indicates if the filesystem is local or DFS. + * + * @return returns TRUE if the filesystem is local, FALSE if it is DFS. + */ + public boolean isLocalFS() { + return localFS; + } + + + private MiniDFSCluster dfsCluster = null; + private MiniMRCluster mrCluster = null; + private FileSystem fileSystem = null; + + /** + * Creates Hadoop instance based on constructor configuration before + * a test case is run. + * + * @throws Exception + */ + protected void setUp() throws Exception { + super.setUp(); + if (localFS) { + fileSystem = FileSystem.getLocal(new JobConf()); + } + else { + dfsCluster = new MiniDFSCluster(new JobConf(), dataNodes, true, null); + fileSystem = dfsCluster.getFileSystem(); + } + if (localMR) { + } + else { + //noinspection deprecation + mrCluster = new MiniMRCluster(taskTrackers, fileSystem.getUri().toString(), 1); + } + } + + /** + * Destroys Hadoop instance based on constructor configuration after + * a test case is run. + * + * @throws Exception + */ + protected void tearDown() throws Exception { + try { + if (mrCluster != null) { + mrCluster.shutdown(); + } + } + catch (Exception ex) { + System.out.println(ex); + } + try { + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + } + catch (Exception ex) { + System.out.println(ex); + } + super.tearDown(); + } + + /** + * Returns the Filesystem in use. + * + * TestCases should use this Filesystem as it + * is properly configured with the workingDir for relative PATHs. + * + * @return the filesystem used by Hadoop. + */ + protected FileSystem getFileSystem() { + return fileSystem; + } + + /** + * Returns a job configuration preconfigured to run against the Hadoop + * managed by the testcase. + * @return configuration that works on the testcase Hadoop instance + */ + protected JobConf createJobConf() { + if (localMR) { + JobConf conf = new JobConf(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); + return conf; + } + else { + return mrCluster.createJobConf(); + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java new file mode 100644 index 0000000000..026edfbddb --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.mortbay.jetty.Server; +import org.mortbay.jetty.servlet.Context; +import org.mortbay.jetty.servlet.ServletHolder; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.HttpServlet; +import javax.servlet.ServletException; +import java.io.IOException; +import java.io.DataOutputStream; + +/** + * Base class to test Job end notification in local and cluster mode. + * + * Starts up hadoop on Local or Cluster mode (by extending of the + * HadoopTestCase class) and it starts a servlet engine that hosts + * a servlet that will receive the notification of job finalization. + * + * The notification servlet returns a HTTP 400 the first time is called + * and a HTTP 200 the second time, thus testing retry. + * + * In both cases local file system is used (this is irrelevant for + * the tested functionality) + * + * + */ +public abstract class NotificationTestCase extends HadoopTestCase { + + protected NotificationTestCase(int mode) throws IOException { + super(mode, HadoopTestCase.LOCAL_FS, 1, 1); + } + + private int port; + private String contextPath = "/notification"; + private String servletPath = "/mapred"; + private Server webServer; + + private void startHttpServer() throws Exception { + + // Create the webServer + if (webServer != null) { + webServer.stop(); + webServer = null; + } + webServer = new Server(0); + + Context context = new Context(webServer, contextPath); + + // create servlet handler + context.addServlet(new ServletHolder(new NotificationServlet()), + servletPath); + + // Start webServer + webServer.start(); + port = webServer.getConnectors()[0].getLocalPort(); + + } + + private void stopHttpServer() throws Exception { + if (webServer != null) { + webServer.stop(); + webServer.destroy(); + webServer = null; + } + } + + public static class NotificationServlet extends HttpServlet { + public static int counter = 0; + private static final long serialVersionUID = 1L; + + protected void doGet(HttpServletRequest req, HttpServletResponse res) + throws ServletException, IOException { + switch (counter) { + case 0: + { + assertTrue(req.getQueryString().contains("SUCCEEDED")); + } + break; + case 2: + { + assertTrue(req.getQueryString().contains("KILLED")); + } + break; + case 4: + { + assertTrue(req.getQueryString().contains("FAILED")); + } + break; + } + if (counter % 2 == 0) { + res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error"); + } + else { + res.setStatus(HttpServletResponse.SC_OK); + } + counter++; + } + } + + private String getNotificationUrlTemplate() { + return "http://localhost:" + port + contextPath + servletPath + + "?jobId=$jobId&jobStatus=$jobStatus"; + } + + protected JobConf createJobConf() { + JobConf conf = super.createJobConf(); + conf.setJobEndNotificationURI(getNotificationUrlTemplate()); + conf.setInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 3); + conf.setInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 200); + return conf; + } + + + protected void setUp() throws Exception { + super.setUp(); + startHttpServer(); + } + + protected void tearDown() throws Exception { + stopHttpServer(); + super.tearDown(); + } + + public void testMR() throws Exception { + System.out.println(launchWordCount(this.createJobConf(), + "a b c d e f g h", 1, 1)); + Thread.sleep(2000); + assertEquals(2, NotificationServlet.counter); + + Path inDir = new Path("notificationjob/input"); + Path outDir = new Path("notificationjob/output"); + + // Hack for local FS that does not have the concept of a 'mounting point' + if (isLocalFS()) { + String localPathRoot = System.getProperty("test.build.data","/tmp") + .toString().replace(' ', '+');; + inDir = new Path(localPathRoot, inDir); + outDir = new Path(localPathRoot, outDir); + } + + // run a job with KILLED status + System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir, + outDir).getID()); + Thread.sleep(2000); + assertEquals(4, NotificationServlet.counter); + + // run a job with FAILED status + System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir, + outDir).getID()); + Thread.sleep(2000); + assertEquals(6, NotificationServlet.counter); + } + + private String launchWordCount(JobConf conf, + String input, + int numMaps, + int numReduces) throws IOException { + Path inDir = new Path("testing/wc/input"); + Path outDir = new Path("testing/wc/output"); + + // Hack for local FS that does not have the concept of a 'mounting point' + if (isLocalFS()) { + String localPathRoot = System.getProperty("test.build.data","/tmp") + .toString().replace(' ', '+');; + inDir = new Path(localPathRoot, inDir); + outDir = new Path(localPathRoot, outDir); + } + + FileSystem fs = FileSystem.get(conf); + fs.delete(outDir, true); + if (!fs.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + { + DataOutputStream file = fs.create(new Path(inDir, "part-0")); + file.writeBytes(input); + file.close(); + } + conf.setJobName("wordcount"); + conf.setInputFormat(TextInputFormat.class); + + // the keys are words (strings) + conf.setOutputKeyClass(Text.class); + // the values are counts (ints) + conf.setOutputValueClass(IntWritable.class); + + conf.setMapperClass(WordCount.MapClass.class); + conf.setCombinerClass(WordCount.Reduce.class); + conf.setReducerClass(WordCount.Reduce.class); + + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + conf.setNumMapTasks(numMaps); + conf.setNumReduceTasks(numReduces); + JobClient.runJob(conf); + return MapReduceTestUtil.readOutput(outDir, conf); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java new file mode 100644 index 0000000000..381d42e238 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java @@ -0,0 +1,584 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.*; +import java.net.URI; +import java.util.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.lib.HashPartitioner; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.fs.*; + +/** + * A set of utilities to validate the sort of the map-reduce framework. + * This utility program has 2 main parts: + * 1. Checking the records' statistics + * a) Validates the no. of bytes and records in sort's input & output. + * b) Validates the xor of the md5's of each key/value pair. + * c) Ensures same key/value is present in both input and output. + * 2. Check individual records to ensure each record is present in both + * the input and the output of the sort (expensive on large data-sets). + * + * To run: bin/hadoop jar build/hadoop-examples.jar sortvalidate + * [-m maps] [-r reduces] [-deep] + * -sortInput sort-in-dir -sortOutput sort-out-dir + */ +public class SortValidator extends Configured implements Tool { + + static private final IntWritable sortInput = new IntWritable(1); + static private final IntWritable sortOutput = new IntWritable(2); + static public String SORT_REDUCES = + "mapreduce.sortvalidator.sort.reduce.tasks"; + static public String MAPS_PER_HOST = "mapreduce.sortvalidator.mapsperhost"; + static public String REDUCES_PER_HOST = + "mapreduce.sortvalidator.reducesperhost"; + static void printUsage() { + System.err.println("sortvalidate [-m ] [-r ] [-deep] " + + "-sortInput -sortOutput "); + System.exit(1); + } + + static private IntWritable deduceInputFile(JobConf job) { + Path[] inputPaths = FileInputFormat.getInputPaths(job); + Path inputFile = new Path(job.get(JobContext.MAP_INPUT_FILE)); + + // value == one for sort-input; value == two for sort-output + return (inputFile.getParent().equals(inputPaths[0])) ? + sortInput : sortOutput; + } + + static private byte[] pair(BytesWritable a, BytesWritable b) { + byte[] pairData = new byte[a.getLength()+ b.getLength()]; + System.arraycopy(a.getBytes(), 0, pairData, 0, a.getLength()); + System.arraycopy(b.getBytes(), 0, pairData, a.getLength(), b.getLength()); + return pairData; + } + + private static final PathFilter sortPathsFilter = new PathFilter() { + public boolean accept(Path path) { + return (path.getName().startsWith("part-")); + } + }; + + /** + * A simple map-reduce job which checks consistency of the + * MapReduce framework's sort by checking: + * a) Records are sorted correctly + * b) Keys are partitioned correctly + * c) The input and output have same no. of bytes and records. + * d) The input and output have the correct 'checksum' by xor'ing + * the md5 of each record. + * + */ + public static class RecordStatsChecker { + + /** + * Generic way to get raw data from a {@link Writable}. + */ + static class Raw { + /** + * Get raw data bytes from a {@link Writable} + * @param writable {@link Writable} object from whom to get the raw data + * @return raw data of the writable + */ + public byte[] getRawBytes(Writable writable) { + return writable.toString().getBytes(); + } + + /** + * Get number of raw data bytes of the {@link Writable} + * @param writable {@link Writable} object from whom to get the raw data + * length + * @return number of raw data bytes + */ + public int getRawBytesLength(Writable writable) { + return writable.toString().getBytes().length; + } + } + + /** + * Specialization of {@link Raw} for {@link BytesWritable}. + */ + static class RawBytesWritable extends Raw { + public byte[] getRawBytes(Writable bw) { + return ((BytesWritable)bw).getBytes(); + } + public int getRawBytesLength(Writable bw) { + return ((BytesWritable)bw).getLength(); + } + } + + /** + * Specialization of {@link Raw} for {@link Text}. + */ + static class RawText extends Raw { + public byte[] getRawBytes(Writable text) { + return ((Text)text).getBytes(); + } + public int getRawBytesLength(Writable text) { + return ((Text)text).getLength(); + } + } + + private static Raw createRaw(Class rawClass) { + if (rawClass == Text.class) { + return new RawText(); + } else if (rawClass == BytesWritable.class) { + System.err.println("Returning " + RawBytesWritable.class); + return new RawBytesWritable(); + } + return new Raw(); + } + + public static class RecordStatsWritable implements Writable { + private long bytes = 0; + private long records = 0; + private int checksum = 0; + + public RecordStatsWritable() {} + + public RecordStatsWritable(long bytes, long records, int checksum) { + this.bytes = bytes; + this.records = records; + this.checksum = checksum; + } + + public void write(DataOutput out) throws IOException { + WritableUtils.writeVLong(out, bytes); + WritableUtils.writeVLong(out, records); + WritableUtils.writeVInt(out, checksum); + } + + public void readFields(DataInput in) throws IOException { + bytes = WritableUtils.readVLong(in); + records = WritableUtils.readVLong(in); + checksum = WritableUtils.readVInt(in); + } + + public long getBytes() { return bytes; } + public long getRecords() { return records; } + public int getChecksum() { return checksum; } + } + + public static class Map extends MapReduceBase + implements Mapper { + + private IntWritable key = null; + private WritableComparable prevKey = null; + private Class keyClass; + private Partitioner partitioner = null; + private int partition = -1; + private int noSortReducers = -1; + private long recordId = -1; + + private Raw rawKey; + private Raw rawValue; + + public void configure(JobConf job) { + // 'key' == sortInput for sort-input; key == sortOutput for sort-output + key = deduceInputFile(job); + + if (key == sortOutput) { + partitioner = new HashPartitioner(); + + // Figure the 'current' partition and no. of reduces of the 'sort' + try { + URI inputURI = new URI(job.get(JobContext.MAP_INPUT_FILE)); + String inputFile = inputURI.getPath(); + // part file is of the form part-r-xxxxx + partition = Integer.valueOf(inputFile.substring( + inputFile.lastIndexOf("part") + 7)).intValue(); + noSortReducers = job.getInt(SORT_REDUCES, -1); + } catch (Exception e) { + System.err.println("Caught: " + e); + System.exit(-1); + } + } + } + + @SuppressWarnings("unchecked") + public void map(WritableComparable key, Writable value, + OutputCollector output, + Reporter reporter) throws IOException { + // Set up rawKey and rawValue on the first call to 'map' + if (recordId == -1) { + rawKey = createRaw(key.getClass()); + rawValue = createRaw(value.getClass()); + } + ++recordId; + + if (this.key == sortOutput) { + // Check if keys are 'sorted' if this + // record is from sort's output + if (prevKey == null) { + prevKey = key; + keyClass = prevKey.getClass(); + } else { + // Sanity check + if (keyClass != key.getClass()) { + throw new IOException("Type mismatch in key: expected " + + keyClass.getName() + ", received " + + key.getClass().getName()); + } + + // Check if they were sorted correctly + if (prevKey.compareTo(key) > 0) { + throw new IOException("The 'map-reduce' framework wrongly" + + " classifed (" + prevKey + ") > (" + + key + ") "+ "for record# " + recordId); + } + prevKey = key; + } + + // Check if the sorted output is 'partitioned' right + int keyPartition = + partitioner.getPartition(key, value, noSortReducers); + if (partition != keyPartition) { + throw new IOException("Partitions do not match for record# " + + recordId + " ! - '" + partition + "' v/s '" + + keyPartition + "'"); + } + } + + // Construct the record-stats and output (this.key, record-stats) + byte[] keyBytes = rawKey.getRawBytes(key); + int keyBytesLen = rawKey.getRawBytesLength(key); + byte[] valueBytes = rawValue.getRawBytes(value); + int valueBytesLen = rawValue.getRawBytesLength(value); + + int keyValueChecksum = + (WritableComparator.hashBytes(keyBytes, keyBytesLen) ^ + WritableComparator.hashBytes(valueBytes, valueBytesLen)); + + output.collect(this.key, + new RecordStatsWritable((keyBytesLen+valueBytesLen), + 1, keyValueChecksum) + ); + } + + } + + public static class Reduce extends MapReduceBase + implements Reducer { + + public void reduce(IntWritable key, Iterator values, + OutputCollector output, + Reporter reporter) throws IOException { + long bytes = 0; + long records = 0; + int xor = 0; + while (values.hasNext()) { + RecordStatsWritable stats = values.next(); + bytes += stats.getBytes(); + records += stats.getRecords(); + xor ^= stats.getChecksum(); + } + + output.collect(key, new RecordStatsWritable(bytes, records, xor)); + } + } + + public static class NonSplitableSequenceFileInputFormat + extends SequenceFileInputFormat { + protected boolean isSplitable(FileSystem fs, Path filename) { + return false; + } + } + + static void checkRecords(Configuration defaults, + Path sortInput, Path sortOutput) throws IOException { + FileSystem inputfs = sortInput.getFileSystem(defaults); + FileSystem outputfs = sortOutput.getFileSystem(defaults); + FileSystem defaultfs = FileSystem.get(defaults); + JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class); + jobConf.setJobName("sortvalidate-recordstats-checker"); + + int noSortReduceTasks = + outputfs.listStatus(sortOutput, sortPathsFilter).length; + jobConf.setInt(SORT_REDUCES, noSortReduceTasks); + int noSortInputpaths = inputfs.listStatus(sortInput).length; + + jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class); + jobConf.setOutputFormat(SequenceFileOutputFormat.class); + + jobConf.setOutputKeyClass(IntWritable.class); + jobConf.setOutputValueClass(RecordStatsChecker.RecordStatsWritable.class); + + jobConf.setMapperClass(Map.class); + jobConf.setCombinerClass(Reduce.class); + jobConf.setReducerClass(Reduce.class); + + jobConf.setNumMapTasks(noSortReduceTasks); + jobConf.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(jobConf, sortInput); + FileInputFormat.addInputPath(jobConf, sortOutput); + Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker"); + if (defaultfs.exists(outputPath)) { + defaultfs.delete(outputPath, true); + } + FileOutputFormat.setOutputPath(jobConf, outputPath); + + // Uncomment to run locally in a single process + //job_conf.set(JTConfig.JT, "local"); + Path[] inputPaths = FileInputFormat.getInputPaths(jobConf); + System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " + + "from " + inputPaths[0] + " (" + + noSortInputpaths + " files), " + + inputPaths[1] + " (" + + noSortReduceTasks + + " files) into " + + FileOutputFormat.getOutputPath(jobConf) + + " with 1 reducer."); + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + JobClient.runJob(jobConf); + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); + + // Check to ensure that the statistics of the + // framework's sort-input and sort-output match + SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs, + new Path(outputPath, "part-00000"), defaults); + IntWritable k1 = new IntWritable(); + IntWritable k2 = new IntWritable(); + RecordStatsWritable v1 = new RecordStatsWritable(); + RecordStatsWritable v2 = new RecordStatsWritable(); + if (!stats.next(k1, v1)) { + throw new IOException("Failed to read record #1 from reduce's output"); + } + if (!stats.next(k2, v2)) { + throw new IOException("Failed to read record #2 from reduce's output"); + } + + if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) || + v1.getChecksum() != v2.getChecksum()) { + throw new IOException("(" + + v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" + + v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")"); + } + } + + } + + /** + * A simple map-reduce task to check if the input and the output + * of the framework's sort is consistent by ensuring each record + * is present in both the input and the output. + * + */ + public static class RecordChecker { + + public static class Map extends MapReduceBase + implements Mapper { + + private IntWritable value = null; + + public void configure(JobConf job) { + // value == one for sort-input; value == two for sort-output + value = deduceInputFile(job); + } + + public void map(BytesWritable key, + BytesWritable value, + OutputCollector output, + Reporter reporter) throws IOException { + // newKey = (key, value) + BytesWritable keyValue = new BytesWritable(pair(key, value)); + + // output (newKey, value) + output.collect(keyValue, this.value); + } + } + + public static class Reduce extends MapReduceBase + implements Reducer { + + public void reduce(BytesWritable key, Iterator values, + OutputCollector output, + Reporter reporter) throws IOException { + int ones = 0; + int twos = 0; + while (values.hasNext()) { + IntWritable count = values.next(); + if (count.equals(sortInput)) { + ++ones; + } else if (count.equals(sortOutput)) { + ++twos; + } else { + throw new IOException("Invalid 'value' of " + count.get() + + " for (key,value): " + key.toString()); + } + } + + // Check to ensure there are equal no. of ones and twos + if (ones != twos) { + throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos + + ") for (key, value): " + key.toString()); + } + } + } + + static void checkRecords(Configuration defaults, int noMaps, int noReduces, + Path sortInput, Path sortOutput) throws IOException { + JobConf jobConf = new JobConf(defaults, RecordChecker.class); + jobConf.setJobName("sortvalidate-record-checker"); + + jobConf.setInputFormat(SequenceFileInputFormat.class); + jobConf.setOutputFormat(SequenceFileOutputFormat.class); + + jobConf.setOutputKeyClass(BytesWritable.class); + jobConf.setOutputValueClass(IntWritable.class); + + jobConf.setMapperClass(Map.class); + jobConf.setReducerClass(Reduce.class); + + JobClient client = new JobClient(jobConf); + ClusterStatus cluster = client.getClusterStatus(); + if (noMaps == -1) { + noMaps = cluster.getTaskTrackers() * + jobConf.getInt(MAPS_PER_HOST, 10); + } + if (noReduces == -1) { + noReduces = (int) (cluster.getMaxReduceTasks() * 0.9); + String sortReduces = jobConf.get(REDUCES_PER_HOST); + if (sortReduces != null) { + noReduces = cluster.getTaskTrackers() * + Integer.parseInt(sortReduces); + } + } + jobConf.setNumMapTasks(noMaps); + jobConf.setNumReduceTasks(noReduces); + + FileInputFormat.setInputPaths(jobConf, sortInput); + FileInputFormat.addInputPath(jobConf, sortOutput); + Path outputPath = new Path("/tmp/sortvalidate/recordchecker"); + FileSystem fs = FileSystem.get(defaults); + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + FileOutputFormat.setOutputPath(jobConf, outputPath); + + // Uncomment to run locally in a single process + //job_conf.set(JTConfig.JT, "local"); + Path[] inputPaths = FileInputFormat.getInputPaths(jobConf); + System.out.println("\nSortValidator.RecordChecker: Running on " + + cluster.getTaskTrackers() + + " nodes to validate sort from " + + inputPaths[0] + ", " + + inputPaths[1] + " into " + + FileOutputFormat.getOutputPath(jobConf) + + " with " + noReduces + " reduces."); + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + JobClient.runJob(jobConf); + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); + } + } + + + /** + * The main driver for sort-validator program. + * Invoke this method to submit the map/reduce job. + * @throws IOException When there is communication problems with the + * job tracker. + */ + public int run(String[] args) throws Exception { + Configuration defaults = getConf(); + + int noMaps = -1, noReduces = -1; + Path sortInput = null, sortOutput = null; + boolean deepTest = false; + for(int i=0; i < args.length; ++i) { + try { + if ("-m".equals(args[i])) { + noMaps = Integer.parseInt(args[++i]); + } else if ("-r".equals(args[i])) { + noReduces = Integer.parseInt(args[++i]); + } else if ("-sortInput".equals(args[i])){ + sortInput = new Path(args[++i]); + } else if ("-sortOutput".equals(args[i])){ + sortOutput = new Path(args[++i]); + } else if ("-deep".equals(args[i])) { + deepTest = true; + } else { + printUsage(); + return -1; + } + } catch (NumberFormatException except) { + System.err.println("ERROR: Integer expected instead of " + args[i]); + printUsage(); + return -1; + } catch (ArrayIndexOutOfBoundsException except) { + System.err.println("ERROR: Required parameter missing from " + + args[i-1]); + printUsage(); + return -1; + } + } + + // Sanity check + if (sortInput == null || sortOutput == null) { + printUsage(); + return -2; + } + + // Check if the records are consistent and sorted correctly + RecordStatsChecker.checkRecords(defaults, sortInput, sortOutput); + + // Check if the same records are present in sort's inputs & outputs + if (deepTest) { + RecordChecker.checkRecords(defaults, noMaps, noReduces, sortInput, + sortOutput); + } + + System.out.println("\nSUCCESS! Validated the MapReduce framework's 'sort'" + + " successfully."); + + return 0; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new SortValidator(), args); + System.exit(res); + } +} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputFormat.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputFormat.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLazyOutput.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLazyOutput.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLocalMRNotification.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalMRNotification.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLocalMRNotification.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalMRNotification.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java similarity index 93% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java index 5e510094ce..1b93377fd1 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java @@ -48,7 +48,6 @@ public class TestSpecialCharactersInOutputPath extends TestCase { private static final String OUTPUT_FILENAME = "result[0]"; public static boolean launchJob(URI fileSys, - String jobTracker, JobConf conf, int numMaps, int numReduces) throws IOException { @@ -68,8 +67,6 @@ public static boolean launchJob(URI fileSys, // use WordCount example FileSystem.setDefaultUri(conf, fileSys); - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME); - conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker); conf.setJobName("foo"); conf.setInputFormat(TextInputFormat.class); @@ -113,11 +110,9 @@ public void testJobWithDFS() throws IOException { fileSys = dfs.getFileSystem(); namenode = fileSys.getUri().toString(); mr = new MiniMRCluster(taskTrackers, namenode, 2); - final String jobTrackerName = "localhost:" + mr.getJobTrackerPort(); JobConf jobConf = new JobConf(); boolean result; - result = launchJob(fileSys.getUri(), jobTrackerName, jobConf, - 3, 1); + result = launchJob(fileSys.getUri(), jobConf, 3, 1); assertTrue(result); } finally { diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java new file mode 100644 index 0000000000..17995603d0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java @@ -0,0 +1,787 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.File; +import java.io.FileInputStream; +import java.io.DataOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.util.StringUtils; + +import org.apache.commons.logging.Log; + +/** + * Utilities used in unit test. + * + */ +public class UtilsForTests { + + static final Log LOG = LogFactory.getLog(UtilsForTests.class); + + final static long KB = 1024L * 1; + final static long MB = 1024L * KB; + final static long GB = 1024L * MB; + final static long TB = 1024L * GB; + final static long PB = 1024L * TB; + final static Object waitLock = new Object(); + + static DecimalFormat dfm = new DecimalFormat("####.000"); + static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###"); + + public static String dfmt(double d) { + return dfm.format(d); + } + + public static String ifmt(double d) { + return ifm.format(d); + } + + public static String formatBytes(long numBytes) { + StringBuffer buf = new StringBuffer(); + boolean bDetails = true; + double num = numBytes; + + if (numBytes < KB) { + buf.append(numBytes + " B"); + bDetails = false; + } else if (numBytes < MB) { + buf.append(dfmt(num / KB) + " KB"); + } else if (numBytes < GB) { + buf.append(dfmt(num / MB) + " MB"); + } else if (numBytes < TB) { + buf.append(dfmt(num / GB) + " GB"); + } else if (numBytes < PB) { + buf.append(dfmt(num / TB) + " TB"); + } else { + buf.append(dfmt(num / PB) + " PB"); + } + if (bDetails) { + buf.append(" (" + ifmt(numBytes) + " bytes)"); + } + return buf.toString(); + } + + public static String formatBytes2(long numBytes) { + StringBuffer buf = new StringBuffer(); + long u = 0; + if (numBytes >= TB) { + u = numBytes / TB; + numBytes -= u * TB; + buf.append(u + " TB "); + } + if (numBytes >= GB) { + u = numBytes / GB; + numBytes -= u * GB; + buf.append(u + " GB "); + } + if (numBytes >= MB) { + u = numBytes / MB; + numBytes -= u * MB; + buf.append(u + " MB "); + } + if (numBytes >= KB) { + u = numBytes / KB; + numBytes -= u * KB; + buf.append(u + " KB "); + } + buf.append(u + " B"); //even if zero + return buf.toString(); + } + + static final String regexpSpecials = "[]()?*+|.!^-\\~@"; + + public static String regexpEscape(String plain) { + StringBuffer buf = new StringBuffer(); + char[] ch = plain.toCharArray(); + int csup = ch.length; + for (int c = 0; c < csup; c++) { + if (regexpSpecials.indexOf(ch[c]) != -1) { + buf.append("\\"); + } + buf.append(ch[c]); + } + return buf.toString(); + } + + public static String safeGetCanonicalPath(File f) { + try { + String s = f.getCanonicalPath(); + return (s == null) ? f.toString() : s; + } catch (IOException io) { + return f.toString(); + } + } + + public static String slurp(File f) throws IOException { + int len = (int) f.length(); + byte[] buf = new byte[len]; + FileInputStream in = new FileInputStream(f); + String contents = null; + try { + in.read(buf, 0, len); + contents = new String(buf, "UTF-8"); + } finally { + in.close(); + } + return contents; + } + + public static String slurpHadoop(Path p, FileSystem fs) throws IOException { + int len = (int) fs.getFileStatus(p).getLen(); + byte[] buf = new byte[len]; + InputStream in = fs.open(p); + String contents = null; + try { + in.read(buf, 0, len); + contents = new String(buf, "UTF-8"); + } finally { + in.close(); + } + return contents; + } + + public static String rjustify(String s, int width) { + if (s == null) s = "null"; + if (width > s.length()) { + s = getSpace(width - s.length()) + s; + } + return s; + } + + public static String ljustify(String s, int width) { + if (s == null) s = "null"; + if (width > s.length()) { + s = s + getSpace(width - s.length()); + } + return s; + } + + static char[] space; + static { + space = new char[300]; + Arrays.fill(space, '\u0020'); + } + + public static String getSpace(int len) { + if (len > space.length) { + space = new char[Math.max(len, 2 * space.length)]; + Arrays.fill(space, '\u0020'); + } + return new String(space, 0, len); + } + + /** + * Gets job status from the jobtracker given the jobclient and the job id + */ + static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException { + JobStatus[] statuses = jc.getAllJobs(); + for (JobStatus jobStatus : statuses) { + if (jobStatus.getJobID().equals(id)) { + return jobStatus; + } + } + return null; + } + + /** + * A utility that waits for specified amount of time + */ + public static void waitFor(long duration) { + try { + synchronized (waitLock) { + waitLock.wait(duration); + } + } catch (InterruptedException ie) {} + } + + /** + * Wait for the jobtracker to be RUNNING. + */ + static void waitForJobTracker(JobClient jobClient) { + while (true) { + try { + ClusterStatus status = jobClient.getClusterStatus(); + while (status.getJobTrackerStatus() != JobTrackerStatus.RUNNING) { + waitFor(100); + status = jobClient.getClusterStatus(); + } + break; // means that the jt is ready + } catch (IOException ioe) {} + } + } + + /** + * Waits until all the jobs at the jobtracker complete. + */ + static void waitTillDone(JobClient jobClient) throws IOException { + // Wait for the last job to complete + while (true) { + boolean shouldWait = false; + for (JobStatus jobStatuses : jobClient.getAllJobs()) { + if (jobStatuses.getRunState() != JobStatus.SUCCEEDED + && jobStatuses.getRunState() != JobStatus.FAILED + && jobStatuses.getRunState() != JobStatus.KILLED) { + shouldWait = true; + break; + } + } + if (shouldWait) { + waitFor(100); + } else { + break; + } + } + } + + /** + * Configure a waiting job + */ + static void configureWaitingJobConf(JobConf jobConf, Path inDir, + Path outputPath, int numMaps, int numRed, + String jobName, String mapSignalFilename, + String redSignalFilename) + throws IOException { + jobConf.setJobName(jobName); + jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class); + jobConf.setOutputFormat(SequenceFileOutputFormat.class); + FileInputFormat.setInputPaths(jobConf, inDir); + FileOutputFormat.setOutputPath(jobConf, outputPath); + jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class); + jobConf.setReducerClass(IdentityReducer.class); + jobConf.setOutputKeyClass(BytesWritable.class); + jobConf.setOutputValueClass(BytesWritable.class); + jobConf.setInputFormat(RandomInputFormat.class); + jobConf.setNumMapTasks(numMaps); + jobConf.setNumReduceTasks(numRed); + jobConf.setJar("build/test/mapred/testjar/testjob.jar"); + jobConf.set(getTaskSignalParameter(true), mapSignalFilename); + jobConf.set(getTaskSignalParameter(false), redSignalFilename); + } + + /** + * Commonly used map and reduce classes + */ + + /** + * Map is a Mapper that just waits for a file to be created on the dfs. The + * file creation is a signal to the mappers and hence acts as a waiting job. + */ + + static class WaitingMapper + extends MapReduceBase + implements Mapper { + + FileSystem fs = null; + Path signal; + int id = 0; + int totalMaps = 0; + + /** + * Checks if the map task needs to wait. By default all the maps will wait. + * This method needs to be overridden to make a custom waiting mapper. + */ + public boolean shouldWait(int id) { + return true; + } + + /** + * Returns a signal file on which the map task should wait. By default all + * the maps wait on a single file passed as test.mapred.map.waiting.target. + * This method needs to be overridden to make a custom waiting mapper + */ + public Path getSignalFile(int id) { + return signal; + } + + /** The waiting function. The map exits once it gets a signal. Here the + * signal is the file existence. + */ + public void map(WritableComparable key, Writable val, + OutputCollector output, + Reporter reporter) + throws IOException { + if (shouldWait(id)) { + if (fs != null) { + while (!fs.exists(getSignalFile(id))) { + try { + reporter.progress(); + synchronized (this) { + this.wait(1000); // wait for 1 sec + } + } catch (InterruptedException ie) { + System.out.println("Interrupted while the map was waiting for " + + " the signal."); + break; + } + } + } else { + throw new IOException("Could not get the DFS!!"); + } + } + } + + public void configure(JobConf conf) { + try { + String taskId = conf.get(JobContext.TASK_ATTEMPT_ID); + id = Integer.parseInt(taskId.split("_")[4]); + totalMaps = Integer.parseInt(conf.get(JobContext.NUM_MAPS)); + fs = FileSystem.get(conf); + signal = new Path(conf.get(getTaskSignalParameter(true))); + } catch (IOException ioe) { + System.out.println("Got an exception while obtaining the filesystem"); + } + } + } + + /** Only the later half of the maps wait for the signal while the rest + * complete immediately. + */ + static class HalfWaitingMapper extends WaitingMapper { + @Override + public boolean shouldWait(int id) { + return id >= (totalMaps / 2); + } + } + + /** + * Reduce that just waits for a file to be created on the dfs. The + * file creation is a signal to the reduce. + */ + + static class WaitingReducer extends MapReduceBase + implements Reducer { + + FileSystem fs = null; + Path signal; + + /** The waiting function. The reduce exits once it gets a signal. Here the + * signal is the file existence. + */ + public void reduce(WritableComparable key, Iterator val, + OutputCollector output, + Reporter reporter) + throws IOException { + if (fs != null) { + while (!fs.exists(signal)) { + try { + reporter.progress(); + synchronized (this) { + this.wait(1000); // wait for 1 sec + } + } catch (InterruptedException ie) { + System.out.println("Interrupted while the map was waiting for the" + + " signal."); + break; + } + } + } else { + throw new IOException("Could not get the DFS!!"); + } + } + + public void configure(JobConf conf) { + try { + fs = FileSystem.get(conf); + signal = new Path(conf.get(getTaskSignalParameter(false))); + } catch (IOException ioe) { + System.out.println("Got an exception while obtaining the filesystem"); + } + } + } + + static String getTaskSignalParameter(boolean isMap) { + return isMap + ? "test.mapred.map.waiting.target" + : "test.mapred.reduce.waiting.target"; + } + + /** + * Signal the maps/reduces to start. + */ + static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, + String mapSignalFile, + String reduceSignalFile, int replication) + throws IOException { + writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile), + (short)replication); + writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile), + (short)replication); + } + + /** + * Signal the maps/reduces to start. + */ + static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, + boolean isMap, String mapSignalFile, + String reduceSignalFile) + throws IOException { + // signal the maps to complete + writeFile(dfs.getNameNode(), fileSys.getConf(), + isMap + ? new Path(mapSignalFile) + : new Path(reduceSignalFile), (short)1); + } + + static String getSignalFile(Path dir) { + return (new Path(dir, "signal")).toString(); + } + + static String getMapSignalFile(Path dir) { + return (new Path(dir, "map-signal")).toString(); + } + + static String getReduceSignalFile(Path dir) { + return (new Path(dir, "reduce-signal")).toString(); + } + + static void writeFile(NameNode namenode, Configuration conf, Path name, + short replication) throws IOException { + FileSystem fileSys = FileSystem.get(conf); + SequenceFile.Writer writer = + SequenceFile.createWriter(fileSys, conf, name, + BytesWritable.class, BytesWritable.class, + CompressionType.NONE); + writer.append(new BytesWritable(), new BytesWritable()); + writer.close(); + fileSys.setReplication(name, replication); + DFSTestUtil.waitReplication(fileSys, name, replication); + } + + // Input formats + /** + * A custom input format that creates virtual inputs of a single string + * for each map. + */ + public static class RandomInputFormat implements InputFormat { + + public InputSplit[] getSplits(JobConf job, + int numSplits) throws IOException { + InputSplit[] result = new InputSplit[numSplits]; + Path outDir = FileOutputFormat.getOutputPath(job); + for(int i=0; i < result.length; ++i) { + result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), + 0, 1, (String[])null); + } + return result; + } + + static class RandomRecordReader implements RecordReader { + Path name; + public RandomRecordReader(Path p) { + name = p; + } + public boolean next(Text key, Text value) { + if (name != null) { + key.set(name.getName()); + name = null; + return true; + } + return false; + } + public Text createKey() { + return new Text(); + } + public Text createValue() { + return new Text(); + } + public long getPos() { + return 0; + } + public void close() {} + public float getProgress() { + return 0.0f; + } + } + + public RecordReader getRecordReader(InputSplit split, + JobConf job, + Reporter reporter) + throws IOException { + return new RandomRecordReader(((FileSplit) split).getPath()); + } + } + + // Start a job and return its RunningJob object + static RunningJob runJob(JobConf conf, Path inDir, Path outDir) + throws IOException { + return runJob(conf, inDir, outDir, conf.getNumMapTasks(), conf.getNumReduceTasks()); + } + + // Start a job and return its RunningJob object + static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps, + int numReds) throws IOException { + + String input = "The quick brown fox\n" + "has many silly\n" + + "red fox sox\n"; + + // submit the job and wait for it to complete + return runJob(conf, inDir, outDir, numMaps, numReds, input); + } + + // Start a job with the specified input and return its RunningJob object + static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps, + int numReds, String input) throws IOException { + FileSystem fs = FileSystem.get(conf); + if (fs.exists(outDir)) { + fs.delete(outDir, true); + } + if (!fs.exists(inDir)) { + fs.mkdirs(inDir); + } + + for (int i = 0; i < numMaps; ++i) { + DataOutputStream file = fs.create(new Path(inDir, "part-" + i)); + file.writeBytes(input); + file.close(); + } + + conf.setInputFormat(TextInputFormat.class); + conf.setOutputKeyClass(LongWritable.class); + conf.setOutputValueClass(Text.class); + + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + conf.setNumMapTasks(numMaps); + conf.setNumReduceTasks(numReds); + + JobClient jobClient = new JobClient(conf); + RunningJob job = jobClient.submitJob(conf); + + return job; + } + + // Run a job that will be succeeded and wait until it completes + public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir) + throws IOException { + conf.setJobName("test-job-succeed"); + conf.setMapperClass(IdentityMapper.class); + conf.setReducerClass(IdentityReducer.class); + + RunningJob job = UtilsForTests.runJob(conf, inDir, outDir); + while (!job.isComplete()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + + return job; + } + + // Run a job that will be failed and wait until it completes + public static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir) + throws IOException { + conf.setJobName("test-job-fail"); + conf.setMapperClass(FailMapper.class); + conf.setReducerClass(IdentityReducer.class); + conf.setMaxMapAttempts(1); + + RunningJob job = UtilsForTests.runJob(conf, inDir, outDir); + while (!job.isComplete()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + + return job; + } + + // Run a job that will be killed and wait until it completes + public static RunningJob runJobKill(JobConf conf, Path inDir, Path outDir) + throws IOException { + + conf.setJobName("test-job-kill"); + conf.setMapperClass(KillMapper.class); + conf.setReducerClass(IdentityReducer.class); + + RunningJob job = UtilsForTests.runJob(conf, inDir, outDir); + while (job.getJobState() != JobStatus.RUNNING) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + job.killJob(); + while (job.cleanupProgress() == 0.0f) { + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + break; + } + } + + return job; + } + + /** + * Cleans up files/dirs inline. CleanupQueue deletes in a separate thread + * asynchronously. + */ + public static class InlineCleanupQueue extends CleanupQueue { + List stalePaths = new ArrayList(); + + public InlineCleanupQueue() { + // do nothing + } + + @Override + public void addToQueue(PathDeletionContext... contexts) { + // delete paths in-line + for (PathDeletionContext context : contexts) { + try { + if (!deletePath(context)) { + LOG.warn("Stale path " + context.fullPath); + stalePaths.add(context.fullPath); + } + } catch (IOException e) { + LOG.warn("Caught exception while deleting path " + + context.fullPath); + LOG.info(StringUtils.stringifyException(e)); + stalePaths.add(context.fullPath); + } + } + } + } + + static class FakeClock extends Clock { + long time = 0; + + public void advance(long millis) { + time += millis; + } + + @Override + long getTime() { + return time; + } + } + // Mapper that fails + static class FailMapper extends MapReduceBase implements + Mapper { + + public void map(WritableComparable key, Writable value, + OutputCollector out, Reporter reporter) + throws IOException { + //NOTE- the next line is required for the TestDebugScript test to succeed + System.err.println("failing map"); + throw new RuntimeException("failing map"); + } + } + + // Mapper that sleeps for a long time. + // Used for running a job that will be killed + static class KillMapper extends MapReduceBase implements + Mapper { + + public void map(WritableComparable key, Writable value, + OutputCollector out, Reporter reporter) + throws IOException { + + try { + Thread.sleep(1000000); + } catch (InterruptedException e) { + // Do nothing + } + } + } + + static void setUpConfigFile(Properties confProps, File configFile) + throws IOException { + Configuration config = new Configuration(false); + FileOutputStream fos = new FileOutputStream(configFile); + + for (Enumeration e = confProps.propertyNames(); e.hasMoreElements();) { + String key = (String) e.nextElement(); + config.set(key, confProps.getProperty(key)); + } + + config.writeXml(fos); + fos.close(); + } + + /** + * This creates a file in the dfs + * @param dfs FileSystem Local File System where file needs to be picked + * @param URIPATH Path dfs path where file needs to be copied + * @param permission FsPermission File permission + * @return returns the DataOutputStream + */ + public static DataOutputStream + createTmpFileDFS(FileSystem dfs, Path URIPATH, + FsPermission permission, String input) throws Exception { + //Creating the path with the file + DataOutputStream file = + FileSystem.create(dfs, URIPATH, permission); + file.writeBytes(input); + file.close(); + return file; + } + + /** + * This formats the long tasktracker name to just the FQDN + * @param taskTrackerLong String The long format of the tasktracker string + * @return String The FQDN of the tasktracker + * @throws Exception + */ + public static String getFQDNofTT (String taskTrackerLong) throws Exception { + //Getting the exact FQDN of the tasktracker from the tasktracker string. + String[] firstSplit = taskTrackerLong.split("_"); + String tmpOutput = firstSplit[1]; + String[] secondSplit = tmpOutput.split(":"); + String tmpTaskTracker = secondSplit[0]; + return tmpTaskTracker; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/WordCount.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/WordCount.java new file mode 100644 index 0000000000..60d2900192 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/WordCount.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * This is an example Hadoop Map/Reduce application. + * It reads the text input files, breaks each line into words + * and counts them. The output is a locally sorted list of words and the + * count of how often they occurred. + * + * To run: bin/hadoop jar build/hadoop-examples.jar wordcount + * [-m maps] [-r reduces] in-dir out-dir + */ +public class WordCount extends Configured implements Tool { + + /** + * Counts the words in each line. + * For each line of input, break the line into words and emit them as + * (word, 1). + */ + public static class MapClass extends MapReduceBase + implements Mapper { + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(LongWritable key, Text value, + OutputCollector output, + Reporter reporter) throws IOException { + String line = value.toString(); + StringTokenizer itr = new StringTokenizer(line); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + output.collect(word, one); + } + } + } + + /** + * A reducer class that just emits the sum of the input values. + */ + public static class Reduce extends MapReduceBase + implements Reducer { + + public void reduce(Text key, Iterator values, + OutputCollector output, + Reporter reporter) throws IOException { + int sum = 0; + while (values.hasNext()) { + sum += values.next().get(); + } + output.collect(key, new IntWritable(sum)); + } + } + + static int printUsage() { + System.out.println("wordcount [-m ] [-r ] "); + ToolRunner.printGenericCommandUsage(System.out); + return -1; + } + + /** + * The main driver for word count map/reduce program. + * Invoke this method to submit the map/reduce job. + * @throws IOException When there is communication problems with the + * job tracker. + */ + public int run(String[] args) throws Exception { + JobConf conf = new JobConf(getConf(), WordCount.class); + conf.setJobName("wordcount"); + + // the keys are words (strings) + conf.setOutputKeyClass(Text.class); + // the values are counts (ints) + conf.setOutputValueClass(IntWritable.class); + + conf.setMapperClass(MapClass.class); + conf.setCombinerClass(Reduce.class); + conf.setReducerClass(Reduce.class); + + List other_args = new ArrayList(); + for(int i=0; i < args.length; ++i) { + try { + if ("-m".equals(args[i])) { + conf.setNumMapTasks(Integer.parseInt(args[++i])); + } else if ("-r".equals(args[i])) { + conf.setNumReduceTasks(Integer.parseInt(args[++i])); + } else { + other_args.add(args[i]); + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); + return printUsage(); + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i-1]); + return printUsage(); + } + } + // Make sure there are exactly 2 parameters left. + if (other_args.size() != 2) { + System.out.println("ERROR: Wrong number of parameters: " + + other_args.size() + " instead of 2."); + return printUsage(); + } + FileInputFormat.setInputPaths(conf, other_args.get(0)); + FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); + + JobClient.runJob(conf); + return 0; + } + + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new WordCount(), args); + System.exit(res); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java new file mode 100644 index 0000000000..d160de5db6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.jobcontrol; + +import java.io.IOException; +import java.text.NumberFormat; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * Utility methods used in various Job Control unit tests. + */ +public class JobControlTestUtils { + + static private Random rand = new Random(); + + private static NumberFormat idFormat = NumberFormat.getInstance(); + + static { + idFormat.setMinimumIntegerDigits(4); + idFormat.setGroupingUsed(false); + } + + /** + * Cleans the data from the passed Path in the passed FileSystem. + * + * @param fs FileSystem to delete data from. + * @param dirPath Path to be deleted. + * @throws IOException If an error occurs cleaning the data. + */ + static void cleanData(FileSystem fs, Path dirPath) throws IOException { + fs.delete(dirPath, true); + } + + /** + * Generates a string of random digits. + * + * @return A random string. + */ + private static String generateRandomWord() { + return idFormat.format(rand.nextLong()); + } + + /** + * Generates a line of random text. + * + * @return A line of random text. + */ + private static String generateRandomLine() { + long r = rand.nextLong() % 7; + long n = r + 20; + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < n; i++) { + sb.append(generateRandomWord()).append(" "); + } + sb.append("\n"); + return sb.toString(); + } + + /** + * Generates data that can be used for Job Control tests. + * + * @param fs FileSystem to create data in. + * @param dirPath Path to create the data in. + * @throws IOException If an error occurs creating the data. + */ + static void generateData(FileSystem fs, Path dirPath) throws IOException { + FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt")); + for (int i = 0; i < 10000; i++) { + String line = generateRandomLine(); + out.write(line.getBytes("UTF-8")); + } + out.close(); + } + + /** + * Creates a simple copy job. + * + * @param indirs List of input directories. + * @param outdir Output directory. + * @return JobConf initialised for a simple copy job. + * @throws Exception If an error occurs creating job configuration. + */ + static JobConf createCopyJob(List indirs, Path outdir) throws Exception { + + Configuration defaults = new Configuration(); + JobConf theJob = new JobConf(defaults, TestJobControl.class); + theJob.setJobName("DataMoveJob"); + + FileInputFormat.setInputPaths(theJob, indirs.toArray(new Path[0])); + theJob.setMapperClass(DataCopy.class); + FileOutputFormat.setOutputPath(theJob, outdir); + theJob.setOutputKeyClass(Text.class); + theJob.setOutputValueClass(Text.class); + theJob.setReducerClass(DataCopy.class); + theJob.setNumMapTasks(12); + theJob.setNumReduceTasks(4); + return theJob; + } + + /** + * Simple Mapper and Reducer implementation which copies data it reads in. + */ + public static class DataCopy extends MapReduceBase implements + Mapper, Reducer { + public void map(LongWritable key, Text value, OutputCollector output, + Reporter reporter) throws IOException { + output.collect(new Text(key.toString()), value); + } + + public void reduce(Text key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + Text dumbKey = new Text(""); + while (values.hasNext()) { + Text data = values.next(); + output.collect(dumbKey, data); + } + } + } + +} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestChainMapReduce.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestChainMapReduce.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestChainMapReduce.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestChainMapReduce.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestChild.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestChild.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestChild.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestSingleElementChain.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/chain/TestSingleElementChain.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestSingleElementChain.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/chain/TestSingleElementChain.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java similarity index 100% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java