diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2389fda741..403ba142b8 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -132,6 +132,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3169 amendment. Deprecate MiniMRCluster. (Ahmed Radwan via
sseth)
+ MAPREDUCE-3369. Migrate MR1 tests to run on MR2 using the new interfaces
+ introduced in MAPREDUCE-3169. (Ahmed Radwan via tomwhite)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
similarity index 87%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
index 4daf90ddce..d91754d71d 100644
--- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
@@ -59,19 +59,6 @@ public void testNoDefaults() throws Exception {
JobConf conf = new JobConf(false);
- //seeding JT and NN info into non-defaults (empty jobconf)
- String jobTrackerAddress = createJobConf().get(JTConfig.JT_IPC_ADDRESS);
- if (jobTrackerAddress == null) {
- jobTrackerAddress = "local";
- }
- conf.set(JTConfig.JT_IPC_ADDRESS, jobTrackerAddress);
- if (jobTrackerAddress == "local") {
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
- }
- else {
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
- }
-
conf.set("fs.default.name", createJobConf().get("fs.default.name"));
conf.setJobName("mr");
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java
new file mode 100644
index 0000000000..c102e8f862
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Abstract Test case class to run MR in local or cluster mode and in local FS
+ * or DFS.
+ *
+ * The Hadoop instance is started and stopped on each test method.
+ *
+ * If using DFS the filesystem is reformated at each start (test method).
+ *
+ * Job Configurations should be created using a configuration returned by the
+ * 'createJobConf()' method.
+ */
+public abstract class HadoopTestCase extends TestCase {
+ public static final int LOCAL_MR = 1;
+ public static final int CLUSTER_MR = 2;
+ public static final int LOCAL_FS = 4;
+ public static final int DFS_FS = 8;
+
+ private boolean localMR;
+ private boolean localFS;
+
+ private int taskTrackers;
+ private int dataNodes;
+
+ /**
+ * Creates a testcase for local or cluster MR using DFS.
+ *
+ * The DFS will be formatted regardless if there was one or not before in the
+ * given location.
+ *
+ * @param mrMode indicates if the MR should be local (LOCAL_MR) or cluster
+ * (CLUSTER_MR)
+ * @param fsMode indicates if the FS should be local (LOCAL_FS) or DFS (DFS_FS)
+ *
+ * local FS when using relative PATHs)
+ *
+ * @param taskTrackers number of task trackers to start when using cluster
+ *
+ * @param dataNodes number of data nodes to start when using DFS
+ *
+ * @throws IOException thrown if the base directory cannot be set.
+ */
+ public HadoopTestCase(int mrMode, int fsMode, int taskTrackers, int dataNodes)
+ throws IOException {
+ if (mrMode != LOCAL_MR && mrMode != CLUSTER_MR) {
+ throw new IllegalArgumentException(
+ "Invalid MapRed mode, must be LOCAL_MR or CLUSTER_MR");
+ }
+ if (fsMode != LOCAL_FS && fsMode != DFS_FS) {
+ throw new IllegalArgumentException(
+ "Invalid FileSystem mode, must be LOCAL_FS or DFS_FS");
+ }
+ if (taskTrackers < 1) {
+ throw new IllegalArgumentException(
+ "Invalid taskTrackers value, must be greater than 0");
+ }
+ if (dataNodes < 1) {
+ throw new IllegalArgumentException(
+ "Invalid dataNodes value, must be greater than 0");
+ }
+ localMR = (mrMode == LOCAL_MR);
+ localFS = (fsMode == LOCAL_FS);
+ /*
+ JobConf conf = new JobConf();
+ fsRoot = conf.get("hadoop.tmp.dir");
+
+ if (fsRoot == null) {
+ throw new IllegalArgumentException(
+ "hadoop.tmp.dir is not defined");
+ }
+
+ fsRoot = fsRoot.replace(' ', '+') + "/fs";
+
+ File file = new File(fsRoot);
+ if (!file.exists()) {
+ if (!file.mkdirs()) {
+ throw new RuntimeException("Could not create FS base path: " + file);
+ }
+ }
+ */
+ this.taskTrackers = taskTrackers;
+ this.dataNodes = dataNodes;
+ }
+
+ /**
+ * Indicates if the MR is running in local or cluster mode.
+ *
+ * @return returns TRUE if the MR is running locally, FALSE if running in
+ * cluster mode.
+ */
+ public boolean isLocalMR() {
+ return localMR;
+ }
+
+ /**
+ * Indicates if the filesystem is local or DFS.
+ *
+ * @return returns TRUE if the filesystem is local, FALSE if it is DFS.
+ */
+ public boolean isLocalFS() {
+ return localFS;
+ }
+
+
+ private MiniDFSCluster dfsCluster = null;
+ private MiniMRCluster mrCluster = null;
+ private FileSystem fileSystem = null;
+
+ /**
+ * Creates Hadoop instance based on constructor configuration before
+ * a test case is run.
+ *
+ * @throws Exception
+ */
+ protected void setUp() throws Exception {
+ super.setUp();
+ if (localFS) {
+ fileSystem = FileSystem.getLocal(new JobConf());
+ }
+ else {
+ dfsCluster = new MiniDFSCluster(new JobConf(), dataNodes, true, null);
+ fileSystem = dfsCluster.getFileSystem();
+ }
+ if (localMR) {
+ }
+ else {
+ //noinspection deprecation
+ mrCluster = new MiniMRCluster(taskTrackers, fileSystem.getUri().toString(), 1);
+ }
+ }
+
+ /**
+ * Destroys Hadoop instance based on constructor configuration after
+ * a test case is run.
+ *
+ * @throws Exception
+ */
+ protected void tearDown() throws Exception {
+ try {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ }
+ catch (Exception ex) {
+ System.out.println(ex);
+ }
+ try {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
+ catch (Exception ex) {
+ System.out.println(ex);
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Returns the Filesystem in use.
+ *
+ * TestCases should use this Filesystem as it
+ * is properly configured with the workingDir for relative PATHs.
+ *
+ * @return the filesystem used by Hadoop.
+ */
+ protected FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ /**
+ * Returns a job configuration preconfigured to run against the Hadoop
+ * managed by the testcase.
+ * @return configuration that works on the testcase Hadoop instance
+ */
+ protected JobConf createJobConf() {
+ if (localMR) {
+ JobConf conf = new JobConf();
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+ return conf;
+ }
+ else {
+ return mrCluster.createJobConf();
+ }
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java
new file mode 100644
index 0000000000..026edfbddb
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.ServletException;
+import java.io.IOException;
+import java.io.DataOutputStream;
+
+/**
+ * Base class to test Job end notification in local and cluster mode.
+ *
+ * Starts up hadoop on Local or Cluster mode (by extending of the
+ * HadoopTestCase class) and it starts a servlet engine that hosts
+ * a servlet that will receive the notification of job finalization.
+ *
+ * The notification servlet returns a HTTP 400 the first time is called
+ * and a HTTP 200 the second time, thus testing retry.
+ *
+ * In both cases local file system is used (this is irrelevant for
+ * the tested functionality)
+ *
+ *
+ */
+public abstract class NotificationTestCase extends HadoopTestCase {
+
+ protected NotificationTestCase(int mode) throws IOException {
+ super(mode, HadoopTestCase.LOCAL_FS, 1, 1);
+ }
+
+ private int port;
+ private String contextPath = "/notification";
+ private String servletPath = "/mapred";
+ private Server webServer;
+
+ private void startHttpServer() throws Exception {
+
+ // Create the webServer
+ if (webServer != null) {
+ webServer.stop();
+ webServer = null;
+ }
+ webServer = new Server(0);
+
+ Context context = new Context(webServer, contextPath);
+
+ // create servlet handler
+ context.addServlet(new ServletHolder(new NotificationServlet()),
+ servletPath);
+
+ // Start webServer
+ webServer.start();
+ port = webServer.getConnectors()[0].getLocalPort();
+
+ }
+
+ private void stopHttpServer() throws Exception {
+ if (webServer != null) {
+ webServer.stop();
+ webServer.destroy();
+ webServer = null;
+ }
+ }
+
+ public static class NotificationServlet extends HttpServlet {
+ public static int counter = 0;
+ private static final long serialVersionUID = 1L;
+
+ protected void doGet(HttpServletRequest req, HttpServletResponse res)
+ throws ServletException, IOException {
+ switch (counter) {
+ case 0:
+ {
+ assertTrue(req.getQueryString().contains("SUCCEEDED"));
+ }
+ break;
+ case 2:
+ {
+ assertTrue(req.getQueryString().contains("KILLED"));
+ }
+ break;
+ case 4:
+ {
+ assertTrue(req.getQueryString().contains("FAILED"));
+ }
+ break;
+ }
+ if (counter % 2 == 0) {
+ res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error");
+ }
+ else {
+ res.setStatus(HttpServletResponse.SC_OK);
+ }
+ counter++;
+ }
+ }
+
+ private String getNotificationUrlTemplate() {
+ return "http://localhost:" + port + contextPath + servletPath +
+ "?jobId=$jobId&jobStatus=$jobStatus";
+ }
+
+ protected JobConf createJobConf() {
+ JobConf conf = super.createJobConf();
+ conf.setJobEndNotificationURI(getNotificationUrlTemplate());
+ conf.setInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 3);
+ conf.setInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 200);
+ return conf;
+ }
+
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ startHttpServer();
+ }
+
+ protected void tearDown() throws Exception {
+ stopHttpServer();
+ super.tearDown();
+ }
+
+ public void testMR() throws Exception {
+ System.out.println(launchWordCount(this.createJobConf(),
+ "a b c d e f g h", 1, 1));
+ Thread.sleep(2000);
+ assertEquals(2, NotificationServlet.counter);
+
+ Path inDir = new Path("notificationjob/input");
+ Path outDir = new Path("notificationjob/output");
+
+ // Hack for local FS that does not have the concept of a 'mounting point'
+ if (isLocalFS()) {
+ String localPathRoot = System.getProperty("test.build.data","/tmp")
+ .toString().replace(' ', '+');;
+ inDir = new Path(localPathRoot, inDir);
+ outDir = new Path(localPathRoot, outDir);
+ }
+
+ // run a job with KILLED status
+ System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
+ outDir).getID());
+ Thread.sleep(2000);
+ assertEquals(4, NotificationServlet.counter);
+
+ // run a job with FAILED status
+ System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
+ outDir).getID());
+ Thread.sleep(2000);
+ assertEquals(6, NotificationServlet.counter);
+ }
+
+ private String launchWordCount(JobConf conf,
+ String input,
+ int numMaps,
+ int numReduces) throws IOException {
+ Path inDir = new Path("testing/wc/input");
+ Path outDir = new Path("testing/wc/output");
+
+ // Hack for local FS that does not have the concept of a 'mounting point'
+ if (isLocalFS()) {
+ String localPathRoot = System.getProperty("test.build.data","/tmp")
+ .toString().replace(' ', '+');;
+ inDir = new Path(localPathRoot, inDir);
+ outDir = new Path(localPathRoot, outDir);
+ }
+
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(outDir, true);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ {
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+ }
+ conf.setJobName("wordcount");
+ conf.setInputFormat(TextInputFormat.class);
+
+ // the keys are words (strings)
+ conf.setOutputKeyClass(Text.class);
+ // the values are counts (ints)
+ conf.setOutputValueClass(IntWritable.class);
+
+ conf.setMapperClass(WordCount.MapClass.class);
+ conf.setCombinerClass(WordCount.Reduce.class);
+ conf.setReducerClass(WordCount.Reduce.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(numMaps);
+ conf.setNumReduceTasks(numReduces);
+ JobClient.runJob(conf);
+ return MapReduceTestUtil.readOutput(outDir, conf);
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java
new file mode 100644
index 0000000000..381d42e238
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.net.URI;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.fs.*;
+
+/**
+ * A set of utilities to validate the sort of the map-reduce framework.
+ * This utility program has 2 main parts:
+ * 1. Checking the records' statistics
+ * a) Validates the no. of bytes and records in sort's input & output.
+ * b) Validates the xor of the md5's of each key/value pair.
+ * c) Ensures same key/value is present in both input and output.
+ * 2. Check individual records to ensure each record is present in both
+ * the input and the output of the sort (expensive on large data-sets).
+ *
+ * To run: bin/hadoop jar build/hadoop-examples.jar sortvalidate
+ * [-m maps] [-r reduces] [-deep]
+ * -sortInput sort-in-dir -sortOutput sort-out-dir
+ */
+public class SortValidator extends Configured implements Tool {
+
+ static private final IntWritable sortInput = new IntWritable(1);
+ static private final IntWritable sortOutput = new IntWritable(2);
+ static public String SORT_REDUCES =
+ "mapreduce.sortvalidator.sort.reduce.tasks";
+ static public String MAPS_PER_HOST = "mapreduce.sortvalidator.mapsperhost";
+ static public String REDUCES_PER_HOST =
+ "mapreduce.sortvalidator.reducesperhost";
+ static void printUsage() {
+ System.err.println("sortvalidate [-m ] [-r ] [-deep] " +
+ "-sortInput -sortOutput ");
+ System.exit(1);
+ }
+
+ static private IntWritable deduceInputFile(JobConf job) {
+ Path[] inputPaths = FileInputFormat.getInputPaths(job);
+ Path inputFile = new Path(job.get(JobContext.MAP_INPUT_FILE));
+
+ // value == one for sort-input; value == two for sort-output
+ return (inputFile.getParent().equals(inputPaths[0])) ?
+ sortInput : sortOutput;
+ }
+
+ static private byte[] pair(BytesWritable a, BytesWritable b) {
+ byte[] pairData = new byte[a.getLength()+ b.getLength()];
+ System.arraycopy(a.getBytes(), 0, pairData, 0, a.getLength());
+ System.arraycopy(b.getBytes(), 0, pairData, a.getLength(), b.getLength());
+ return pairData;
+ }
+
+ private static final PathFilter sortPathsFilter = new PathFilter() {
+ public boolean accept(Path path) {
+ return (path.getName().startsWith("part-"));
+ }
+ };
+
+ /**
+ * A simple map-reduce job which checks consistency of the
+ * MapReduce framework's sort by checking:
+ * a) Records are sorted correctly
+ * b) Keys are partitioned correctly
+ * c) The input and output have same no. of bytes and records.
+ * d) The input and output have the correct 'checksum' by xor'ing
+ * the md5 of each record.
+ *
+ */
+ public static class RecordStatsChecker {
+
+ /**
+ * Generic way to get raw data from a {@link Writable}.
+ */
+ static class Raw {
+ /**
+ * Get raw data bytes from a {@link Writable}
+ * @param writable {@link Writable} object from whom to get the raw data
+ * @return raw data of the writable
+ */
+ public byte[] getRawBytes(Writable writable) {
+ return writable.toString().getBytes();
+ }
+
+ /**
+ * Get number of raw data bytes of the {@link Writable}
+ * @param writable {@link Writable} object from whom to get the raw data
+ * length
+ * @return number of raw data bytes
+ */
+ public int getRawBytesLength(Writable writable) {
+ return writable.toString().getBytes().length;
+ }
+ }
+
+ /**
+ * Specialization of {@link Raw} for {@link BytesWritable}.
+ */
+ static class RawBytesWritable extends Raw {
+ public byte[] getRawBytes(Writable bw) {
+ return ((BytesWritable)bw).getBytes();
+ }
+ public int getRawBytesLength(Writable bw) {
+ return ((BytesWritable)bw).getLength();
+ }
+ }
+
+ /**
+ * Specialization of {@link Raw} for {@link Text}.
+ */
+ static class RawText extends Raw {
+ public byte[] getRawBytes(Writable text) {
+ return ((Text)text).getBytes();
+ }
+ public int getRawBytesLength(Writable text) {
+ return ((Text)text).getLength();
+ }
+ }
+
+ private static Raw createRaw(Class rawClass) {
+ if (rawClass == Text.class) {
+ return new RawText();
+ } else if (rawClass == BytesWritable.class) {
+ System.err.println("Returning " + RawBytesWritable.class);
+ return new RawBytesWritable();
+ }
+ return new Raw();
+ }
+
+ public static class RecordStatsWritable implements Writable {
+ private long bytes = 0;
+ private long records = 0;
+ private int checksum = 0;
+
+ public RecordStatsWritable() {}
+
+ public RecordStatsWritable(long bytes, long records, int checksum) {
+ this.bytes = bytes;
+ this.records = records;
+ this.checksum = checksum;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVLong(out, bytes);
+ WritableUtils.writeVLong(out, records);
+ WritableUtils.writeVInt(out, checksum);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ bytes = WritableUtils.readVLong(in);
+ records = WritableUtils.readVLong(in);
+ checksum = WritableUtils.readVInt(in);
+ }
+
+ public long getBytes() { return bytes; }
+ public long getRecords() { return records; }
+ public int getChecksum() { return checksum; }
+ }
+
+ public static class Map extends MapReduceBase
+ implements Mapper {
+
+ private IntWritable key = null;
+ private WritableComparable prevKey = null;
+ private Class extends WritableComparable> keyClass;
+ private Partitioner partitioner = null;
+ private int partition = -1;
+ private int noSortReducers = -1;
+ private long recordId = -1;
+
+ private Raw rawKey;
+ private Raw rawValue;
+
+ public void configure(JobConf job) {
+ // 'key' == sortInput for sort-input; key == sortOutput for sort-output
+ key = deduceInputFile(job);
+
+ if (key == sortOutput) {
+ partitioner = new HashPartitioner();
+
+ // Figure the 'current' partition and no. of reduces of the 'sort'
+ try {
+ URI inputURI = new URI(job.get(JobContext.MAP_INPUT_FILE));
+ String inputFile = inputURI.getPath();
+ // part file is of the form part-r-xxxxx
+ partition = Integer.valueOf(inputFile.substring(
+ inputFile.lastIndexOf("part") + 7)).intValue();
+ noSortReducers = job.getInt(SORT_REDUCES, -1);
+ } catch (Exception e) {
+ System.err.println("Caught: " + e);
+ System.exit(-1);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void map(WritableComparable key, Writable value,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ // Set up rawKey and rawValue on the first call to 'map'
+ if (recordId == -1) {
+ rawKey = createRaw(key.getClass());
+ rawValue = createRaw(value.getClass());
+ }
+ ++recordId;
+
+ if (this.key == sortOutput) {
+ // Check if keys are 'sorted' if this
+ // record is from sort's output
+ if (prevKey == null) {
+ prevKey = key;
+ keyClass = prevKey.getClass();
+ } else {
+ // Sanity check
+ if (keyClass != key.getClass()) {
+ throw new IOException("Type mismatch in key: expected " +
+ keyClass.getName() + ", received " +
+ key.getClass().getName());
+ }
+
+ // Check if they were sorted correctly
+ if (prevKey.compareTo(key) > 0) {
+ throw new IOException("The 'map-reduce' framework wrongly" +
+ " classifed (" + prevKey + ") > (" +
+ key + ") "+ "for record# " + recordId);
+ }
+ prevKey = key;
+ }
+
+ // Check if the sorted output is 'partitioned' right
+ int keyPartition =
+ partitioner.getPartition(key, value, noSortReducers);
+ if (partition != keyPartition) {
+ throw new IOException("Partitions do not match for record# " +
+ recordId + " ! - '" + partition + "' v/s '" +
+ keyPartition + "'");
+ }
+ }
+
+ // Construct the record-stats and output (this.key, record-stats)
+ byte[] keyBytes = rawKey.getRawBytes(key);
+ int keyBytesLen = rawKey.getRawBytesLength(key);
+ byte[] valueBytes = rawValue.getRawBytes(value);
+ int valueBytesLen = rawValue.getRawBytesLength(value);
+
+ int keyValueChecksum =
+ (WritableComparator.hashBytes(keyBytes, keyBytesLen) ^
+ WritableComparator.hashBytes(valueBytes, valueBytesLen));
+
+ output.collect(this.key,
+ new RecordStatsWritable((keyBytesLen+valueBytesLen),
+ 1, keyValueChecksum)
+ );
+ }
+
+ }
+
+ public static class Reduce extends MapReduceBase
+ implements Reducer {
+
+ public void reduce(IntWritable key, Iterator values,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ long bytes = 0;
+ long records = 0;
+ int xor = 0;
+ while (values.hasNext()) {
+ RecordStatsWritable stats = values.next();
+ bytes += stats.getBytes();
+ records += stats.getRecords();
+ xor ^= stats.getChecksum();
+ }
+
+ output.collect(key, new RecordStatsWritable(bytes, records, xor));
+ }
+ }
+
+ public static class NonSplitableSequenceFileInputFormat
+ extends SequenceFileInputFormat {
+ protected boolean isSplitable(FileSystem fs, Path filename) {
+ return false;
+ }
+ }
+
+ static void checkRecords(Configuration defaults,
+ Path sortInput, Path sortOutput) throws IOException {
+ FileSystem inputfs = sortInput.getFileSystem(defaults);
+ FileSystem outputfs = sortOutput.getFileSystem(defaults);
+ FileSystem defaultfs = FileSystem.get(defaults);
+ JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class);
+ jobConf.setJobName("sortvalidate-recordstats-checker");
+
+ int noSortReduceTasks =
+ outputfs.listStatus(sortOutput, sortPathsFilter).length;
+ jobConf.setInt(SORT_REDUCES, noSortReduceTasks);
+ int noSortInputpaths = inputfs.listStatus(sortInput).length;
+
+ jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ jobConf.setOutputKeyClass(IntWritable.class);
+ jobConf.setOutputValueClass(RecordStatsChecker.RecordStatsWritable.class);
+
+ jobConf.setMapperClass(Map.class);
+ jobConf.setCombinerClass(Reduce.class);
+ jobConf.setReducerClass(Reduce.class);
+
+ jobConf.setNumMapTasks(noSortReduceTasks);
+ jobConf.setNumReduceTasks(1);
+
+ FileInputFormat.setInputPaths(jobConf, sortInput);
+ FileInputFormat.addInputPath(jobConf, sortOutput);
+ Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
+ if (defaultfs.exists(outputPath)) {
+ defaultfs.delete(outputPath, true);
+ }
+ FileOutputFormat.setOutputPath(jobConf, outputPath);
+
+ // Uncomment to run locally in a single process
+ //job_conf.set(JTConfig.JT, "local");
+ Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
+ System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " +
+ "from " + inputPaths[0] + " (" +
+ noSortInputpaths + " files), " +
+ inputPaths[1] + " (" +
+ noSortReduceTasks +
+ " files) into " +
+ FileOutputFormat.getOutputPath(jobConf) +
+ " with 1 reducer.");
+ Date startTime = new Date();
+ System.out.println("Job started: " + startTime);
+ JobClient.runJob(jobConf);
+ Date end_time = new Date();
+ System.out.println("Job ended: " + end_time);
+ System.out.println("The job took " +
+ (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+
+ // Check to ensure that the statistics of the
+ // framework's sort-input and sort-output match
+ SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
+ new Path(outputPath, "part-00000"), defaults);
+ IntWritable k1 = new IntWritable();
+ IntWritable k2 = new IntWritable();
+ RecordStatsWritable v1 = new RecordStatsWritable();
+ RecordStatsWritable v2 = new RecordStatsWritable();
+ if (!stats.next(k1, v1)) {
+ throw new IOException("Failed to read record #1 from reduce's output");
+ }
+ if (!stats.next(k2, v2)) {
+ throw new IOException("Failed to read record #2 from reduce's output");
+ }
+
+ if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) ||
+ v1.getChecksum() != v2.getChecksum()) {
+ throw new IOException("(" +
+ v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
+ v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
+ }
+ }
+
+ }
+
+ /**
+ * A simple map-reduce task to check if the input and the output
+ * of the framework's sort is consistent by ensuring each record
+ * is present in both the input and the output.
+ *
+ */
+ public static class RecordChecker {
+
+ public static class Map extends MapReduceBase
+ implements Mapper {
+
+ private IntWritable value = null;
+
+ public void configure(JobConf job) {
+ // value == one for sort-input; value == two for sort-output
+ value = deduceInputFile(job);
+ }
+
+ public void map(BytesWritable key,
+ BytesWritable value,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ // newKey = (key, value)
+ BytesWritable keyValue = new BytesWritable(pair(key, value));
+
+ // output (newKey, value)
+ output.collect(keyValue, this.value);
+ }
+ }
+
+ public static class Reduce extends MapReduceBase
+ implements Reducer {
+
+ public void reduce(BytesWritable key, Iterator values,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ int ones = 0;
+ int twos = 0;
+ while (values.hasNext()) {
+ IntWritable count = values.next();
+ if (count.equals(sortInput)) {
+ ++ones;
+ } else if (count.equals(sortOutput)) {
+ ++twos;
+ } else {
+ throw new IOException("Invalid 'value' of " + count.get() +
+ " for (key,value): " + key.toString());
+ }
+ }
+
+ // Check to ensure there are equal no. of ones and twos
+ if (ones != twos) {
+ throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos +
+ ") for (key, value): " + key.toString());
+ }
+ }
+ }
+
+ static void checkRecords(Configuration defaults, int noMaps, int noReduces,
+ Path sortInput, Path sortOutput) throws IOException {
+ JobConf jobConf = new JobConf(defaults, RecordChecker.class);
+ jobConf.setJobName("sortvalidate-record-checker");
+
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ jobConf.setOutputKeyClass(BytesWritable.class);
+ jobConf.setOutputValueClass(IntWritable.class);
+
+ jobConf.setMapperClass(Map.class);
+ jobConf.setReducerClass(Reduce.class);
+
+ JobClient client = new JobClient(jobConf);
+ ClusterStatus cluster = client.getClusterStatus();
+ if (noMaps == -1) {
+ noMaps = cluster.getTaskTrackers() *
+ jobConf.getInt(MAPS_PER_HOST, 10);
+ }
+ if (noReduces == -1) {
+ noReduces = (int) (cluster.getMaxReduceTasks() * 0.9);
+ String sortReduces = jobConf.get(REDUCES_PER_HOST);
+ if (sortReduces != null) {
+ noReduces = cluster.getTaskTrackers() *
+ Integer.parseInt(sortReduces);
+ }
+ }
+ jobConf.setNumMapTasks(noMaps);
+ jobConf.setNumReduceTasks(noReduces);
+
+ FileInputFormat.setInputPaths(jobConf, sortInput);
+ FileInputFormat.addInputPath(jobConf, sortOutput);
+ Path outputPath = new Path("/tmp/sortvalidate/recordchecker");
+ FileSystem fs = FileSystem.get(defaults);
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+ FileOutputFormat.setOutputPath(jobConf, outputPath);
+
+ // Uncomment to run locally in a single process
+ //job_conf.set(JTConfig.JT, "local");
+ Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
+ System.out.println("\nSortValidator.RecordChecker: Running on " +
+ cluster.getTaskTrackers() +
+ " nodes to validate sort from " +
+ inputPaths[0] + ", " +
+ inputPaths[1] + " into " +
+ FileOutputFormat.getOutputPath(jobConf) +
+ " with " + noReduces + " reduces.");
+ Date startTime = new Date();
+ System.out.println("Job started: " + startTime);
+ JobClient.runJob(jobConf);
+ Date end_time = new Date();
+ System.out.println("Job ended: " + end_time);
+ System.out.println("The job took " +
+ (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+ }
+ }
+
+
+ /**
+ * The main driver for sort-validator program.
+ * Invoke this method to submit the map/reduce job.
+ * @throws IOException When there is communication problems with the
+ * job tracker.
+ */
+ public int run(String[] args) throws Exception {
+ Configuration defaults = getConf();
+
+ int noMaps = -1, noReduces = -1;
+ Path sortInput = null, sortOutput = null;
+ boolean deepTest = false;
+ for(int i=0; i < args.length; ++i) {
+ try {
+ if ("-m".equals(args[i])) {
+ noMaps = Integer.parseInt(args[++i]);
+ } else if ("-r".equals(args[i])) {
+ noReduces = Integer.parseInt(args[++i]);
+ } else if ("-sortInput".equals(args[i])){
+ sortInput = new Path(args[++i]);
+ } else if ("-sortOutput".equals(args[i])){
+ sortOutput = new Path(args[++i]);
+ } else if ("-deep".equals(args[i])) {
+ deepTest = true;
+ } else {
+ printUsage();
+ return -1;
+ }
+ } catch (NumberFormatException except) {
+ System.err.println("ERROR: Integer expected instead of " + args[i]);
+ printUsage();
+ return -1;
+ } catch (ArrayIndexOutOfBoundsException except) {
+ System.err.println("ERROR: Required parameter missing from " +
+ args[i-1]);
+ printUsage();
+ return -1;
+ }
+ }
+
+ // Sanity check
+ if (sortInput == null || sortOutput == null) {
+ printUsage();
+ return -2;
+ }
+
+ // Check if the records are consistent and sorted correctly
+ RecordStatsChecker.checkRecords(defaults, sortInput, sortOutput);
+
+ // Check if the same records are present in sort's inputs & outputs
+ if (deepTest) {
+ RecordChecker.checkRecords(defaults, noMaps, noReduces, sortInput,
+ sortOutput);
+ }
+
+ System.out.println("\nSUCCESS! Validated the MapReduce framework's 'sort'" +
+ " successfully.");
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new SortValidator(), args);
+ System.exit(res);
+ }
+}
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputFormat.java
similarity index 100%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputFormat.java
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
similarity index 100%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLazyOutput.java
similarity index 100%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLazyOutput.java
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLocalMRNotification.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalMRNotification.java
similarity index 100%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLocalMRNotification.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalMRNotification.java
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
similarity index 93%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
index 5e510094ce..1b93377fd1 100644
--- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
@@ -48,7 +48,6 @@ public class TestSpecialCharactersInOutputPath extends TestCase {
private static final String OUTPUT_FILENAME = "result[0]";
public static boolean launchJob(URI fileSys,
- String jobTracker,
JobConf conf,
int numMaps,
int numReduces) throws IOException {
@@ -68,8 +67,6 @@ public static boolean launchJob(URI fileSys,
// use WordCount example
FileSystem.setDefaultUri(conf, fileSys);
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
- conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
conf.setJobName("foo");
conf.setInputFormat(TextInputFormat.class);
@@ -113,11 +110,9 @@ public void testJobWithDFS() throws IOException {
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 2);
- final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
JobConf jobConf = new JobConf();
boolean result;
- result = launchJob(fileSys.getUri(), jobTrackerName, jobConf,
- 3, 1);
+ result = launchJob(fileSys.getUri(), jobConf, 3, 1);
assertTrue(result);
} finally {
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
similarity index 100%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
new file mode 100644
index 0000000000..17995603d0
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
@@ -0,0 +1,787 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.commons.logging.Log;
+
+/**
+ * Utilities used in unit test.
+ *
+ */
+public class UtilsForTests {
+
+ static final Log LOG = LogFactory.getLog(UtilsForTests.class);
+
+ final static long KB = 1024L * 1;
+ final static long MB = 1024L * KB;
+ final static long GB = 1024L * MB;
+ final static long TB = 1024L * GB;
+ final static long PB = 1024L * TB;
+ final static Object waitLock = new Object();
+
+ static DecimalFormat dfm = new DecimalFormat("####.000");
+ static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
+
+ public static String dfmt(double d) {
+ return dfm.format(d);
+ }
+
+ public static String ifmt(double d) {
+ return ifm.format(d);
+ }
+
+ public static String formatBytes(long numBytes) {
+ StringBuffer buf = new StringBuffer();
+ boolean bDetails = true;
+ double num = numBytes;
+
+ if (numBytes < KB) {
+ buf.append(numBytes + " B");
+ bDetails = false;
+ } else if (numBytes < MB) {
+ buf.append(dfmt(num / KB) + " KB");
+ } else if (numBytes < GB) {
+ buf.append(dfmt(num / MB) + " MB");
+ } else if (numBytes < TB) {
+ buf.append(dfmt(num / GB) + " GB");
+ } else if (numBytes < PB) {
+ buf.append(dfmt(num / TB) + " TB");
+ } else {
+ buf.append(dfmt(num / PB) + " PB");
+ }
+ if (bDetails) {
+ buf.append(" (" + ifmt(numBytes) + " bytes)");
+ }
+ return buf.toString();
+ }
+
+ public static String formatBytes2(long numBytes) {
+ StringBuffer buf = new StringBuffer();
+ long u = 0;
+ if (numBytes >= TB) {
+ u = numBytes / TB;
+ numBytes -= u * TB;
+ buf.append(u + " TB ");
+ }
+ if (numBytes >= GB) {
+ u = numBytes / GB;
+ numBytes -= u * GB;
+ buf.append(u + " GB ");
+ }
+ if (numBytes >= MB) {
+ u = numBytes / MB;
+ numBytes -= u * MB;
+ buf.append(u + " MB ");
+ }
+ if (numBytes >= KB) {
+ u = numBytes / KB;
+ numBytes -= u * KB;
+ buf.append(u + " KB ");
+ }
+ buf.append(u + " B"); //even if zero
+ return buf.toString();
+ }
+
+ static final String regexpSpecials = "[]()?*+|.!^-\\~@";
+
+ public static String regexpEscape(String plain) {
+ StringBuffer buf = new StringBuffer();
+ char[] ch = plain.toCharArray();
+ int csup = ch.length;
+ for (int c = 0; c < csup; c++) {
+ if (regexpSpecials.indexOf(ch[c]) != -1) {
+ buf.append("\\");
+ }
+ buf.append(ch[c]);
+ }
+ return buf.toString();
+ }
+
+ public static String safeGetCanonicalPath(File f) {
+ try {
+ String s = f.getCanonicalPath();
+ return (s == null) ? f.toString() : s;
+ } catch (IOException io) {
+ return f.toString();
+ }
+ }
+
+ public static String slurp(File f) throws IOException {
+ int len = (int) f.length();
+ byte[] buf = new byte[len];
+ FileInputStream in = new FileInputStream(f);
+ String contents = null;
+ try {
+ in.read(buf, 0, len);
+ contents = new String(buf, "UTF-8");
+ } finally {
+ in.close();
+ }
+ return contents;
+ }
+
+ public static String slurpHadoop(Path p, FileSystem fs) throws IOException {
+ int len = (int) fs.getFileStatus(p).getLen();
+ byte[] buf = new byte[len];
+ InputStream in = fs.open(p);
+ String contents = null;
+ try {
+ in.read(buf, 0, len);
+ contents = new String(buf, "UTF-8");
+ } finally {
+ in.close();
+ }
+ return contents;
+ }
+
+ public static String rjustify(String s, int width) {
+ if (s == null) s = "null";
+ if (width > s.length()) {
+ s = getSpace(width - s.length()) + s;
+ }
+ return s;
+ }
+
+ public static String ljustify(String s, int width) {
+ if (s == null) s = "null";
+ if (width > s.length()) {
+ s = s + getSpace(width - s.length());
+ }
+ return s;
+ }
+
+ static char[] space;
+ static {
+ space = new char[300];
+ Arrays.fill(space, '\u0020');
+ }
+
+ public static String getSpace(int len) {
+ if (len > space.length) {
+ space = new char[Math.max(len, 2 * space.length)];
+ Arrays.fill(space, '\u0020');
+ }
+ return new String(space, 0, len);
+ }
+
+ /**
+ * Gets job status from the jobtracker given the jobclient and the job id
+ */
+ static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException {
+ JobStatus[] statuses = jc.getAllJobs();
+ for (JobStatus jobStatus : statuses) {
+ if (jobStatus.getJobID().equals(id)) {
+ return jobStatus;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * A utility that waits for specified amount of time
+ */
+ public static void waitFor(long duration) {
+ try {
+ synchronized (waitLock) {
+ waitLock.wait(duration);
+ }
+ } catch (InterruptedException ie) {}
+ }
+
+ /**
+ * Wait for the jobtracker to be RUNNING.
+ */
+ static void waitForJobTracker(JobClient jobClient) {
+ while (true) {
+ try {
+ ClusterStatus status = jobClient.getClusterStatus();
+ while (status.getJobTrackerStatus() != JobTrackerStatus.RUNNING) {
+ waitFor(100);
+ status = jobClient.getClusterStatus();
+ }
+ break; // means that the jt is ready
+ } catch (IOException ioe) {}
+ }
+ }
+
+ /**
+ * Waits until all the jobs at the jobtracker complete.
+ */
+ static void waitTillDone(JobClient jobClient) throws IOException {
+ // Wait for the last job to complete
+ while (true) {
+ boolean shouldWait = false;
+ for (JobStatus jobStatuses : jobClient.getAllJobs()) {
+ if (jobStatuses.getRunState() != JobStatus.SUCCEEDED
+ && jobStatuses.getRunState() != JobStatus.FAILED
+ && jobStatuses.getRunState() != JobStatus.KILLED) {
+ shouldWait = true;
+ break;
+ }
+ }
+ if (shouldWait) {
+ waitFor(100);
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Configure a waiting job
+ */
+ static void configureWaitingJobConf(JobConf jobConf, Path inDir,
+ Path outputPath, int numMaps, int numRed,
+ String jobName, String mapSignalFilename,
+ String redSignalFilename)
+ throws IOException {
+ jobConf.setJobName(jobName);
+ jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileInputFormat.setInputPaths(jobConf, inDir);
+ FileOutputFormat.setOutputPath(jobConf, outputPath);
+ jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class);
+ jobConf.setReducerClass(IdentityReducer.class);
+ jobConf.setOutputKeyClass(BytesWritable.class);
+ jobConf.setOutputValueClass(BytesWritable.class);
+ jobConf.setInputFormat(RandomInputFormat.class);
+ jobConf.setNumMapTasks(numMaps);
+ jobConf.setNumReduceTasks(numRed);
+ jobConf.setJar("build/test/mapred/testjar/testjob.jar");
+ jobConf.set(getTaskSignalParameter(true), mapSignalFilename);
+ jobConf.set(getTaskSignalParameter(false), redSignalFilename);
+ }
+
+ /**
+ * Commonly used map and reduce classes
+ */
+
+ /**
+ * Map is a Mapper that just waits for a file to be created on the dfs. The
+ * file creation is a signal to the mappers and hence acts as a waiting job.
+ */
+
+ static class WaitingMapper
+ extends MapReduceBase
+ implements Mapper {
+
+ FileSystem fs = null;
+ Path signal;
+ int id = 0;
+ int totalMaps = 0;
+
+ /**
+ * Checks if the map task needs to wait. By default all the maps will wait.
+ * This method needs to be overridden to make a custom waiting mapper.
+ */
+ public boolean shouldWait(int id) {
+ return true;
+ }
+
+ /**
+ * Returns a signal file on which the map task should wait. By default all
+ * the maps wait on a single file passed as test.mapred.map.waiting.target.
+ * This method needs to be overridden to make a custom waiting mapper
+ */
+ public Path getSignalFile(int id) {
+ return signal;
+ }
+
+ /** The waiting function. The map exits once it gets a signal. Here the
+ * signal is the file existence.
+ */
+ public void map(WritableComparable key, Writable val,
+ OutputCollector output,
+ Reporter reporter)
+ throws IOException {
+ if (shouldWait(id)) {
+ if (fs != null) {
+ while (!fs.exists(getSignalFile(id))) {
+ try {
+ reporter.progress();
+ synchronized (this) {
+ this.wait(1000); // wait for 1 sec
+ }
+ } catch (InterruptedException ie) {
+ System.out.println("Interrupted while the map was waiting for "
+ + " the signal.");
+ break;
+ }
+ }
+ } else {
+ throw new IOException("Could not get the DFS!!");
+ }
+ }
+ }
+
+ public void configure(JobConf conf) {
+ try {
+ String taskId = conf.get(JobContext.TASK_ATTEMPT_ID);
+ id = Integer.parseInt(taskId.split("_")[4]);
+ totalMaps = Integer.parseInt(conf.get(JobContext.NUM_MAPS));
+ fs = FileSystem.get(conf);
+ signal = new Path(conf.get(getTaskSignalParameter(true)));
+ } catch (IOException ioe) {
+ System.out.println("Got an exception while obtaining the filesystem");
+ }
+ }
+ }
+
+ /** Only the later half of the maps wait for the signal while the rest
+ * complete immediately.
+ */
+ static class HalfWaitingMapper extends WaitingMapper {
+ @Override
+ public boolean shouldWait(int id) {
+ return id >= (totalMaps / 2);
+ }
+ }
+
+ /**
+ * Reduce that just waits for a file to be created on the dfs. The
+ * file creation is a signal to the reduce.
+ */
+
+ static class WaitingReducer extends MapReduceBase
+ implements Reducer {
+
+ FileSystem fs = null;
+ Path signal;
+
+ /** The waiting function. The reduce exits once it gets a signal. Here the
+ * signal is the file existence.
+ */
+ public void reduce(WritableComparable key, Iterator val,
+ OutputCollector output,
+ Reporter reporter)
+ throws IOException {
+ if (fs != null) {
+ while (!fs.exists(signal)) {
+ try {
+ reporter.progress();
+ synchronized (this) {
+ this.wait(1000); // wait for 1 sec
+ }
+ } catch (InterruptedException ie) {
+ System.out.println("Interrupted while the map was waiting for the"
+ + " signal.");
+ break;
+ }
+ }
+ } else {
+ throw new IOException("Could not get the DFS!!");
+ }
+ }
+
+ public void configure(JobConf conf) {
+ try {
+ fs = FileSystem.get(conf);
+ signal = new Path(conf.get(getTaskSignalParameter(false)));
+ } catch (IOException ioe) {
+ System.out.println("Got an exception while obtaining the filesystem");
+ }
+ }
+ }
+
+ static String getTaskSignalParameter(boolean isMap) {
+ return isMap
+ ? "test.mapred.map.waiting.target"
+ : "test.mapred.reduce.waiting.target";
+ }
+
+ /**
+ * Signal the maps/reduces to start.
+ */
+ static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
+ String mapSignalFile,
+ String reduceSignalFile, int replication)
+ throws IOException {
+ writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile),
+ (short)replication);
+ writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile),
+ (short)replication);
+ }
+
+ /**
+ * Signal the maps/reduces to start.
+ */
+ static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
+ boolean isMap, String mapSignalFile,
+ String reduceSignalFile)
+ throws IOException {
+ // signal the maps to complete
+ writeFile(dfs.getNameNode(), fileSys.getConf(),
+ isMap
+ ? new Path(mapSignalFile)
+ : new Path(reduceSignalFile), (short)1);
+ }
+
+ static String getSignalFile(Path dir) {
+ return (new Path(dir, "signal")).toString();
+ }
+
+ static String getMapSignalFile(Path dir) {
+ return (new Path(dir, "map-signal")).toString();
+ }
+
+ static String getReduceSignalFile(Path dir) {
+ return (new Path(dir, "reduce-signal")).toString();
+ }
+
+ static void writeFile(NameNode namenode, Configuration conf, Path name,
+ short replication) throws IOException {
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fileSys, conf, name,
+ BytesWritable.class, BytesWritable.class,
+ CompressionType.NONE);
+ writer.append(new BytesWritable(), new BytesWritable());
+ writer.close();
+ fileSys.setReplication(name, replication);
+ DFSTestUtil.waitReplication(fileSys, name, replication);
+ }
+
+ // Input formats
+ /**
+ * A custom input format that creates virtual inputs of a single string
+ * for each map.
+ */
+ public static class RandomInputFormat implements InputFormat {
+
+ public InputSplit[] getSplits(JobConf job,
+ int numSplits) throws IOException {
+ InputSplit[] result = new InputSplit[numSplits];
+ Path outDir = FileOutputFormat.getOutputPath(job);
+ for(int i=0; i < result.length; ++i) {
+ result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i),
+ 0, 1, (String[])null);
+ }
+ return result;
+ }
+
+ static class RandomRecordReader implements RecordReader {
+ Path name;
+ public RandomRecordReader(Path p) {
+ name = p;
+ }
+ public boolean next(Text key, Text value) {
+ if (name != null) {
+ key.set(name.getName());
+ name = null;
+ return true;
+ }
+ return false;
+ }
+ public Text createKey() {
+ return new Text();
+ }
+ public Text createValue() {
+ return new Text();
+ }
+ public long getPos() {
+ return 0;
+ }
+ public void close() {}
+ public float getProgress() {
+ return 0.0f;
+ }
+ }
+
+ public RecordReader getRecordReader(InputSplit split,
+ JobConf job,
+ Reporter reporter)
+ throws IOException {
+ return new RandomRecordReader(((FileSplit) split).getPath());
+ }
+ }
+
+ // Start a job and return its RunningJob object
+ static RunningJob runJob(JobConf conf, Path inDir, Path outDir)
+ throws IOException {
+ return runJob(conf, inDir, outDir, conf.getNumMapTasks(), conf.getNumReduceTasks());
+ }
+
+ // Start a job and return its RunningJob object
+ static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
+ int numReds) throws IOException {
+
+ String input = "The quick brown fox\n" + "has many silly\n"
+ + "red fox sox\n";
+
+ // submit the job and wait for it to complete
+ return runJob(conf, inDir, outDir, numMaps, numReds, input);
+ }
+
+ // Start a job with the specified input and return its RunningJob object
+ static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
+ int numReds, String input) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+ if (!fs.exists(inDir)) {
+ fs.mkdirs(inDir);
+ }
+
+ for (int i = 0; i < numMaps; ++i) {
+ DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+ file.writeBytes(input);
+ file.close();
+ }
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(numMaps);
+ conf.setNumReduceTasks(numReds);
+
+ JobClient jobClient = new JobClient(conf);
+ RunningJob job = jobClient.submitJob(conf);
+
+ return job;
+ }
+
+ // Run a job that will be succeeded and wait until it completes
+ public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
+ throws IOException {
+ conf.setJobName("test-job-succeed");
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+
+ RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
+ while (!job.isComplete()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ return job;
+ }
+
+ // Run a job that will be failed and wait until it completes
+ public static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
+ throws IOException {
+ conf.setJobName("test-job-fail");
+ conf.setMapperClass(FailMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+ conf.setMaxMapAttempts(1);
+
+ RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
+ while (!job.isComplete()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ return job;
+ }
+
+ // Run a job that will be killed and wait until it completes
+ public static RunningJob runJobKill(JobConf conf, Path inDir, Path outDir)
+ throws IOException {
+
+ conf.setJobName("test-job-kill");
+ conf.setMapperClass(KillMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+
+ RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
+ while (job.getJobState() != JobStatus.RUNNING) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ job.killJob();
+ while (job.cleanupProgress() == 0.0f) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+
+ return job;
+ }
+
+ /**
+ * Cleans up files/dirs inline. CleanupQueue deletes in a separate thread
+ * asynchronously.
+ */
+ public static class InlineCleanupQueue extends CleanupQueue {
+ List stalePaths = new ArrayList();
+
+ public InlineCleanupQueue() {
+ // do nothing
+ }
+
+ @Override
+ public void addToQueue(PathDeletionContext... contexts) {
+ // delete paths in-line
+ for (PathDeletionContext context : contexts) {
+ try {
+ if (!deletePath(context)) {
+ LOG.warn("Stale path " + context.fullPath);
+ stalePaths.add(context.fullPath);
+ }
+ } catch (IOException e) {
+ LOG.warn("Caught exception while deleting path "
+ + context.fullPath);
+ LOG.info(StringUtils.stringifyException(e));
+ stalePaths.add(context.fullPath);
+ }
+ }
+ }
+ }
+
+ static class FakeClock extends Clock {
+ long time = 0;
+
+ public void advance(long millis) {
+ time += millis;
+ }
+
+ @Override
+ long getTime() {
+ return time;
+ }
+ }
+ // Mapper that fails
+ static class FailMapper extends MapReduceBase implements
+ Mapper {
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector out, Reporter reporter)
+ throws IOException {
+ //NOTE- the next line is required for the TestDebugScript test to succeed
+ System.err.println("failing map");
+ throw new RuntimeException("failing map");
+ }
+ }
+
+ // Mapper that sleeps for a long time.
+ // Used for running a job that will be killed
+ static class KillMapper extends MapReduceBase implements
+ Mapper {
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector out, Reporter reporter)
+ throws IOException {
+
+ try {
+ Thread.sleep(1000000);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+ }
+
+ static void setUpConfigFile(Properties confProps, File configFile)
+ throws IOException {
+ Configuration config = new Configuration(false);
+ FileOutputStream fos = new FileOutputStream(configFile);
+
+ for (Enumeration> e = confProps.propertyNames(); e.hasMoreElements();) {
+ String key = (String) e.nextElement();
+ config.set(key, confProps.getProperty(key));
+ }
+
+ config.writeXml(fos);
+ fos.close();
+ }
+
+ /**
+ * This creates a file in the dfs
+ * @param dfs FileSystem Local File System where file needs to be picked
+ * @param URIPATH Path dfs path where file needs to be copied
+ * @param permission FsPermission File permission
+ * @return returns the DataOutputStream
+ */
+ public static DataOutputStream
+ createTmpFileDFS(FileSystem dfs, Path URIPATH,
+ FsPermission permission, String input) throws Exception {
+ //Creating the path with the file
+ DataOutputStream file =
+ FileSystem.create(dfs, URIPATH, permission);
+ file.writeBytes(input);
+ file.close();
+ return file;
+ }
+
+ /**
+ * This formats the long tasktracker name to just the FQDN
+ * @param taskTrackerLong String The long format of the tasktracker string
+ * @return String The FQDN of the tasktracker
+ * @throws Exception
+ */
+ public static String getFQDNofTT (String taskTrackerLong) throws Exception {
+ //Getting the exact FQDN of the tasktracker from the tasktracker string.
+ String[] firstSplit = taskTrackerLong.split("_");
+ String tmpOutput = firstSplit[1];
+ String[] secondSplit = tmpOutput.split(":");
+ String tmpTaskTracker = secondSplit[0];
+ return tmpTaskTracker;
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/WordCount.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/WordCount.java
new file mode 100644
index 0000000000..60d2900192
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/WordCount.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This is an example Hadoop Map/Reduce application.
+ * It reads the text input files, breaks each line into words
+ * and counts them. The output is a locally sorted list of words and the
+ * count of how often they occurred.
+ *
+ * To run: bin/hadoop jar build/hadoop-examples.jar wordcount
+ * [-m maps] [-r reduces] in-dir out-dir
+ */
+public class WordCount extends Configured implements Tool {
+
+ /**
+ * Counts the words in each line.
+ * For each line of input, break the line into words and emit them as
+ * (word, 1).
+ */
+ public static class MapClass extends MapReduceBase
+ implements Mapper {
+
+ private final static IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+
+ public void map(LongWritable key, Text value,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ String line = value.toString();
+ StringTokenizer itr = new StringTokenizer(line);
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ output.collect(word, one);
+ }
+ }
+ }
+
+ /**
+ * A reducer class that just emits the sum of the input values.
+ */
+ public static class Reduce extends MapReduceBase
+ implements Reducer {
+
+ public void reduce(Text key, Iterator values,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ int sum = 0;
+ while (values.hasNext()) {
+ sum += values.next().get();
+ }
+ output.collect(key, new IntWritable(sum));
+ }
+ }
+
+ static int printUsage() {
+ System.out.println("wordcount [-m ] [-r ]