MAPREDUCE-3369. Migrate MR1 tests to run on MR2 using the new interfaces introduced in MAPREDUCE-3169. Contributed by Ahmed Radwan.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1209281 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2011-12-01 22:29:42 +00:00
parent 6b555008f3
commit 714ae6e62f
31 changed files with 2126 additions and 19 deletions

View File

@ -132,6 +132,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3169 amendment. Deprecate MiniMRCluster. (Ahmed Radwan via
sseth)
MAPREDUCE-3369. Migrate MR1 tests to run on MR2 using the new interfaces
introduced in MAPREDUCE-3169. (Ahmed Radwan via tomwhite)
OPTIMIZATIONS
BUG FIXES

View File

@ -59,19 +59,6 @@ public void testNoDefaults() throws Exception {
JobConf conf = new JobConf(false);
//seeding JT and NN info into non-defaults (empty jobconf)
String jobTrackerAddress = createJobConf().get(JTConfig.JT_IPC_ADDRESS);
if (jobTrackerAddress == null) {
jobTrackerAddress = "local";
}
conf.set(JTConfig.JT_IPC_ADDRESS, jobTrackerAddress);
if (jobTrackerAddress == "local") {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
}
else {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
}
conf.set("fs.default.name", createJobConf().get("fs.default.name"));
conf.setJobName("mr");

View File

@ -0,0 +1,214 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import junit.framework.TestCase;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
import java.io.File;
import java.io.IOException;
/**
* Abstract Test case class to run MR in local or cluster mode and in local FS
* or DFS.
*
* The Hadoop instance is started and stopped on each test method.
*
* If using DFS the filesystem is reformated at each start (test method).
*
* Job Configurations should be created using a configuration returned by the
* 'createJobConf()' method.
*/
public abstract class HadoopTestCase extends TestCase {
public static final int LOCAL_MR = 1;
public static final int CLUSTER_MR = 2;
public static final int LOCAL_FS = 4;
public static final int DFS_FS = 8;
private boolean localMR;
private boolean localFS;
private int taskTrackers;
private int dataNodes;
/**
* Creates a testcase for local or cluster MR using DFS.
*
* The DFS will be formatted regardless if there was one or not before in the
* given location.
*
* @param mrMode indicates if the MR should be local (LOCAL_MR) or cluster
* (CLUSTER_MR)
* @param fsMode indicates if the FS should be local (LOCAL_FS) or DFS (DFS_FS)
*
* local FS when using relative PATHs)
*
* @param taskTrackers number of task trackers to start when using cluster
*
* @param dataNodes number of data nodes to start when using DFS
*
* @throws IOException thrown if the base directory cannot be set.
*/
public HadoopTestCase(int mrMode, int fsMode, int taskTrackers, int dataNodes)
throws IOException {
if (mrMode != LOCAL_MR && mrMode != CLUSTER_MR) {
throw new IllegalArgumentException(
"Invalid MapRed mode, must be LOCAL_MR or CLUSTER_MR");
}
if (fsMode != LOCAL_FS && fsMode != DFS_FS) {
throw new IllegalArgumentException(
"Invalid FileSystem mode, must be LOCAL_FS or DFS_FS");
}
if (taskTrackers < 1) {
throw new IllegalArgumentException(
"Invalid taskTrackers value, must be greater than 0");
}
if (dataNodes < 1) {
throw new IllegalArgumentException(
"Invalid dataNodes value, must be greater than 0");
}
localMR = (mrMode == LOCAL_MR);
localFS = (fsMode == LOCAL_FS);
/*
JobConf conf = new JobConf();
fsRoot = conf.get("hadoop.tmp.dir");
if (fsRoot == null) {
throw new IllegalArgumentException(
"hadoop.tmp.dir is not defined");
}
fsRoot = fsRoot.replace(' ', '+') + "/fs";
File file = new File(fsRoot);
if (!file.exists()) {
if (!file.mkdirs()) {
throw new RuntimeException("Could not create FS base path: " + file);
}
}
*/
this.taskTrackers = taskTrackers;
this.dataNodes = dataNodes;
}
/**
* Indicates if the MR is running in local or cluster mode.
*
* @return returns TRUE if the MR is running locally, FALSE if running in
* cluster mode.
*/
public boolean isLocalMR() {
return localMR;
}
/**
* Indicates if the filesystem is local or DFS.
*
* @return returns TRUE if the filesystem is local, FALSE if it is DFS.
*/
public boolean isLocalFS() {
return localFS;
}
private MiniDFSCluster dfsCluster = null;
private MiniMRCluster mrCluster = null;
private FileSystem fileSystem = null;
/**
* Creates Hadoop instance based on constructor configuration before
* a test case is run.
*
* @throws Exception
*/
protected void setUp() throws Exception {
super.setUp();
if (localFS) {
fileSystem = FileSystem.getLocal(new JobConf());
}
else {
dfsCluster = new MiniDFSCluster(new JobConf(), dataNodes, true, null);
fileSystem = dfsCluster.getFileSystem();
}
if (localMR) {
}
else {
//noinspection deprecation
mrCluster = new MiniMRCluster(taskTrackers, fileSystem.getUri().toString(), 1);
}
}
/**
* Destroys Hadoop instance based on constructor configuration after
* a test case is run.
*
* @throws Exception
*/
protected void tearDown() throws Exception {
try {
if (mrCluster != null) {
mrCluster.shutdown();
}
}
catch (Exception ex) {
System.out.println(ex);
}
try {
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}
catch (Exception ex) {
System.out.println(ex);
}
super.tearDown();
}
/**
* Returns the Filesystem in use.
*
* TestCases should use this Filesystem as it
* is properly configured with the workingDir for relative PATHs.
*
* @return the filesystem used by Hadoop.
*/
protected FileSystem getFileSystem() {
return fileSystem;
}
/**
* Returns a job configuration preconfigured to run against the Hadoop
* managed by the testcase.
* @return configuration that works on the testcase Hadoop instance
*/
protected JobConf createJobConf() {
if (localMR) {
JobConf conf = new JobConf();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
return conf;
}
else {
return mrCluster.createJobConf();
}
}
}

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpServlet;
import javax.servlet.ServletException;
import java.io.IOException;
import java.io.DataOutputStream;
/**
* Base class to test Job end notification in local and cluster mode.
*
* Starts up hadoop on Local or Cluster mode (by extending of the
* HadoopTestCase class) and it starts a servlet engine that hosts
* a servlet that will receive the notification of job finalization.
*
* The notification servlet returns a HTTP 400 the first time is called
* and a HTTP 200 the second time, thus testing retry.
*
* In both cases local file system is used (this is irrelevant for
* the tested functionality)
*
*
*/
public abstract class NotificationTestCase extends HadoopTestCase {
protected NotificationTestCase(int mode) throws IOException {
super(mode, HadoopTestCase.LOCAL_FS, 1, 1);
}
private int port;
private String contextPath = "/notification";
private String servletPath = "/mapred";
private Server webServer;
private void startHttpServer() throws Exception {
// Create the webServer
if (webServer != null) {
webServer.stop();
webServer = null;
}
webServer = new Server(0);
Context context = new Context(webServer, contextPath);
// create servlet handler
context.addServlet(new ServletHolder(new NotificationServlet()),
servletPath);
// Start webServer
webServer.start();
port = webServer.getConnectors()[0].getLocalPort();
}
private void stopHttpServer() throws Exception {
if (webServer != null) {
webServer.stop();
webServer.destroy();
webServer = null;
}
}
public static class NotificationServlet extends HttpServlet {
public static int counter = 0;
private static final long serialVersionUID = 1L;
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException {
switch (counter) {
case 0:
{
assertTrue(req.getQueryString().contains("SUCCEEDED"));
}
break;
case 2:
{
assertTrue(req.getQueryString().contains("KILLED"));
}
break;
case 4:
{
assertTrue(req.getQueryString().contains("FAILED"));
}
break;
}
if (counter % 2 == 0) {
res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error");
}
else {
res.setStatus(HttpServletResponse.SC_OK);
}
counter++;
}
}
private String getNotificationUrlTemplate() {
return "http://localhost:" + port + contextPath + servletPath +
"?jobId=$jobId&amp;jobStatus=$jobStatus";
}
protected JobConf createJobConf() {
JobConf conf = super.createJobConf();
conf.setJobEndNotificationURI(getNotificationUrlTemplate());
conf.setInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 3);
conf.setInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 200);
return conf;
}
protected void setUp() throws Exception {
super.setUp();
startHttpServer();
}
protected void tearDown() throws Exception {
stopHttpServer();
super.tearDown();
}
public void testMR() throws Exception {
System.out.println(launchWordCount(this.createJobConf(),
"a b c d e f g h", 1, 1));
Thread.sleep(2000);
assertEquals(2, NotificationServlet.counter);
Path inDir = new Path("notificationjob/input");
Path outDir = new Path("notificationjob/output");
// Hack for local FS that does not have the concept of a 'mounting point'
if (isLocalFS()) {
String localPathRoot = System.getProperty("test.build.data","/tmp")
.toString().replace(' ', '+');;
inDir = new Path(localPathRoot, inDir);
outDir = new Path(localPathRoot, outDir);
}
// run a job with KILLED status
System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
outDir).getID());
Thread.sleep(2000);
assertEquals(4, NotificationServlet.counter);
// run a job with FAILED status
System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
outDir).getID());
Thread.sleep(2000);
assertEquals(6, NotificationServlet.counter);
}
private String launchWordCount(JobConf conf,
String input,
int numMaps,
int numReduces) throws IOException {
Path inDir = new Path("testing/wc/input");
Path outDir = new Path("testing/wc/output");
// Hack for local FS that does not have the concept of a 'mounting point'
if (isLocalFS()) {
String localPathRoot = System.getProperty("test.build.data","/tmp")
.toString().replace(' ', '+');;
inDir = new Path(localPathRoot, inDir);
outDir = new Path(localPathRoot, outDir);
}
FileSystem fs = FileSystem.get(conf);
fs.delete(outDir, true);
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
{
DataOutputStream file = fs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
}
conf.setJobName("wordcount");
conf.setInputFormat(TextInputFormat.class);
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WordCount.MapClass.class);
conf.setCombinerClass(WordCount.Reduce.class);
conf.setReducerClass(WordCount.Reduce.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReduces);
JobClient.runJob(conf);
return MapReduceTestUtil.readOutput(outDir, conf);
}
}

