diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index 3ae1e746fc..dc563eeab4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -153,10 +153,10 @@ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
FileSystem localFs = FileSystem.getLocal(job).getRaw();
FSDataInputStream inStream = localFs.open(mapOutputFileName);
try {
+ inStream.seek(ir.startOffset);
inStream =
IntermediateEncryptedStream.wrapIfNecessary(job, inStream,
mapOutputFileName);
- inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
decompressedLength, metrics, reporter);
} finally {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java
index 79fcd4110c..fbee7ef5c0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java
@@ -44,6 +44,7 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -75,9 +76,20 @@
* mbs-per-map specifies the amount of data (in MBs) to generate per map.
* By default, this is twice the value of mapreduce.task.io.sort.mb
* map-tasks
specifies the number of map tasks to run.
+ * Steps of the unit test:
+ * 1- Generating random input text.
+ * 2- Run a job with encryption disabled. Get the checksum of the output file
+ * checkSumReference
.
+ * 3- Run the job with encryption enabled.
+ * 4- Compare checkSumReference
to the checksum of the job output.
+ * 5- If the job has multiple reducers, the test launches one final job to
+ * combine the output files into a single one.
+ * 6- Verify that the maps spilled files.
*/
@RunWith(Parameterized.class)
public class TestMRIntermediateDataEncryption {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
/**
* The number of bytes generated by the input generator.
*/
@@ -86,8 +98,6 @@ public class TestMRIntermediateDataEncryption {
public static final int INPUT_GEN_NUM_THREADS = 16;
public static final long TASK_SORT_IO_MB_DEFAULT = 128L;
public static final String JOB_DIR_PATH = "jobs-data-path";
- private static final Logger LOG =
- LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
/**
* Directory of the test data.
*/
@@ -97,6 +107,7 @@ public class TestMRIntermediateDataEncryption {
private static MiniDFSCluster dfsCluster;
private static MiniMRClientCluster mrCluster;
private static FileSystem fs;
+ private static FileChecksum checkSumReference;
private static Path jobInputDirPath;
private static long inputFileSize;
/**
@@ -136,11 +147,7 @@ public static Collection getTestParameters() {
{"testSingleReducer", 3, 1, false},
{"testUberMode", 3, 1, true},
{"testMultipleMapsPerNode", 8, 1, false},
- // TODO: The following configuration is commented out until
- // MAPREDUCE-7325 is fixed.
- // Setting multiple reducers breaks LocalJobRunner causing the
- // unit test to fail.
- // {"testMultipleReducers", 2, 4, false}
+ {"testMultipleReducers", 2, 4, false}
});
}
@@ -171,6 +178,8 @@ public static void setupClass() throws Exception {
// run the input generator job.
Assert.assertEquals("Generating input should succeed", 0,
generateInputTextFile());
+ // run the reference job
+ runReferenceJob();
}
@AfterClass
@@ -185,7 +194,7 @@ public static void tearDown() throws IOException {
// make sure that generated input file is deleted
final File textInputFile = new File(testRootDir, "input.txt");
if (textInputFile.exists()) {
- textInputFile.delete();
+ Assert.assertTrue(textInputFile.delete());
}
}
@@ -198,7 +207,7 @@ private static Configuration createBaseConfiguration() {
// Set the jvm arguments to enable intermediate encryption.
Configuration conf =
MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
- // Set the temp directories a subdir of the test directory.
+ // Set the temp directories a subDir of the test directory.
conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting(conf, testRootDir);
conf.setLong("dfs.blocksize", BLOCK_SIZE_DEFAULT);
return conf;
@@ -207,7 +216,7 @@ private static Configuration createBaseConfiguration() {
/**
* Creates a thread safe BufferedWriter to be used among the task generators.
* @return A synchronized BufferedWriter
to the input file.
- * @throws IOException
+ * @throws IOException opening a new {@link FileWriter}.
*/
private static synchronized BufferedWriter getTextInputWriter()
throws IOException {
@@ -223,7 +232,7 @@ private static synchronized BufferedWriter getTextInputWriter()
* It creates a total INPUT_GEN_NUM_THREADS
future tasks.
*
* @return the result of the input generation. 0 for success.
- * @throws Exception
+ * @throws Exception during the I/O of job.
*/
private static int generateInputTextFile() throws Exception {
final File textInputFile = new File(testRootDir, "input.txt");
@@ -270,6 +279,118 @@ private static int generateInputTextFile() throws Exception {
return 0;
}
+ /**
+ * Runs a WordCount job with encryption disabled and stores the checksum of
+ * the output file.
+ * @throws Exception due to I/O errors.
+ */
+ private static void runReferenceJob() throws Exception {
+ final String jobRefLabel = "job-reference";
+ final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
+ if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
+ throw new IOException("Could not delete " + jobRefDirPath);
+ }
+ Assert.assertTrue(fs.mkdirs(jobRefDirPath));
+ Path jobRefOutputPath = new Path(jobRefDirPath, "out-dir");
+ Configuration referenceConf = new Configuration(commonConfig);
+ referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, false);
+ Job jobReference = runWordCountJob(jobRefLabel, jobRefOutputPath,
+ referenceConf, 4, 1);
+ Assert.assertTrue(jobReference.isSuccessful());
+ FileStatus[] fileStatusArr =
+ fs.listStatus(jobRefOutputPath,
+ new Utils.OutputFileUtils.OutputFilesFilter());
+ Assert.assertEquals(1, fileStatusArr.length);
+ checkSumReference = fs.getFileChecksum(fileStatusArr[0].getPath());
+ Assert.assertTrue(fs.delete(jobRefDirPath, true));
+ }
+
+ private static Job runWordCountJob(String postfixName, Path jOutputPath,
+ Configuration jConf, int mappers, int reducers) throws Exception {
+ Job job = Job.getInstance(jConf);
+ job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, mappers);
+ job.setJarByClass(TestMRIntermediateDataEncryption.class);
+ job.setJobName("mr-spill-" + postfixName);
+ // Mapper configuration
+ job.setMapperClass(TokenizerMapper.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setCombinerClass(LongSumReducer.class);
+ FileInputFormat.setMinInputSplitSize(job,
+ (inputFileSize + mappers) / mappers);
+ // Reducer configuration
+ job.setReducerClass(LongSumReducer.class);
+ job.setNumReduceTasks(reducers);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+ // Set the IO paths for the job.
+ FileInputFormat.addInputPath(job, jobInputDirPath);
+ FileOutputFormat.setOutputPath(job, jOutputPath);
+ if (job.waitForCompletion(true)) {
+ FileStatus[] fileStatusArr =
+ fs.listStatus(jOutputPath,
+ new Utils.OutputFileUtils.OutputFilesFilter());
+ for (FileStatus fStatus : fileStatusArr) {
+ LOG.info("Job: {} .. Output file {} .. Size = {}",
+ postfixName, fStatus.getPath(), fStatus.getLen());
+ }
+ }
+ return job;
+ }
+
+ /**
+ * Compares the checksum of the output file to the
+ * checkSumReference
.
+ * If the job has a multiple reducers, the output files are combined by
+ * launching another job.
+ * @return true if the checksums are equal.
+ * @throws Exception if the output is missing or the combiner job fails.
+ */
+ private boolean validateJobOutput() throws Exception {
+ Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
+ fs.exists(jobOutputPath));
+ Path outputPath = jobOutputPath;
+ if (numReducers != 1) {
+ // combine the result into one file by running a combiner job
+ final String jobRefLabel = testTitleName + "-combine";
+ final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
+ if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
+ throw new IOException("Could not delete " + jobRefDirPath);
+ }
+ fs.mkdirs(jobRefDirPath);
+ outputPath = new Path(jobRefDirPath, "out-dir");
+ Configuration referenceConf = new Configuration(commonConfig);
+ referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
+ false);
+ Job combinerJob = Job.getInstance(referenceConf);
+ combinerJob.setJarByClass(TestMRIntermediateDataEncryption.class);
+ combinerJob.setJobName("mr-spill-" + jobRefLabel);
+ combinerJob.setMapperClass(CombinerJobMapper.class);
+ FileInputFormat.addInputPath(combinerJob, jobOutputPath);
+ // Reducer configuration
+ combinerJob.setReducerClass(LongSumReducer.class);
+ combinerJob.setNumReduceTasks(1);
+ combinerJob.setOutputKeyClass(Text.class);
+ combinerJob.setOutputValueClass(LongWritable.class);
+ // Set the IO paths for the job.
+ FileOutputFormat.setOutputPath(combinerJob, outputPath);
+ if (!combinerJob.waitForCompletion(true)) {
+ return false;
+ }
+ FileStatus[] fileStatusArr =
+ fs.listStatus(outputPath,
+ new Utils.OutputFileUtils.OutputFilesFilter());
+ LOG.info("Job-Combination: {} .. Output file {} .. Size = {}",
+ jobRefDirPath, fileStatusArr[0].getPath(), fileStatusArr[0].getLen());
+ }
+ // Get the output files of the job.
+ FileStatus[] fileStatusArr =
+ fs.listStatus(outputPath,
+ new Utils.OutputFileUtils.OutputFilesFilter());
+ FileChecksum jobFileChecksum =
+ fs.getFileChecksum(fileStatusArr[0].getPath());
+ return checkSumReference.equals(jobFileChecksum);
+ }
+
@Before
public void setup() throws Exception {
LOG.info("Starting TestMRIntermediateDataEncryption#{}.......",
@@ -284,16 +405,16 @@ public void setup() throws Exception {
config = new Configuration(commonConfig);
config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
- // set the configuration to make sure that we get spilled files
+ // Set the configuration to make sure that we get spilled files.
long ioSortMb = TASK_SORT_IO_MB_DEFAULT;
config.setLong(MRJobConfig.IO_SORT_MB, ioSortMb);
long mapMb = Math.max(2 * ioSortMb, config.getInt(MRJobConfig.MAP_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
- // make sure the map tasks will spill to disk.
+ // Make sure the map tasks will spill to disk.
config.setLong(MRJobConfig.MAP_MEMORY_MB, mapMb);
config.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
config.setInt(MRJobConfig.NUM_MAPS, numMappers);
- // max attempts have to be set to 1 when intermediate encryption is enabled.
+ // Max attempts have to be set to 1 when intermediate encryption is enabled.
config.setInt("mapreduce.map.maxattempts", 1);
config.setInt("mapreduce.reduce.maxattempts", 1);
}
@@ -302,24 +423,6 @@ public void setup() throws Exception {
public void testWordCount() throws Exception {
LOG.info("........Starting main Job Driver #{} starting at {}.......",
testTitleName, Time.formatTime(System.currentTimeMillis()));
- Job job = Job.getInstance(config);
- job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, numMappers);
- job.setJarByClass(TestMRIntermediateDataEncryption.class);
- job.setJobName("mr-spill-" + testTitleName);
- // Mapper configuration
- job.setMapperClass(TokenizerMapper.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setCombinerClass(LongSumReducer.class);
- FileInputFormat.setMinInputSplitSize(job,
- (inputFileSize + numMappers) / numMappers);
- // Reducer configuration
- job.setReducerClass(LongSumReducer.class);
- job.setNumReduceTasks(numReducers);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- // Set the IO paths for the job.
- FileInputFormat.addInputPath(job, jobInputDirPath);
- FileOutputFormat.setOutputPath(job, jobOutputPath);
SpillCallBackPathsFinder spillInjector =
(SpillCallBackPathsFinder) IntermediateEncryptedStream
.setSpillCBInjector(new SpillCallBackPathsFinder());
@@ -328,34 +431,36 @@ public void testWordCount() throws Exception {
testTitleName));
try {
long startTime = Time.monotonicNow();
- testSummary.append(String.format("%nJob %s ended at %s",
+ testSummary.append(String.format("%nJob %s started at %s",
testTitleName, Time.formatTime(System.currentTimeMillis())));
- Assert.assertTrue(job.waitForCompletion(true));
+ Job job = runWordCountJob(testTitleName, jobOutputPath, config,
+ numMappers, numReducers);
+ Assert.assertTrue(job.isSuccessful());
long endTime = Time.monotonicNow();
testSummary.append(String.format("%nJob %s ended at %s",
job.getJobName(), Time.formatTime(System.currentTimeMillis())));
testSummary.append(String.format("%n\tThe job took %.3f seconds",
(1.0 * (endTime - startTime)) / 1000));
- long spilledRecords =
- job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
- Assert.assertFalse(
- "The encrypted spilled files should not be empty.",
- spillInjector.getEncryptedSpilledFiles().isEmpty());
- Assert.assertTrue("Spill records must be greater than 0",
- spilledRecords > 0);
- Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
- fs.exists(jobOutputPath));
- Assert.assertTrue("Invalid access to spill file positions",
- spillInjector.getInvalidSpillEntries().isEmpty());
- FileStatus[] fileStatus =
+ FileStatus[] fileStatusArr =
fs.listStatus(jobOutputPath,
new Utils.OutputFileUtils.OutputFilesFilter());
- for (FileStatus fStatus : fileStatus) {
+ for (FileStatus fStatus : fileStatusArr) {
long fileSize = fStatus.getLen();
testSummary.append(
String.format("%n\tOutput file %s: %d",
fStatus.getPath(), fileSize));
}
+ // Validate the checksum of the output.
+ Assert.assertTrue(validateJobOutput());
+ // Check intermediate files and spilling.
+ long spilledRecords =
+ job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
+ Assert.assertTrue("Spill records must be greater than 0",
+ spilledRecords > 0);
+ Assert.assertFalse("The encrypted spilled files should not be empty.",
+ spillInjector.getEncryptedSpilledFiles().isEmpty());
+ Assert.assertTrue("Invalid access to spill file positions",
+ spillInjector.getInvalidSpillEntries().isEmpty());
} finally {
testSummary.append(spillInjector.getSpilledFileReport());
LOG.info(testSummary.toString());
@@ -408,4 +513,21 @@ public void map(Object key, Text value,
}
}
}
+
+ /**
+ * A Mapper that reads the output of WordCount passing it to the reducer.
+ * It is used to combine the output of multiple reducer jobs.
+ */
+ public static class CombinerJobMapper
+ extends Mapper {
+ private final LongWritable sum = new LongWritable(0);
+ private final Text word = new Text();
+ public void map(Object key, Text value,
+ Context context) throws IOException, InterruptedException {
+ String[] line = value.toString().split("\\s+");
+ sum.set(Long.parseLong(line[1]));
+ word.set(line[0]);
+ context.write(word, sum);
+ }
+ }
}