MAPREDUCE-5428. HistoryFileManager doesn't stop threads when service is stopped. Contributed by Karthik Kambatla

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1509401 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-08-01 19:57:31 +00:00
parent 4757963d32
commit bfe5a528d8
4 changed files with 25 additions and 6 deletions

View File

@ -636,6 +636,9 @@ Release 2.1.0-beta - 2013-08-06
MAPREDUCE-5419. TestSlive is getting FileNotFound Exception (Robert Parker MAPREDUCE-5419. TestSlive is getting FileNotFound Exception (Robert Parker
via jlowe) via jlowe)
MAPREDUCE-5428. HistoryFileManager doesn't stop threads when service is
stopped (Karthik Kambatla via jlowe)
BREAKDOWN OF HADOOP-8562 SUBTASKS BREAKDOWN OF HADOOP-8562 SUBTASKS
MAPREDUCE-4739. Some MapReduce tests fail to find winutils. MAPREDUCE-4739. Some MapReduce tests fail to find winutils.

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownThreadsHelper;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -473,8 +474,8 @@ public synchronized Configuration loadConfFile() throws IOException {
private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
private FileContext intermediateDoneDirFc; // Intermediate Done Dir private FileContext intermediateDoneDirFc; // Intermediate Done Dir
// FileContext // FileContext
@VisibleForTesting
private ThreadPoolExecutor moveToDoneExecutor = null; protected ThreadPoolExecutor moveToDoneExecutor = null;
private long maxHistoryAge = 0; private long maxHistoryAge = 0;
public HistoryFileManager() { public HistoryFileManager() {
@ -543,6 +544,12 @@ protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf); super.serviceInit(conf);
} }
@Override
public void serviceStop() throws Exception {
ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
super.serviceStop();
}
protected JobListCache createJobListCache() { protected JobListCache createJobListCache() {
return new JobListCache(conf.getInt( return new JobListCache(conf.getInt(
JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,

View File

@ -70,12 +70,15 @@ public void testHistoryEvents() throws Exception {
((JobHistory)context).start(); ((JobHistory)context).start();
Assert.assertTrue( context.getStartTime()>0); Assert.assertTrue( context.getStartTime()>0);
Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED); Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED);
// get job before stopping JobHistory
Job parsedJob = context.getJob(jobId);
// stop JobHistory
((JobHistory)context).stop(); ((JobHistory)context).stop();
Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED); Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED);
Job parsedJob = context.getJob(jobId);
Assert.assertEquals("CompletedMaps not correct", 2, Assert.assertEquals("CompletedMaps not correct", 2,
parsedJob.getCompletedMaps()); parsedJob.getCompletedMaps());
Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName()); Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName());

View File

@ -534,7 +534,10 @@ public void testScanningOldDirs() throws Exception {
Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0); Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
fileInfo = hfm.getFileInfo(jobId); fileInfo = hfm.getFileInfo(jobId);
hfm.stop();
Assert.assertNotNull("Unable to locate old job history", fileInfo); Assert.assertNotNull("Unable to locate old job history", fileInfo);
Assert.assertTrue("HistoryFileManager not shutdown properly",
hfm.moveToDoneExecutor.isTerminated());
} finally { } finally {
LOG.info("FINISHED testScanningOldDirs"); LOG.info("FINISHED testScanningOldDirs");
} }
@ -637,6 +640,9 @@ public void testDeleteFileInfo() throws Exception {
// correct live time // correct live time
hfm.setMaxHistoryAge(-1); hfm.setMaxHistoryAge(-1);
hfm.clean(); hfm.clean();
hfm.stop();
Assert.assertTrue("Thread pool shutdown",
hfm.moveToDoneExecutor.isTerminated());
// should be deleted ! // should be deleted !
Assert.assertTrue("file should be deleted ", fileInfo.isDeleted()); Assert.assertTrue("file should be deleted ", fileInfo.isDeleted());