View File

@ -0,0 +1,584 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.*;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.*;
/**
* A set of utilities to validate the <b>sort</b> of the map-reduce framework.
* This utility program has 2 main parts:
* 1. Checking the records' statistics
* a) Validates the no. of bytes and records in sort's input & output.
* b) Validates the xor of the md5's of each key/value pair.
* c) Ensures same key/value is present in both input and output.
* 2. Check individual records to ensure each record is present in both
* the input and the output of the sort (expensive on large data-sets).
*
* To run: bin/hadoop jar build/hadoop-examples.jar sortvalidate
* [-m <i>maps</i>] [-r <i>reduces</i>] [-deep]
* -sortInput <i>sort-in-dir</i> -sortOutput <i>sort-out-dir</i>
*/
public class SortValidator extends Configured implements Tool {
static private final IntWritable sortInput = new IntWritable(1);
static private final IntWritable sortOutput = new IntWritable(2);
static public String SORT_REDUCES =
"mapreduce.sortvalidator.sort.reduce.tasks";
static public String MAPS_PER_HOST = "mapreduce.sortvalidator.mapsperhost";
static public String REDUCES_PER_HOST =
"mapreduce.sortvalidator.reducesperhost";
static void printUsage() {
System.err.println("sortvalidate [-m <maps>] [-r <reduces>] [-deep] " +
"-sortInput <sort-input-dir> -sortOutput <sort-output-dir>");
System.exit(1);
}
static private IntWritable deduceInputFile(JobConf job) {
Path[] inputPaths = FileInputFormat.getInputPaths(job);
Path inputFile = new Path(job.get(JobContext.MAP_INPUT_FILE));
// value == one for sort-input; value == two for sort-output
return (inputFile.getParent().equals(inputPaths[0])) ?
sortInput : sortOutput;
}
static private byte[] pair(BytesWritable a, BytesWritable b) {
byte[] pairData = new byte[a.getLength()+ b.getLength()];
System.arraycopy(a.getBytes(), 0, pairData, 0, a.getLength());
System.arraycopy(b.getBytes(), 0, pairData, a.getLength(), b.getLength());
return pairData;
}
private static final PathFilter sortPathsFilter = new PathFilter() {
public boolean accept(Path path) {
return (path.getName().startsWith("part-"));
}
};
/**
* A simple map-reduce job which checks consistency of the
* MapReduce framework's sort by checking:
* a) Records are sorted correctly
* b) Keys are partitioned correctly
* c) The input and output have same no. of bytes and records.
* d) The input and output have the correct 'checksum' by xor'ing
* the md5 of each record.
*
*/
public static class RecordStatsChecker {
/**
* Generic way to get <b>raw</b> data from a {@link Writable}.
*/
static class Raw {
/**
* Get raw data bytes from a {@link Writable}
* @param writable {@link Writable} object from whom to get the raw data
* @return raw data of the writable
*/
public byte[] getRawBytes(Writable writable) {
return writable.toString().getBytes();
}
/**
* Get number of raw data bytes of the {@link Writable}
* @param writable {@link Writable} object from whom to get the raw data
* length
* @return number of raw data bytes
*/
public int getRawBytesLength(Writable writable) {
return writable.toString().getBytes().length;
}
}
/**
* Specialization of {@link Raw} for {@link BytesWritable}.
*/
static class RawBytesWritable extends Raw {
public byte[] getRawBytes(Writable bw) {
return ((BytesWritable)bw).getBytes();
}
public int getRawBytesLength(Writable bw) {
return ((BytesWritable)bw).getLength();
}
}
/**
* Specialization of {@link Raw} for {@link Text}.
*/
static class RawText extends Raw {
public byte[] getRawBytes(Writable text) {
return ((Text)text).getBytes();
}
public int getRawBytesLength(Writable text) {
return ((Text)text).getLength();
}
}
private static Raw createRaw(Class rawClass) {
if (rawClass == Text.class) {
return new RawText();
} else if (rawClass == BytesWritable.class) {
System.err.println("Returning " + RawBytesWritable.class);
return new RawBytesWritable();
}
return new Raw();
}
public static class RecordStatsWritable implements Writable {
private long bytes = 0;
private long records = 0;
private int checksum = 0;
public RecordStatsWritable() {}
public RecordStatsWritable(long bytes, long records, int checksum) {
this.bytes = bytes;
this.records = records;
this.checksum = checksum;
}
public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, bytes);
WritableUtils.writeVLong(out, records);
WritableUtils.writeVInt(out, checksum);
}
public void readFields(DataInput in) throws IOException {
bytes = WritableUtils.readVLong(in);
records = WritableUtils.readVLong(in);
checksum = WritableUtils.readVInt(in);
}
public long getBytes() { return bytes; }
public long getRecords() { return records; }
public int getChecksum() { return checksum; }
}
public static class Map extends MapReduceBase
implements Mapper<WritableComparable, Writable,
IntWritable, RecordStatsWritable> {
private IntWritable key = null;
private WritableComparable prevKey = null;
private Class<? extends WritableComparable> keyClass;
private Partitioner<WritableComparable, Writable> partitioner = null;
private int partition = -1;
private int noSortReducers = -1;
private long recordId = -1;
private Raw rawKey;
private Raw rawValue;
public void configure(JobConf job) {
// 'key' == sortInput for sort-input; key == sortOutput for sort-output
key = deduceInputFile(job);
if (key == sortOutput) {
partitioner = new HashPartitioner<WritableComparable, Writable>();
// Figure the 'current' partition and no. of reduces of the 'sort'
try {
URI inputURI = new URI(job.get(JobContext.MAP_INPUT_FILE));
String inputFile = inputURI.getPath();
// part file is of the form part-r-xxxxx
partition = Integer.valueOf(inputFile.substring(
inputFile.lastIndexOf("part") + 7)).intValue();
noSortReducers = job.getInt(SORT_REDUCES, -1);
} catch (Exception e) {
System.err.println("Caught: " + e);
System.exit(-1);
}
}
}
@SuppressWarnings("unchecked")
public void map(WritableComparable key, Writable value,
OutputCollector<IntWritable, RecordStatsWritable> output,
Reporter reporter) throws IOException {
// Set up rawKey and rawValue on the first call to 'map'
if (recordId == -1) {
rawKey = createRaw(key.getClass());
rawValue = createRaw(value.getClass());
}
++recordId;
if (this.key == sortOutput) {
// Check if keys are 'sorted' if this
// record is from sort's output
if (prevKey == null) {
prevKey = key;
keyClass = prevKey.getClass();
} else {
// Sanity check
if (keyClass != key.getClass()) {
throw new IOException("Type mismatch in key: expected " +
keyClass.getName() + ", received " +
key.getClass().getName());
}
// Check if they were sorted correctly
if (prevKey.compareTo(key) > 0) {
throw new IOException("The 'map-reduce' framework wrongly" +
" classifed (" + prevKey + ") > (" +
key + ") "+ "for record# " + recordId);
}
prevKey = key;
}
// Check if the sorted output is 'partitioned' right
int keyPartition =
partitioner.getPartition(key, value, noSortReducers);
if (partition != keyPartition) {
throw new IOException("Partitions do not match for record# " +
recordId + " ! - '" + partition + "' v/s '" +
keyPartition + "'");
}
}
// Construct the record-stats and output (this.key, record-stats)
byte[] keyBytes = rawKey.getRawBytes(key);
int keyBytesLen = rawKey.getRawBytesLength(key);
byte[] valueBytes = rawValue.getRawBytes(value);
int valueBytesLen = rawValue.getRawBytesLength(value);
int keyValueChecksum =
(WritableComparator.hashBytes(keyBytes, keyBytesLen) ^
WritableComparator.hashBytes(valueBytes, valueBytesLen));
output.collect(this.key,
new RecordStatsWritable((keyBytesLen+valueBytesLen),
1, keyValueChecksum)
);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<IntWritable, RecordStatsWritable,
IntWritable, RecordStatsWritable> {
public void reduce(IntWritable key, Iterator<RecordStatsWritable> values,
OutputCollector<IntWritable,
RecordStatsWritable> output,
Reporter reporter) throws IOException {
long bytes = 0;
long records = 0;
int xor = 0;
while (values.hasNext()) {
RecordStatsWritable stats = values.next();
bytes += stats.getBytes();
records += stats.getRecords();
xor ^= stats.getChecksum();
}
output.collect(key, new RecordStatsWritable(bytes, records, xor));
}
}
public static class NonSplitableSequenceFileInputFormat
extends SequenceFileInputFormat {
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}
}
static void checkRecords(Configuration defaults,
Path sortInput, Path sortOutput) throws IOException {
FileSystem inputfs = sortInput.getFileSystem(defaults);
FileSystem outputfs = sortOutput.getFileSystem(defaults);
FileSystem defaultfs = FileSystem.get(defaults);
JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class);
jobConf.setJobName("sortvalidate-recordstats-checker");
int noSortReduceTasks =
outputfs.listStatus(sortOutput, sortPathsFilter).length;
jobConf.setInt(SORT_REDUCES, noSortReduceTasks);
int noSortInputpaths = inputfs.listStatus(sortInput).length;
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setOutputKeyClass(IntWritable.class);
jobConf.setOutputValueClass(RecordStatsChecker.RecordStatsWritable.class);
jobConf.setMapperClass(Map.class);
jobConf.setCombinerClass(Reduce.class);
jobConf.setReducerClass(Reduce.class);
jobConf.setNumMapTasks(noSortReduceTasks);
jobConf.setNumReduceTasks(1);
FileInputFormat.setInputPaths(jobConf, sortInput);
FileInputFormat.addInputPath(jobConf, sortOutput);
Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
if (defaultfs.exists(outputPath)) {
defaultfs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(jobConf, outputPath);
// Uncomment to run locally in a single process
//job_conf.set(JTConfig.JT, "local");
Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " +
"from " + inputPaths[0] + " (" +
noSortInputpaths + " files), " +
inputPaths[1] + " (" +
noSortReduceTasks +
" files) into " +
FileOutputFormat.getOutputPath(jobConf) +
" with 1 reducer.");
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
// Check to ensure that the statistics of the
// framework's sort-input and sort-output match
SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
new Path(outputPath, "part-00000"), defaults);
IntWritable k1 = new IntWritable();
IntWritable k2 = new IntWritable();
RecordStatsWritable v1 = new RecordStatsWritable();
RecordStatsWritable v2 = new RecordStatsWritable();
if (!stats.next(k1, v1)) {
throw new IOException("Failed to read record #1 from reduce's output");
}
if (!stats.next(k2, v2)) {
throw new IOException("Failed to read record #2 from reduce's output");
}
if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) ||
v1.getChecksum() != v2.getChecksum()) {
throw new IOException("(" +
v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
}
}
}
/**
* A simple map-reduce task to check if the input and the output
* of the framework's sort is consistent by ensuring each record
* is present in both the input and the output.
*
*/
public static class RecordChecker {
public static class Map extends MapReduceBase
implements Mapper<BytesWritable, BytesWritable,
BytesWritable, IntWritable> {
private IntWritable value = null;
public void configure(JobConf job) {
// value == one for sort-input; value == two for sort-output
value = deduceInputFile(job);
}
public void map(BytesWritable key,
BytesWritable value,
OutputCollector<BytesWritable, IntWritable> output,
Reporter reporter) throws IOException {
// newKey = (key, value)
BytesWritable keyValue = new BytesWritable(pair(key, value));
// output (newKey, value)
output.collect(keyValue, this.value);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<BytesWritable, IntWritable,
BytesWritable, IntWritable> {
public void reduce(BytesWritable key, Iterator<IntWritable> values,
OutputCollector<BytesWritable, IntWritable> output,
Reporter reporter) throws IOException {
int ones = 0;
int twos = 0;
while (values.hasNext()) {
IntWritable count = values.next();
if (count.equals(sortInput)) {
++ones;
} else if (count.equals(sortOutput)) {
++twos;
} else {
throw new IOException("Invalid 'value' of " + count.get() +
" for (key,value): " + key.toString());
}
}
// Check to ensure there are equal no. of ones and twos
if (ones != twos) {
throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos +
") for (key, value): " + key.toString());
}
}
}
static void checkRecords(Configuration defaults, int noMaps, int noReduces,
Path sortInput, Path sortOutput) throws IOException {
JobConf jobConf = new JobConf(defaults, RecordChecker.class);
jobConf.setJobName("sortvalidate-record-checker");
jobConf.setInputFormat(SequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(IntWritable.class);
jobConf.setMapperClass(Map.class);
jobConf.setReducerClass(Reduce.class);
JobClient client = new JobClient(jobConf);
ClusterStatus cluster = client.getClusterStatus();
if (noMaps == -1) {
noMaps = cluster.getTaskTrackers() *
jobConf.getInt(MAPS_PER_HOST, 10);
}
if (noReduces == -1) {
noReduces = (int) (cluster.getMaxReduceTasks() * 0.9);
String sortReduces = jobConf.get(REDUCES_PER_HOST);
if (sortReduces != null) {
noReduces = cluster.getTaskTrackers() *
Integer.parseInt(sortReduces);
}
}
jobConf.setNumMapTasks(noMaps);
jobConf.setNumReduceTasks(noReduces);
FileInputFormat.setInputPaths(jobConf, sortInput);
FileInputFormat.addInputPath(jobConf, sortOutput);
Path outputPath = new Path("/tmp/sortvalidate/recordchecker");
FileSystem fs = FileSystem.get(defaults);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(jobConf, outputPath);
// Uncomment to run locally in a single process
//job_conf.set(JTConfig.JT, "local");
Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
System.out.println("\nSortValidator.RecordChecker: Running on " +
cluster.getTaskTrackers() +
" nodes to validate sort from " +
inputPaths[0] + ", " +
inputPaths[1] + " into " +
FileOutputFormat.getOutputPath(jobConf) +
" with " + noReduces + " reduces.");
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
}
}
/**
* The main driver for sort-validator program.
* Invoke this method to submit the map/reduce job.
* @throws IOException When there is communication problems with the
* job tracker.
*/
public int run(String[] args) throws Exception {
Configuration defaults = getConf();
int noMaps = -1, noReduces = -1;
Path sortInput = null, sortOutput = null;
boolean deepTest = false;
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
noMaps = Integer.parseInt(args[++i]);
} else if ("-r".equals(args[i])) {
noReduces = Integer.parseInt(args[++i]);
} else if ("-sortInput".equals(args[i])){
sortInput = new Path(args[++i]);
} else if ("-sortOutput".equals(args[i])){
sortOutput = new Path(args[++i]);
} else if ("-deep".equals(args[i])) {
deepTest = true;
} else {
printUsage();
return -1;
}
} catch (NumberFormatException except) {
System.err.println("ERROR: Integer expected instead of " + args[i]);
printUsage();
return -1;
} catch (ArrayIndexOutOfBoundsException except) {
System.err.println("ERROR: Required parameter missing from " +
args[i-1]);
printUsage();
return -1;
}
}
// Sanity check
if (sortInput == null || sortOutput == null) {
printUsage();
return -2;
}
// Check if the records are consistent and sorted correctly
RecordStatsChecker.checkRecords(defaults, sortInput, sortOutput);
// Check if the same records are present in sort's inputs & outputs
if (deepTest) {
RecordChecker.checkRecords(defaults, noMaps, noReduces, sortInput,
sortOutput);
}
System.out.println("\nSUCCESS! Validated the MapReduce framework's 'sort'" +
" successfully.");
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SortValidator(), args);
System.exit(res);
}
}

