MAPREDUCE-7079: JobHistory#ServiceStop implementation is incorrect. Contributed by Ahmed Hussein (ahussein)

This commit is contained in:
Eric E Payne 2020-01-29 16:54:45 +00:00
parent 7f3e1e0c07
commit b897f6834b
2 changed files with 184 additions and 120 deletions

View File

@ -143,29 +143,32 @@ protected void serviceStart() throws Exception {
protected int getInitDelaySecs() { protected int getInitDelaySecs() {
return 30; return 30;
} }
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
LOG.info("Stopping JobHistory"); LOG.info("Stopping JobHistory");
if (scheduledExecutor != null) { if (scheduledExecutor != null) {
LOG.info("Stopping History Cleaner/Move To Done"); LOG.info("Stopping History Cleaner/Move To Done");
scheduledExecutor.shutdown(); scheduledExecutor.shutdown();
boolean interrupted = false; int retryCnt = 50;
long currentTime = System.currentTimeMillis(); try {
while (!scheduledExecutor.isShutdown() while (!scheduledExecutor.awaitTermination(20,
&& System.currentTimeMillis() > currentTime + 1000l && !interrupted) { TimeUnit.MILLISECONDS)) {
try { if (--retryCnt == 0) {
Thread.sleep(20); scheduledExecutor.shutdownNow();
} catch (InterruptedException e) { break;
interrupted = true; }
}
} catch (InterruptedException iex) {
LOG.warn("HistoryCleanerService/move to done shutdown may not have " +
"succeeded, Forcing a shutdown", iex);
if (!scheduledExecutor.isShutdown()) {
scheduledExecutor.shutdownNow();
} }
} }
if (!scheduledExecutor.isShutdown()) { scheduledExecutor = null;
LOG.warn("HistoryCleanerService/move to done shutdown may not have " +
"succeeded, Forcing a shutdown");
scheduledExecutor.shutdownNow();
}
} }
// Stop the other services.
if (storage != null && storage instanceof Service) { if (storage != null && storage instanceof Service) {
((Service) storage).stop(); ((Service) storage).stop();
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -26,12 +28,20 @@
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -44,85 +54,126 @@
* framework's merge on the reduce side will merge the partitions created to * framework's merge on the reduce side will merge the partitions created to
* generate the final output which is sorted on the key. * generate the final output which is sorted on the key.
*/ */
@RunWith(Parameterized.class)
public class TestMRIntermediateDataEncryption { public class TestMRIntermediateDataEncryption {
private static final Logger LOG =
LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
/**
* Use urandom to avoid the YarnChild process from hanging on low entropy
* systems.
*/
private static final String JVM_SECURITY_EGD_OPT =
"-Djava.security.egd=file:/dev/./urandom";
// Where MR job's input will reside. // Where MR job's input will reside.
private static final Path INPUT_DIR = new Path("/test/input"); private static final Path INPUT_DIR = new Path("/test/input");
// Where output goes. // Where output goes.
private static final Path OUTPUT = new Path("/test/output"); private static final Path OUTPUT = new Path("/test/output");
private static final int NUM_LINES = 1000;
private static MiniMRClientCluster mrCluster = null;
private static MiniDFSCluster dfsCluster = null;
private static FileSystem fs = null;
private static final int NUM_NODES = 2;
@Test private final String testTitle;
public void testSingleReducer() throws Exception { private final int numMappers;
doEncryptionTest(3, 1, 2, false); private final int numReducers;
private final boolean isUber;
/**
* List of arguments to run the JunitTest.
* @return
*/
@Parameterized.Parameters(
name = "{index}: TestMRIntermediateDataEncryption.{0} .. "
+ "mappers:{1}, reducers:{2}, isUber:{3})")
public static Collection<Object[]> getTestParameters() {
return Arrays.asList(new Object[][]{
{"testSingleReducer", 3, 1, false},
{"testUberMode", 3, 1, true},
{"testMultipleMapsPerNode", 8, 1, false},
{"testMultipleReducers", 2, 4, false}
});
} }
@Test /**
public void testUberMode() throws Exception { * Initialized the parametrized JUnit test.
doEncryptionTest(3, 1, 2, true); * @param testName the name of the unit test to be executed.
* @param mappers number of mappers in the tests.
* @param reducers number of the reducers.
* @param uberEnabled boolean flag for isUber
*/
public TestMRIntermediateDataEncryption(String testName, int mappers,
int reducers, boolean uberEnabled) {
this.testTitle = testName;
this.numMappers = mappers;
this.numReducers = reducers;
this.isUber = uberEnabled;
} }
@Test @BeforeClass
public void testMultipleMapsPerNode() throws Exception { public static void setupClass() throws Exception {
doEncryptionTest(8, 1, 2, false); Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
// Set the jvm arguments.
conf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
JVM_SECURITY_EGD_OPT);
final String childJVMOpts = JVM_SECURITY_EGD_OPT
+ " " + conf.get("mapred.child.java.opts", " ");
conf.set("mapred.child.java.opts", childJVMOpts);
// Start the mini-MR and mini-DFS clusters.
dfsCluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_NODES).build();
mrCluster =
MiniMRClientClusterFactory.create(
TestMRIntermediateDataEncryption.class, NUM_NODES, conf);
mrCluster.start();
} }
@Test @AfterClass
public void testMultipleReducers() throws Exception { public static void tearDown() throws IOException {
doEncryptionTest(2, 4, 2, false); if (fs != null) {
fs.close();
}
if (mrCluster != null) {
mrCluster.stop();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
} }
public void doEncryptionTest(int numMappers, int numReducers, int numNodes, @Before
boolean isUber) throws Exception { public void setup() throws Exception {
doEncryptionTest(numMappers, numReducers, numNodes, 1000, isUber); LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", testTitle);
fs = dfsCluster.getFileSystem();
if (fs.exists(INPUT_DIR) && !fs.delete(INPUT_DIR, true)) {
throw new IOException("Could not delete " + INPUT_DIR);
}
if (fs.exists(OUTPUT) && !fs.delete(OUTPUT, true)) {
throw new IOException("Could not delete " + OUTPUT);
}
// Generate input.
createInput(fs, numMappers, NUM_LINES);
} }
public void doEncryptionTest(int numMappers, int numReducers, int numNodes, @After
int numLines, boolean isUber) throws Exception { public void cleanup() throws IOException {
MiniDFSCluster dfsCluster = null; if (fs != null) {
MiniMRClientCluster mrCluster = null; if (fs.exists(OUTPUT)) {
FileSystem fileSystem = null; fs.delete(OUTPUT, true);
try {
Configuration conf = new Configuration();
// Start the mini-MR and mini-DFS clusters
dfsCluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numNodes).build();
fileSystem = dfsCluster.getFileSystem();
mrCluster = MiniMRClientClusterFactory.create(this.getClass(),
numNodes, conf);
// Generate input.
createInput(fileSystem, numMappers, numLines);
// Run the test.
runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem,
numMappers, numReducers, numLines, isUber);
} finally {
if (dfsCluster != null) {
dfsCluster.shutdown();
} }
if (mrCluster != null) { if (fs.exists(INPUT_DIR)) {
mrCluster.stop(); fs.delete(INPUT_DIR, true);
} }
} }
} }
private void createInput(FileSystem fs, int numMappers, int numLines) throws Exception { @Test(timeout=600000)
fs.delete(INPUT_DIR, true); public void testMerge() throws Exception {
for (int i = 0; i < numMappers; i++) { JobConf job = new JobConf(mrCluster.getConfig());
OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
Writer writer = new OutputStreamWriter(os);
for (int j = 0; j < numLines; j++) {
// Create sorted key, value pairs.
int k = j + 1;
String formattedNumber = String.format("%09d", k);
writer.write(formattedNumber + " " + formattedNumber + "\n");
}
writer.close();
}
}
private void runMergeTest(JobConf job, FileSystem fileSystem, int
numMappers, int numReducers, int numLines, boolean isUber)
throws Exception {
fileSystem.delete(OUTPUT, true);
job.setJobName("Test"); job.setJobName("Test");
JobClient client = new JobClient(job); JobClient client = new JobClient(job);
RunningJob submittedJob = null; RunningJob submittedJob = null;
@ -134,43 +185,53 @@ private void runMergeTest(JobConf job, FileSystem fileSystem, int
job.setMapOutputValueClass(Text.class); job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class); job.setMapperClass(TestMRIntermediateDataEncryption.MyMapper.class);
job.setPartitionerClass(MyPartitioner.class); job.setPartitionerClass(
TestMRIntermediateDataEncryption.MyPartitioner.class);
job.setOutputFormat(TextOutputFormat.class); job.setOutputFormat(TextOutputFormat.class);
job.setNumReduceTasks(numReducers); job.setNumReduceTasks(numReducers);
job.setInt("mapreduce.map.maxattempts", 1); job.setInt("mapreduce.map.maxattempts", 1);
job.setInt("mapreduce.reduce.maxattempts", 1); job.setInt("mapreduce.reduce.maxattempts", 1);
job.setInt("mapred.test.num_lines", numLines); job.setInt("mapred.test.num_lines", NUM_LINES);
if (isUber) { job.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
job.setBoolean("mapreduce.job.ubertask.enable", true);
}
job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
try { submittedJob = client.submitJob(job);
submittedJob = client.submitJob(job); submittedJob.waitForCompletion();
try { assertTrue("The submitted job is completed", submittedJob.isComplete());
if (! client.monitorAndPrintJob(job, submittedJob)) { assertTrue("The submitted job is successful", submittedJob.isSuccessful());
throw new IOException("Job failed!"); verifyOutput(fs, numMappers, NUM_LINES);
} client.close();
} catch(InterruptedException ie) { // wait for short period to cool down.
Thread.currentThread().interrupt(); Thread.sleep(1000);
}
private void createInput(FileSystem filesystem, int mappers, int numLines)
throws Exception {
for (int i = 0; i < mappers; i++) {
OutputStream os =
filesystem.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
Writer writer = new OutputStreamWriter(os);
for (int j = 0; j < numLines; j++) {
// Create sorted key, value pairs.
int k = j + 1;
String formattedNumber = String.format("%09d", k);
writer.write(formattedNumber + " " + formattedNumber + "\n");
} }
} catch(IOException ioe) { writer.close();
System.err.println("Job failed with: " + ioe); os.close();
} finally {
verifyOutput(submittedJob, fileSystem, numMappers, numLines);
} }
} }
private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int numMappers, int numLines) private void verifyOutput(FileSystem fileSystem,
throws Exception { int mappers, int numLines)
throws Exception {
FSDataInputStream dis = null; FSDataInputStream dis = null;
long numValidRecords = 0; long numValidRecords = 0;
long numInvalidRecords = 0; long numInvalidRecords = 0;
String prevKeyValue = "000000000"; String prevKeyValue = "000000000";
Path[] fileList = Path[] fileList =
FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT, FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
new Utils.OutputFileUtils.OutputFilesFilter())); new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outFile : fileList) { for (Path outFile : fileList) {
try { try {
dis = fileSystem.open(outFile); dis = fileSystem.open(outFile);
@ -197,7 +258,7 @@ private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int nu
} }
} }
// Make sure we got all input records in the output in sorted order. // Make sure we got all input records in the output in sorted order.
assertEquals((long)(numMappers * numLines), numValidRecords); assertEquals((long)(mappers * numLines), numValidRecords);
// Make sure there is no extraneous invalid record. // Make sure there is no extraneous invalid record.
assertEquals(0, numInvalidRecords); assertEquals(0, numInvalidRecords);
} }
@ -207,30 +268,30 @@ private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int nu
* in displayable form. * in displayable form.
*/ */
public static class MyMapper extends MapReduceBase public static class MyMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, Text> { implements Mapper<LongWritable, Text, Text, Text> {
private Text keyText; private Text keyText;
private Text valueText; private Text valueText;
public MyMapper() { public MyMapper() {
keyText = new Text(); keyText = new Text();
valueText = new Text(); valueText = new Text();
}
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String record = value.toString();
int blankPos = record.indexOf(" ");
keyText.set(record.substring(0, blankPos));
valueText.set(record.substring(blankPos+1));
output.collect(keyText, valueText);
}
public void close() throws IOException {
}
} }
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String record = value.toString();
int blankPos = record.indexOf(" ");
keyText.set(record.substring(0, blankPos));
valueText.set(record.substring(blankPos + 1));
output.collect(keyText, valueText);
}
public void close() throws IOException {
}
}
/** /**
* Partitioner implementation to make sure that output is in total sorted * Partitioner implementation to make sure that output is in total sorted
* order. We basically route key ranges to different reducers such that * order. We basically route key ranges to different reducers such that
@ -255,12 +316,12 @@ public int getPartition(Text key, Text value, int numPartitions) {
int keyValue = 0; int keyValue = 0;
try { try {
keyValue = Integer.parseInt(key.toString()); keyValue = Integer.parseInt(key.toString());
} catch(NumberFormatException nfe) { } catch (NumberFormatException nfe) {
keyValue = 0; keyValue = 0;
} }
int partitionNumber = (numPartitions*(Math.max(0, keyValue-1)))/job.getInt("mapred.test.num_lines", 10000); int partitionNumber = (numPartitions * (Math.max(0, keyValue - 1))) / job
.getInt("mapred.test.num_lines", 10000);
return partitionNumber; return partitionNumber;
} }
} }
} }