MAPREDUCE-4949. Enable multiple pi jobs to run in parallel. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1437029 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-01-22 16:26:29 +00:00
parent 73fd247c76
commit bb81a17e0b
2 changed files with 16 additions and 11 deletions

View File

@ -209,6 +209,8 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4907. TrackerDistributedCacheManager issues too many getFileStatus MAPREDUCE-4907. TrackerDistributedCacheManager issues too many getFileStatus
calls. (sandyr via tucu) calls. (sandyr via tucu)
MAPREDUCE-4949. Enable multiple pi jobs to run in parallel. (sandyr via tucu)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -21,6 +21,7 @@
import java.io.IOException; import java.io.IOException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
@ -77,8 +78,7 @@ public class QuasiMonteCarlo extends Configured implements Tool {
static final String DESCRIPTION static final String DESCRIPTION
= "A map/reduce program that estimates Pi using a quasi-Monte Carlo method."; = "A map/reduce program that estimates Pi using a quasi-Monte Carlo method.";
/** tmp directory for input/output */ /** tmp directory for input/output */
static private final Path TMP_DIR = new Path( static private final String TMP_DIR_PREFIX = QuasiMonteCarlo.class.getSimpleName();
QuasiMonteCarlo.class.getSimpleName() + "_TMP_3_141592654");
/** 2-dimensional Halton sequence {H(i)}, /** 2-dimensional Halton sequence {H(i)},
* where H(i) is a 2-dimensional point and i >= 1 is the index. * where H(i) is a 2-dimensional point and i >= 1 is the index.
@ -228,9 +228,9 @@ public void reduce(BooleanWritable isInside,
@Override @Override
public void cleanup(Context context) throws IOException { public void cleanup(Context context) throws IOException {
//write output to a file //write output to a file
Path outDir = new Path(TMP_DIR, "out");
Path outFile = new Path(outDir, "reduce-out");
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
Path outDir = new Path(conf.get(FileOutputFormat.OUTDIR));
Path outFile = new Path(outDir, "reduce-out");
FileSystem fileSys = FileSystem.get(conf); FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
outFile, LongWritable.class, LongWritable.class, outFile, LongWritable.class, LongWritable.class,
@ -246,7 +246,7 @@ public void cleanup(Context context) throws IOException {
* @return the estimated value of Pi * @return the estimated value of Pi
*/ */
public static BigDecimal estimatePi(int numMaps, long numPoints, public static BigDecimal estimatePi(int numMaps, long numPoints,
Configuration conf Path tmpDir, Configuration conf
) throws IOException, ClassNotFoundException, InterruptedException { ) throws IOException, ClassNotFoundException, InterruptedException {
Job job = new Job(conf); Job job = new Job(conf);
//setup job conf //setup job conf
@ -269,14 +269,14 @@ public static BigDecimal estimatePi(int numMaps, long numPoints,
job.setSpeculativeExecution(false); job.setSpeculativeExecution(false);
//setup input/output directories //setup input/output directories
final Path inDir = new Path(TMP_DIR, "in"); final Path inDir = new Path(tmpDir, "in");
final Path outDir = new Path(TMP_DIR, "out"); final Path outDir = new Path(tmpDir, "out");
FileInputFormat.setInputPaths(job, inDir); FileInputFormat.setInputPaths(job, inDir);
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
final FileSystem fs = FileSystem.get(conf); final FileSystem fs = FileSystem.get(conf);
if (fs.exists(TMP_DIR)) { if (fs.exists(tmpDir)) {
throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR) throw new IOException("Tmp directory " + fs.makeQualified(tmpDir)
+ " already exists. Please remove it first."); + " already exists. Please remove it first.");
} }
if (!fs.mkdirs(inDir)) { if (!fs.mkdirs(inDir)) {
@ -325,7 +325,7 @@ public static BigDecimal estimatePi(int numMaps, long numPoints,
.multiply(BigDecimal.valueOf(numInside.get())) .multiply(BigDecimal.valueOf(numInside.get()))
.divide(numTotal, RoundingMode.HALF_UP); .divide(numTotal, RoundingMode.HALF_UP);
} finally { } finally {
fs.delete(TMP_DIR, true); fs.delete(tmpDir, true);
} }
} }
@ -344,12 +344,15 @@ public int run(String[] args) throws Exception {
final int nMaps = Integer.parseInt(args[0]); final int nMaps = Integer.parseInt(args[0]);
final long nSamples = Long.parseLong(args[1]); final long nSamples = Long.parseLong(args[1]);
long now = System.currentTimeMillis();
int rand = new Random().nextInt(Integer.MAX_VALUE);
final Path tmpDir = new Path(TMP_DIR_PREFIX + "_" + now + "_" + rand);
System.out.println("Number of Maps = " + nMaps); System.out.println("Number of Maps = " + nMaps);
System.out.println("Samples per Map = " + nSamples); System.out.println("Samples per Map = " + nSamples);
System.out.println("Estimated value of Pi is " System.out.println("Estimated value of Pi is "
+ estimatePi(nMaps, nSamples, getConf())); + estimatePi(nMaps, nSamples, tmpDir, getConf()));
return 0; return 0;
} }