MAPREDUCE-7325. Intermediate data encryption is broken in LocalJobRunner. Contributed by Ahmed Hussein
This commit is contained in:
parent
61bfa42389
commit
ede490d131
@ -153,10 +153,10 @@ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
|
||||
FileSystem localFs = FileSystem.getLocal(job).getRaw();
|
||||
FSDataInputStream inStream = localFs.open(mapOutputFileName);
|
||||
try {
|
||||
inStream.seek(ir.startOffset);
|
||||
inStream =
|
||||
IntermediateEncryptedStream.wrapIfNecessary(job, inStream,
|
||||
mapOutputFileName);
|
||||
inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
|
||||
mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
|
||||
decompressedLength, metrics, reporter);
|
||||
} finally {
|
||||
|
@ -44,6 +44,7 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -75,9 +76,20 @@
|
||||
* mbs-per-map specifies the amount of data (in MBs) to generate per map.
|
||||
* By default, this is twice the value of <code>mapreduce.task.io.sort.mb</code>
|
||||
* <code>map-tasks</code> specifies the number of map tasks to run.
|
||||
* Steps of the unit test:
|
||||
* 1- Generating random input text.
|
||||
* 2- Run a job with encryption disabled. Get the checksum of the output file
|
||||
* <code>checkSumReference</code>.
|
||||
* 3- Run the job with encryption enabled.
|
||||
* 4- Compare <code>checkSumReference</code> to the checksum of the job output.
|
||||
* 5- If the job has multiple reducers, the test launches one final job to
|
||||
* combine the output files into a single one.
|
||||
* 6- Verify that the maps spilled files.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestMRIntermediateDataEncryption {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
|
||||
/**
|
||||
* The number of bytes generated by the input generator.
|
||||
*/
|
||||
@ -86,8 +98,6 @@ public class TestMRIntermediateDataEncryption {
|
||||
public static final int INPUT_GEN_NUM_THREADS = 16;
|
||||
public static final long TASK_SORT_IO_MB_DEFAULT = 128L;
|
||||
public static final String JOB_DIR_PATH = "jobs-data-path";
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
|
||||
/**
|
||||
* Directory of the test data.
|
||||
*/
|
||||
@ -97,6 +107,7 @@ public class TestMRIntermediateDataEncryption {
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
private static MiniMRClientCluster mrCluster;
|
||||
private static FileSystem fs;
|
||||
private static FileChecksum checkSumReference;
|
||||
private static Path jobInputDirPath;
|
||||
private static long inputFileSize;
|
||||
/**
|
||||
@ -136,11 +147,7 @@ public static Collection<Object[]> getTestParameters() {
|
||||
{"testSingleReducer", 3, 1, false},
|
||||
{"testUberMode", 3, 1, true},
|
||||
{"testMultipleMapsPerNode", 8, 1, false},
|
||||
// TODO: The following configuration is commented out until
|
||||
// MAPREDUCE-7325 is fixed.
|
||||
// Setting multiple reducers breaks LocalJobRunner causing the
|
||||
// unit test to fail.
|
||||
// {"testMultipleReducers", 2, 4, false}
|
||||
{"testMultipleReducers", 2, 4, false}
|
||||
});
|
||||
}
|
||||
|
||||
@ -171,6 +178,8 @@ public static void setupClass() throws Exception {
|
||||
// run the input generator job.
|
||||
Assert.assertEquals("Generating input should succeed", 0,
|
||||
generateInputTextFile());
|
||||
// run the reference job
|
||||
runReferenceJob();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@ -185,7 +194,7 @@ public static void tearDown() throws IOException {
|
||||
// make sure that generated input file is deleted
|
||||
final File textInputFile = new File(testRootDir, "input.txt");
|
||||
if (textInputFile.exists()) {
|
||||
textInputFile.delete();
|
||||
Assert.assertTrue(textInputFile.delete());
|
||||
}
|
||||
}
|
||||
|
||||
@ -198,7 +207,7 @@ private static Configuration createBaseConfiguration() {
|
||||
// Set the jvm arguments to enable intermediate encryption.
|
||||
Configuration conf =
|
||||
MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
|
||||
// Set the temp directories a subdir of the test directory.
|
||||
// Set the temp directories a subDir of the test directory.
|
||||
conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting(conf, testRootDir);
|
||||
conf.setLong("dfs.blocksize", BLOCK_SIZE_DEFAULT);
|
||||
return conf;
|
||||
@ -207,7 +216,7 @@ private static Configuration createBaseConfiguration() {
|
||||
/**
|
||||
* Creates a thread safe BufferedWriter to be used among the task generators.
|
||||
* @return A synchronized <code>BufferedWriter</code> to the input file.
|
||||
* @throws IOException
|
||||
* @throws IOException opening a new {@link FileWriter}.
|
||||
*/
|
||||
private static synchronized BufferedWriter getTextInputWriter()
|
||||
throws IOException {
|
||||
@ -223,7 +232,7 @@ private static synchronized BufferedWriter getTextInputWriter()
|
||||
* It creates a total <code>INPUT_GEN_NUM_THREADS</code> future tasks.
|
||||
*
|
||||
* @return the result of the input generation. 0 for success.
|
||||
* @throws Exception
|
||||
* @throws Exception during the I/O of job.
|
||||
*/
|
||||
private static int generateInputTextFile() throws Exception {
|
||||
final File textInputFile = new File(testRootDir, "input.txt");
|
||||
@ -270,6 +279,118 @@ private static int generateInputTextFile() throws Exception {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a WordCount job with encryption disabled and stores the checksum of
|
||||
* the output file.
|
||||
* @throws Exception due to I/O errors.
|
||||
*/
|
||||
private static void runReferenceJob() throws Exception {
|
||||
final String jobRefLabel = "job-reference";
|
||||
final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
|
||||
if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
|
||||
throw new IOException("Could not delete " + jobRefDirPath);
|
||||
}
|
||||
Assert.assertTrue(fs.mkdirs(jobRefDirPath));
|
||||
Path jobRefOutputPath = new Path(jobRefDirPath, "out-dir");
|
||||
Configuration referenceConf = new Configuration(commonConfig);
|
||||
referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, false);
|
||||
Job jobReference = runWordCountJob(jobRefLabel, jobRefOutputPath,
|
||||
referenceConf, 4, 1);
|
||||
Assert.assertTrue(jobReference.isSuccessful());
|
||||
FileStatus[] fileStatusArr =
|
||||
fs.listStatus(jobRefOutputPath,
|
||||
new Utils.OutputFileUtils.OutputFilesFilter());
|
||||
Assert.assertEquals(1, fileStatusArr.length);
|
||||
checkSumReference = fs.getFileChecksum(fileStatusArr[0].getPath());
|
||||
Assert.assertTrue(fs.delete(jobRefDirPath, true));
|
||||
}
|
||||
|
||||
private static Job runWordCountJob(String postfixName, Path jOutputPath,
|
||||
Configuration jConf, int mappers, int reducers) throws Exception {
|
||||
Job job = Job.getInstance(jConf);
|
||||
job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, mappers);
|
||||
job.setJarByClass(TestMRIntermediateDataEncryption.class);
|
||||
job.setJobName("mr-spill-" + postfixName);
|
||||
// Mapper configuration
|
||||
job.setMapperClass(TokenizerMapper.class);
|
||||
job.setInputFormatClass(TextInputFormat.class);
|
||||
job.setCombinerClass(LongSumReducer.class);
|
||||
FileInputFormat.setMinInputSplitSize(job,
|
||||
(inputFileSize + mappers) / mappers);
|
||||
// Reducer configuration
|
||||
job.setReducerClass(LongSumReducer.class);
|
||||
job.setNumReduceTasks(reducers);
|
||||
job.setOutputKeyClass(Text.class);
|
||||
job.setOutputValueClass(LongWritable.class);
|
||||
// Set the IO paths for the job.
|
||||
FileInputFormat.addInputPath(job, jobInputDirPath);
|
||||
FileOutputFormat.setOutputPath(job, jOutputPath);
|
||||
if (job.waitForCompletion(true)) {
|
||||
FileStatus[] fileStatusArr =
|
||||
fs.listStatus(jOutputPath,
|
||||
new Utils.OutputFileUtils.OutputFilesFilter());
|
||||
for (FileStatus fStatus : fileStatusArr) {
|
||||
LOG.info("Job: {} .. Output file {} .. Size = {}",
|
||||
postfixName, fStatus.getPath(), fStatus.getLen());
|
||||
}
|
||||
}
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares the checksum of the output file to the
|
||||
* <code>checkSumReference</code>.
|
||||
* If the job has a multiple reducers, the output files are combined by
|
||||
* launching another job.
|
||||
* @return true if the checksums are equal.
|
||||
* @throws Exception if the output is missing or the combiner job fails.
|
||||
*/
|
||||
private boolean validateJobOutput() throws Exception {
|
||||
Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
|
||||
fs.exists(jobOutputPath));
|
||||
Path outputPath = jobOutputPath;
|
||||
if (numReducers != 1) {
|
||||
// combine the result into one file by running a combiner job
|
||||
final String jobRefLabel = testTitleName + "-combine";
|
||||
final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
|
||||
if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
|
||||
throw new IOException("Could not delete " + jobRefDirPath);
|
||||
}
|
||||
fs.mkdirs(jobRefDirPath);
|
||||
outputPath = new Path(jobRefDirPath, "out-dir");
|
||||
Configuration referenceConf = new Configuration(commonConfig);
|
||||
referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
|
||||
false);
|
||||
Job combinerJob = Job.getInstance(referenceConf);
|
||||
combinerJob.setJarByClass(TestMRIntermediateDataEncryption.class);
|
||||
combinerJob.setJobName("mr-spill-" + jobRefLabel);
|
||||
combinerJob.setMapperClass(CombinerJobMapper.class);
|
||||
FileInputFormat.addInputPath(combinerJob, jobOutputPath);
|
||||
// Reducer configuration
|
||||
combinerJob.setReducerClass(LongSumReducer.class);
|
||||
combinerJob.setNumReduceTasks(1);
|
||||
combinerJob.setOutputKeyClass(Text.class);
|
||||
combinerJob.setOutputValueClass(LongWritable.class);
|
||||
// Set the IO paths for the job.
|
||||
FileOutputFormat.setOutputPath(combinerJob, outputPath);
|
||||
if (!combinerJob.waitForCompletion(true)) {
|
||||
return false;
|
||||
}
|
||||
FileStatus[] fileStatusArr =
|
||||
fs.listStatus(outputPath,
|
||||
new Utils.OutputFileUtils.OutputFilesFilter());
|
||||
LOG.info("Job-Combination: {} .. Output file {} .. Size = {}",
|
||||
jobRefDirPath, fileStatusArr[0].getPath(), fileStatusArr[0].getLen());
|
||||
}
|
||||
// Get the output files of the job.
|
||||
FileStatus[] fileStatusArr =
|
||||
fs.listStatus(outputPath,
|
||||
new Utils.OutputFileUtils.OutputFilesFilter());
|
||||
FileChecksum jobFileChecksum =
|
||||
fs.getFileChecksum(fileStatusArr[0].getPath());
|
||||
return checkSumReference.equals(jobFileChecksum);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
LOG.info("Starting TestMRIntermediateDataEncryption#{}.......",
|
||||
@ -284,16 +405,16 @@ public void setup() throws Exception {
|
||||
config = new Configuration(commonConfig);
|
||||
config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
|
||||
config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
|
||||
// set the configuration to make sure that we get spilled files
|
||||
// Set the configuration to make sure that we get spilled files.
|
||||
long ioSortMb = TASK_SORT_IO_MB_DEFAULT;
|
||||
config.setLong(MRJobConfig.IO_SORT_MB, ioSortMb);
|
||||
long mapMb = Math.max(2 * ioSortMb, config.getInt(MRJobConfig.MAP_MEMORY_MB,
|
||||
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
|
||||
// make sure the map tasks will spill to disk.
|
||||
// Make sure the map tasks will spill to disk.
|
||||
config.setLong(MRJobConfig.MAP_MEMORY_MB, mapMb);
|
||||
config.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
|
||||
config.setInt(MRJobConfig.NUM_MAPS, numMappers);
|
||||
// max attempts have to be set to 1 when intermediate encryption is enabled.
|
||||
// Max attempts have to be set to 1 when intermediate encryption is enabled.
|
||||
config.setInt("mapreduce.map.maxattempts", 1);
|
||||
config.setInt("mapreduce.reduce.maxattempts", 1);
|
||||
}
|
||||
@ -302,24 +423,6 @@ public void setup() throws Exception {
|
||||
public void testWordCount() throws Exception {
|
||||
LOG.info("........Starting main Job Driver #{} starting at {}.......",
|
||||
testTitleName, Time.formatTime(System.currentTimeMillis()));
|
||||
Job job = Job.getInstance(config);
|
||||
job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, numMappers);
|
||||
job.setJarByClass(TestMRIntermediateDataEncryption.class);
|
||||
job.setJobName("mr-spill-" + testTitleName);
|
||||
// Mapper configuration
|
||||
job.setMapperClass(TokenizerMapper.class);
|
||||
job.setInputFormatClass(TextInputFormat.class);
|
||||
job.setCombinerClass(LongSumReducer.class);
|
||||
FileInputFormat.setMinInputSplitSize(job,
|
||||
(inputFileSize + numMappers) / numMappers);
|
||||
// Reducer configuration
|
||||
job.setReducerClass(LongSumReducer.class);
|
||||
job.setNumReduceTasks(numReducers);
|
||||
job.setOutputKeyClass(Text.class);
|
||||
job.setOutputValueClass(LongWritable.class);
|
||||
// Set the IO paths for the job.
|
||||
FileInputFormat.addInputPath(job, jobInputDirPath);
|
||||
FileOutputFormat.setOutputPath(job, jobOutputPath);
|
||||
SpillCallBackPathsFinder spillInjector =
|
||||
(SpillCallBackPathsFinder) IntermediateEncryptedStream
|
||||
.setSpillCBInjector(new SpillCallBackPathsFinder());
|
||||
@ -328,34 +431,36 @@ public void testWordCount() throws Exception {
|
||||
testTitleName));
|
||||
try {
|
||||
long startTime = Time.monotonicNow();
|
||||
testSummary.append(String.format("%nJob %s ended at %s",
|
||||
testSummary.append(String.format("%nJob %s started at %s",
|
||||
testTitleName, Time.formatTime(System.currentTimeMillis())));
|
||||
Assert.assertTrue(job.waitForCompletion(true));
|
||||
Job job = runWordCountJob(testTitleName, jobOutputPath, config,
|
||||
numMappers, numReducers);
|
||||
Assert.assertTrue(job.isSuccessful());
|
||||
long endTime = Time.monotonicNow();
|
||||
testSummary.append(String.format("%nJob %s ended at %s",
|
||||
job.getJobName(), Time.formatTime(System.currentTimeMillis())));
|
||||
testSummary.append(String.format("%n\tThe job took %.3f seconds",
|
||||
(1.0 * (endTime - startTime)) / 1000));
|
||||
long spilledRecords =
|
||||
job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
|
||||
Assert.assertFalse(
|
||||
"The encrypted spilled files should not be empty.",
|
||||
spillInjector.getEncryptedSpilledFiles().isEmpty());
|
||||
Assert.assertTrue("Spill records must be greater than 0",
|
||||
spilledRecords > 0);
|
||||
Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
|
||||
fs.exists(jobOutputPath));
|
||||
Assert.assertTrue("Invalid access to spill file positions",
|
||||
spillInjector.getInvalidSpillEntries().isEmpty());
|
||||
FileStatus[] fileStatus =
|
||||
FileStatus[] fileStatusArr =
|
||||
fs.listStatus(jobOutputPath,
|
||||
new Utils.OutputFileUtils.OutputFilesFilter());
|
||||
for (FileStatus fStatus : fileStatus) {
|
||||
for (FileStatus fStatus : fileStatusArr) {
|
||||
long fileSize = fStatus.getLen();
|
||||
testSummary.append(
|
||||
String.format("%n\tOutput file %s: %d",
|
||||
fStatus.getPath(), fileSize));
|
||||
}
|
||||
// Validate the checksum of the output.
|
||||
Assert.assertTrue(validateJobOutput());
|
||||
// Check intermediate files and spilling.
|
||||
long spilledRecords =
|
||||
job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
|
||||
Assert.assertTrue("Spill records must be greater than 0",
|
||||
spilledRecords > 0);
|
||||
Assert.assertFalse("The encrypted spilled files should not be empty.",
|
||||
spillInjector.getEncryptedSpilledFiles().isEmpty());
|
||||
Assert.assertTrue("Invalid access to spill file positions",
|
||||
spillInjector.getInvalidSpillEntries().isEmpty());
|
||||
} finally {
|
||||
testSummary.append(spillInjector.getSpilledFileReport());
|
||||
LOG.info(testSummary.toString());
|
||||
@ -408,4 +513,21 @@ public void map(Object key, Text value,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A Mapper that reads the output of WordCount passing it to the reducer.
|
||||
* It is used to combine the output of multiple reducer jobs.
|
||||
*/
|
||||
public static class CombinerJobMapper
|
||||
extends Mapper<Object, Text, Text, LongWritable> {
|
||||
private final LongWritable sum = new LongWritable(0);
|
||||
private final Text word = new Text();
|
||||
public void map(Object key, Text value,
|
||||
Context context) throws IOException, InterruptedException {
|
||||
String[] line = value.toString().split("\\s+");
|
||||
sum.set(Long.parseLong(line[1]));
|
||||
word.set(line[0]);
|
||||
context.write(word, sum);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user