View File

@ -48,7 +48,6 @@ public class TestSpecialCharactersInOutputPath extends TestCase {
private static final String OUTPUT_FILENAME = "result[0]";
public static boolean launchJob(URI fileSys,
String jobTracker,
JobConf conf,
int numMaps,
int numReduces) throws IOException {
@ -68,8 +67,6 @@ public static boolean launchJob(URI fileSys,
// use WordCount example
FileSystem.setDefaultUri(conf, fileSys);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
conf.setJobName("foo");
conf.setInputFormat(TextInputFormat.class);
@ -113,11 +110,9 @@ public void testJobWithDFS() throws IOException {
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 2);
final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
JobConf jobConf = new JobConf();
boolean result;
result = launchJob(fileSys.getUri(), jobTrackerName, jobConf,
3, 1);
result = launchJob(fileSys.getUri(), jobConf, 3, 1);
assertTrue(result);
} finally {

View File

@ -0,0 +1,787 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.FileInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.StringUtils;
import org.apache.commons.logging.Log;
/**
* Utilities used in unit test.
*
*/
public class UtilsForTests {
static final Log LOG = LogFactory.getLog(UtilsForTests.class);
final static long KB = 1024L * 1;
final static long MB = 1024L * KB;
final static long GB = 1024L * MB;
final static long TB = 1024L * GB;
final static long PB = 1024L * TB;
final static Object waitLock = new Object();
static DecimalFormat dfm = new DecimalFormat("####.000");
static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
public static String dfmt(double d) {
return dfm.format(d);
}
public static String ifmt(double d) {
return ifm.format(d);
}
public static String formatBytes(long numBytes) {
StringBuffer buf = new StringBuffer();
boolean bDetails = true;
double num = numBytes;
if (numBytes < KB) {
buf.append(numBytes + " B");
bDetails = false;
} else if (numBytes < MB) {
buf.append(dfmt(num / KB) + " KB");
} else if (numBytes < GB) {
buf.append(dfmt(num / MB) + " MB");
} else if (numBytes < TB) {
buf.append(dfmt(num / GB) + " GB");
} else if (numBytes < PB) {
buf.append(dfmt(num / TB) + " TB");
} else {
buf.append(dfmt(num / PB) + " PB");
}
if (bDetails) {
buf.append(" (" + ifmt(numBytes) + " bytes)");
}
return buf.toString();
}
public static String formatBytes2(long numBytes) {
StringBuffer buf = new StringBuffer();
long u = 0;
if (numBytes >= TB) {
u = numBytes / TB;
numBytes -= u * TB;
buf.append(u + " TB ");
}
if (numBytes >= GB) {
u = numBytes / GB;
numBytes -= u * GB;
buf.append(u + " GB ");
}
if (numBytes >= MB) {
u = numBytes / MB;
numBytes -= u * MB;
buf.append(u + " MB ");
}
if (numBytes >= KB) {
u = numBytes / KB;
numBytes -= u * KB;
buf.append(u + " KB ");
}
buf.append(u + " B"); //even if zero
return buf.toString();
}
static final String regexpSpecials = "[]()?*+|.!^-\\~@";
public static String regexpEscape(String plain) {
StringBuffer buf = new StringBuffer();
char[] ch = plain.toCharArray();
int csup = ch.length;
for (int c = 0; c < csup; c++) {
if (regexpSpecials.indexOf(ch[c]) != -1) {
buf.append("\\");
}
buf.append(ch[c]);
}
return buf.toString();
}
public static String safeGetCanonicalPath(File f) {
try {
String s = f.getCanonicalPath();
return (s == null) ? f.toString() : s;
} catch (IOException io) {
return f.toString();
}
}
public static String slurp(File f) throws IOException {
int len = (int) f.length();
byte[] buf = new byte[len];
FileInputStream in = new FileInputStream(f);
String contents = null;
try {
in.read(buf, 0, len);
contents = new String(buf, "UTF-8");
} finally {
in.close();
}
return contents;
}
public static String slurpHadoop(Path p, FileSystem fs) throws IOException {
int len = (int) fs.getFileStatus(p).getLen();
byte[] buf = new byte[len];
InputStream in = fs.open(p);
String contents = null;
try {
in.read(buf, 0, len);
contents = new String(buf, "UTF-8");
} finally {
in.close();
}
return contents;
}
public static String rjustify(String s, int width) {
if (s == null) s = "null";
if (width > s.length()) {
s = getSpace(width - s.length()) + s;
}
return s;
}
public static String ljustify(String s, int width) {
if (s == null) s = "null";
if (width > s.length()) {
s = s + getSpace(width - s.length());
}
return s;
}
static char[] space;
static {
space = new char[300];
Arrays.fill(space, '\u0020');
}
public static String getSpace(int len) {
if (len > space.length) {
space = new char[Math.max(len, 2 * space.length)];
Arrays.fill(space, '\u0020');
}
return new String(space, 0, len);
}
/**
* Gets job status from the jobtracker given the jobclient and the job id
*/
static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException {
JobStatus[] statuses = jc.getAllJobs();
for (JobStatus jobStatus : statuses) {
if (jobStatus.getJobID().equals(id)) {
return jobStatus;
}
}
return null;
}
/**
* A utility that waits for specified amount of time
*/
public static void waitFor(long duration) {
try {
synchronized (waitLock) {
waitLock.wait(duration);
}
} catch (InterruptedException ie) {}
}
/**
* Wait for the jobtracker to be RUNNING.
*/
static void waitForJobTracker(JobClient jobClient) {
while (true) {
try {
ClusterStatus status = jobClient.getClusterStatus();
while (status.getJobTrackerStatus() != JobTrackerStatus.RUNNING) {
waitFor(100);
status = jobClient.getClusterStatus();
}
break; // means that the jt is ready
} catch (IOException ioe) {}
}
}
/**
* Waits until all the jobs at the jobtracker complete.
*/
static void waitTillDone(JobClient jobClient) throws IOException {
// Wait for the last job to complete
while (true) {
boolean shouldWait = false;
for (JobStatus jobStatuses : jobClient.getAllJobs()) {
if (jobStatuses.getRunState() != JobStatus.SUCCEEDED
&& jobStatuses.getRunState() != JobStatus.FAILED
&& jobStatuses.getRunState() != JobStatus.KILLED) {
shouldWait = true;
break;
}
}
if (shouldWait) {
waitFor(100);
} else {
break;
}
}
}
/**
* Configure a waiting job
*/
static void configureWaitingJobConf(JobConf jobConf, Path inDir,
Path outputPath, int numMaps, int numRed,
String jobName, String mapSignalFilename,
String redSignalFilename)
throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setInputFormat(RandomInputFormat.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(numRed);
jobConf.setJar("build/test/mapred/testjar/testjob.jar");
jobConf.set(getTaskSignalParameter(true), mapSignalFilename);
jobConf.set(getTaskSignalParameter(false), redSignalFilename);
}
/**
* Commonly used map and reduce classes
*/
/**
* Map is a Mapper that just waits for a file to be created on the dfs. The
* file creation is a signal to the mappers and hence acts as a waiting job.
*/
static class WaitingMapper
extends MapReduceBase
implements Mapper<WritableComparable, Writable,
WritableComparable, Writable> {
FileSystem fs = null;
Path signal;
int id = 0;
int totalMaps = 0;
/**
* Checks if the map task needs to wait. By default all the maps will wait.
* This method needs to be overridden to make a custom waiting mapper.
*/
public boolean shouldWait(int id) {
return true;
}
/**
* Returns a signal file on which the map task should wait. By default all
* the maps wait on a single file passed as test.mapred.map.waiting.target.
* This method needs to be overridden to make a custom waiting mapper
*/
public Path getSignalFile(int id) {
return signal;
}
/** The waiting function. The map exits once it gets a signal. Here the
* signal is the file existence.
*/
public void map(WritableComparable key, Writable val,
OutputCollector<WritableComparable, Writable> output,
Reporter reporter)
throws IOException {
if (shouldWait(id)) {
if (fs != null) {
while (!fs.exists(getSignalFile(id))) {
try {
reporter.progress();
synchronized (this) {
this.wait(1000); // wait for 1 sec
}
} catch (InterruptedException ie) {
System.out.println("Interrupted while the map was waiting for "
+ " the signal.");
break;
}
}
} else {
throw new IOException("Could not get the DFS!!");
}
}
}
public void configure(JobConf conf) {
try {
String taskId = conf.get(JobContext.TASK_ATTEMPT_ID);
id = Integer.parseInt(taskId.split("_")[4]);
totalMaps = Integer.parseInt(conf.get(JobContext.NUM_MAPS));
fs = FileSystem.get(conf);
signal = new Path(conf.get(getTaskSignalParameter(true)));
} catch (IOException ioe) {
System.out.println("Got an exception while obtaining the filesystem");
}
}
}
/** Only the later half of the maps wait for the signal while the rest
* complete immediately.
*/
static class HalfWaitingMapper extends WaitingMapper {
@Override
public boolean shouldWait(int id) {
return id >= (totalMaps / 2);
}
}
/**
* Reduce that just waits for a file to be created on the dfs. The
* file creation is a signal to the reduce.
*/
static class WaitingReducer extends MapReduceBase
implements Reducer<WritableComparable, Writable,
WritableComparable, Writable> {
FileSystem fs = null;
Path signal;
/** The waiting function. The reduce exits once it gets a signal. Here the
* signal is the file existence.
*/
public void reduce(WritableComparable key, Iterator<Writable> val,
OutputCollector<WritableComparable, Writable> output,
Reporter reporter)
throws IOException {
if (fs != null) {
while (!fs.exists(signal)) {
try {
reporter.progress();
synchronized (this) {
this.wait(1000); // wait for 1 sec
}
} catch (InterruptedException ie) {
System.out.println("Interrupted while the map was waiting for the"
+ " signal.");
break;
}
}
} else {
throw new IOException("Could not get the DFS!!");
}
}
public void configure(JobConf conf) {
try {
fs = FileSystem.get(conf);
signal = new Path(conf.get(getTaskSignalParameter(false)));
} catch (IOException ioe) {
System.out.println("Got an exception while obtaining the filesystem");
}
}
}
static String getTaskSignalParameter(boolean isMap) {
return isMap
? "test.mapred.map.waiting.target"
: "test.mapred.reduce.waiting.target";
}
/**
* Signal the maps/reduces to start.
*/
static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
String mapSignalFile,
String reduceSignalFile, int replication)
throws IOException {
writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile),
(short)replication);
writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile),
(short)replication);
}
/**
* Signal the maps/reduces to start.
*/
static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
boolean isMap, String mapSignalFile,
String reduceSignalFile)
throws IOException {
// signal the maps to complete
writeFile(dfs.getNameNode(), fileSys.getConf(),
isMap
? new Path(mapSignalFile)
: new Path(reduceSignalFile), (short)1);
}
static String getSignalFile(Path dir) {
return (new Path(dir, "signal")).toString();
}
static String getMapSignalFile(Path dir) {
return (new Path(dir, "map-signal")).toString();
}
static String getReduceSignalFile(Path dir) {
return (new Path(dir, "reduce-signal")).toString();
}
static void writeFile(NameNode namenode, Configuration conf, Path name,
short replication) throws IOException {
FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer =
SequenceFile.createWriter(fileSys, conf, name,
BytesWritable.class, BytesWritable.class,
CompressionType.NONE);
writer.append(new BytesWritable(), new BytesWritable());
writer.close();
fileSys.setReplication(name, replication);
DFSTestUtil.waitReplication(fileSys, name, replication);
}
// Input formats
/**
* A custom input format that creates virtual inputs of a single string
* for each map.
*/
public static class RandomInputFormat implements InputFormat<Text, Text> {
public InputSplit[] getSplits(JobConf job,
int numSplits) throws IOException {
InputSplit[] result = new InputSplit[numSplits];
Path outDir = FileOutputFormat.getOutputPath(job);
for(int i=0; i < result.length; ++i) {
result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i),
0, 1, (String[])null);
}
return result;
}
static class RandomRecordReader implements RecordReader<Text, Text> {
Path name;
public RandomRecordReader(Path p) {
name = p;
}
public boolean next(Text key, Text value) {
if (name != null) {
key.set(name.getName());
name = null;
return true;
}
return false;
}
public Text createKey() {
return new Text();
}
public Text createValue() {
return new Text();
}
public long getPos() {
return 0;
}
public void close() {}
public float getProgress() {
return 0.0f;
}
}
public RecordReader<Text, Text> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter)
throws IOException {
return new RandomRecordReader(((FileSplit) split).getPath());
}
}
// Start a job and return its RunningJob object
static RunningJob runJob(JobConf conf, Path inDir, Path outDir)
throws IOException {
return runJob(conf, inDir, outDir, conf.getNumMapTasks(), conf.getNumReduceTasks());
}
// Start a job and return its RunningJob object
static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException {
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
// submit the job and wait for it to complete
return runJob(conf, inDir, outDir, numMaps, numReds, input);
}
// Start a job with the specified input and return its RunningJob object
static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds, String input) throws IOException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return job;
}
// Run a job that will be succeeded and wait until it completes
public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
return job;
}
// Run a job that will be failed and wait until it completes
public static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-fail");
conf.setMapperClass(FailMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setMaxMapAttempts(1);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
return job;
}
// Run a job that will be killed and wait until it completes
public static RunningJob runJobKill(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-kill");
conf.setMapperClass(KillMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (job.getJobState() != JobStatus.RUNNING) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
job.killJob();
while (job.cleanupProgress() == 0.0f) {
try {
Thread.sleep(10);
} catch (InterruptedException ie) {
break;
}
}
return job;
}
/**
* Cleans up files/dirs inline. CleanupQueue deletes in a separate thread
* asynchronously.
*/
public static class InlineCleanupQueue extends CleanupQueue {
List<String> stalePaths = new ArrayList<String>();
public InlineCleanupQueue() {
// do nothing
}
@Override
public void addToQueue(PathDeletionContext... contexts) {
// delete paths in-line
for (PathDeletionContext context : contexts) {
try {
if (!deletePath(context)) {
LOG.warn("Stale path " + context.fullPath);
stalePaths.add(context.fullPath);
}
} catch (IOException e) {
LOG.warn("Caught exception while deleting path "
+ context.fullPath);
LOG.info(StringUtils.stringifyException(e));
stalePaths.add(context.fullPath);
}
}
}
}
static class FakeClock extends Clock {
long time = 0;
public void advance(long millis) {
time += millis;
}
@Override
long getTime() {
return time;
}
}
// Mapper that fails
static class FailMapper extends MapReduceBase implements
Mapper<WritableComparable, Writable, WritableComparable, Writable> {
public void map(WritableComparable key, Writable value,
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
throws IOException {
//NOTE- the next line is required for the TestDebugScript test to succeed
System.err.println("failing map");
throw new RuntimeException("failing map");
}
}
// Mapper that sleeps for a long time.
// Used for running a job that will be killed
static class KillMapper extends MapReduceBase implements
Mapper<WritableComparable, Writable, WritableComparable, Writable> {
public void map(WritableComparable key, Writable value,
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
throws IOException {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
// Do nothing
}
}
}
static void setUpConfigFile(Properties confProps, File configFile)
throws IOException {
Configuration config = new Configuration(false);
FileOutputStream fos = new FileOutputStream(configFile);
for (Enumeration<?> e = confProps.propertyNames(); e.hasMoreElements();) {
String key = (String) e.nextElement();
config.set(key, confProps.getProperty(key));
}
config.writeXml(fos);
fos.close();
}
/**
* This creates a file in the dfs
* @param dfs FileSystem Local File System where file needs to be picked
* @param URIPATH Path dfs path where file needs to be copied
* @param permission FsPermission File permission
* @return returns the DataOutputStream
*/
public static DataOutputStream
createTmpFileDFS(FileSystem dfs, Path URIPATH,
FsPermission permission, String input) throws Exception {
//Creating the path with the file
DataOutputStream file =
FileSystem.create(dfs, URIPATH, permission);
file.writeBytes(input);
file.close();
return file;
}
/**
* This formats the long tasktracker name to just the FQDN
* @param taskTrackerLong String The long format of the tasktracker string
* @return String The FQDN of the tasktracker
* @throws Exception
*/
public static String getFQDNofTT (String taskTrackerLong) throws Exception {
//Getting the exact FQDN of the tasktracker from the tasktracker string.
String[] firstSplit = taskTrackerLong.split("_");
String tmpOutput = firstSplit[1];
String[] secondSplit = tmpOutput.split(":");
String tmpTaskTracker = secondSplit[0];
return tmpTaskTracker;
}
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* This is an example Hadoop Map/Reduce application.
* It reads the text input files, breaks each line into words
* and counts them. The output is a locally sorted list of words and the
* count of how often they occurred.
*
* To run: bin/hadoop jar build/hadoop-examples.jar wordcount
* [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i>
*/
public class WordCount extends Configured implements Tool {
/**
* Counts the words in each line.
* For each line of input, break the line into words and emit them as
* (<b>word</b>, <b>1</b>).
*/
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
static int printUsage() {
System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
/**
* The main driver for word count map/reduce program.
* Invoke this method to submit the map/reduce job.
* @throws IOException When there is communication problems with the
* job tracker.
*/
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), WordCount.class);
conf.setJobName("wordcount");
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
List<String> other_args = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
conf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])) {
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(res);
}
}

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.jobcontrol;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* Utility methods used in various Job Control unit tests.
*/
public class JobControlTestUtils {
static private Random rand = new Random();
private static NumberFormat idFormat = NumberFormat.getInstance();
static {
idFormat.setMinimumIntegerDigits(4);
idFormat.setGroupingUsed(false);
}
/**
* Cleans the data from the passed Path in the passed FileSystem.
*
* @param fs FileSystem to delete data from.
* @param dirPath Path to be deleted.
* @throws IOException If an error occurs cleaning the data.
*/
static void cleanData(FileSystem fs, Path dirPath) throws IOException {
fs.delete(dirPath, true);
}
/**
* Generates a string of random digits.
*
* @return A random string.
*/
private static String generateRandomWord() {
return idFormat.format(rand.nextLong());
}
/**
* Generates a line of random text.
*
* @return A line of random text.
*/
private static String generateRandomLine() {
long r = rand.nextLong() % 7;
long n = r + 20;
StringBuffer sb = new StringBuffer();
for (int i = 0; i < n; i++) {
sb.append(generateRandomWord()).append(" ");
}
sb.append("\n");
return sb.toString();
}
/**
* Generates data that can be used for Job Control tests.
*
* @param fs FileSystem to create data in.
* @param dirPath Path to create the data in.
* @throws IOException If an error occurs creating the data.
*/
static void generateData(FileSystem fs, Path dirPath) throws IOException {
FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt"));
for (int i = 0; i < 10000; i++) {
String line = generateRandomLine();
out.write(line.getBytes("UTF-8"));
}
out.close();
}
/**
* Creates a simple copy job.
*
* @param indirs List of input directories.
* @param outdir Output directory.
* @return JobConf initialised for a simple copy job.
* @throws Exception If an error occurs creating job configuration.
*/
static JobConf createCopyJob(List<Path> indirs, Path outdir) throws Exception {
Configuration defaults = new Configuration();
JobConf theJob = new JobConf(defaults, TestJobControl.class);
theJob.setJobName("DataMoveJob");
FileInputFormat.setInputPaths(theJob, indirs.toArray(new Path[0]));
theJob.setMapperClass(DataCopy.class);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
theJob.setReducerClass(DataCopy.class);
theJob.setNumMapTasks(12);
theJob.setNumReduceTasks(4);
return theJob;
}
/**
* Simple Mapper and Reducer implementation which copies data it reads in.
*/
public static class DataCopy extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text>, Reducer<Text, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
output.collect(new Text(key.toString()), value);
}
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
Text dumbKey = new Text("");
while (values.hasNext()) {
Text data = values.next();
output.collect(dumbKey, data);
}
}
}
}