MAPREDUCE-4234. SortValidator.java is incompatible with multi-user or parallel use (due to a /tmp file with static name) (Robert Evans via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1367789 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Turner Eagles 2012-07-31 21:26:08 +00:00
parent 123c4f57d3
commit 5938c86352
3 changed files with 82 additions and 52 deletions

View File

@ -785,6 +785,10 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4457. mr job invalid transition TA_TOO_MANY_FETCH_FAILURE at
FAILED (Robert Evans via tgraves)
MAPREDUCE-4234. SortValidator.java is incompatible with multi-user or
parallel use (due to a /tmp file with static name) (Robert Evans via
jeagles)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.*;
@ -345,7 +344,8 @@ static void checkRecords(Configuration defaults,
FileInputFormat.setInputPaths(jobConf, sortInput);
FileInputFormat.addInputPath(jobConf, sortOutput);
Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
Path outputPath = new Path(new Path(jobConf.get("hadoop.tmp.dir", "/tmp"),
"sortvalidate"), UUID.randomUUID().toString());
if (defaultfs.exists(outputPath)) {
defaultfs.delete(outputPath, true);
}
@ -365,31 +365,44 @@ static void checkRecords(Configuration defaults,
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
try {
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
// Check to ensure that the statistics of the
// framework's sort-input and sort-output match
SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
new Path(outputPath, "part-00000"), defaults);
IntWritable k1 = new IntWritable();
IntWritable k2 = new IntWritable();
RecordStatsWritable v1 = new RecordStatsWritable();
RecordStatsWritable v2 = new RecordStatsWritable();
if (!stats.next(k1, v1)) {
throw new IOException("Failed to read record #1 from reduce's output");
}
if (!stats.next(k2, v2)) {
throw new IOException("Failed to read record #2 from reduce's output");
}
// Check to ensure that the statistics of the
// framework's sort-input and sort-output match
SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
new Path(outputPath, "part-00000"), defaults);
try {
IntWritable k1 = new IntWritable();
IntWritable k2 = new IntWritable();
RecordStatsWritable v1 = new RecordStatsWritable();
RecordStatsWritable v2 = new RecordStatsWritable();
if (!stats.next(k1, v1)) {
throw new IOException(
"Failed to read record #1 from reduce's output");
}
if (!stats.next(k2, v2)) {
throw new IOException(
"Failed to read record #2 from reduce's output");
}
if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) ||
v1.getChecksum() != v2.getChecksum()) {
throw new IOException("(" +
v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
if ((v1.getBytes() != v2.getBytes()) ||
(v1.getRecords() != v2.getRecords()) ||
v1.getChecksum() != v2.getChecksum()) {
throw new IOException("(" +
v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum()
+ ") v/s (" +
v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum()
+ ")");
}
} finally {
stats.close();
}
} finally {
defaultfs.delete(outputPath, true);
}
}

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.*;
@ -345,7 +344,8 @@ static void checkRecords(Configuration defaults,
FileInputFormat.setInputPaths(jobConf, sortInput);
FileInputFormat.addInputPath(jobConf, sortOutput);
Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
Path outputPath = new Path(new Path(jobConf.get("hadoop.tmp.dir", "/tmp"),
"sortvalidate"), UUID.randomUUID().toString());
if (defaultfs.exists(outputPath)) {
defaultfs.delete(outputPath, true);
}
@ -365,31 +365,44 @@ static void checkRecords(Configuration defaults,
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
try {
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
// Check to ensure that the statistics of the
// framework's sort-input and sort-output match
SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
new Path(outputPath, "part-00000"), defaults);
IntWritable k1 = new IntWritable();
IntWritable k2 = new IntWritable();
RecordStatsWritable v1 = new RecordStatsWritable();
RecordStatsWritable v2 = new RecordStatsWritable();
if (!stats.next(k1, v1)) {
throw new IOException("Failed to read record #1 from reduce's output");
}
if (!stats.next(k2, v2)) {
throw new IOException("Failed to read record #2 from reduce's output");
}
// Check to ensure that the statistics of the
// framework's sort-input and sort-output match
SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
new Path(outputPath, "part-00000"), defaults);
try {
IntWritable k1 = new IntWritable();
IntWritable k2 = new IntWritable();
RecordStatsWritable v1 = new RecordStatsWritable();
RecordStatsWritable v2 = new RecordStatsWritable();
if (!stats.next(k1, v1)) {
throw new IOException(
"Failed to read record #1 from reduce's output");
}
if (!stats.next(k2, v2)) {
throw new IOException(
"Failed to read record #2 from reduce's output");
}
if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) ||
v1.getChecksum() != v2.getChecksum()) {
throw new IOException("(" +
v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
if ((v1.getBytes() != v2.getBytes()) ||
(v1.getRecords() != v2.getRecords()) ||
v1.getChecksum() != v2.getChecksum()) {
throw new IOException("(" +
v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum()
+ ") v/s (" +
v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum()
+ ")");
}
} finally {
stats.close();
}
} finally {
defaultfs.delete(outputPath, true);
}
}