MAPREDUCE-6279. AM should explicity exit JVM after all services have stopped. Contributed by Eric Payne

This commit is contained in:
Jason Lowe 2015-05-07 22:05:12 +00:00
parent b88700dcd0
commit f30065c8b6
3 changed files with 82 additions and 4 deletions

View File

@ -307,6 +307,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6192. Create unit test to automatically compare
MR related classes and mapred-default.xml (rchiang via rkanter)
MAPREDUCE-6279. AM should explicity exit JVM after all services have
stopped (Eric Payne via jlowe)
OPTIMIZATIONS
BUG FIXES

View File

@ -220,6 +220,7 @@ public class MRAppMaster extends CompositeService {
private final ScheduledExecutorService logSyncer;
private long recoveredJobStartTime = 0;
private static boolean mainStarted = false;
@VisibleForTesting
protected AtomicBoolean successfullyUnregistered =
@ -605,9 +606,35 @@ public void shutDownJob() {
clientService.stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed. Exiting.. ", t);
ExitUtil.terminate(1, t);
exitMRAppMaster(1, t);
}
exitMRAppMaster(0, null);
}
/** MRAppMaster exit method which has been instrumented for both runtime and
* unit testing.
* If the main thread has not been started, this method was called from a
* test. In that case, configure the ExitUtil object to not exit the JVM.
*
* @param status integer indicating exit status
* @param t throwable exception that could be null
*/
private void exitMRAppMaster(int status, Throwable t) {
if (!mainStarted) {
ExitUtil.disableSystemExit();
}
try {
if (t != null) {
ExitUtil.terminate(status, t);
} else {
ExitUtil.terminate(status);
}
} catch (ExitUtil.ExitException ee) {
// ExitUtil.ExitException is only thrown from the ExitUtil test code when
// SystemExit has been disabled. It is always thrown in in the test code,
// even when no error occurs. Ignore the exception so that tests don't
// need to handle it.
}
}
private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
@ -1407,6 +1434,7 @@ private static void validateInputParam(String value, String param)
public static void main(String[] args) {
try {
mainStarted = true;
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
String containerIdStr =
System.getenv(Environment.CONTAINER_ID.name());

View File

@ -20,9 +20,15 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -32,7 +38,6 @@
import java.util.Map;
import org.junit.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -63,6 +68,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -437,6 +443,47 @@ public void testMRAppMasterCredentials() throws Exception {
}
@Test
public void testMRAppMasterShutDownJob() throws Exception,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
File stagingDir =
new File(MRApps.getStagingAreaDir(conf, userName).toString());
stagingDir.mkdirs();
MRAppMasterTest appMaster =
spy(new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), false, true));
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
doReturn(conf).when(appMaster).getConfig();
appMaster.isLastAMRetry = true;
doNothing().when(appMaster).serviceStop();
// Test normal shutdown.
appMaster.shutDownJob();
Assert.assertTrue("Expected shutDownJob to terminate.",
ExitUtil.terminateCalled());
Assert.assertEquals("Expected shutDownJob to exit with status code of 0.",
0, ExitUtil.getFirstExitException().status);
// Test shutdown with exception.
ExitUtil.resetFirstExitException();
String msg = "Injected Exception";
doThrow(new RuntimeException(msg))
.when(appMaster).notifyIsLastAMRetry(anyBoolean());
appMaster.shutDownJob();
assertTrue("Expected message from ExitUtil.ExitException to be " + msg,
ExitUtil.getFirstExitException().getMessage().contains(msg));
Assert.assertEquals("Expected shutDownJob to exit with status code of 1.",
1, ExitUtil.getFirstExitException().status);
}
private void verifyFailedStatus(MRAppMasterTest appMaster,
String expectedJobState) {
ArgumentCaptor<JobHistoryEvent> captor = ArgumentCaptor